Skip to content

Commit f3ecfc7

Browse files
authored
Allow large partition numbers in VersionedIntervalTimeline. (#18777)
* Allow large partition numbers in VersionedIntervalTimeline. The experimental segment locking feature (forceTimeChunkLock: false) reserves the first 32768 partition numbers for the "root generation", and then uses an "atomic update groups" scheme to replace root segment ranges with new sets of segments during reindexing operations. OvershadowableManager, which manages this atomic update scheme, imposes a limit of 32768 segments per time chunk. Previously, this applied even to people that are not using segment locking. In this patch, the class is now only used when segment locking is actually in play, meaning that the limit is not imposed under normal conditions. * Use the correct annotation. * Add embedded test, and config property to support it. * Do HighPartitionNumberTest with a different approach. * Changes from review. * Align behavior with tests. * Remove extraneous newline. * Fix comment.
1 parent 60ad692 commit f3ecfc7

File tree

7 files changed

+541
-47
lines changed

7 files changed

+541
-47
lines changed
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.testing.embedded.msq;
21+
22+
import com.google.common.collect.Iterables;
23+
import org.apache.druid.indexing.overlord.Segments;
24+
import org.apache.druid.java.util.common.StringUtils;
25+
import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
26+
import org.apache.druid.msq.util.MultiStageQueryContext;
27+
import org.apache.druid.sql.calcite.planner.Calcites;
28+
import org.apache.druid.testing.embedded.EmbeddedBroker;
29+
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
30+
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
31+
import org.apache.druid.testing.embedded.EmbeddedHistorical;
32+
import org.apache.druid.testing.embedded.EmbeddedIndexer;
33+
import org.apache.druid.testing.embedded.EmbeddedOverlord;
34+
import org.apache.druid.testing.embedded.indexing.MoreResources;
35+
import org.apache.druid.testing.embedded.indexing.Resources;
36+
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
37+
import org.apache.druid.timeline.DataSegment;
38+
import org.apache.druid.timeline.partition.NumberedShardSpec;
39+
import org.apache.druid.timeline.partition.PartitionIds;
40+
import org.junit.jupiter.api.Assertions;
41+
import org.junit.jupiter.api.BeforeAll;
42+
import org.junit.jupiter.api.Test;
43+
44+
import java.util.Map;
45+
import java.util.Set;
46+
47+
/**
48+
* Test to verify that high partition numbers (above the limit of {@link PartitionIds#ROOT_GEN_END_PARTITION_ID})
49+
* work correctly when segment locking is not in play.
50+
*/
51+
public class HighPartitionNumberTest extends EmbeddedClusterTestBase
52+
{
53+
/**
54+
* Expected number of rows for three copies of {@link Resources.DataFile#tinyWiki1Json()}.
55+
*/
56+
private static final int EXPECTED_TOTAL_ROWS = 9;
57+
58+
private final EmbeddedBroker broker = new EmbeddedBroker();
59+
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
60+
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
61+
private final EmbeddedIndexer indexer = new EmbeddedIndexer()
62+
.setServerMemory(300_000_000L)
63+
.addProperty("druid.worker.capacity", "2");
64+
65+
private EmbeddedMSQApis msqApis;
66+
67+
@Override
68+
protected EmbeddedDruidCluster createCluster()
69+
{
70+
return EmbeddedDruidCluster
71+
.withEmbeddedDerbyAndZookeeper()
72+
.useLatchableEmitter()
73+
.useDefaultTimeoutForLatchableEmitter(180_000)
74+
.addServer(overlord)
75+
.addServer(coordinator)
76+
.addServer(indexer)
77+
.addServer(broker)
78+
.addServer(new EmbeddedHistorical());
79+
}
80+
81+
@BeforeAll
82+
public void initTestClient()
83+
{
84+
msqApis = new EmbeddedMSQApis(cluster, overlord);
85+
}
86+
87+
@Test
88+
public void testHighPartitionNumbers()
89+
{
90+
insertFirstSegment();
91+
insertSecondSegment();
92+
insertLastSegments();
93+
94+
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
95+
96+
// Verify that we have segments with partition numbers above the old limit
97+
final int maxPartitionNum = Integer.parseInt(cluster.runSql(
98+
"SELECT MAX(partition_num) FROM sys.segments WHERE datasource=%s",
99+
Calcites.escapeStringLiteral(dataSource)
100+
).trim());
101+
102+
Assertions.assertEquals(32769 /* larger than Short.MAX_VALUE */, maxPartitionNum);
103+
104+
// Verify that all data is queryable
105+
cluster.callApi().verifySqlQuery(
106+
"SELECT COUNT(*) FROM %s",
107+
dataSource,
108+
String.valueOf(EXPECTED_TOTAL_ROWS)
109+
);
110+
}
111+
112+
/**
113+
* Inserts {@link Resources.DataFile#tinyWiki1Json()} with partition number zero, using SQL.
114+
*/
115+
private void insertFirstSegment()
116+
{
117+
// Insert tinyWiki1Json in 1 segment (it's a 3 line file).
118+
String queryLocal = StringUtils.format(
119+
MoreResources.MSQ.INSERT_TINY_WIKI_JSON,
120+
dataSource,
121+
Resources.DataFile.tinyWiki1Json()
122+
);
123+
124+
Map<String, Object> context = Map.of(
125+
MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
126+
ClusterStatisticsMergeMode.PARALLEL
127+
);
128+
129+
cluster.callApi().waitForTaskToSucceed(msqApis.submitTaskSql(context, queryLocal).getTaskId(), overlord);
130+
}
131+
132+
/**
133+
* Reinserts the segment from {@link #insertFirstSegment()} directly into metadata storage,
134+
* with partition number 32766.
135+
*/
136+
private void insertSecondSegment()
137+
{
138+
// Get the segment and reinsert it with a higher partition number.
139+
final DataSegment firstSegment =
140+
Iterables.getOnlyElement(
141+
overlord.bindings()
142+
.segmentsMetadataStorage()
143+
.retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
144+
);
145+
146+
overlord.bindings().segmentsMetadataStorage().commitSegments(
147+
Set.of(
148+
firstSegment.withShardSpec(
149+
new NumberedShardSpec(
150+
Short.MAX_VALUE - 1,
151+
firstSegment.getShardSpec().getNumCorePartitions()
152+
)
153+
)
154+
),
155+
null
156+
);
157+
}
158+
159+
/**
160+
* Inserts {@link Resources.DataFile#tinyWiki1Json()} with SQL into three segments, starting at
161+
* {@link Short#MAX_VALUE} and going to {@link Short#MAX_VALUE} + 2.
162+
*/
163+
private void insertLastSegments()
164+
{
165+
// Insert tinyWiki1Json in 3 segment (it's a 3 line file).
166+
String queryLocal = StringUtils.format(
167+
MoreResources.MSQ.INSERT_TINY_WIKI_JSON,
168+
dataSource,
169+
Resources.DataFile.tinyWiki1Json()
170+
);
171+
172+
Map<String, Object> context = Map.of(
173+
MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
174+
ClusterStatisticsMergeMode.PARALLEL,
175+
MultiStageQueryContext.CTX_ROWS_PER_SEGMENT,
176+
1
177+
);
178+
179+
cluster.callApi().waitForTaskToSucceed(msqApis.submitTaskSql(context, queryLocal).getTaskId(), overlord);
180+
}
181+
}

processing/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import it.unimi.dsi.fastutil.shorts.ShortComparators;
3838
import it.unimi.dsi.fastutil.shorts.ShortSortedSet;
3939
import it.unimi.dsi.fastutil.shorts.ShortSortedSets;
40+
import org.apache.druid.error.DruidException;
4041
import org.apache.druid.java.util.common.IAE;
4142
import org.apache.druid.java.util.common.ISE;
4243
import org.apache.druid.timeline.Overshadowable;
@@ -63,9 +64,12 @@
6364
* In {@link org.apache.druid.timeline.VersionedIntervalTimeline}, this class is used to manage segments in the same
6465
* timeChunk.
6566
*
67+
* OvershadowableManager is only used when segment locking is in play, and segments with minor version != 0 have
68+
* been created. See {@link PartitionHolder#add(PartitionChunk)}.
69+
*
6670
* This class is not thread-safe.
6771
*/
68-
class OvershadowableManager<T extends Overshadowable<T>>
72+
public class OvershadowableManager<T extends Overshadowable<T>> implements PartitionHolderContents<T>
6973
{
7074
/**
7175
* There are 3 states for atomicUpdateGroups.
@@ -97,18 +101,31 @@ enum State
97101
private final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> visibleGroupPerRange;
98102
private final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> overshadowedGroups;
99103

100-
OvershadowableManager()
104+
public OvershadowableManager()
101105
{
102106
this.knownPartitionChunks = new HashMap<>();
103107
this.standbyGroups = new TreeMap<>();
104108
this.visibleGroupPerRange = new TreeMap<>();
105109
this.overshadowedGroups = new TreeMap<>();
106110
}
107111

108-
public static <T extends Overshadowable<T>> OvershadowableManager<T> copyVisible(OvershadowableManager<T> original)
112+
public static <T extends Overshadowable<T>> OvershadowableManager<T> fromSimple(
113+
final SimplePartitionHolderContents<T> contents
114+
)
115+
{
116+
final OvershadowableManager<T> retVal = new OvershadowableManager<>();
117+
final Iterator<PartitionChunk<T>> iter = contents.visibleChunksIterator();
118+
while (iter.hasNext()) {
119+
retVal.addChunk(iter.next());
120+
}
121+
return retVal;
122+
}
123+
124+
@Override
125+
public OvershadowableManager<T> copyVisible()
109126
{
110127
final OvershadowableManager<T> copy = new OvershadowableManager<>();
111-
original.visibleGroupPerRange.forEach((partitionRange, versionToGroups) -> {
128+
visibleGroupPerRange.forEach((partitionRange, versionToGroups) -> {
112129
// There should be only one group per partition range
113130
final AtomicUpdateGroup<T> group = versionToGroups.values().iterator().next();
114131
group.getChunks().forEach(chunk -> copy.knownPartitionChunks.put(chunk.getChunkNumber(), chunk));
@@ -121,10 +138,11 @@ public static <T extends Overshadowable<T>> OvershadowableManager<T> copyVisible
121138
return copy;
122139
}
123140

124-
public static <T extends Overshadowable<T>> OvershadowableManager<T> deepCopy(OvershadowableManager<T> original)
141+
@Override
142+
public OvershadowableManager<T> deepCopy()
125143
{
126-
final OvershadowableManager<T> copy = copyVisible(original);
127-
original.overshadowedGroups.forEach((partitionRange, versionToGroups) -> {
144+
final OvershadowableManager<T> copy = copyVisible();
145+
overshadowedGroups.forEach((partitionRange, versionToGroups) -> {
128146
// There should be only one group per partition range
129147
final AtomicUpdateGroup<T> group = versionToGroups.values().iterator().next();
130148
group.getChunks().forEach(chunk -> copy.knownPartitionChunks.put(chunk.getChunkNumber(), chunk));
@@ -134,7 +152,7 @@ public static <T extends Overshadowable<T>> OvershadowableManager<T> deepCopy(Ov
134152
new SingleEntryShort2ObjectSortedMap<>(group.getMinorVersion(), AtomicUpdateGroup.copy(group))
135153
);
136154
});
137-
original.standbyGroups.forEach((partitionRange, versionToGroups) -> {
155+
standbyGroups.forEach((partitionRange, versionToGroups) -> {
138156
// There should be only one group per partition range
139157
final AtomicUpdateGroup<T> group = versionToGroups.values().iterator().next();
140158
group.getChunks().forEach(chunk -> copy.knownPartitionChunks.put(chunk.getChunkNumber(), chunk));
@@ -635,13 +653,23 @@ private void addAtomicUpdateGroupWithState(AtomicUpdateGroup<T> aug, State state
635653
}
636654
}
637655

638-
boolean addChunk(PartitionChunk<T> chunk)
656+
@Override
657+
public boolean addChunk(PartitionChunk<T> chunk)
639658
{
659+
// Chunks with minor version zero need to have restrained partition numbers with OvershadowableManager.
660+
if (chunk.getObject().getMinorVersion() == 0 && chunk.getChunkNumber() >= PartitionIds.ROOT_GEN_END_PARTITION_ID) {
661+
throw new ISE(
662+
"PartitionId[%d] must be in the range [0, 32767] when using segment locking. "
663+
+ "Try compacting the interval to reduce the segment count, or use time chunk locking.",
664+
chunk.getChunkNumber()
665+
);
666+
}
667+
640668
// Sanity check. ExistingChunk should be usually null.
641669
final PartitionChunk<T> existingChunk = knownPartitionChunks.put(chunk.getChunkNumber(), chunk);
642670
if (existingChunk != null) {
643671
if (!existingChunk.equals(chunk)) {
644-
throw new ISE(
672+
throw DruidException.defensive(
645673
"existingChunk[%s] is different from newChunk[%s] for partitionId[%d]",
646674
existingChunk,
647675
chunk,
@@ -894,8 +922,9 @@ private void removeFrom(AtomicUpdateGroup<T> aug, State state)
894922
}
895923
}
896924

925+
@Override
897926
@Nullable
898-
PartitionChunk<T> removeChunk(PartitionChunk<T> partitionChunk)
927+
public PartitionChunk<T> removeChunk(PartitionChunk<T> partitionChunk)
899928
{
900929
final PartitionChunk<T> knownChunk = knownPartitionChunks.get(partitionChunk.getChunkNumber());
901930
if (knownChunk == null) {
@@ -926,12 +955,14 @@ PartitionChunk<T> removeChunk(PartitionChunk<T> partitionChunk)
926955
return knownPartitionChunks.remove(partitionChunk.getChunkNumber());
927956
}
928957

958+
@Override
929959
public boolean isEmpty()
930960
{
931961
return visibleGroupPerRange.isEmpty();
932962
}
933963

934-
public boolean isComplete()
964+
@Override
965+
public boolean areVisibleChunksConsistent()
935966
{
936967
return Iterators.all(
937968
visibleGroupPerRange.values().iterator(),
@@ -945,7 +976,8 @@ public boolean isComplete()
945976
}
946977

947978
@Nullable
948-
PartitionChunk<T> getChunk(int partitionId)
979+
@Override
980+
public PartitionChunk<T> getChunk(int partitionId)
949981
{
950982
final PartitionChunk<T> chunk = knownPartitionChunks.get(partitionId);
951983
if (chunk == null) {
@@ -964,7 +996,8 @@ PartitionChunk<T> getChunk(int partitionId)
964996
}
965997
}
966998

967-
Iterator<PartitionChunk<T>> visibleChunksIterator()
999+
@Override
1000+
public Iterator<PartitionChunk<T>> visibleChunksIterator()
9681001
{
9691002
final FluentIterable<Short2ObjectSortedMap<AtomicUpdateGroup<T>>> versionToGroupIterable = FluentIterable.from(
9701003
visibleGroupPerRange.values()
@@ -978,7 +1011,8 @@ Iterator<PartitionChunk<T>> visibleChunksIterator()
9781011
}).iterator();
9791012
}
9801013

981-
List<PartitionChunk<T>> getOvershadowedChunks()
1014+
@Override
1015+
public List<PartitionChunk<T>> getOvershadowedChunks()
9821016
{
9831017
return getAllChunks(overshadowedGroups);
9841018
}

0 commit comments

Comments
 (0)