Skip to content

Commit df9a4cf

Browse files
顾鹏顾鹏
authored andcommitted
Add a property to disable prefetch read
1 parent df03c63 commit df9a4cf

File tree

3 files changed

+84
-6
lines changed

3 files changed

+84
-6
lines changed

core/client/fs/src/main/java/alluxio/client/block/stream/BlockInStream.java

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public enum BlockInStreamSource {
7979

8080
private boolean mClosed = false;
8181
private boolean mEOF = false;
82+
private boolean mEnablePrefetchRead = true;
8283

8384
/**
8485
* Creates a {@link BlockInStream}.
@@ -120,6 +121,7 @@ public static BlockInStream create(FileSystemContext context, BlockInfo info,
120121
alluxioConf.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_PREFERRED);
121122
boolean sourceSupportsDomainSocket = NettyUtils.isDomainSocketSupported(dataSource);
122123
boolean sourceIsLocal = dataSourceType == BlockInStreamSource.NODE_LOCAL;
124+
boolean enablePrefetchRead = alluxioConf.getBoolean(PropertyKey.USER_PREFETCH_READ_ENABLED);
123125

124126
// Short circuit is enabled when
125127
// 1. data source is local node
@@ -141,9 +143,9 @@ public static BlockInStream create(FileSystemContext context, BlockInfo info,
141143
// gRPC
142144
LOG.debug("Creating gRPC input stream for block {} @ {} from client {} reading through {} ("
143145
+ "data locates in the local worker {}, shortCircuitEnabled {}, "
144-
+ "shortCircuitPreferred {}, sourceSupportDomainSocket {})",
146+
+ "shortCircuitPreferred {}, sourceSupportDomainSocket {}, enablePrefetchRead {})",
145147
blockId, dataSource, NetworkAddressUtils.getClientHostName(alluxioConf), dataSource,
146-
sourceIsLocal, shortCircuit, shortCircuitPreferred, sourceSupportsDomainSocket);
148+
sourceIsLocal, shortCircuit, shortCircuitPreferred, sourceSupportsDomainSocket, enablePrefetchRead);
147149
return createGrpcBlockInStream(context, dataSource, dataSourceType, blockId,
148150
blockSize, options);
149151
}
@@ -208,6 +210,8 @@ private static BlockInStream createGrpcBlockInStream(FileSystemContext context,
208210
AlluxioConfiguration conf = context.getClusterConf();
209211
long chunkSize = conf.getBytes(
210212
PropertyKey.USER_STREAMING_READER_CHUNK_SIZE_BYTES);
213+
boolean enablePrefetchRead = conf.getBoolean(
214+
PropertyKey.USER_PREFETCH_READ_ENABLED);
211215
// Construct the partial read request
212216
ReadRequest.Builder builder = ReadRequest.newBuilder()
213217
.setBlockId(blockId)
@@ -224,7 +228,7 @@ private static BlockInStream createGrpcBlockInStream(FileSystemContext context,
224228
} else {
225229
factory = new GrpcDataReader.Factory(context, address, builder);
226230
}
227-
return new BlockInStream(factory, address, blockSource, blockId, blockSize);
231+
return new BlockInStream(factory, address, blockSource, blockId, blockSize, enablePrefetchRead);
228232
}
229233

230234
/**
@@ -246,11 +250,13 @@ public static BlockInStream createRemoteBlockInStream(FileSystemContext context,
246250
AlluxioConfiguration conf = context.getClusterConf();
247251
long chunkSize = conf.getBytes(
248252
PropertyKey.USER_STREAMING_READER_CHUNK_SIZE_BYTES);
253+
boolean enablePrefetchRead = conf.getBoolean(
254+
PropertyKey.USER_PREFETCH_READ_ENABLED);
249255
ReadRequest readRequest = ReadRequest.newBuilder().setBlockId(blockId)
250256
.setOpenUfsBlockOptions(ufsOptions).setChunkSize(chunkSize).buildPartial();
251257
DataReader.Factory factory = new GrpcDataReader.Factory(context, address,
252258
readRequest.toBuilder());
253-
return new BlockInStream(factory, address, blockSource, blockId, blockSize);
259+
return new BlockInStream(factory, address, blockSource, blockId, blockSize, enablePrefetchRead);
254260
}
255261

256262
/**
@@ -272,6 +278,23 @@ protected BlockInStream(DataReader.Factory dataReaderFactory,
272278
mLength = length;
273279
}
274280

281+
/**
282+
* Creates an instance of {@link BlockInStream}.
283+
*
284+
* @param dataReaderFactory the data reader factory
285+
* @param address the address of the gRPC data server
286+
* @param blockSource the source location of the block
287+
* @param id the ID (either block ID or UFS file ID)
288+
* @param length the length
289+
* @param enablePrefetchRead whether to enable prefetch
290+
*/
291+
@VisibleForTesting
292+
protected BlockInStream(DataReader.Factory dataReaderFactory,
293+
WorkerNetAddress address, BlockInStreamSource blockSource, long id, long length, boolean enablePrefetchRead) {
294+
this(dataReaderFactory, address, blockSource, id, length);
295+
mEnablePrefetchRead = enablePrefetchRead;
296+
}
297+
275298
@Override
276299
public long getPos() {
277300
return mPos;
@@ -318,7 +341,11 @@ public int read(ByteBuffer byteBuffer, int off, int len) throws IOException {
318341
if (mPos == mLength) {
319342
return -1;
320343
}
321-
readChunk();
344+
if (mEnablePrefetchRead) {
345+
readChunk();
346+
} else {
347+
readLengthChunk(len);
348+
}
322349
if (mCurrentChunk == null) {
323350
mEOF = true;
324351
}
@@ -498,6 +525,21 @@ private void readChunk() throws IOException {
498525
}
499526
}
500527

528+
/**
529+
* Reads a new length from the channel into current chunk.
530+
*/
531+
private void readLengthChunk(int length) throws IOException {
532+
if (mDataReader == null) {
533+
mDataReader = mDataReaderFactory.create(mPos, length);
534+
} else if (mCurrentChunk != null && mCurrentChunk.readableBytes() == 0) {
535+
closeDataReader();
536+
mDataReader = mDataReaderFactory.create(mPos, length);
537+
}
538+
if (mCurrentChunk == null) {
539+
mCurrentChunk = mDataReader.readChunk();
540+
}
541+
}
542+
501543
/**
502544
* Close the current data reader.
503545
*/

core/client/fs/src/test/java/alluxio/client/block/stream/BlockInStreamTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,34 @@ public void closeReaderAfterReadingAllData() throws Exception {
118118
assertTrue(reader.isClosed());
119119
}
120120

121+
@Test
122+
public void disablePrefetchReadTest() throws Exception {
123+
int chunkSize = 512;
124+
TestDataReader.Factory factory = new TestDataReader.Factory(
125+
chunkSize, BufferUtils.getIncreasingByteArray(2 * chunkSize));
126+
BlockInStream stream = new BlockInStream(factory, new WorkerNetAddress(),
127+
BlockInStream.BlockInStreamSource.REMOTE, -1, 1024, false);
128+
byte[] res = new byte[chunkSize];
129+
int read;
130+
read = stream.read(res);
131+
TestDataReader reader = factory.getDataReader();
132+
assertEquals(chunkSize, read);
133+
assertEquals(stream.mCurrentChunk.getLength(), res.length);
134+
assertNotNull(reader);
135+
assertFalse(reader.isClosed());
136+
137+
read = stream.read(res, 0, chunkSize);
138+
assertEquals(chunkSize, read);
139+
assertTrue(reader.isClosed());
140+
141+
read = stream.read(res, 0, chunkSize);
142+
assertEquals(-1, read);
143+
assertTrue(reader.isClosed());
144+
145+
stream.close();
146+
assertTrue(reader.isClosed());
147+
}
148+
121149
@Test
122150
public void createShortCircuit() throws Exception {
123151
WorkerNetAddress dataSource = new WorkerNetAddress();

core/common/src/main/java/alluxio/conf/PropertyKey.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7090,7 +7090,14 @@ public String toString() {
70907090
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
70917091
.setScope(Scope.CLIENT)
70927092
.build();
7093-
7093+
public static final PropertyKey USER_PREFETCH_READ_ENABLED =
7094+
booleanBuilder(Name.USER_PREFETCH_READ_ENABLED)
7095+
.setDefaultValue(true)
7096+
.setDescription("Whether to enable the data prefetch, when remote read, "
7097+
+ "prevents the worker from sending the entire block of data to the client.")
7098+
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
7099+
.setScope(Scope.CLIENT)
7100+
.build();
70947101
//
70957102
// FUSE integration related properties
70967103
//
@@ -9288,6 +9295,7 @@ public static final class Name {
92889295
"alluxio.user.short.circuit.preferred";
92899296
public static final String USER_WORKER_LIST_REFRESH_INTERVAL =
92909297
"alluxio.user.worker.list.refresh.interval";
9298+
public static final String USER_PREFETCH_READ_ENABLED = "alluxio.user.prefetch.read.enabled";
92919299

92929300
//
92939301
// FUSE integration related properties

0 commit comments

Comments
 (0)