-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathBufferedInputStream.java
More file actions
580 lines (542 loc) · 21.2 KB
/
BufferedInputStream.java
File metadata and controls
580 lines (542 loc) · 21.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
/*
* Copyright (c) 1994, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.io;
import org.jspecify.annotations.NullMarked;
import org.jspecify.annotations.Nullable;
import java.util.Arrays;
import java.util.Objects;
import jdk.internal.misc.Unsafe;
import jdk.internal.util.ArraysSupport;
/**
* A {@code BufferedInputStream} adds
* functionality to another input stream-namely,
* the ability to buffer the input and to
* support the {@code mark} and {@code reset}
* methods. When the {@code BufferedInputStream}
* is created, an internal buffer array is
* created. As bytes from the stream are read
* or skipped, the internal buffer is refilled
* as necessary from the contained input stream,
* many bytes at a time. The {@code mark}
* operation remembers a point in the input
* stream and the {@code reset} operation
* causes all the bytes read since the most
* recent {@code mark} operation to be
* reread before new bytes are taken from
* the contained input stream.
*
* @apiNote
* Once wrapped in a {@code BufferedInputStream}, the underlying
* {@code InputStream} should not be used directly nor wrapped with
* another stream.
*
* @author Arthur van Hoff
* @since 1.0
*/
@NullMarked
public class BufferedInputStream extends FilterInputStream {
private static final int DEFAULT_BUFFER_SIZE = 8192;
private static final byte[] EMPTY = new byte[0];
/**
* As this class is used early during bootstrap, it's motivated to use
* Unsafe.compareAndSetReference instead of AtomicReferenceFieldUpdater
* (or VarHandles) to reduce dependencies and improve startup time.
*/
private static final Unsafe U = Unsafe.getUnsafe();
private static final long BUF_OFFSET
= U.objectFieldOffset(BufferedInputStream.class, "buf");
// initial buffer size (DEFAULT_BUFFER_SIZE or size specified to constructor)
private final int initialSize;
/**
* The internal buffer array where the data is stored. When necessary,
* it may be replaced by another array of
* a different size.
*/
/*
* We null this out with a CAS on close(), which is necessary since
* closes can be asynchronous. We use nullness of buf[] as primary
* indicator that this stream is closed. (The "in" field is also
* nulled out on close.)
*/
protected volatile byte @Nullable [] buf;
/**
* The index one greater than the index of the last valid byte in
* the buffer.
* This value is always
* in the range {@code 0} through {@code buf.length};
* elements {@code buf[0]} through {@code buf[count-1]}
* contain buffered input data obtained
* from the underlying input stream.
*/
protected int count;
/**
* The current position in the buffer. This is the index of the next
* byte to be read from the {@code buf} array.
* <p>
* This value is always in the range {@code 0}
* through {@code count}. If it is less
* than {@code count}, then {@code buf[pos]}
* is the next byte to be supplied as input;
* if it is equal to {@code count}, then
* the next {@code read} or {@code skip}
* operation will require more bytes to be
* read from the contained input stream.
*
* @see java.io.BufferedInputStream#buf
*/
protected int pos;
/**
* The value of the {@code pos} field at the time the last
* {@code mark} method was called.
* <p>
* This value is always
* in the range {@code -1} through {@code pos}.
* If there is no marked position in the input
* stream, this field is {@code -1}. If
* there is a marked position in the input
* stream, then {@code buf[markpos]}
* is the first byte to be supplied as input
* after a {@code reset} operation. If
* {@code markpos} is not {@code -1},
* then all bytes from positions {@code buf[markpos]}
* through {@code buf[pos-1]} must remain
* in the buffer array (though they may be
* moved to another place in the buffer array,
* with suitable adjustments to the values
* of {@code count}, {@code pos},
* and {@code markpos}); they may not
* be discarded unless and until the difference
* between {@code pos} and {@code markpos}
* exceeds {@code marklimit}.
*
* @see java.io.BufferedInputStream#mark(int)
* @see java.io.BufferedInputStream#pos
*/
protected int markpos = -1;
/**
* The maximum read ahead allowed after a call to the
* {@code mark} method before subsequent calls to the
* {@code reset} method fail.
* Whenever the difference between {@code pos}
* and {@code markpos} exceeds {@code marklimit},
* then the mark may be dropped by setting
* {@code markpos} to {@code -1}.
*
* @see java.io.BufferedInputStream#mark(int)
* @see java.io.BufferedInputStream#reset()
*/
protected int marklimit;
/**
* Check to make sure that underlying input stream has not been
* nulled out due to close; if not return it;
*/
private InputStream getInIfOpen() throws IOException {
InputStream input = in;
if (input == null)
throw new IOException("Stream closed");
return input;
}
/**
* Returns the internal buffer, optionally allocating it if empty.
* @param allocateIfEmpty true to allocate if empty
* @throws IOException if the stream is closed (buf is null)
*/
private byte[] getBufIfOpen(boolean allocateIfEmpty) throws IOException {
byte[] buffer = buf;
if (allocateIfEmpty && buffer == EMPTY) {
buffer = new byte[initialSize];
if (!U.compareAndSetReference(this, BUF_OFFSET, EMPTY, buffer)) {
// re-read buf
buffer = buf;
}
}
if (buffer == null) {
throw new IOException("Stream closed");
}
return buffer;
}
/**
* Returns the internal buffer, allocating it if empty.
* @throws IOException if the stream is closed (buf is null)
*/
private byte[] getBufIfOpen() throws IOException {
return getBufIfOpen(true);
}
/**
* Throws IOException if the stream is closed (buf is null).
*/
private void ensureOpen() throws IOException {
if (buf == null) {
throw new IOException("Stream closed");
}
}
/**
* Creates a {@code BufferedInputStream}
* and saves its argument, the input stream
* {@code in}, for later use. An internal
* buffer array is created and stored in {@code buf}.
*
* @param in the underlying input stream.
*/
public BufferedInputStream(InputStream in) {
this(in, DEFAULT_BUFFER_SIZE);
}
/**
* Creates a {@code BufferedInputStream}
* with the specified buffer size,
* and saves its argument, the input stream
* {@code in}, for later use. An internal
* buffer array of length {@code size}
* is created and stored in {@code buf}.
*
* @param in the underlying input stream.
* @param size the buffer size.
* @throws IllegalArgumentException if {@code size <= 0}.
*/
public BufferedInputStream(InputStream in, int size) {
super(in);
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
}
initialSize = size;
if (getClass() == BufferedInputStream.class) {
// lazily create buffer when not subclassed
buf = EMPTY;
} else {
buf = new byte[size];
}
}
/**
* Fills the buffer with more data, taking into account
* shuffling and other tricks for dealing with marks.
* Assumes that it is being called by a synchronized method.
* This method also assumes that all data has already been read in,
* hence pos > count.
*/
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos == -1)
pos = 0; /* no mark: throw away the buffer */
else if (pos >= buffer.length) { /* no room left in buffer */
if (markpos > 0) { /* can throw away early part of the buffer */
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz);
pos = sz;
markpos = 0;
} else if (buffer.length >= marklimit) {
markpos = -1; /* buffer got too big, invalidate mark */
pos = 0; /* drop buffer contents */
} else { /* grow buffer */
int nsz = ArraysSupport.newLength(pos,
1, /* minimum growth */
pos /* preferred growth */);
if (nsz > marklimit)
nsz = marklimit;
byte[] nbuf = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
if (!U.compareAndSetReference(this, BUF_OFFSET, buffer, nbuf)) {
// Can't replace buf if there was an async close.
// Note: This would need to be changed if fill()
// is ever made accessible to multiple threads.
// But for now, the only way CAS can fail is via close.
// assert buf == null;
throw new IOException("Stream closed");
}
buffer = nbuf;
}
}
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}
/**
* See
* the general contract of the {@code read}
* method of {@code InputStream}.
*
* @return the next byte of data, or {@code -1} if the end of the
* stream is reached.
* @throws IOException if this input stream has been closed by
* invoking its {@link #close()} method,
* or an I/O error occurs.
* @see java.io.FilterInputStream#in
*/
public synchronized int read() throws IOException {
if (pos >= count) {
fill();
if (pos >= count)
return -1;
}
return getBufIfOpen()[pos++] & 0xff;
}
/**
* Read bytes into a portion of an array, reading from the underlying
* stream at most once if necessary.
*/
private int read1(byte[] b, int off, int len) throws IOException {
int avail = count - pos;
if (avail <= 0) {
/* If the requested length is at least as large as the buffer, and
if there is no mark/reset activity, do not bother to copy the
bytes into the local buffer. In this way buffered streams will
cascade harmlessly. */
int size = Math.max(getBufIfOpen(false).length, initialSize);
if (len >= size && markpos == -1) {
return getInIfOpen().read(b, off, len);
}
fill();
avail = count - pos;
if (avail <= 0) return -1;
}
int cnt = (avail < len) ? avail : len;
System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
pos += cnt;
return cnt;
}
/**
* Reads bytes from this byte-input stream into the specified byte array,
* starting at the given offset.
*
* <p> This method implements the general contract of the corresponding
* {@link InputStream#read(byte[], int, int) read} method of
* the {@link InputStream} class. As an additional
* convenience, it attempts to read as many bytes as possible by repeatedly
* invoking the {@code read} method of the underlying stream. This
* iterated {@code read} continues until one of the following
* conditions becomes true: <ul>
*
* <li> The specified number of bytes have been read,
*
* <li> The {@code read} method of the underlying stream returns
* {@code -1}, indicating end-of-file, or
*
* <li> The {@code available} method of the underlying stream
* returns zero, indicating that further input requests would block.
*
* </ul> If the first {@code read} on the underlying stream returns
* {@code -1} to indicate end-of-file then this method returns
* {@code -1}. Otherwise, this method returns the number of bytes
* actually read.
*
* <p> Subclasses of this class are encouraged, but not required, to
* attempt to read as many bytes as possible in the same fashion.
*
* @param b destination buffer.
* @param off offset at which to start storing bytes.
* @param len maximum number of bytes to read.
* @return the number of bytes read, or {@code -1} if the end of
* the stream has been reached.
* @throws IOException if this input stream has been closed by
* invoking its {@link #close()} method,
* or an I/O error occurs.
* @throws IndexOutOfBoundsException {@inheritDoc}
*/
public synchronized int read(byte[] b, int off, int len) throws IOException {
ensureOpen();
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
int n = 0;
for (;;) {
int nread = read1(b, off + n, len - n);
if (nread <= 0)
return (n == 0) ? nread : n;
n += nread;
if (n >= len)
return n;
// if not closed but no bytes available, return
InputStream input = in;
if (input != null && input.available() <= 0)
return n;
}
}
/**
* See the general contract of the {@code skip}
* method of {@code InputStream}.
*
* @throws IOException if this input stream has been closed by
* invoking its {@link #close()} method,
* {@code in.skip(n)} throws an IOException,
* or an I/O error occurs.
*/
public synchronized long skip(long n) throws IOException {
ensureOpen();
if (n <= 0) {
return 0;
}
long avail = count - pos;
if (avail <= 0) {
// If no mark position set then don't keep in buffer
if (markpos == -1)
return getInIfOpen().skip(n);
// Fill in buffer to save bytes for reset
fill();
avail = count - pos;
if (avail <= 0)
return 0;
}
long skipped = (avail < n) ? avail : n;
pos += (int)skipped;
return skipped;
}
/**
* Returns an estimate of the number of bytes that can be read (or
* skipped over) from this input stream without blocking by the next
* invocation of a method for this input stream. The next invocation might be
* the same thread or another thread. A single read or skip of this
* many bytes will not block, but may read or skip fewer bytes.
* <p>
* This method returns the sum of the number of bytes remaining to be read in
* the buffer ({@code count - pos}) and the result of calling the
* {@link java.io.FilterInputStream#in in}{@code .available()}.
*
* @return an estimate of the number of bytes that can be read (or skipped
* over) from this input stream without blocking.
* @throws IOException if this input stream has been closed by
* invoking its {@link #close()} method,
* or an I/O error occurs.
*/
public synchronized int available() throws IOException {
int n = count - pos;
int avail = getInIfOpen().available();
return n > (Integer.MAX_VALUE - avail)
? Integer.MAX_VALUE
: n + avail;
}
/**
* See the general contract of the {@code mark}
* method of {@code InputStream}.
*
* @param readlimit the maximum limit of bytes that can be read before
* the mark position becomes invalid.
* @see java.io.BufferedInputStream#reset()
*/
public synchronized void mark(int readlimit) {
marklimit = readlimit;
markpos = pos;
}
/**
* See the general contract of the {@code reset}
* method of {@code InputStream}.
* <p>
* If {@code markpos} is {@code -1}
* (no mark has been set or the mark has been
* invalidated), an {@code IOException}
* is thrown. Otherwise, {@code pos} is
* set equal to {@code markpos}.
*
* @throws IOException if this stream has not been marked or,
* if the mark has been invalidated, or the stream
* has been closed by invoking its {@link #close()}
* method, or an I/O error occurs.
* @see java.io.BufferedInputStream#mark(int)
*/
public synchronized void reset() throws IOException {
ensureOpen();
if (markpos < 0)
throw new IOException("Resetting to invalid mark");
pos = markpos;
}
/**
* Tests if this input stream supports the {@code mark}
* and {@code reset} methods. The {@code markSupported}
* method of {@code BufferedInputStream} returns
* {@code true}.
*
* @return a {@code boolean} indicating if this stream type supports
* the {@code mark} and {@code reset} methods.
* @see java.io.InputStream#mark(int)
* @see java.io.InputStream#reset()
*/
public boolean markSupported() {
return true;
}
/**
* Closes this input stream and releases any system resources
* associated with the stream.
* Once the stream has been closed, further read(), available(), reset(),
* or skip() invocations will throw an IOException.
* Closing a previously closed stream has no effect.
*
* @throws IOException if an I/O error occurs.
*/
public void close() throws IOException {
byte[] buffer;
while ( (buffer = buf) != null) {
if (U.compareAndSetReference(this, BUF_OFFSET, buffer, null)) {
InputStream input = in;
in = null;
if (input != null)
input.close();
return;
}
// Else retry in case a new buf was CASed in fill()
}
}
@Override
public synchronized long transferTo(OutputStream out) throws IOException {
Objects.requireNonNull(out, "out");
if (getClass() == BufferedInputStream.class && markpos == -1) {
int avail = count - pos;
if (avail > 0) {
if (isTrusted(out)) {
out.write(getBufIfOpen(), pos, avail);
} else {
// Prevent poisoning and leaking of buf
byte[] buffer = Arrays.copyOfRange(getBufIfOpen(), pos, count);
out.write(buffer);
}
pos = count;
}
try {
return Math.addExact(avail, getInIfOpen().transferTo(out));
} catch (ArithmeticException ignore) {
return Long.MAX_VALUE;
}
} else {
return super.transferTo(out);
}
}
/**
* Returns true if this class satisfies the following conditions:
* <ul>
* <li>does not retain a reference to the {@code byte[]}</li>
* <li>does not leak a reference to the {@code byte[]} to non-trusted classes</li>
* <li>does not modify the contents of the {@code byte[]}</li>
* <li>{@code write()} method does not read the contents outside of the offset/length bounds</li>
* </ul>
*
* @return true if this class is trusted
*/
private static boolean isTrusted(OutputStream os) {
var clazz = os.getClass();
return clazz == ByteArrayOutputStream.class
|| clazz == FileOutputStream.class
|| clazz == PipedOutputStream.class;
}
}