diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index 76cbe2106110..d052fd654898 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -1119,6 +1119,38 @@ public void testLockAllocateDayReplaceMonthAllocateAppend() verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11, segmentV12); } + @Test + public void test_concurrentAppend_toIntervalWithUnusedSegments() + { + // Allocate and commit an APPEND segment + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + Assert.assertEquals(0, pendingSegment.getShardSpec().getPartitionNum()); + + final DataSegment segmentV01 = asSegment(pendingSegment); + appendTask.commitAppendSegments(segmentV01); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01); + + // Mark it as unused + getStorageCoordinator().markAllSegmentsAsUnused(appendTask.getDataSource()); + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23); + + // Allocate and commit another APPEND segment + final SegmentIdWithShardSpec pendingSegment2 + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + Assert.assertEquals(SEGMENT_V0, pendingSegment2.getVersion()); + Assert.assertEquals(1, pendingSegment2.getShardSpec().getPartitionNum()); + + final DataSegment segmentV02 = asSegment(pendingSegment2); + appendTask.commitAppendSegments(segmentV02); + Assert.assertNotEquals(segmentV01, segmentV02); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV02); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV02); + } @Nullable private DataSegment findSegmentWith(String version, Map loadSpec, Set segments) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 5f6f817313eb..dade16686490 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1419,6 +1419,7 @@ private PendingSegmentRecord createNewPendingSegment( version, partialShardSpec.complete(jsonMapper, newPartitionId, 0) ); + pendingSegmentId = getTrueAllocatedId(transaction, pendingSegmentId); return PendingSegmentRecord.create( pendingSegmentId, request.getSequenceName(), @@ -1555,12 +1556,13 @@ private SegmentIdWithShardSpec createNewPendingSegment( ? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID : PartitionIds.ROOT_GEN_START_PARTITION_ID; String version = newSegmentVersion == null ? existingVersion : newSegmentVersion; - return new SegmentIdWithShardSpec( + SegmentIdWithShardSpec allocatedId = new SegmentIdWithShardSpec( dataSource, interval, version, partialShardSpec.complete(jsonMapper, newPartitionId, 0) ); + return getTrueAllocatedId(transaction, allocatedId); } else if (!overallMaxId.getInterval().equals(interval)) { log.warn( "Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].",