Skip to content

Commit acd3b29

Browse files
authored
Fix concurrent append to interval with only unused segments (#18216)
Bug: Concurrent append uses lock of type APPEND which always uses a lock version of epoch 1970-01-01. This can cause data loss in a flow as follows: - Ingest data using an APPEND task to an empty interval - Mark all the segments as unused - Re-run the APPEND task - Data is not visible since old segment IDs (now unused) are allocated again Fix: In segment allocation, do not reuse an old segment ID, used or unused. This fix was already done for some cases back in #16380 . An embedded test for this has been included in #18207
1 parent f2a95fa commit acd3b29

File tree

2 files changed

+35
-1
lines changed

2 files changed

+35
-1
lines changed

indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,6 +1119,38 @@ public void testLockAllocateDayReplaceMonthAllocateAppend()
11191119
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11, segmentV12);
11201120
}
11211121

1122+
@Test
1123+
public void test_concurrentAppend_toIntervalWithUnusedSegments()
1124+
{
1125+
// Allocate and commit an APPEND segment
1126+
final SegmentIdWithShardSpec pendingSegment
1127+
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
1128+
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
1129+
Assert.assertEquals(0, pendingSegment.getShardSpec().getPartitionNum());
1130+
1131+
final DataSegment segmentV01 = asSegment(pendingSegment);
1132+
appendTask.commitAppendSegments(segmentV01);
1133+
1134+
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
1135+
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
1136+
1137+
// Mark it as unused
1138+
getStorageCoordinator().markAllSegmentsAsUnused(appendTask.getDataSource());
1139+
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23);
1140+
1141+
// Allocate and commit another APPEND segment
1142+
final SegmentIdWithShardSpec pendingSegment2
1143+
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
1144+
Assert.assertEquals(SEGMENT_V0, pendingSegment2.getVersion());
1145+
Assert.assertEquals(1, pendingSegment2.getShardSpec().getPartitionNum());
1146+
1147+
final DataSegment segmentV02 = asSegment(pendingSegment2);
1148+
appendTask.commitAppendSegments(segmentV02);
1149+
Assert.assertNotEquals(segmentV01, segmentV02);
1150+
1151+
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV02);
1152+
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV02);
1153+
}
11221154

11231155
@Nullable
11241156
private DataSegment findSegmentWith(String version, Map<String, Object> loadSpec, Set<DataSegment> segments)

server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1419,6 +1419,7 @@ private PendingSegmentRecord createNewPendingSegment(
14191419
version,
14201420
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
14211421
);
1422+
pendingSegmentId = getTrueAllocatedId(transaction, pendingSegmentId);
14221423
return PendingSegmentRecord.create(
14231424
pendingSegmentId,
14241425
request.getSequenceName(),
@@ -1555,12 +1556,13 @@ private SegmentIdWithShardSpec createNewPendingSegment(
15551556
? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
15561557
: PartitionIds.ROOT_GEN_START_PARTITION_ID;
15571558
String version = newSegmentVersion == null ? existingVersion : newSegmentVersion;
1558-
return new SegmentIdWithShardSpec(
1559+
SegmentIdWithShardSpec allocatedId = new SegmentIdWithShardSpec(
15591560
dataSource,
15601561
interval,
15611562
version,
15621563
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
15631564
);
1565+
return getTrueAllocatedId(transaction, allocatedId);
15641566
} else if (!overallMaxId.getInterval().equals(interval)) {
15651567
log.warn(
15661568
"Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].",

0 commit comments

Comments
 (0)