001/*
002 * Copyright 2018-2019 Ping Identity Corporation
003 * All Rights Reserved.
004 */
005/*
006 * Copyright (C) 2018-2019 Ping Identity Corporation
007 *
008 * This program is free software; you can redistribute it and/or modify
009 * it under the terms of the GNU General Public License (GPLv2 only)
010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
011 * as published by the Free Software Foundation.
012 *
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Public License for more details.
017 *
018 * You should have received a copy of the GNU General Public License
019 * along with this program; if not, see <http://www.gnu.org/licenses>.
020 */
021package com.unboundid.util;
022
023
024
025import java.io.InputStream;
026import java.io.IOException;
027
028
029
030/**
031 * This class provides an {@code InputStream} implementation that uses a
032 * {@link FixedRateBarrier} to impose an upper bound on the rate (in bytes per
033 * second) at which data can be read from a wrapped {@code InputStream}.
034 */
035@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE)
036public final class RateLimitedInputStream
037       extends InputStream
038{
039  // The fixed-rate barrier that will serve as a rate limiter for this class.
040  private final FixedRateBarrier rateLimiter;
041
042  // The input stream from which the data will actually be read.
043  private final InputStream wrappedStream;
044
045  // The maximum number of bytes that can be read in any single call to the
046  // rate limiter.
047  private final int maxBytesPerRead;
048
049
050
051  /**
052   * Creates a new instance of this rate-limited input stream that wraps the
053   * provided input stream.
054   *
055   * @param  wrappedStream      The input stream from which the data will
056   *                            actually be read.  It must not be {@code null}.
057   * @param  maxBytesPerSecond  The maximum number of bytes per second that can
058   *                            be read using this input stream.  It must be
059   *                            greater than zero.
060   */
061  public RateLimitedInputStream(final InputStream wrappedStream,
062                                final int maxBytesPerSecond)
063  {
064    Validator.ensureTrue((wrappedStream != null),
065         "RateLimitedInputStream.wrappedStream must not be null.");
066    Validator.ensureTrue((maxBytesPerSecond > 0),
067         "RateLimitedInputStream.maxBytesPerSecond must be greater than " +
068              "zero.  The provided value was " + maxBytesPerSecond);
069
070    this.wrappedStream = wrappedStream;
071
072    rateLimiter = new FixedRateBarrier(1000L, maxBytesPerSecond);
073    maxBytesPerRead = Math.max(1, (maxBytesPerSecond / 100));
074  }
075
076
077
078  /**
079   * Closes this input stream and the wrapped stream.
080   *
081   * @throws  IOException  If a problem is encountered while closing the wrapped
082   *                       input stream.
083   */
084  @Override()
085  public void close()
086         throws IOException
087  {
088    wrappedStream.close();
089  }
090
091
092
093  /**
094   * Reads a single byte of input from the wrapped input stream.
095   *
096   * @return  The byte that was read, or -1 if the end of the input stream has
097   *          been reached.
098   *
099   * @throws  IOException  If a problem is encountered while attempting to read
100   *                       data from the underlying input stream.
101   */
102  @Override()
103  public int read()
104         throws IOException
105  {
106    rateLimiter.await();
107    return wrappedStream.read();
108  }
109
110
111
112  /**
113   * Reads data from the wrapped input stream into the provided array.
114   *
115   * @param  b  The array into which the data will be placed.
116   *
117   * @return  The number of bytes that were read, or -1 if the end of the input
118   *          stream has been reached.
119   *
120   * @throws  IOException  If a problem is encountered while attempting to read
121   *                       data from the underlying input stream.
122   */
123  @Override()
124  public int read(final byte[] b)
125         throws IOException
126  {
127    return read(b, 0, b.length);
128  }
129
130
131
132  /**
133   * Reads data from the wrapped input stream into the specified portion of the
134   * provided array.
135   *
136   * @param  b       The array into which the data will be placed.
137   * @param  offset  The index into the provided array at which the data should
138   *                 start being added.
139   * @param  length  The maximum number of bytes to be added into the array.
140   *
141   * @return  The number of bytes that were read, or -1 if the end of the input
142   *          stream has been reached.
143   *
144   * @throws  IOException  If a problem is encountered while attempting to read
145   *                       data from the underlying input stream.
146   */
147  @Override()
148  public int read(final byte[] b, final int offset, final int length)
149         throws IOException
150  {
151    if (length <= 0)
152    {
153      return 0;
154    }
155
156    if (length <= maxBytesPerRead)
157    {
158      rateLimiter.await(length);
159      return wrappedStream.read(b, offset, length);
160    }
161    else
162    {
163      int pos = offset;
164      int remainingLength = length;
165      int totalBytesRead = 0;
166      while (remainingLength > 0)
167      {
168        final int lengthThisRead = Math.min(remainingLength, maxBytesPerRead);
169        rateLimiter.await(lengthThisRead);
170        final int bytesRead = wrappedStream.read(b, pos, lengthThisRead);
171        if (bytesRead < 0)
172        {
173          break;
174        }
175
176        pos += bytesRead;
177        totalBytesRead += bytesRead;
178        remainingLength -= bytesRead;
179      }
180
181      return totalBytesRead;
182    }
183  }
184
185
186
187  /**
188   * Retrieves the number of bytes that are immediately available to be read,
189   * if the wrapped stream supports this operation.
190   *
191   * @return  The number of bytes that are immediately available to be read, or
192   *          zero if there are no bytes to be read, if the end of the input
193   *          stream has been reached, or if the wrapped input stream does not
194   *          support this operation.
195   */
196  @Override()
197  public int available()
198         throws IOException
199  {
200    return wrappedStream.available();
201  }
202
203
204
205  /**
206   * Indicates whether this {@code InputStream} implementation supports the use
207   * of the {@link #mark(int)} and {@link #reset()} methods.  This
208   * implementation will support those methods if the wrapped stream supports
209   * them.
210   *
211   * @return  {@code true} if this {@code InputStream} supports the
212   *          {@code mark} and {@code reset} methods, or {@code false} if not.
213   */
214  @Override()
215  public boolean markSupported()
216  {
217    return wrappedStream.markSupported();
218  }
219
220
221
222  /**
223   * Attempts to mark the current position in the wrapped input stream so that
224   * it can optionally be reset after some amount of data has been read.
225   * fun
226   *
227   * @param  readLimit  The maximum number of bytes expected to be read before a
228   *                    call to the {@link #reset()} method before the mark will
229   *                    no longer be honored.
230   */
231  @Override()
232  public void mark(final int readLimit)
233  {
234    wrappedStream.mark(readLimit);
235  }
236
237
238
239  /**
240   * Attempts to reset the position of this input stream to the last mark
241   * position.
242   *
243   * @throws  IOException  If the input stream cannot be repositioned to the
244   *                       marked location, or if no mark has been set.
245   */
246  @Override()
247  public void reset()
248         throws IOException
249  {
250    wrappedStream.reset();
251  }
252}