18
18
19
19
import java .io .File ;
20
20
import java .io .IOException ;
21
+ import java .io .RandomAccessFile ;
21
22
import java .nio .ByteBuffer ;
22
23
import java .nio .channels .ClosedByInterruptException ;
23
24
import java .nio .channels .ClosedChannelException ;
@@ -39,22 +40,22 @@ class FileDataBlock implements CloseableDataBlock {
39
40
40
41
private static final DebugLogger debug = DebugLogger .get (FileDataBlock .class );
41
42
42
- static Tracker tracker ;
43
+ static Tracker tracker = Tracker . NONE ;
43
44
44
- private final ManagedFileChannel channel ;
45
+ private final FileAccess fileAccess ;
45
46
46
47
private final long offset ;
47
48
48
49
private final long size ;
49
50
50
51
FileDataBlock (Path path ) throws IOException {
51
- this .channel = new ManagedFileChannel (path );
52
+ this .fileAccess = new FileAccess (path );
52
53
this .offset = 0 ;
53
54
this .size = Files .size (path );
54
55
}
55
56
56
- FileDataBlock (ManagedFileChannel channel , long offset , long size ) {
57
- this .channel = channel ;
57
+ FileDataBlock (FileAccess fileAccess , long offset , long size ) {
58
+ this .fileAccess = fileAccess ;
58
59
this .offset = offset ;
59
60
this .size = size ;
60
61
}
@@ -79,7 +80,7 @@ public int read(ByteBuffer dst, long pos) throws IOException {
79
80
originalDestinationLimit = dst .limit ();
80
81
dst .limit (dst .position () + remaining );
81
82
}
82
- int result = this .channel .read (dst , this .offset + pos );
83
+ int result = this .fileAccess .read (dst , this .offset + pos );
83
84
if (originalDestinationLimit != -1 ) {
84
85
dst .limit (originalDestinationLimit );
85
86
}
@@ -92,7 +93,7 @@ public int read(ByteBuffer dst, long pos) throws IOException {
92
93
* @throws IOException on I/O error
93
94
*/
94
95
void open () throws IOException {
95
- this .channel .open ();
96
+ this .fileAccess .open ();
96
97
}
97
98
98
99
/**
@@ -102,7 +103,7 @@ void open() throws IOException {
102
103
*/
103
104
@ Override
104
105
public void close () throws IOException {
105
- this .channel .close ();
106
+ this .fileAccess .close ();
106
107
}
107
108
108
109
/**
@@ -112,7 +113,7 @@ public void close() throws IOException {
112
113
* @throws E if the channel is closed
113
114
*/
114
115
<E extends Exception > void ensureOpen (Supplier <E > exceptionSupplier ) throws E {
115
- this .channel .ensureOpen (exceptionSupplier );
116
+ this .fileAccess .ensureOpen (exceptionSupplier );
116
117
}
117
118
118
119
/**
@@ -145,14 +146,14 @@ FileDataBlock slice(long offset, long size) {
145
146
if (size < 0 || offset + size > this .size ) {
146
147
throw new IllegalArgumentException ("Size must not be negative and must be within bounds" );
147
148
}
148
- debug .log ("Slicing %s at %s with size %s" , this .channel , offset , size );
149
- return new FileDataBlock (this .channel , this .offset + offset , size );
149
+ debug .log ("Slicing %s at %s with size %s" , this .fileAccess , offset , size );
150
+ return new FileDataBlock (this .fileAccess , this .offset + offset , size );
150
151
}
151
152
152
153
/**
153
154
* Manages access to underlying {@link FileChannel}.
154
155
*/
155
- static class ManagedFileChannel {
156
+ static class FileAccess {
156
157
157
158
static final int BUFFER_SIZE = 1024 * 10 ;
158
159
@@ -162,6 +163,10 @@ static class ManagedFileChannel {
162
163
163
164
private FileChannel fileChannel ;
164
165
166
+ private boolean fileChannelInterrupted ;
167
+
168
+ private RandomAccessFile randomAccessFile ;
169
+
165
170
private ByteBuffer buffer ;
166
171
167
172
private long bufferPosition = -1 ;
@@ -170,7 +175,7 @@ static class ManagedFileChannel {
170
175
171
176
private final Object lock = new Object ();
172
177
173
- ManagedFileChannel (Path path ) {
178
+ FileAccess (Path path ) {
174
179
if (!Files .isRegularFile (path )) {
175
180
throw new IllegalArgumentException (path + " must be a regular file" );
176
181
}
@@ -194,34 +199,45 @@ int read(ByteBuffer dst, long position) throws IOException {
194
199
}
195
200
196
201
private void fillBuffer (long position ) throws IOException {
197
- for (int i = 0 ; i < 10 ; i ++) {
198
- boolean interrupted = (i != 0 ) ? Thread .interrupted () : false ;
199
- try {
200
- this .buffer .clear ();
201
- this .bufferSize = this .fileChannel .read (this .buffer , position );
202
- this .bufferPosition = position ;
203
- return ;
204
- }
205
- catch (ClosedByInterruptException ex ) {
202
+ if (Thread .currentThread ().isInterrupted ()) {
203
+ fillBufferUsingRandomAccessFile (position );
204
+ return ;
205
+ }
206
+ try {
207
+ if (this .fileChannelInterrupted ) {
206
208
repairFileChannel ();
209
+ this .fileChannelInterrupted = false ;
207
210
}
208
- finally {
209
- if (interrupted ) {
210
- Thread .currentThread ().interrupt ();
211
- }
212
- }
211
+ this .buffer .clear ();
212
+ this .bufferSize = this .fileChannel .read (this .buffer , position );
213
+ this .bufferPosition = position ;
214
+ }
215
+ catch (ClosedByInterruptException ex ) {
216
+ this .fileChannelInterrupted = true ;
217
+ fillBufferUsingRandomAccessFile (position );
213
218
}
214
- throw new ClosedByInterruptException ();
215
219
}
216
220
217
- private void repairFileChannel () throws IOException {
218
- if (tracker != null ) {
219
- tracker .closedFileChannel (this .path , this .fileChannel );
221
+ private void fillBufferUsingRandomAccessFile (long position ) throws IOException {
222
+ if (this .randomAccessFile == null ) {
223
+ this .randomAccessFile = new RandomAccessFile (this .path .toFile (), "r" );
224
+ tracker .openedFileChannel (this .path );
220
225
}
221
- this .fileChannel = FileChannel .open (this .path , StandardOpenOption .READ );
222
- if (tracker != null ) {
223
- tracker .openedFileChannel (this .path , this .fileChannel );
226
+ byte [] bytes = new byte [BUFFER_SIZE ];
227
+ this .randomAccessFile .seek (position );
228
+ int len = this .randomAccessFile .read (bytes );
229
+ this .buffer .clear ();
230
+ if (len > 0 ) {
231
+ this .buffer .put (bytes , 0 , len );
224
232
}
233
+ this .bufferSize = len ;
234
+ this .bufferPosition = position ;
235
+ }
236
+
237
+ private void repairFileChannel () throws IOException {
238
+ tracker .closedFileChannel (this .path );
239
+ this .fileChannel = FileChannel .open (this .path , StandardOpenOption .READ );
240
+ tracker .openedFileChannel (this .path );
225
241
}
226
242
227
243
void open () throws IOException {
@@ -230,9 +246,7 @@ void open() throws IOException {
230
246
debug .log ("Opening '%s'" , this .path );
231
247
this .fileChannel = FileChannel .open (this .path , StandardOpenOption .READ );
232
248
this .buffer = ByteBuffer .allocateDirect (BUFFER_SIZE );
233
- if (tracker != null ) {
234
- tracker .openedFileChannel (this .path , this .fileChannel );
235
- }
249
+ tracker .openedFileChannel (this .path );
236
250
}
237
251
this .referenceCount ++;
238
252
debug .log ("Reference count for '%s' incremented to %s" , this .path , this .referenceCount );
@@ -251,18 +265,21 @@ void close() throws IOException {
251
265
this .bufferPosition = -1 ;
252
266
this .bufferSize = 0 ;
253
267
this .fileChannel .close ();
254
- if (tracker != null ) {
255
- tracker .closedFileChannel (this .path , this .fileChannel );
256
- }
268
+ tracker .closedFileChannel (this .path );
257
269
this .fileChannel = null ;
270
+ if (this .randomAccessFile != null ) {
271
+ this .randomAccessFile .close ();
272
+ tracker .closedFileChannel (this .path );
273
+ this .randomAccessFile = null ;
274
+ }
258
275
}
259
276
debug .log ("Reference count for '%s' decremented to %s" , this .path , this .referenceCount );
260
277
}
261
278
}
262
279
263
280
<E extends Exception > void ensureOpen (Supplier <E > exceptionSupplier ) throws E {
264
281
synchronized (this .lock ) {
265
- if (this .referenceCount == 0 || ! this . fileChannel . isOpen () ) {
282
+ if (this .referenceCount == 0 ) {
266
283
throw exceptionSupplier .get ();
267
284
}
268
285
}
@@ -280,9 +297,21 @@ public String toString() {
280
297
*/
281
298
interface Tracker {
282
299
283
- void openedFileChannel (Path path , FileChannel fileChannel );
300
+ Tracker NONE = new Tracker () {
301
+
302
+ @ Override
303
+ public void openedFileChannel (Path path ) {
304
+ }
305
+
306
+ @ Override
307
+ public void closedFileChannel (Path path ) {
308
+ }
309
+
310
+ };
311
+
312
+ void openedFileChannel (Path path );
284
313
285
- void closedFileChannel (Path path , FileChannel fileChannel );
314
+ void closedFileChannel (Path path );
286
315
287
316
}
288
317
0 commit comments