@@ -183,8 +183,8 @@ private void readHeader() throws IOException {
183
183
elementCount = readInt (buffer , 4 );
184
184
int firstOffset = readInt (buffer , 8 );
185
185
int lastOffset = readInt (buffer , 12 );
186
- first = readElement (firstOffset );
187
- last = readElement (lastOffset );
186
+ first = readElement (firstOffset , raf );
187
+ last = readElement (lastOffset , raf );
188
188
});
189
189
}
190
190
@@ -194,26 +194,23 @@ private void readHeader() throws IOException {
194
194
* update the class member variables *after* this call succeeds. Assumes segment writes are atomic
195
195
* in the underlying file system.
196
196
*/
197
- private void writeHeader (int fileLength , int elementCount , int firstPosition , int lastPosition )
197
+ private void writeHeader (int fileLength , int elementCount , int firstPosition , int lastPosition ,
198
+ RandomAccessFile raf )
198
199
throws IOException {
199
200
writeInts (buffer , fileLength , elementCount , firstPosition , lastPosition );
200
- openAndExecute (raf -> {
201
- raf .seek (0 );
202
- raf .write (buffer );
203
- });
201
+ raf .seek (0 );
202
+ raf .write (buffer );
204
203
}
205
204
206
205
/**
207
206
* Returns the Element for the given offset.
208
207
*/
209
- private Element readElement (int position ) throws IOException {
208
+ private Element readElement (int position , RandomAccessFile raf ) throws IOException {
210
209
if (position == 0 ) {
211
210
return Element .NULL ;
212
211
}
213
- try (RandomAccessFile raf = open (rafFile )) {
214
- raf .seek (position );
215
- return new Element (position , raf .readInt ());
216
- }
212
+ raf .seek (position );
213
+ return new Element (position , raf .readInt ());
217
214
}
218
215
219
216
/**
@@ -266,23 +263,23 @@ private int wrapPosition(int position) {
266
263
* @param position in file to write to
267
264
* @param buffer to write from
268
265
* @param count # of bytes to write
266
+ * @param raf scoped underlying file
269
267
*/
270
- private void ringWrite (int position , byte [] buffer , int offset , int count ) throws IOException {
271
- final int wrappedPosition = wrapPosition (position );
272
- openAndExecute (raf -> {
273
- if (wrappedPosition + count <= fileLength ) {
274
- raf .seek (wrappedPosition );
275
- raf .write (buffer , offset , count );
276
- } else {
277
- // The write overlaps the EOF.
278
- // # of bytes to write before the EOF.
279
- int beforeEof = fileLength - wrappedPosition ;
280
- raf .seek (wrappedPosition );
281
- raf .write (buffer , offset , beforeEof );
282
- raf .seek (HEADER_LENGTH );
283
- raf .write (buffer , offset + beforeEof , count - beforeEof );
284
- }
285
- });
268
+ private void ringWrite (int position , byte [] buffer , int offset , int count , RandomAccessFile raf )
269
+ throws IOException {
270
+ position = wrapPosition (position );
271
+ if (position + count <= fileLength ) {
272
+ raf .seek (position );
273
+ raf .write (buffer , offset , count );
274
+ } else {
275
+ // The write overlaps the EOF.
276
+ // # of bytes to write before the EOF.
277
+ int beforeEof = fileLength - position ;
278
+ raf .seek (position );
279
+ raf .write (buffer , offset , beforeEof );
280
+ raf .seek (HEADER_LENGTH );
281
+ raf .write (buffer , offset + beforeEof , count - beforeEof );
282
+ }
286
283
}
287
284
288
285
/**
@@ -291,23 +288,23 @@ private void ringWrite(int position, byte[] buffer, int offset, int count) throw
291
288
* @param position in file to read from
292
289
* @param buffer to read into
293
290
* @param count # of bytes to read
291
+ * @param raf scoped underlying file
294
292
*/
295
- private void ringRead (int position , byte [] buffer , int offset , int count ) throws IOException {
296
- final int wrappedPosition = wrapPosition (position );
297
- openAndExecute (raf -> {
298
- if (wrappedPosition + count <= fileLength ) {
299
- raf .seek (wrappedPosition );
300
- raf .readFully (buffer , offset , count );
301
- } else {
302
- // The read overlaps the EOF.
303
- // # of bytes to read before the EOF.
304
- int beforeEof = fileLength - wrappedPosition ;
305
- raf .seek (wrappedPosition );
306
- raf .readFully (buffer , offset , beforeEof );
307
- raf .seek (HEADER_LENGTH );
308
- raf .readFully (buffer , offset + beforeEof , count - beforeEof );
309
- }
310
- });
293
+ private void ringRead (int position , byte [] buffer , int offset , int count , RandomAccessFile raf )
294
+ throws IOException {
295
+ position = wrapPosition (position );
296
+ if (position + count <= fileLength ) {
297
+ raf .seek (position );
298
+ raf .readFully (buffer , offset , count );
299
+ } else {
300
+ // The read overlaps the EOF.
301
+ // # of bytes to read before the EOF.
302
+ int beforeEof = fileLength - position ;
303
+ raf .seek (position );
304
+ raf .readFully (buffer , offset , beforeEof );
305
+ raf .seek (HEADER_LENGTH );
306
+ raf .readFully (buffer , offset + beforeEof , count - beforeEof );
307
+ }
311
308
}
312
309
313
310
/**
@@ -334,31 +331,32 @@ public synchronized void add(byte[] data, int offset, int count) throws IOExcept
334
331
throw new IndexOutOfBoundsException ();
335
332
}
336
333
337
- expandIfNecessary (count );
338
-
339
- // Insert a new element after the current last element.
340
- boolean wasEmpty = isEmpty ();
341
- int position =
342
- wasEmpty
343
- ? HEADER_LENGTH
344
- : wrapPosition (last .position + Element .HEADER_LENGTH + last .length );
345
- Element newLast = new Element (position , count );
346
-
347
- // Write length.
348
- writeInt (buffer , 0 , count );
349
- ringWrite (newLast .position , buffer , 0 , Element .HEADER_LENGTH );
350
-
351
- // Write data.
352
- ringWrite (newLast .position + Element .HEADER_LENGTH , data , offset , count );
353
-
354
- // Commit the addition. If wasEmpty, first == last.
355
- int firstPosition = wasEmpty ? newLast .position : first .position ;
356
- writeHeader (fileLength , elementCount + 1 , firstPosition , newLast .position );
357
- last = newLast ;
358
- elementCount ++;
359
- if (wasEmpty ) {
360
- first = last ; // first element
361
- }
334
+ openAndExecute (raf -> {
335
+ expandIfNecessary (count , raf );
336
+
337
+ // Insert a new element after the current last element.
338
+ boolean wasEmpty = isEmpty ();
339
+ int position =
340
+ wasEmpty
341
+ ? HEADER_LENGTH
342
+ : wrapPosition (last .position + Element .HEADER_LENGTH + last .length );
343
+ Element newLast = new Element (position , count );
344
+
345
+ // Write length.
346
+ writeInt (buffer , 0 , count );
347
+ ringWrite (newLast .position , buffer , 0 , Element .HEADER_LENGTH , raf );
348
+
349
+ // Write data.
350
+ ringWrite (newLast .position + Element .HEADER_LENGTH , data , offset , count , raf );
351
+ // Commit the addition. If wasEmpty, first == last.
352
+ int firstPosition = wasEmpty ? newLast .position : first .position ;
353
+ writeHeader (fileLength , elementCount + 1 , firstPosition , newLast .position , raf );
354
+ last = newLast ;
355
+ elementCount ++;
356
+ if (wasEmpty ) {
357
+ first = last ; // first element
358
+ }
359
+ });
362
360
}
363
361
364
362
/**
@@ -404,7 +402,7 @@ public synchronized boolean isEmpty() {
404
402
*
405
403
* @param dataLength length of data being added
406
404
*/
407
- private void expandIfNecessary (int dataLength ) throws IOException {
405
+ private void expandIfNecessary (int dataLength , RandomAccessFile raf ) throws IOException {
408
406
int elementLength = Element .HEADER_LENGTH + dataLength ;
409
407
int remainingBytes = remainingBytes ();
410
408
if (remainingBytes >= elementLength ) {
@@ -428,23 +426,21 @@ private void expandIfNecessary(int dataLength) throws IOException {
428
426
429
427
// If the buffer is split, we need to make it contiguous
430
428
if (endOfLastElement < first .position ) {
431
- openAndExecute (raf -> {
432
- FileChannel channel = raf .getChannel ();
433
- channel .position (fileLength ); // destination position
434
- int count = endOfLastElement - Element .HEADER_LENGTH ;
435
- if (channel .transferTo (HEADER_LENGTH , count , channel ) != count ) {
436
- throw new AssertionError ("Copied insufficient number of bytes!" );
437
- }
438
- });
429
+ FileChannel channel = raf .getChannel ();
430
+ channel .position (fileLength ); // destination position
431
+ int count = endOfLastElement - Element .HEADER_LENGTH ;
432
+ if (channel .transferTo (HEADER_LENGTH , count , channel ) != count ) {
433
+ throw new AssertionError ("Copied insufficient number of bytes!" );
434
+ }
439
435
}
440
436
441
437
// Commit the expansion.
442
438
if (last .position < first .position ) {
443
439
int newLastPosition = fileLength + last .position - HEADER_LENGTH ;
444
- writeHeader (newLength , elementCount , first .position , newLastPosition );
440
+ writeHeader (newLength , elementCount , first .position , newLastPosition , raf );
445
441
last = new Element (newLastPosition , last .length );
446
442
} else {
447
- writeHeader (newLength , elementCount , first .position , last .position );
443
+ writeHeader (newLength , elementCount , first .position , last .position , raf );
448
444
}
449
445
450
446
fileLength = newLength ;
@@ -470,7 +466,7 @@ public synchronized byte[] peek() throws IOException {
470
466
}
471
467
int length = first .length ;
472
468
byte [] data = new byte [length ];
473
- ringRead (first .position + Element .HEADER_LENGTH , data , 0 , length );
469
+ openAndExecute ( raf -> ringRead (first .position + Element .HEADER_LENGTH , data , 0 , length , raf ) );
474
470
return data ;
475
471
}
476
472
@@ -479,7 +475,7 @@ public synchronized byte[] peek() throws IOException {
479
475
*/
480
476
public synchronized void peek (ElementReader reader ) throws IOException {
481
477
if (elementCount > 0 ) {
482
- reader .read (new ElementInputStream (first ), first .length );
478
+ openAndExecute ( raf -> reader .read (new ElementInputStream (first , raf ), first .length ) );
483
479
}
484
480
}
485
481
@@ -488,12 +484,14 @@ public synchronized void peek(ElementReader reader) throws IOException {
488
484
* added.
489
485
*/
490
486
public synchronized void forEach (ElementReader reader ) throws IOException {
491
- int position = first .position ;
492
- for (int i = 0 ; i < elementCount ; i ++) {
493
- Element current = readElement (position );
494
- reader .read (new ElementInputStream (current ), current .length );
495
- position = wrapPosition (current .position + Element .HEADER_LENGTH + current .length );
496
- }
487
+ openAndExecute (raf -> {
488
+ int position = first .position ;
489
+ for (int i = 0 ; i < elementCount ; i ++) {
490
+ Element current = readElement (position , raf );
491
+ reader .read (new ElementInputStream (current , raf ), current .length );
492
+ position = wrapPosition (current .position + Element .HEADER_LENGTH + current .length );
493
+ }
494
+ });
497
495
}
498
496
499
497
/**
@@ -515,10 +513,12 @@ private final class ElementInputStream extends InputStream {
515
513
516
514
private int position ;
517
515
private int remaining ;
516
+ private final RandomAccessFile raf ;
518
517
519
- private ElementInputStream (Element element ) {
518
+ private ElementInputStream (Element element , RandomAccessFile raf ) {
520
519
position = wrapPosition (element .position + Element .HEADER_LENGTH );
521
520
remaining = element .length ;
521
+ this .raf = raf ;
522
522
}
523
523
524
524
@ Override
@@ -531,7 +531,7 @@ public int read(byte[] buffer, int offset, int length) throws IOException {
531
531
if (length > remaining ) {
532
532
length = remaining ;
533
533
}
534
- ringRead (position , buffer , offset , length );
534
+ ringRead (position , buffer , offset , length , raf );
535
535
position = wrapPosition (position + length );
536
536
remaining -= length ;
537
537
return length ;
@@ -546,10 +546,8 @@ public int read() throws IOException {
546
546
return -1 ;
547
547
}
548
548
int readByte ;
549
- try (RandomAccessFile raf = open (rafFile )) {
550
- raf .seek (position );
551
- readByte = raf .read ();
552
- }
549
+ raf .seek (position );
550
+ readByte = raf .read ();
553
551
position = wrapPosition (position + 1 );
554
552
remaining --;
555
553
return readByte ;
@@ -572,24 +570,26 @@ public synchronized void remove() throws IOException {
572
570
if (isEmpty ()) {
573
571
throw new NoSuchElementException ();
574
572
}
575
- if (elementCount == 1 ) {
576
- clear ();
577
- } else {
578
- // assert elementCount > 1
579
- int newFirstPosition = wrapPosition (first .position + Element .HEADER_LENGTH + first .length );
580
- ringRead (newFirstPosition , buffer , 0 , Element .HEADER_LENGTH );
581
- int length = readInt (buffer , 0 );
582
- writeHeader (fileLength , elementCount - 1 , newFirstPosition , last .position );
583
- elementCount --;
584
- first = new Element (newFirstPosition , length );
585
- }
573
+ openAndExecute (raf -> {
574
+ if (elementCount == 1 ) {
575
+ clear (raf );
576
+ } else {
577
+ // assert elementCount > 1
578
+ int newFirstPosition = wrapPosition (first .position + Element .HEADER_LENGTH + first .length );
579
+ ringRead (newFirstPosition , buffer , 0 , Element .HEADER_LENGTH , raf );
580
+ int length = readInt (buffer , 0 );
581
+ writeHeader (fileLength , elementCount - 1 , newFirstPosition , last .position , raf );
582
+ elementCount --;
583
+ first = new Element (newFirstPosition , length );
584
+ }
585
+ });
586
586
}
587
587
588
588
/**
589
589
* Clears this queue. Truncates the file to the initial size.
590
590
*/
591
- public synchronized void clear () throws IOException {
592
- writeHeader (INITIAL_LENGTH , 0 , 0 , 0 );
591
+ public synchronized void clear (RandomAccessFile raf ) throws IOException {
592
+ writeHeader (INITIAL_LENGTH , 0 , 0 , 0 , raf );
593
593
elementCount = 0 ;
594
594
first = Element .NULL ;
595
595
last = Element .NULL ;
0 commit comments