77import java .nio .charset .CharacterCodingException ;
88import java .nio .charset .CharsetDecoder ;
99import java .nio .charset .CoderResult ;
10- import java .util .ArrayList ;
11- import java .util .List ;
1210
1311/**
1412 * A {@link Buffer} implementation backed by multiple {@link ByteBuffer}s,
2220 */
2321class MultiBuffer implements Buffer {
2422
25- /** Maximum size per underlying chunk. */
26- static final int CHUNK_SIZE = Integer .MAX_VALUE / 2 ;
23+ /** Default maximum size per underlying chunk. */
24+ static final int DEFAULT_CHUNK_SIZE = Integer .MAX_VALUE / 2 ;
2725
2826 final ByteBuffer [] buffers ;
27+ private final int chunkSize ;
2928 private final long capacity ;
3029
3130 private long position = 0 ;
@@ -38,36 +37,59 @@ class MultiBuffer implements Buffer {
3837 * @param capacity the total capacity in bytes
3938 */
4039 public MultiBuffer (long capacity ) {
40+ this (capacity , DEFAULT_CHUNK_SIZE );
41+ }
42+
43+ /**
44+ * Creates a new {@code MultiBuffer} backed by the given
45+ * {@link ByteBuffer}s.
46+ *
47+ * <p>The total capacity and limit are set to the sum of the
48+ * buffer capacities.
49+ *
50+ * @param buffers the backing buffers (cloned into an internal array)
51+ * @param chunkSize the size of each buffer chunk
52+ */
53+ private MultiBuffer (ByteBuffer [] buffers , int chunkSize ) {
54+ this .buffers = buffers .clone ();
55+ long capacity = 0 ;
56+ for (ByteBuffer buffer : buffers ) {
57+ capacity += buffer .capacity ();
58+ }
59+ this .capacity = capacity ;
60+ this .limit = capacity ;
61+ this .chunkSize = chunkSize ;
62+ }
63+
64+ /**
65+ * Creates a new {@code MultiBuffer} with the given capacity, backed by
66+ * heap-allocated {@link ByteBuffer}s with the given chunk size.
67+ *
68+ * @param capacity the total capacity in bytes
69+ * @param chunkSize the size of each buffer chunk
70+ */
71+ MultiBuffer (long capacity , int chunkSize ) {
4172 if (capacity <= 0 ) {
4273 throw new IllegalArgumentException ("Capacity must be positive" );
4374 }
4475 this .capacity = capacity ;
4576 this .limit = capacity ;
77+ this .chunkSize = chunkSize ;
4678
47- int fullChunks = (int ) (capacity / CHUNK_SIZE );
48- int remainder = (int ) (capacity % CHUNK_SIZE );
79+ int fullChunks = (int ) (capacity / chunkSize );
80+ int remainder = (int ) (capacity % chunkSize );
4981 int totalChunks = fullChunks + (remainder > 0 ? 1 : 0 );
5082
5183 this .buffers = new ByteBuffer [totalChunks ];
5284
5385 for (int i = 0 ; i < fullChunks ; i ++) {
54- buffers [i ] = ByteBuffer .allocateDirect (CHUNK_SIZE );
86+ buffers [i ] = ByteBuffer .allocateDirect (chunkSize );
5587 }
5688 if (remainder > 0 ) {
5789 buffers [totalChunks - 1 ] = ByteBuffer .allocateDirect (remainder );
5890 }
5991 }
6092
61- private MultiBuffer (ByteBuffer [] buffers ) {
62- this .buffers = buffers .clone ();
63- long capacity = 0 ;
64- for (ByteBuffer buffer : buffers ) {
65- capacity += buffer .capacity ();
66- }
67- this .capacity = capacity ;
68- this .limit = capacity ;
69- }
70-
7193 /** {@inheritDoc} */
7294 @ Override
7395 public long capacity () {
@@ -131,8 +153,8 @@ public Buffer get(byte[] dst) {
131153 int offset = 0 ;
132154 int length = dst .length ;
133155 while (length > 0 ) {
134- int bufIndex = (int ) (pos / CHUNK_SIZE );
135- int bufOffset = (int ) (pos % CHUNK_SIZE );
156+ int bufIndex = (int ) (pos / this . chunkSize );
157+ int bufOffset = (int ) (pos % this . chunkSize );
136158 ByteBuffer buf = buffers [bufIndex ];
137159 buf .position (bufOffset );
138160 int toRead = Math .min (buf .remaining (), length );
@@ -151,16 +173,16 @@ public byte get(long index) {
151173 if (index < 0 || index >= limit ) {
152174 throw new IndexOutOfBoundsException ("Index: " + index );
153175 }
154- int bufIndex = (int ) (index / CHUNK_SIZE );
155- int offset = (int ) (index % CHUNK_SIZE );
176+ int bufIndex = (int ) (index / this . chunkSize );
177+ int offset = (int ) (index % this . chunkSize );
156178 return buffers [bufIndex ].get (offset );
157179 }
158180
159181 /** {@inheritDoc} */
160182 @ Override
161183 public double getDouble () {
162- int bufIndex = (int ) (position / CHUNK_SIZE );
163- int off = (int ) (position % CHUNK_SIZE );
184+ int bufIndex = (int ) (position / this . chunkSize );
185+ int off = (int ) (position % this . chunkSize );
164186 ByteBuffer buf = buffers [bufIndex ];
165187 buf .position (off );
166188 if (buf .remaining () >= 8 ) {
@@ -177,8 +199,8 @@ public double getDouble() {
177199 /** {@inheritDoc} */
178200 @ Override
179201 public float getFloat () {
180- int bufIndex = (int ) (position / CHUNK_SIZE );
181- int off = (int ) (position % CHUNK_SIZE );
202+ int bufIndex = (int ) (position / this . chunkSize );
203+ int off = (int ) (position % this . chunkSize );
182204 ByteBuffer buf = buffers [bufIndex ];
183205 buf .position (off );
184206 if (buf .remaining () >= 4 ) {
@@ -195,7 +217,7 @@ public float getFloat() {
195217 /** {@inheritDoc} */
196218 @ Override
197219 public Buffer duplicate () {
198- MultiBuffer copy = new MultiBuffer (capacity );
220+ MultiBuffer copy = new MultiBuffer (capacity , chunkSize );
199221 for (int i = 0 ; i < buffers .length ; i ++) {
200222 copy .buffers [i ] = buffers [i ].duplicate ();
201223 }
@@ -207,11 +229,24 @@ public Buffer duplicate() {
207229 /** {@inheritDoc} */
208230 @ Override
209231 public long readFrom (FileChannel channel ) throws IOException {
232+ return this .readFrom (channel , DEFAULT_CHUNK_SIZE );
233+ }
234+
235+ /**
236+ * Reads data from the given channel into this buffer starting at the
237+ * current position.
238+ *
239+ * @param channel the file channel
240+ * @param chunkSize the chunk size to use for positioning reads
241+ * @return the number of bytes read
242+ * @throws IOException if an I/O error occurs
243+ */
244+ long readFrom (FileChannel channel , int chunkSize ) throws IOException {
210245 long totalRead = 0 ;
211246 long pos = position ;
212- for (int i = (int ) (pos / CHUNK_SIZE ); i < buffers .length ; i ++) {
247+ for (int i = (int ) (pos / chunkSize ); i < buffers .length ; i ++) {
213248 ByteBuffer buf = buffers [i ];
214- buf .position ((int ) (pos % CHUNK_SIZE ));
249+ buf .position ((int ) (pos % chunkSize ));
215250 int read = channel .read (buf );
216251 if (read == -1 ) {
217252 break ;
@@ -244,8 +279,8 @@ public String decode(CharsetDecoder decoder)
244279
245280 while (remainingBytes > 0 ) {
246281 // Locate which underlying buffer we are in
247- int bufIndex = (int ) (pos / CHUNK_SIZE );
248- int bufOffset = (int ) (pos % CHUNK_SIZE );
282+ int bufIndex = (int ) (pos / this . chunkSize );
283+ int bufOffset = (int ) (pos % this . chunkSize );
249284
250285 ByteBuffer srcView = buffers [bufIndex ].duplicate ();
251286 srcView .position (bufOffset );
@@ -272,7 +307,7 @@ public String decode(CharsetDecoder decoder)
272307 /**
273308 * Wraps the given byte arrays in a new {@code MultiBuffer}.
274309 *
275- * <p>If any array exceeds {@link #CHUNK_SIZE }, it will be split across multiple
310+ * <p>If any array exceeds {@link #DEFAULT_CHUNK_SIZE }, it will be split across multiple
276311 * underlying {@link ByteBuffer}s. The data is copied into new buffers so the
277312 * returned {@code MultiBuffer} is fully independent.
278313 *
@@ -282,7 +317,7 @@ public String decode(CharsetDecoder decoder)
282317 public static MultiBuffer wrap (ByteBuffer [] chunks ) {
283318 for (int i = 0 ; i < chunks .length ; i ++) {
284319 ByteBuffer chunk = chunks [i ];
285- if (chunk .capacity () == CHUNK_SIZE ) {
320+ if (chunk .capacity () == DEFAULT_CHUNK_SIZE ) {
286321 continue ;
287322 }
288323 if (i == chunks .length - 1 ) {
@@ -293,7 +328,7 @@ public static MultiBuffer wrap(ByteBuffer[] chunks) {
293328 + " is smaller than expected chunk size" );
294329 }
295330
296- return new MultiBuffer (chunks );
331+ return new MultiBuffer (chunks , DEFAULT_CHUNK_SIZE );
297332 }
298333
299334 /**
@@ -314,8 +349,8 @@ public static MultiBuffer mapFromChannel(FileChannel channel) throws IOException
314349 long remaining = size ;
315350
316351 for (int i = 0 ; i < buf .buffers .length ; i ++) {
317- long chunkPos = (long ) i * CHUNK_SIZE ;
318- long chunkSize = Math .min (CHUNK_SIZE , remaining );
352+ long chunkPos = (long ) i * DEFAULT_CHUNK_SIZE ;
353+ long chunkSize = Math .min (DEFAULT_CHUNK_SIZE , remaining );
319354 ByteBuffer mapped = channel .map (
320355 FileChannel .MapMode .READ_ONLY ,
321356 chunkPos ,
0 commit comments