Skip to content

Commit ae252d9

Browse files
capistrantkfaraz
andauthored
Fix concurrent append to interval with unused segments (#18230) (#18275)
This is a better approach to the fix in #18216 Changes: - When allocating the first segment in an interval which already contains an unused segment, use a fresh version rather than reusing the old version (now unused) Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
1 parent 2e2574c commit ae252d9

File tree

6 files changed

+357
-14
lines changed

6 files changed

+357
-14
lines changed

indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.google.common.collect.ImmutableList;
2424
import com.google.common.collect.Iterables;
2525
import com.google.common.collect.Sets;
26+
import org.apache.druid.error.DruidExceptionMatcher;
27+
import org.apache.druid.error.ExceptionMatcher;
2628
import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
2729
import org.apache.druid.indexing.common.TaskLock;
2830
import org.apache.druid.indexing.common.TaskStorageDirTracker;
@@ -63,6 +65,7 @@
6365
import org.apache.druid.timeline.DataSegment;
6466
import org.apache.druid.timeline.SegmentId;
6567
import org.apache.druid.timeline.partition.NumberedShardSpec;
68+
import org.hamcrest.MatcherAssert;
6669
import org.joda.time.Interval;
6770
import org.joda.time.Period;
6871
import org.junit.After;
@@ -1120,7 +1123,7 @@ public void testLockAllocateDayReplaceMonthAllocateAppend()
11201123
}
11211124

11221125
@Test
1123-
public void test_concurrentAppend_toIntervalWithUnusedSegments()
1126+
public void test_concurrentAppend_toIntervalWithUnusedAppendSegment_createsFreshVersion()
11241127
{
11251128
// Allocate and commit an APPEND segment
11261129
final SegmentIdWithShardSpec pendingSegment
@@ -1141,8 +1144,10 @@ public void test_concurrentAppend_toIntervalWithUnusedSegments()
11411144
// Allocate and commit another APPEND segment
11421145
final SegmentIdWithShardSpec pendingSegment2
11431146
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
1144-
Assert.assertEquals(SEGMENT_V0, pendingSegment2.getVersion());
1145-
Assert.assertEquals(1, pendingSegment2.getShardSpec().getPartitionNum());
1147+
1148+
// Verify that the new segment gets a different version
1149+
Assert.assertEquals(SEGMENT_V0 + "S", pendingSegment2.getVersion());
1150+
Assert.assertEquals(0, pendingSegment2.getShardSpec().getPartitionNum());
11461151

11471152
final DataSegment segmentV02 = asSegment(pendingSegment2);
11481153
appendTask.commitAppendSegments(segmentV02);
@@ -1152,6 +1157,54 @@ public void test_concurrentAppend_toIntervalWithUnusedSegments()
11521157
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV02);
11531158
}
11541159

1160+
@Test
1161+
public void test_allocateCommitDelete_createsFreshVersion_uptoMaxAllowedRetries()
1162+
{
1163+
final int maxAllowedAppends = 10;
1164+
final int expectedParitionNum = 0;
1165+
String expectedVersion = SEGMENT_V0;
1166+
1167+
// Allocate, commit, delete, repeat
1168+
for (int i = 0; i < maxAllowedAppends; ++i, expectedVersion += "S") {
1169+
// Allocate a segment and verify its version and partition number
1170+
final SegmentIdWithShardSpec pendingSegment
1171+
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
1172+
1173+
Assert.assertEquals(expectedVersion, pendingSegment.getVersion());
1174+
Assert.assertEquals(expectedParitionNum, pendingSegment.getShardSpec().getPartitionNum());
1175+
1176+
// Commit the segment and verify its version and partition number
1177+
final DataSegment segment = asSegment(pendingSegment);
1178+
appendTask.commitAppendSegments(segment);
1179+
1180+
Assert.assertEquals(expectedVersion, segment.getVersion());
1181+
Assert.assertEquals(expectedParitionNum, segment.getShardSpec().getPartitionNum());
1182+
1183+
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segment);
1184+
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segment);
1185+
1186+
// Mark the segment as unused
1187+
getStorageCoordinator().markAllSegmentsAsUnused(appendTask.getDataSource());
1188+
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23);
1189+
}
1190+
1191+
// Verify that the next attempt fails
1192+
MatcherAssert.assertThat(
1193+
Assert.assertThrows(
1194+
ISE.class,
1195+
() -> appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY)
1196+
),
1197+
ExceptionMatcher.of(ISE.class).expectRootCause(
1198+
DruidExceptionMatcher.internalServerError().expectMessageIs(
1199+
"Could not allocate segment"
1200+
+ "[wiki_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_1970-01-01T00:00:00.000Z]"
1201+
+ " as there are too many clashing unused versions(upto [1970-01-01T00:00:00.000ZSSSSSSSSSS])"
1202+
+ " in the interval. Kill the old unused versions to proceed."
1203+
)
1204+
)
1205+
);
1206+
}
1207+
11551208
@Nullable
11561209
private DataSegment findSegmentWith(String version, Map<String, Object> loadSpec, Set<DataSegment> segments)
11571210
{

processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.druid.error;
2121

22+
import com.google.common.base.Throwables;
2223
import org.apache.druid.matchers.DruidMatchers;
2324
import org.hamcrest.Description;
2425
import org.hamcrest.DiagnosingMatcher;
@@ -75,6 +76,12 @@ public ExceptionMatcher expectCause(Matcher<Throwable> causeMatcher)
7576
return this;
7677
}
7778

79+
public ExceptionMatcher expectRootCause(Matcher<Throwable> causeMatcher)
80+
{
81+
matcherList.add(0, DruidMatchers.fn("rootCause", Throwables::getRootCause, causeMatcher));
82+
return this;
83+
}
84+
7885
@Override
7986
protected boolean matches(Object item, Description mismatchDescription)
8087
{

server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java

Lines changed: 90 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.inject.Inject;
3434
import org.apache.druid.common.utils.IdUtils;
3535
import org.apache.druid.error.DruidException;
36+
import org.apache.druid.error.InternalServerError;
3637
import org.apache.druid.error.InvalidInput;
3738
import org.apache.druid.indexing.overlord.DataSourceMetadata;
3839
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -1419,7 +1420,7 @@ private PendingSegmentRecord createNewPendingSegment(
14191420
version,
14201421
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
14211422
);
1422-
pendingSegmentId = getTrueAllocatedId(transaction, pendingSegmentId);
1423+
pendingSegmentId = getUniqueIdForPrimaryAllocation(transaction, pendingSegmentId);
14231424
return PendingSegmentRecord.create(
14241425
pendingSegmentId,
14251426
request.getSequenceName(),
@@ -1457,7 +1458,7 @@ private PendingSegmentRecord createNewPendingSegment(
14571458
)
14581459
);
14591460
return PendingSegmentRecord.create(
1460-
getTrueAllocatedId(transaction, pendingSegmentId),
1461+
getUniqueIdForSecondaryAllocation(transaction, pendingSegmentId),
14611462
request.getSequenceName(),
14621463
request.getPreviousSegmentId(),
14631464
null,
@@ -1562,7 +1563,7 @@ private SegmentIdWithShardSpec createNewPendingSegment(
15621563
version,
15631564
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
15641565
);
1565-
return getTrueAllocatedId(transaction, allocatedId);
1566+
return getUniqueIdForPrimaryAllocation(transaction, allocatedId);
15661567
} else if (!overallMaxId.getInterval().equals(interval)) {
15671568
log.warn(
15681569
"Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].",
@@ -1590,18 +1591,90 @@ private SegmentIdWithShardSpec createNewPendingSegment(
15901591
committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions()
15911592
)
15921593
);
1593-
return getTrueAllocatedId(transaction, allocatedId);
1594+
return getUniqueIdForSecondaryAllocation(transaction, allocatedId);
15941595
}
15951596
}
15961597

15971598
/**
1598-
* Verifies that the allocated id doesn't already exist in the druid_segments table.
1599-
* If yes, try to get the max unallocated id considering the unused segments for the datasource, version and interval
1600-
* Otherwise, use the same id.
1601-
* @param allocatedId The segment allcoted on the basis of used and pending segments
1602-
* @return a segment id that isn't already used by other unused segments
1599+
* Returns a unique {@link SegmentIdWithShardSpec} which does not clash with
1600+
* any existing unused segment. If an unused segment already exists that matches
1601+
* the interval and version of the given {@code allocatedId}, a fresh version
1602+
* is created by suffixing one or more {@link PendingSegmentRecord#CONCURRENT_APPEND_VERSION_SUFFIX}.
1603+
* Such a conflict can happen only if all the segments in this interval created
1604+
* by a prior APPEND task were marked as unused.
1605+
* <p>
1606+
* This method should be called only when allocating the first segment in an interval.
1607+
*/
1608+
private SegmentIdWithShardSpec getUniqueIdForPrimaryAllocation(
1609+
SegmentMetadataTransaction transaction,
1610+
SegmentIdWithShardSpec allocatedId
1611+
)
1612+
{
1613+
// Get all the unused segment versions for this datasource and interval
1614+
final Set<String> unusedSegmentVersions = transaction.noCacheSql().retrieveUnusedSegmentVersionsWithInterval(
1615+
allocatedId.getDataSource(),
1616+
allocatedId.getInterval()
1617+
);
1618+
1619+
final String allocatedVersion = allocatedId.getVersion();
1620+
if (!unusedSegmentVersions.contains(allocatedVersion)) {
1621+
// Nothing to do, this version is new
1622+
return allocatedId;
1623+
} else if (!PendingSegmentRecord.DEFAULT_VERSION_FOR_CONCURRENT_APPEND.equals(allocatedVersion)) {
1624+
// Version clash should never happen for non-APPEND locks
1625+
throw DruidException.defensive(
1626+
"Cannot allocate segment[%s] as there are already some unused segments"
1627+
+ " for version[%s] in this interval.",
1628+
allocatedId, allocatedVersion
1629+
);
1630+
}
1631+
1632+
// Iterate until a new non-clashing version is found
1633+
boolean foundFreshVersion = false;
1634+
StringBuilder candidateVersion = new StringBuilder(allocatedId.getVersion());
1635+
for (int i = 0; i < 10; ++i) {
1636+
if (unusedSegmentVersions.contains(candidateVersion.toString())) {
1637+
candidateVersion.append(PendingSegmentRecord.CONCURRENT_APPEND_VERSION_SUFFIX);
1638+
} else {
1639+
foundFreshVersion = true;
1640+
break;
1641+
}
1642+
}
1643+
1644+
if (foundFreshVersion) {
1645+
final SegmentIdWithShardSpec uniqueId = new SegmentIdWithShardSpec(
1646+
allocatedId.getDataSource(),
1647+
allocatedId.getInterval(),
1648+
candidateVersion.toString(),
1649+
allocatedId.getShardSpec()
1650+
);
1651+
log.info(
1652+
"Created new unique pending segment ID[%s] with version[%s] for originally allocated ID[%s].",
1653+
uniqueId, candidateVersion.toString(), allocatedId
1654+
);
1655+
1656+
return uniqueId;
1657+
} else {
1658+
throw InternalServerError.exception(
1659+
"Could not allocate segment[%s] as there are too many clashing unused"
1660+
+ " versions(upto [%s]) in the interval. Kill the old unused versions to proceed.",
1661+
allocatedId, candidateVersion.toString()
1662+
);
1663+
}
1664+
}
1665+
1666+
/**
1667+
* Returns a unique {@link SegmentIdWithShardSpec} which does not clash with
1668+
* any existing unused segment. If an unused segment already exists that matches
1669+
* the interval, version and partition number of the given {@code allocatedId},
1670+
* a higher partition number is used. Such a conflict can happen only if some
1671+
* segments of the underlying version have been marked as unused while others
1672+
* are still used.
1673+
* <p>
1674+
* This method should not be called when allocating the first segment in an
1675+
* interval.
16031676
*/
1604-
private SegmentIdWithShardSpec getTrueAllocatedId(
1677+
private SegmentIdWithShardSpec getUniqueIdForSecondaryAllocation(
16051678
SegmentMetadataTransaction transaction,
16061679
SegmentIdWithShardSpec allocatedId
16071680
)
@@ -1631,7 +1704,7 @@ private SegmentIdWithShardSpec getTrueAllocatedId(
16311704
allocatedId.getShardSpec().getPartitionNum(),
16321705
unusedMaxId.getPartitionNum() + 1
16331706
);
1634-
return new SegmentIdWithShardSpec(
1707+
final SegmentIdWithShardSpec uniqueId = new SegmentIdWithShardSpec(
16351708
allocatedId.getDataSource(),
16361709
allocatedId.getInterval(),
16371710
allocatedId.getVersion(),
@@ -1640,6 +1713,12 @@ private SegmentIdWithShardSpec getTrueAllocatedId(
16401713
allocatedId.getShardSpec().getNumCorePartitions()
16411714
)
16421715
);
1716+
log.info(
1717+
"Created new unique pending segment ID[%s] with partition number[%s] for originally allocated ID[%s].",
1718+
uniqueId, maxPartitionNum, allocatedId
1719+
);
1720+
1721+
return uniqueId;
16431722
}
16441723

16451724
@Override

server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,19 @@
5252
*/
5353
public class PendingSegmentRecord
5454
{
55+
/**
56+
* Default lock version used by concurrent APPEND tasks.
57+
*/
58+
public static final String DEFAULT_VERSION_FOR_CONCURRENT_APPEND = DateTimes.EPOCH.toString();
59+
60+
/**
61+
* Suffix to use to construct fresh segment versions in the event of a clash.
62+
* The chosen character {@code S} is just for visual ease so that two versions
63+
* are not easily confused for each other.
64+
* {@code 1970-01-01T00:00:00.000Z_1} vs {@code 1970-01-01T00:00:00.000ZS_1}.
65+
*/
66+
public static final String CONCURRENT_APPEND_VERSION_SUFFIX = "S";
67+
5568
private final SegmentIdWithShardSpec id;
5669
private final String sequenceName;
5770
private final String sequencePrevId;

server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,6 +1128,29 @@ public List<DataSegment> retrieveUnusedSegmentsWithExactInterval(
11281128
return segments.stream().filter(Objects::nonNull).collect(Collectors.toList());
11291129
}
11301130

1131+
/**
1132+
* Retrieves the versions of unused segments which are perfectly aligned with
1133+
* the given interval.
1134+
*/
1135+
public Set<String> retrieveUnusedSegmentVersionsWithInterval(String dataSource, Interval interval)
1136+
{
1137+
final String sql = StringUtils.format(
1138+
"SELECT DISTINCT(version) FROM %1$s"
1139+
+ " WHERE dataSource = :dataSource AND used = false"
1140+
+ " AND %2$send%2$s = :end AND start = :start",
1141+
dbTables.getSegmentsTable(),
1142+
connector.getQuoteString()
1143+
);
1144+
return Set.copyOf(
1145+
handle.createQuery(sql)
1146+
.bind("dataSource", dataSource)
1147+
.bind("start", interval.getStart().toString())
1148+
.bind("end", interval.getEnd().toString())
1149+
.mapTo(String.class)
1150+
.list()
1151+
);
1152+
}
1153+
11311154
/**
11321155
* Retrieve the used segment for a given id if it exists in the metadata store and null otherwise
11331156
*/

0 commit comments

Comments
 (0)