Skip to content

Commit 3587895

Browse files
authored
improve segment cache mount/unmount resilience if interrupted (#18833)
1 parent 22faad7 commit 3587895

File tree

2 files changed

+128
-8
lines changed

2 files changed

+128
-8
lines changed

server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,7 @@ public void mount(StorageLocation mountLocation) throws SegmentLoadingException
10031003
);
10041004
atomicMoveAndDeleteCacheEntryDirectory(storageDir);
10051005
} else {
1006-
needsLoad = false;
1006+
needsLoad = referenceProvider != null;
10071007
}
10081008
}
10091009
if (needsLoad) {
@@ -1069,14 +1069,17 @@ public void unmount()
10691069
try {
10701070
synchronized (this) {
10711071
if (referenceProvider != null) {
1072-
referenceProvider.close();
1072+
ReferenceCountedSegmentProvider provider = referenceProvider;
10731073
referenceProvider = null;
1074+
provider.close();
10741075
}
10751076
if (!config.isDeleteOnRemove()) {
10761077
return;
10771078
}
10781079
if (storageDir != null) {
1079-
atomicMoveAndDeleteCacheEntryDirectory(storageDir);
1080+
if (storageDir.exists()) {
1081+
atomicMoveAndDeleteCacheEntryDirectory(storageDir);
1082+
}
10801083
storageDir = null;
10811084
location = null;
10821085
}

server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java

Lines changed: 122 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ public void testAcquireSegmentOnDemand() throws IOException
319319
makeSegmentsToLoad(segmentCount, localStorageFolder, interval, segmentsToWeakLoad);
320320

321321
for (boolean sleepy : new boolean[]{true, false}) {
322-
testWeakLoad(iterations, segmentCount, concurrentReads, false, sleepy, false);
322+
testWeakLoad(iterations, segmentCount, concurrentReads, false, sleepy, false, false);
323323
}
324324
}
325325

@@ -337,7 +337,7 @@ public void testAcquireSegmentOnDemandRandomSegment() throws IOException
337337
makeSegmentsToLoad(segmentCount, localStorageFolder, interval, segmentsToWeakLoad);
338338

339339
for (boolean sleepy : new boolean[]{true, false}) {
340-
testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy, true);
340+
testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy, true, false);
341341
}
342342
}
343343

@@ -355,7 +355,7 @@ public void testAcquireSegmentOnDemandRandomSegmentHighHitRate() throws IOExcept
355355
makeSegmentsToLoad(segmentCount, localStorageFolder, interval, segmentsToWeakLoad);
356356

357357
for (boolean sleepy : new boolean[]{true, false}) {
358-
testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy, true);
358+
testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy, true, false);
359359
}
360360
}
361361

@@ -373,7 +373,7 @@ public void testAcquireSegmentOnDemandRandomSegmentNoEvictions() throws IOExcept
373373

374374
for (boolean sleepy : new boolean[]{true, false}) {
375375
// use different segments for each run, otherwise the 2nd run is all cache hits
376-
testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy, true);
376+
testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy, true, true);
377377
}
378378
}
379379

@@ -576,7 +576,8 @@ private void testWeakLoad(
576576
int concurrentReads,
577577
boolean random,
578578
boolean sleepy,
579-
boolean expectHits
579+
boolean expectHits,
580+
boolean expectNoFailures
580581
)
581582
{
582583
int totalSuccess = 0;
@@ -634,6 +635,10 @@ private void testWeakLoad(
634635
Assertions.assertTrue(totalFailures <= location.getWeakStats().getRejectCount() + location2.getWeakStats()
635636
.getRejectCount());
636637

638+
if (expectNoFailures) {
639+
Assertions.assertEquals(0, totalFailures);
640+
Assertions.assertEquals(iterations, totalSuccess);
641+
}
637642
if (expectHits) {
638643
Assertions.assertTrue(location.getWeakStats().getHitCount() >= 0);
639644
Assertions.assertTrue(location2.getWeakStats().getHitCount() >= 0);
@@ -708,7 +713,82 @@ private BatchResult testWeakBatch(int iteration, List<DataSegment> currentBatch,
708713
return new BatchResult(exceptions, success, rows);
709714
}
710715

716+
@Test
717+
public void testAcquireSegmentOnDemandRandomSegmentWithInterrupt() throws IOException, InterruptedException
718+
{
719+
final int segmentCount = 8;
720+
final int iterations = 2000;
721+
final int concurrentReads = 10;
722+
final File localStorageFolder = new File(tempDir, "local_storage_folder");
723+
724+
final Interval interval = Intervals.of("2019-01-01/P1D");
725+
726+
makeSegmentsToLoad(segmentCount, localStorageFolder, interval, segmentsToWeakLoad);
727+
728+
final List<DataSegment> currentBatch = new ArrayList<>();
729+
for (int i = 0; i < iterations; i++) {
730+
currentBatch.add(segmentsToWeakLoad.get(ThreadLocalRandom.current().nextInt(segmentCount)));
731+
// process batches of 10 requests at a time
732+
if (currentBatch.size() == concurrentReads) {
733+
final List<InterruptedLoad> weakLoads = currentBatch
734+
.stream()
735+
.map(segment -> new InterruptedLoad(virtualStorageManager, segment))
736+
.collect(Collectors.toList());
737+
final List<Future<Integer>> futures = new ArrayList<>();
738+
for (InterruptedLoad weakLoad : weakLoads) {
739+
futures.add(executorService.submit(weakLoad));
740+
}
741+
for (Future<Integer> future : futures) {
742+
try {
743+
future.get(20L, TimeUnit.MILLISECONDS);
744+
}
745+
catch (Throwable t) {
746+
}
747+
}
748+
while (true) {
749+
boolean allDone = true;
750+
for (Future<?> f : futures) {
751+
allDone = allDone && f.isDone();
752+
}
753+
if (allDone) {
754+
break;
755+
}
756+
Thread.sleep(5);
757+
}
758+
Assertions.assertEquals(0, location.getActiveWeakHolds());
759+
Assertions.assertEquals(0, location2.getActiveWeakHolds());
760+
currentBatch.clear();
761+
}
762+
}
763+
764+
Assertions.assertTrue(location.getWeakStats().getHitCount() >= 0);
765+
Assertions.assertTrue(location2.getWeakStats().getHitCount() >= 0);
766+
767+
// now ensure that we can successfully do stuff after all those interrupts
768+
int totalSuccess = 0;
769+
int totalFailures = 0;
770+
for (int i = 0; i < iterations; i++) {
771+
int segment = ThreadLocalRandom.current().nextInt(segmentCount);
772+
currentBatch.add(segmentsToWeakLoad.get(segment));
773+
// process batches of 10 requests at a time
774+
if (currentBatch.size() == concurrentReads) {
711775

776+
BatchResult result = testWeakBatch(i, currentBatch, false);
777+
totalSuccess += result.success;
778+
totalFailures += result.exceptions.size();
779+
currentBatch.clear();
780+
}
781+
}
782+
Assertions.assertEquals(iterations, totalSuccess);
783+
Assertions.assertEquals(0, totalFailures);
784+
Assertions.assertEquals(0, location.getActiveWeakHolds());
785+
Assertions.assertEquals(0, location2.getActiveWeakHolds());
786+
Assertions.assertTrue(4 >= location.getWeakEntryCount());
787+
Assertions.assertTrue(4 >= location2.getWeakEntryCount());
788+
// 5 because __drop path
789+
Assertions.assertTrue(5 >= location.getPath().listFiles().length);
790+
Assertions.assertTrue(5 >= location2.getPath().listFiles().length);
791+
}
712792

713793
private void assertNoLooseEnds()
714794
{
@@ -925,4 +1005,41 @@ public Integer call()
9251005
}
9261006
}
9271007
}
1008+
1009+
private static class InterruptedLoad implements Callable<Integer>
1010+
{
1011+
private final SegmentLocalCacheManager segmentManager;
1012+
private final DataSegment segment;
1013+
1014+
private InterruptedLoad(
1015+
SegmentLocalCacheManager segmentManager,
1016+
DataSegment segment
1017+
)
1018+
{
1019+
this.segmentManager = segmentManager;
1020+
this.segment = segment;
1021+
}
1022+
1023+
@Override
1024+
public Integer call() throws SegmentLoadingException
1025+
{
1026+
final Closer closer = Closer.create();
1027+
final AcquireSegmentAction action = closer.register(
1028+
segmentManager.acquireSegment(segment)
1029+
);
1030+
try {
1031+
final Future<AcquireSegmentResult> result = action.getSegmentFuture();
1032+
Thread.sleep(ThreadLocalRandom.current().nextInt(50));
1033+
result.cancel(true);
1034+
Thread.currentThread().interrupt();
1035+
}
1036+
catch (Throwable t) {
1037+
throw new RuntimeException(t);
1038+
}
1039+
finally {
1040+
CloseableUtils.closeAndWrapExceptions(closer);
1041+
}
1042+
return null;
1043+
}
1044+
}
9281045
}

0 commit comments

Comments
 (0)