Skip to content

Commit 22990f2

Browse files
authored
fix leaky segment reference on segment drop (#18782)
1 parent f3ecfc7 commit 22990f2

File tree

4 files changed

+45
-4
lines changed

4 files changed

+45
-4
lines changed

processing/src/test/java/org/apache/druid/segment/TestSegmentUtils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,12 @@ public List<OrderBy> getOrdering()
126126
{
127127
return Cursors.ascendingTimeOrder();
128128
}
129+
130+
@Override
131+
public int getNumRows()
132+
{
133+
return 1234;
134+
}
129135
};
130136

131137
public static class SegmentForTesting extends QueryableIndexSegment
@@ -178,6 +184,8 @@ public <T> T as(@Nonnull Class<T> clazz)
178184
return (T) INDEX;
179185
} else if (clazz.equals(CursorFactory.class)) {
180186
return (T) new QueryableIndexCursorFactory(INDEX);
187+
} else if (clazz.equals(PhysicalSegmentInspector.class)) {
188+
return (T) new QueryableIndexPhysicalSegmentInspector(INDEX);
181189
}
182190
return null;
183191
}

server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,25 @@ boolean isSegmentCached(final DataSegment segment)
648648
return false;
649649
}
650650

651+
/**
652+
* Testing use only please, any callers that want to do stuff with segments should use
653+
* {@link #acquireCachedSegment(DataSegment)} or {@link #acquireSegment(DataSegment)} instead. Does not hold locks
654+
* and so is not really safe to use while the cache manager is active
655+
*/
656+
@VisibleForTesting
657+
@Nullable
658+
public ReferenceCountedSegmentProvider getSegmentReferenceProvider(DataSegment segment)
659+
{
660+
final SegmentCacheEntry cacheEntry = new SegmentCacheEntry(segment);
661+
for (StorageLocation location : locations) {
662+
final SegmentCacheEntry entry = location.getCacheEntry(cacheEntry.id);
663+
if (entry != null) {
664+
return entry.referenceProvider;
665+
}
666+
}
667+
return null;
668+
}
669+
651670
/**
652671
* Returns the effective segment info directory based on the configuration settings.
653672
* The directory is selected based on the following configurations injected into this class:

server/src/main/java/org/apache/druid/server/SegmentManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,11 +380,11 @@ public void dropSegment(final DataSegment dataSegment)
380380
try (final Closer closer = Closer.create()) {
381381
final Optional<Segment> oldSegment = cacheManager.acquireCachedSegment(oldSegmentRef);
382382
long numberOfRows = oldSegment.map(segment -> {
383+
closer.register(segment);
383384
final PhysicalSegmentInspector countInspector = segment.as(PhysicalSegmentInspector.class);
384385
if (countInspector != null) {
385386
return countInspector.getNumRows();
386387
}
387-
CloseableUtils.closeAndWrapExceptions(segment);
388388
return 0;
389389
}).orElse(0);
390390

server/src/test/java/org/apache/druid/server/SegmentManagerTest.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.druid.query.expression.TestExprMacroTable;
3636
import org.apache.druid.segment.IndexIO;
3737
import org.apache.druid.segment.IndexSpec;
38+
import org.apache.druid.segment.ReferenceCountedSegmentProvider;
3839
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
3940
import org.apache.druid.segment.SegmentMapFunction;
4041
import org.apache.druid.segment.TestHelper;
@@ -45,7 +46,6 @@
4546
import org.apache.druid.segment.loading.LeastBytesUsedStorageLocationSelectorStrategy;
4647
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
4748
import org.apache.druid.segment.loading.LocalLoadSpec;
48-
import org.apache.druid.segment.loading.SegmentCacheManager;
4949
import org.apache.druid.segment.loading.SegmentLoaderConfig;
5050
import org.apache.druid.segment.loading.SegmentLoadingException;
5151
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
@@ -90,7 +90,9 @@ public class SegmentManagerTest extends InitializedNullHandlingTest
9090
);
9191

9292
private ExecutorService executor;
93+
private SegmentLocalCacheManager cacheManager;
9394
private SegmentManager segmentManager;
95+
private SegmentLocalCacheManager virtualCacheManager;
9496
private SegmentManager virtualSegmentManager;
9597

9698
@Rule
@@ -160,7 +162,7 @@ public boolean isVirtualStorage()
160162
);
161163

162164
final List<StorageLocation> storageLocations = loaderConfig.toStorageLocations();
163-
final SegmentLocalCacheManager cacheManager = new SegmentLocalCacheManager(
165+
cacheManager = new SegmentLocalCacheManager(
164166
storageLocations,
165167
loaderConfig,
166168
new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations),
@@ -170,7 +172,7 @@ public boolean isVirtualStorage()
170172
segmentManager = new SegmentManager(cacheManager);
171173

172174
final List<StorageLocation> virtualStorageLocations = virtualLoaderConfig.toStorageLocations();
173-
final SegmentCacheManager virtualCacheManager = new SegmentLocalCacheManager(
175+
virtualCacheManager = new SegmentLocalCacheManager(
174176
virtualStorageLocations,
175177
virtualLoaderConfig,
176178
new LeastBytesUsedStorageLocationSelectorStrategy(virtualStorageLocations),
@@ -238,8 +240,12 @@ public void testLoadBootstrapSegment() throws ExecutionException, InterruptedExc
238240
@Test
239241
public void testDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException, IOException
240242
{
243+
List<ReferenceCountedSegmentProvider> referenceProviders = new ArrayList<>();
241244
for (DataSegment eachSegment : SEGMENTS) {
242245
segmentManager.loadSegment(eachSegment);
246+
ReferenceCountedSegmentProvider refProvider = cacheManager.getSegmentReferenceProvider(eachSegment);
247+
referenceProviders.add(refProvider);
248+
Assert.assertFalse(refProvider.isClosed());
243249
}
244250

245251
final List<Future<Void>> futures = ImmutableList.of(SEGMENTS.get(0), SEGMENTS.get(2)).stream()
@@ -260,6 +266,14 @@ public void testDropSegment() throws SegmentLoadingException, ExecutionException
260266
assertResult(
261267
ImmutableList.of(SEGMENTS.get(1), SEGMENTS.get(3), SEGMENTS.get(4))
262268
);
269+
for (int i = 0; i < SEGMENTS.size(); i++) {
270+
Assert.assertEquals(0, referenceProviders.get(i).getNumReferences());
271+
if (i == 0 || i == 2) {
272+
Assert.assertTrue(referenceProviders.get(i).isClosed());
273+
} else {
274+
Assert.assertFalse(referenceProviders.get(i).isClosed());
275+
}
276+
}
263277
}
264278

265279
private Void loadSegmentOrFail(DataSegment segment)

0 commit comments

Comments
 (0)