@@ -650,13 +650,14 @@ private RangeMissingHandler writerWithOffset(RangeMissingHandler writer, int wri
650
650
// no need to allocate a new capturing lambda if the offset isn't adjusted
651
651
return writer ;
652
652
}
653
- return (channel , channelPos , streamFactory , relativePos , len , progressUpdater ) -> writer .fillCacheRange (
653
+ return (channel , channelPos , streamFactory , relativePos , len , progressUpdater , completionListener ) -> writer .fillCacheRange (
654
654
channel ,
655
655
channelPos ,
656
656
streamFactory ,
657
657
relativePos - writeOffset ,
658
658
len ,
659
- progressUpdater
659
+ progressUpdater ,
660
+ completionListener
660
661
);
661
662
}
662
663
@@ -991,16 +992,17 @@ void populateAndRead(
991
992
executor .execute (fillGapRunnable (gap , writer , null , refs .acquireListener ()));
992
993
}
993
994
} else {
994
- final List <AbstractRunnable > gapFillingTasks = gaps .stream ()
995
- .map (gap -> fillGapRunnable (gap , writer , streamFactory , refs .acquireListener ()))
996
- .toList ();
997
- executor .execute (() -> {
998
- try (streamFactory ) {
995
+ var gapFillingListener = refs .acquireListener ();
996
+ try (var gfRefs = new RefCountingRunnable (ActionRunnable .run (gapFillingListener , streamFactory ::close ))) {
997
+ final List <Runnable > gapFillingTasks = gaps .stream ()
998
+ .map (gap -> fillGapRunnable (gap , writer , streamFactory , gfRefs .acquireListener ()))
999
+ .toList ();
1000
+ executor .execute (() -> {
999
1001
// Fill the gaps in order. If a gap fails to fill for whatever reason, the task for filling the next
1000
1002
// gap will still be executed.
1001
1003
gapFillingTasks .forEach (Runnable ::run );
1002
- }
1003
- });
1004
+ });
1005
+ }
1004
1006
}
1005
1007
}
1006
1008
}
@@ -1009,13 +1011,13 @@ void populateAndRead(
1009
1011
}
1010
1012
}
1011
1013
1012
- private AbstractRunnable fillGapRunnable (
1014
+ private Runnable fillGapRunnable (
1013
1015
SparseFileTracker .Gap gap ,
1014
1016
RangeMissingHandler writer ,
1015
1017
@ Nullable SourceInputStreamFactory streamFactory ,
1016
1018
ActionListener <Void > listener
1017
1019
) {
1018
- return ActionRunnable . run ( listener . delegateResponse (( l , e ) -> failGapAndListener ( gap , l , e )), () -> {
1020
+ return ( ) -> ActionListener . run ( listener , l -> {
1019
1021
var ioRef = io ;
1020
1022
assert regionOwners .get (ioRef ) == CacheFileRegion .this ;
1021
1023
assert CacheFileRegion .this .hasReferences () : CacheFileRegion .this ;
@@ -1026,10 +1028,15 @@ private AbstractRunnable fillGapRunnable(
1026
1028
streamFactory ,
1027
1029
start ,
1028
1030
Math .toIntExact (gap .end () - start ),
1029
- progress -> gap .onProgress (start + progress )
1031
+ progress -> gap .onProgress (start + progress ),
1032
+ l .<Void >map (unused -> {
1033
+ assert regionOwners .get (ioRef ) == CacheFileRegion .this ;
1034
+ assert CacheFileRegion .this .hasReferences () : CacheFileRegion .this ;
1035
+ writeCount .increment ();
1036
+ gap .onCompletion ();
1037
+ return null ;
1038
+ }).delegateResponse ((delegate , e ) -> failGapAndListener (gap , delegate , e ))
1030
1039
);
1031
- writeCount .increment ();
1032
- gap .onCompletion ();
1033
1040
});
1034
1041
}
1035
1042
@@ -1117,12 +1124,23 @@ public void fillCacheRange(
1117
1124
SourceInputStreamFactory streamFactory ,
1118
1125
int relativePos ,
1119
1126
int length ,
1120
- IntConsumer progressUpdater
1127
+ IntConsumer progressUpdater ,
1128
+ ActionListener <Void > completionListener
1121
1129
) throws IOException {
1122
- writer .fillCacheRange (channel , channelPos , streamFactory , relativePos , length , progressUpdater );
1123
- var elapsedTime = TimeUnit .NANOSECONDS .toMillis (relativeTimeInNanosSupplier .getAsLong () - startTime );
1124
- SharedBlobCacheService .this .blobCacheMetrics .getCacheMissLoadTimes ().record (elapsedTime );
1125
- SharedBlobCacheService .this .blobCacheMetrics .getCacheMissCounter ().increment ();
1130
+ writer .fillCacheRange (
1131
+ channel ,
1132
+ channelPos ,
1133
+ streamFactory ,
1134
+ relativePos ,
1135
+ length ,
1136
+ progressUpdater ,
1137
+ completionListener .map (unused -> {
1138
+ var elapsedTime = TimeUnit .NANOSECONDS .toMillis (relativeTimeInNanosSupplier .getAsLong () - startTime );
1139
+ blobCacheMetrics .getCacheMissLoadTimes ().record (elapsedTime );
1140
+ blobCacheMetrics .getCacheMissCounter ().increment ();
1141
+ return null ;
1142
+ })
1143
+ );
1126
1144
}
1127
1145
};
1128
1146
if (rangeToRead .isEmpty ()) {
@@ -1215,9 +1233,18 @@ public void fillCacheRange(
1215
1233
SourceInputStreamFactory streamFactory ,
1216
1234
int relativePos ,
1217
1235
int len ,
1218
- IntConsumer progressUpdater
1236
+ IntConsumer progressUpdater ,
1237
+ ActionListener <Void > completionListener
1219
1238
) throws IOException {
1220
- delegate .fillCacheRange (channel , channelPos , streamFactory , relativePos - writeOffset , len , progressUpdater );
1239
+ delegate .fillCacheRange (
1240
+ channel ,
1241
+ channelPos ,
1242
+ streamFactory ,
1243
+ relativePos - writeOffset ,
1244
+ len ,
1245
+ progressUpdater ,
1246
+ completionListener
1247
+ );
1221
1248
}
1222
1249
};
1223
1250
}
@@ -1230,14 +1257,25 @@ public void fillCacheRange(
1230
1257
SourceInputStreamFactory streamFactory ,
1231
1258
int relativePos ,
1232
1259
int len ,
1233
- IntConsumer progressUpdater
1260
+ IntConsumer progressUpdater ,
1261
+ ActionListener <Void > completionListener
1234
1262
) throws IOException {
1235
1263
assert assertValidRegionAndLength (fileRegion , channelPos , len );
1236
- delegate .fillCacheRange (channel , channelPos , streamFactory , relativePos , len , progressUpdater );
1237
- assert regionOwners .get (fileRegion .io ) == fileRegion
1238
- : "File chunk [" + fileRegion .regionKey + "] no longer owns IO [" + fileRegion .io + "]" ;
1264
+ delegate .fillCacheRange (
1265
+ channel ,
1266
+ channelPos ,
1267
+ streamFactory ,
1268
+ relativePos ,
1269
+ len ,
1270
+ progressUpdater ,
1271
+ Assertions .ENABLED ? ActionListener .runBefore (completionListener , () -> {
1272
+ assert regionOwners .get (fileRegion .io ) == fileRegion
1273
+ : "File chunk [" + fileRegion .regionKey + "] no longer owns IO [" + fileRegion .io + "]" ;
1274
+ }) : completionListener
1275
+ );
1239
1276
}
1240
1277
};
1278
+
1241
1279
}
1242
1280
return adjustedWriter ;
1243
1281
}
@@ -1324,14 +1362,16 @@ default SourceInputStreamFactory sharedInputStreamFactory(List<SparseFileTracker
1324
1362
* @param length of data to fetch
1325
1363
* @param progressUpdater consumer to invoke with the number of copied bytes as they are written in cache.
1326
1364
* This is used to notify waiting readers that data become available in cache.
1365
+ * @param completionListener listener that has to be called when the callback method completes
1327
1366
*/
1328
1367
void fillCacheRange (
1329
1368
SharedBytes .IO channel ,
1330
1369
int channelPos ,
1331
1370
@ Nullable SourceInputStreamFactory streamFactory ,
1332
1371
int relativePos ,
1333
1372
int length ,
1334
- IntConsumer progressUpdater
1373
+ IntConsumer progressUpdater ,
1374
+ ActionListener <Void > completionListener
1335
1375
) throws IOException ;
1336
1376
}
1337
1377
@@ -1343,9 +1383,9 @@ public interface SourceInputStreamFactory extends Releasable {
1343
1383
/**
1344
1384
* Create the input stream at the specified position.
1345
1385
* @param relativePos the relative position in the remote storage to read from.
1346
- * @return the input stream ready to be read from.
1386
+ * @param listener listener for the input stream ready to be read from.
1347
1387
*/
1348
- InputStream create (int relativePos ) throws IOException ;
1388
+ void create (int relativePos , ActionListener < InputStream > listener ) throws IOException ;
1349
1389
}
1350
1390
1351
1391
private abstract static class DelegatingRangeMissingHandler implements RangeMissingHandler {
@@ -1367,9 +1407,10 @@ public void fillCacheRange(
1367
1407
SourceInputStreamFactory streamFactory ,
1368
1408
int relativePos ,
1369
1409
int length ,
1370
- IntConsumer progressUpdater
1410
+ IntConsumer progressUpdater ,
1411
+ ActionListener <Void > completionListener
1371
1412
) throws IOException {
1372
- delegate .fillCacheRange (channel , channelPos , streamFactory , relativePos , length , progressUpdater );
1413
+ delegate .fillCacheRange (channel , channelPos , streamFactory , relativePos , length , progressUpdater , completionListener );
1373
1414
}
1374
1415
}
1375
1416
0 commit comments