-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Allow large partition numbers in VersionedIntervalTimeline. #18777
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The experimental segment locking feature (forceTimeChunkLock: false) reserves the first 32768 partition numbers for the "root generation", and then uses an "atomic update groups" scheme to replace root segment ranges with new sets of segments during reindexing operations. OvershadowableManager, which manages this atomic update scheme, imposes a limit of 32768 segments per time chunk. Previously, this applied even to people that are not using segment locking. In this patch, the class is now only used when segment locking is actually in play, meaning that the limit is not imposed under normal conditions.
kfaraz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks neat! It is nice to have a clear distinction between when the minor versions are in use and when they are not.
Left a couple of suggestions. Let me know what you think.
| boolean added = overshadowableManager.addChunk(chunk); | ||
| if (chunk.getObject().getMinorVersion() != 0 && contents instanceof SimplePartitionHolderContents) { | ||
| // Swap simple map for an OvershadowableManager when minor versions are encountered. | ||
| contents = OvershadowableManager.fromSimple((SimplePartitionHolderContents<T>) contents); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this swap will bring about changes in the segment-holding capacity of the interval, should we add a log line here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it'll get logged too much, since timelines are created all the time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I meant log something only when we change from simple to minor-version enabled. Would that be verbose too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, since timelines are created and populated as part of various operations. It would get logged semi-randomly and perhaps too frequently.
| * This class is not thread-safe. | ||
| */ | ||
| class OvershadowableManager<T extends Overshadowable<T>> | ||
| class OvershadowableManager<T extends Overshadowable<T>> implements PartitionHolderContents<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename this to indicate that it is used only when segment locking is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, rather than rename I added a javadoc.
| * {@link PartitionHolder#isComplete()}. When segment locking is not being used, all chunks | ||
| * are consistent, so this always returns true. | ||
| */ | ||
| boolean visibleChunksAreConsistent(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
| boolean visibleChunksAreConsistent(); | |
| boolean areVisibleChunksConsistent(); |
| * @see SimplePartitionHolderContents implementation when segment locking is not in play | ||
| * @see OvershadowableManager implementation when segment locking is in play | ||
| */ | ||
| public interface PartitionHolderContents<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super Nit: "Holder" + "Contents" sound like they should cancel each other out. 😛
Should we call this class PartitionChunks<T> or PartitionChunkGroup<T> since it is basically just a group of chunks?
The implementations could be called SimplePartitionChunkGroup and PartitionChunkGroupWithMinorVersion or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Group" unfortunately means something different in OvershadowableManager (an atomic update group).
| private final TreeMap<PartitionChunk<T>, PartitionChunk<T>> holderMap = new TreeMap<>(); | ||
|
|
||
| /** | ||
| * Map of {@link PartitionChunk#getChunkNumber()} to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * Map of {@link PartitionChunk#getChunkNumber()} to | |
| * Map from {@link PartitionChunk#getChunkNumber()} to the actual {@link PartitionChunk}. |
| } | ||
|
|
||
| public static <T extends Overshadowable<T>> OvershadowableManager<T> copyVisible(OvershadowableManager<T> original) | ||
| public static <T extends Overshadowable<T>> OvershadowableManager<T> fromSimple( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this method fail if the passed simple holder already has more than 32k chunks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just added a check to addChunk, so now it will.
embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/HighPartitionNumberTest.java
Fixed
Show fixed
Hide fixed
| final int maxPartitionNum = Integer.parseInt(cluster.runSql( | ||
| "SELECT MAX(partition_num) FROM sys.segments WHERE datasource=%s", | ||
| Calcites.escapeStringLiteral(dataSource) | ||
| ).trim()); |
Check notice
Code scanning / CodeQL
Missing catch of NumberFormatException Note test
| * individually create each segment. Controls the initial partition number to be used for segment allocation. | ||
| */ | ||
| @JsonProperty | ||
| private int initialAllocationPartitionNumber = PartitionIds.ROOT_GEN_START_PARTITION_ID; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this config is to be used only for tests, I would advise a slightly different strategy which wouldn't require touching all the production classes.
- Use the
ClusterTestingModuleextension in the embedded test and enable it on the Overlord viadruid.unsafe.cluster.testing=true. - Create a new class
FaultyIndexerSQLMetadataStorageCoordinator extends IndexerSQLMetadataStorageCoordinatorand override allocation methods. - In
ClusterTestingModule, dobinder.bind(IndexerSQLMetadataStorageCoordinator.class).to(FaultyIndexer...); - Pass a test config into the faulty implementation to control the
startPartitionId.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just pushed a different approach:
- First insert the test data into a single segment using SQL, with partition 0.
- Direct insert a new segment into metadata storage that is the same as that first one, but with a partition number of Short.MAX_VALUE
- Run SQL to insert 3 more segments.
- Verify all are loaded and queryable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this new parameter I had added.
|
@kfaraz just updated this— TY for review. |
| Set.of( | ||
| firstSegment.withShardSpec( | ||
| new NumberedShardSpec( | ||
| Short.MAX_VALUE - 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice trick!
| insertSecondSegment(); | ||
| insertLastSegments(); | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: extra newline?
The experimental segment locking feature (
forceTimeChunkLock: false) reserves the first 32768 partition numbers for the "root generation", and then uses an "atomic update groups" scheme to replace root segment ranges with new sets of segments during reindexing operations.OvershadowableManager, which manages this atomic update scheme, imposes a limit of 32768 segments per time chunk. This applies even to people that are not using segment locking. In this patch, the class is now only used when segment locking is actually in play, meaning that the limit is not imposed under normal conditions.