Implement a fingerprinting mechanism to track compaction states in a more efficient manner#18844
Conversation
… storage configurable
...ervice/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/timeline/CompactionState.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java
Outdated
Show resolved
Hide resolved
| @LifecycleStop | ||
| public void stop() | ||
| { | ||
| fingerprintCache.invalidateAll(); |
There was a problem hiding this comment.
does this cache object need any other lifecycle cleanup?
server/src/main/java/org/apache/druid/segment/metadata/IndexingStateStorage.java
Show resolved
Hide resolved
There was a problem hiding this comment.
what about if the operator has create tables disabled and does not properly create the table before upgrading?
server/src/main/java/org/apache/druid/segment/metadata/CompactionStateManager.java
Outdated
Show resolved
Hide resolved
kfaraz
left a comment
There was a problem hiding this comment.
Thanks for the feature, @capistrant !
I have started going through the PR, leaving a partial review here.
I am yet to go through several changes, such as the ones made in CompactionStatus, DatasourceCompactibleSegmentIterator, etc.
| * <p> | ||
| * Useful for simulations and unit tests where database persistence is not needed. | ||
| */ | ||
| public class HeapMemoryCompactionStateManager extends CompactionStateManager |
There was a problem hiding this comment.
Might be cleaner to let CompactionStateManager be an interface, and let both the heap-based and the concrete class implement it.
| * In-memory implementation of {@link CompactionStateManager} that stores | ||
| * compaction state fingerprints in heap memory without requiring a database. | ||
| * <p> | ||
| * Useful for simulations and unit tests where database persistence is not needed. |
There was a problem hiding this comment.
If this is used only in tests, we should probably put it in the test source root src/test/java.
There was a problem hiding this comment.
That is where I originally put it, but then I tried to use it in a simulation class which is in the app code, not test. Let me review this though, maybe I am mistaken on how it is all working with the simulations
There was a problem hiding this comment.
Oh, I see. Are you referring to CoordinatorSimulationBuilder or some other class?
There was a problem hiding this comment.
no CompactionRunSimulator, https://github.com/apache/druid/pull/18844/files#diff-b8a4fdf52e09ff26fa6f5610c021d196b9fa99673b83051de794ed07257be13b ... It creates CompactSegments instance, which as of now requires a CompactionStateManager. But I guess if we go the route of not supporting fingerprinting in the coordinator duty led compaction, this may not be a problem and it can be moved to the test space.
docs/configuration/index.md
Outdated
| |`druid.manager.compactionState.cacheSize`|The maximum number of compaction state fingerprints to cache in memory on the coordinator and overlord. Compaction state fingerprints are used to track the compaction configuration applied to segments. Consider increasing this value if you have a large number of datasources with compaction configurations.|`100`| | ||
| |`druid.manager.compactionState.prewarmSize`|The number of most recently used compaction state fingerprints to load into cache on Coordinator startup. This pre-warms the cache to improve performance immediately after startup.|`100`| |
There was a problem hiding this comment.
Both Coordinator and Overlord (with segment metadata caching enabled) already keep all used segments in memory, including the respective (interned) CompactionState objects as well.
I don't think the number of distinct CompactState objects that we keep in memory will increase after this patch.
Do we still need to worry about the cache size of these objects?
Does a cache miss trigger a fetch from metadata store?
| { | ||
|
|
||
| /** | ||
| * Lazy initialization holder for deterministic ObjectMapper. |
There was a problem hiding this comment.
I wonder if we shouldn't just inject this mapper annotated with @Sorted or @Deterministic as a lazy singleton. It may be injected into CompactionStateManager and fingerprints will always be created by that class rather than using a static utility method.
processing/src/main/java/org/apache/druid/timeline/DataSegment.java
Outdated
Show resolved
Hide resolved
| if (segmentIterator.hasNext()) { | ||
| // If we are going to create compaction jobs for this compaction state, we need to persist the fingerprint -> state | ||
| // mapping so compacted segments from these jobs can reference a valid compaction state. | ||
| params.getCompactionStateManager().persistCompactionState( |
There was a problem hiding this comment.
The templates should only perform lightweight (i.e. non-IO) read-only operations as createCompactionJobs may be called frequently.
We should not do any persistence here.
Instead, the params can hold some mapping where we can add this compaction state and perform persistence on-demand (perhaps in the CompactionJobQueue).
There was a problem hiding this comment.
thank you for the guidance. Will work on how to get this out of hot path
| } | ||
| } | ||
|
|
||
| private static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateFingerprintToSegments(String compactionStateFingerprint) |
There was a problem hiding this comment.
Let's re-use the static function from AbstractTask itself?
There was a problem hiding this comment.
sure! I didn't know if it was bad form to reach into that class from MSQ. But I like having just one impl
There was a problem hiding this comment.
I think it is fine to use AbstractTask in the MSQ code. Alternatively, you can put the method in IndexTaskUtils too.
| Tasks.DEFAULT_STORE_COMPACTION_STATE | ||
| ); | ||
|
|
||
| String compactionStateFingerprint = querySpec.getContext() |
There was a problem hiding this comment.
| String compactionStateFingerprint = querySpec.getContext() | |
| final String compactionStateFingerprint = querySpec.getContext() |
website/.spelling
Outdated
| pre-compute | ||
| pre-computed | ||
| pre-computing | ||
| pre-dates |
There was a problem hiding this comment.
predates need not be hyphenated.
There was a problem hiding this comment.
sometimes my inability to spell, compounded by my inability to google how to spell, is embarrassing. this is one of those times. will fix
| * </p> | ||
| */ | ||
| @ManageLifecycle | ||
| public class CompactionStateManager |
There was a problem hiding this comment.
I don't feel that pre-warming the cache is really necessary. The fingerprint needs to be retrieved only when running the CompactionJobQueue on Overlord or CompactSegments on Coordinator.
- Let's always keep all the compaction states in memory. We are already keeping all the used segments in memory. The distinct
CompactionStateobjects will be fairly small in number and size. - The states can be cached in
HeapMemorySegmentMetadataCachewhich already serves as a cache for used segments, pending segments and schemas. - If possible, let's support the fingerprint flow only with compaction supervisors and not the Coordinator-based
CompactSegmentsduty. That would simplify the new flow and be another motivation for users to migrate to using compaction supervisors.
There was a problem hiding this comment.
If possible, let's support the fingerprint flow only with compaction supervisors and not the Coordinator-based CompactSegments duty. That would simplify the new flow and be another motivation for users to migrate to using compaction supervisors.
would we want to deprecate CompactSegments compaction on the coordinator in this case? so we aren't forever supporting compaction without fingerprints + compaction with fingerprints?
There was a problem hiding this comment.
Yes, the plan was to deprecate CompactSegments once compaction supervisors took off. I don't fully recall if compaction supervisors is already marked GA or not. They would also have to be made the default, if we want to start deprecation of CompactSegments.
But I feel all of this should be out of scope for the current PR.
If supporting the fingerprint logic in CompactSegments is not additional work and does not complicate the flow, we can leave it as is.
My only concern is that there should be just one service that is responsible for persisting new fingerprints. I would prefer that to be the Overlord, so that it always has a consistent cache state. So we either just don't support fingerprints on the Coordinator or we handle persistence by calling an Overlord API.
(I am yet to go through the whole PR to identify all the call sites that may persist a compaction state. I have only found the one in CompactionConfigBasedJobTemplate so far.)
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java
Fixed
Show fixed
Hide fixed
Yes, it makes sense to track the pending state in a separate column. I was about to suggest the same. 🙂
I should think that the cleanup duty would be only a fallback mechanism for cleaning up pending compaction states. |
Hm, since a compaction state is associated with a fingerprint and not an individual task, I think having the task that fails be able to delete it is potentially dangerous. Assuming most datasources under compaction have multiple compaction candidates, multiple tasks could be associated with the same state and one erroneous failure that does a delete could lead to missing state for other compacted segments |
| Map<ByteBuffer, WireTransferable.Deserializer> wtDeserializers | ||
| ) | ||
| { | ||
| final ObjectMapper sortedMapper = new DefaultObjectMapper(); |
There was a problem hiding this comment.
is this cool? as in, like does it matter that this will this be missing all of the jackson modules that get registered with the normal jsonMapper?
also, it seems like we inject it places so that we can make a DefaultIndexingStateFingerprintMapper, should this just be an internal implementation detail of DefaultIndexingStateFingerprintMapper? I would imagine in the future we would want to just get the fingerprint mapper from like the supervisor? (if it is configurable per datasource) or some fingerpint factory or something (if system wide) instead of this special object mapper used for the default impl in the future once this is made more pluggable unless i'm missing something
There was a problem hiding this comment.
is this cool? as in, like does it matter that this will this be missing all of the jackson modules that get registered with the normal jsonMapper?
Oh, wouldn't invoking setupJackson take care of that?
IIUC, the only thing we would miss is the DefaultObjectMapper being initialized with service name (as done in JcaksonModule for the other mappers). Please correct me if I am missing something.
also, it seems like we inject it places so that we can make a DefaultIndexingStateFingerprintMapper, should this just be an internal implementation detail of DefaultIndexingStateFingerprintMapper?
Yeah, I suppose this would be okay too. Although, we would still need to pass in the default @Json ObjectMapper and then make a copy inside the fingerprint mapper. But I agree that it would be less error prone.
I would imagine in the future we would want to just get the fingerprint mapper from like the supervisor? (if it is configurable per datasource) or some fingerpint factory or something (if system wide) instead of this special object mapper used for the default impl in the future once this is made more pluggable unless i'm missing something
Hmm, I am not sure. The logic to generate a fingerprint for a given indexing state and to store and retrieve the state/fingerprint would continue to remain core Druid logic. Supervisors provided by extensions may just have their custom (serializable) implementations of the CompactionState class.
There was a problem hiding this comment.
Oh, wouldn't invoking setupJackson take care of that?
I think that would setup the injectable values for @JacksonInject, but do we not need to register the jackson modules from all the druid modules with it like here? https://github.com/apache/druid/blob/master/server/src/main/java/org/apache/druid/guice/DruidInjectorBuilder.java#L223
It seems weird to add this mapper there though. Maybe this doesn't matter all that much because we never deserialize with this mapper, just make some bytes out of the compaction state... so it doesn't really matter as long as it serializes to a stable set of bytes?
There was a problem hiding this comment.
Appreciate the discussion here. I think, from my reading of this and re-look at the code, that the best path forward is going to be to bury the deterministic mapper in the fingerprint mapper. That will be seeded from the json mapper that the CompactionJobQueue already was having prior to this PR. I think this gives us most logical current state and hopefully future flexibility if things expand beyond the default mapper
| // Mark compaction state fingerprints as active after successful publish | ||
| if (result.isSuccess()) { | ||
| markIndexingStateFingerprintsAsActive(result.getSegments()); | ||
| } |
There was a problem hiding this comment.
should this be done as part of the same transaction that does the other stuff? same question for other similar calls in this file. i guess it probably doesn't matter much in practice....
There was a problem hiding this comment.
+1, it would be cleaner if done in the same transaction, especially since the underlying task action is meant to be performed within a single transaction.
@capistrant , is there any specific reason for doing it outside the transaction?
There was a problem hiding this comment.
My thinking was as follows:
- I don't think we want marking as as active failing to impact the insert. so even if we did add this to the first transaction, we'd still want to ignore the result which feels weird to me.
- The IndexingStateStorage interface would have one method that takes a txn handle while the rest do whatever is necessary in their impl. And our test/simulation storage implementation is memory only so accepting a txn handle there is unusual.
I'm not closed off to the idea though if there are some upside I'm not taking into account
| public boolean isUniqueConstraintViolation(Throwable t) | ||
| { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
should there be a default implementation for this? based on the usage, it looks like not implementing it means something will explodes instead of eating a (possibly expected?) exception
There was a problem hiding this comment.
I don't think the default impl would be universally applicable anyway.
So, we either keep this method abstract and force extensions to implement it, or let the transaction fail (as is currently being done in the patch).
Also, looking at the way we currently insert segments in IndexerSQLMetadataConnector, we just check once if a segment ID already exists and then skip the insert. Otherwise, we proceed with the insert and let the transaction fail if already inserted by a competing transaction. It is up to the caller to retry or swallow the exception in such cases.
I think it would be fine to do the same here too to keep things simple for the time being.
@capistrant , @clintropolis , thoughts?
There was a problem hiding this comment.
I lean towards dropping the default impl. tbh, idk why, when I was breaking this out by database I didn't make this abstract.
| columns.add("indexing_state_fingerprint VARCHAR(255)"); | ||
| columns.add("upgraded_from_segment_id VARCHAR(255)"); | ||
|
|
||
| if (centralizedDatasourceSchemaConfig.isEnabled()) { |
There was a problem hiding this comment.
i know this isn't new or yours, but it feels weird that we conditionally define the schema based on some config....
There was a problem hiding this comment.
Yeah, I agree. It should be okay to always have these columns in the schema since they are anyway nullable.
We can do it in a separate PR, though, and ensure that we are not breaking any weird assumptions related to this.
| throw new ISE( | ||
| "Cannot start Druid as table[%s] has an incompatible schema." | ||
| + " Reason: One or all of these columns [used_status_last_updated, schema_fingerprint, num_rows] does not exist in table." | ||
| + " Reason: One or all of these columns [used_status_last_updated, schema_fingerprint, num_rows, indexing_state_fingerprint] does not exist in table." |
There was a problem hiding this comment.
nit: might be about time to break this down and have separate messages per problem
| * | ||
| * @return Number of deleted pending metadata entries | ||
| */ | ||
| protected abstract int cleanupEntriesCreatedBeforePendingDurationToRetain(DateTime minCreatedTime); |
There was a problem hiding this comment.
same comment as ^, cleanupPendingEntriesCreatedBefore or cleanupPendingEntriesOlderThan
| */ | ||
| protected abstract int cleanupEntriesCreatedBeforePendingDurationToRetain(DateTime minCreatedTime); | ||
|
|
||
| protected DateTime getCurrentTime() |
There was a problem hiding this comment.
this is pretty wierd and maybe should have javadocs to indicate that it is to make testing controllable
| @Override | ||
| protected int cleanupEntriesCreatedBeforeDurationToRetain(DateTime minCreatedTime) | ||
| { | ||
| // 1: Mark unreferenced states as unused |
There was a problem hiding this comment.
current comments seem sort of redundant with the code, but it does seem like it would be useful to instead summarize why we are doing this other stuff before deleting unused
There was a problem hiding this comment.
I nuked the comments and added a javadoc for the impl to provide context to what is going on with prepping the data before looking for rows to delete.
|
|
||
| import java.util.Objects; | ||
|
|
||
| public class OverlordMetadataCleanupConfig |
There was a problem hiding this comment.
i guess this is MetadataCleanupConfig with extra pending duration? maybe worth javadoc to explain when you might want to use one or the other? (i think this one is for when track pending state in same table as the thing itself?) Also having trouble what is specific to the overlord about this re: naming
There was a problem hiding this comment.
+1, @capistrant , when we migrate other cleanup duties to the Overlord, we will just use the MetadataCleanupConfig itself.
For the current purposes, let's add a new class IndexingStateCleanupConfig which extends MetadataCleanupConfig.
Would that work?
There was a problem hiding this comment.
Ya, this started as me basically stealing the config for the OL duties and keeping it same, and then adding the pending config thing once we decided we needed that flag in the database. I think Kashif's suggestion will work for this, reducing the duplication.
| for (String fingerprint : fingerprints) { | ||
| try { | ||
| int rowsUpdated = indexingStateStorage.markIndexingStatesAsActive(fingerprint); |
There was a problem hiding this comment.
any reason not to set these all active in a single call?
There was a problem hiding this comment.
+1, let's batch this if possible
There was a problem hiding this comment.
ah yes, good idea
|
Ah, that makes sense. Thanks for the response, @capistrant ! |
|
|
||
| case EXISTS_AND_UNUSED: | ||
| // Fingerprint exists but is marked as unused - update the used flag | ||
| log.info( |
There was a problem hiding this comment.
Can we move the branches into separate methods?
markIndexingStateAsUsedinsertIndexingState.
kfaraz
left a comment
There was a problem hiding this comment.
Side note: We may want to rename the CompactionState class in a follow up PR too.
Or at least have an IndexingState interface which is used in all the new fingerprinting logic and have the old CompactionState class implement this interface.
|
|
||
| @Inject | ||
| public SqlIndexingStateStorage( | ||
| @Nonnull MetadataStorageTablesConfig dbTables, |
There was a problem hiding this comment.
Nit: Do we need the @Nonnull annotations? I don't think we use them in injected constructors anyway.
| @Inject | ||
| public SqlIndexingStateStorage( | ||
| @Nonnull MetadataStorageTablesConfig dbTables, | ||
| @Nonnull ObjectMapper jsonMapper, |
There was a problem hiding this comment.
| @Nonnull ObjectMapper jsonMapper, | |
| @Nonnull @Json ObjectMapper jsonMapper, |
| public boolean isUniqueConstraintViolation(Throwable t) | ||
| { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
I don't think the default impl would be universally applicable anyway.
So, we either keep this method abstract and force extensions to implement it, or let the transaction fail (as is currently being done in the patch).
Also, looking at the way we currently insert segments in IndexerSQLMetadataConnector, we just check once if a segment ID already exists and then skip the insert. Otherwise, we proceed with the insert and let the transaction fail if already inserted by a competing transaction. It is up to the caller to retry or swallow the exception in such cases.
I think it would be fine to do the same here too to keep things simple for the time being.
@capistrant , @clintropolis , thoughts?
| * | ||
| * @return Number of deleted metadata entries | ||
| */ | ||
| protected abstract int cleanupEntriesCreatedBeforeDurationToRetain(DateTime minCreatedTime); |
| * | ||
| * @return Number of deleted pending metadata entries | ||
| */ | ||
| protected abstract int cleanupEntriesCreatedBeforePendingDurationToRetain(DateTime minCreatedTime); |
|
|
||
| import java.util.Objects; | ||
|
|
||
| public class OverlordMetadataCleanupConfig |
There was a problem hiding this comment.
+1, @capistrant , when we migrate other cleanup duties to the Overlord, we will just use the MetadataCleanupConfig itself.
For the current purposes, let's add a new class IndexingStateCleanupConfig which extends MetadataCleanupConfig.
Would that work?
| for (String fingerprint : fingerprints) { | ||
| try { | ||
| int rowsUpdated = indexingStateStorage.markIndexingStatesAsActive(fingerprint); |
There was a problem hiding this comment.
+1, let's batch this if possible
Batch marking of indexing states as active to avoid chained updates where only one is needed Build segments table missing columns error column by column refactor how we are configuring and executing the ol metadata cleanup duties. fix missed naming refactor Improve readability of upsertIndexingState Fixup SqlIndexingStateStorage constructor drop default impl of isUniqueConstraintViolation Refactor how the deterministic mapper is handled for reindexing
...ing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
Outdated
Show resolved
Hide resolved
|
Thank you @clintropolis and @kfaraz for helping get this change reviewed and ready for use. @kfaraz if you end up circling back and have thoughts on follow ups, please let me know and I will assess/implement. |
* Implement a fingerprinting mechanism to track compaction states in a more efficient manner (apache#18844) * meatadata store bits part 1 * annotate segments with compaction fingerprint before persist * Add ability to generate compaction state fingerprint * add fingerprint to task context and make legacy last compaction state storage configurable * update embedded tests for compaction supervisors to flex fingerprints * checkpoint with persisting compaction states * add duty to clean up unused compaction states * take fingerprints into account in CompactionStatus * Add and improve tests * get rid of some todo comments * fix checkstyle * cleanup some more TODO * Add some docs * update web console * make cache size configurable and fix some spelling * fixup use of deprecated builder * fix checktyle * fix coordinator compactsegments duty and respond to self review comments * fix spellchecker * predates is a word * improve some javadocs * simplify some test assertions based on review * better naming * controller impl cleanup * For compaction supervisors, take persisting pending compaction states out of hot path * use Configs.valueOrDefault helper in data segment * Refactor where fingerprinting happens and how the object mapper is wired up * refactor CompactionStateManager into an interface with a persisted and heap impl * remove fingerprinting support from the coordinator compact segments duty * Move on heap compaction state manager to test sources * CompactionStateManager is now overlord only * Refactor how the compaction state fingerprint cache is wired up * prettify * small changes after self-review * Cleanup CompactionStateCache per review * compactionstatemanager to compactionstatestorage plus refactor * Add compaction state added and deleted metrics * improve queries for compaction state cache sync * clean up doc wording * Miscl. cleanup from review * some metadata store code cleanup * refactor id out of the compaction states table as it is superflous * Some CompactionStatus cleanup * Migrate the location of creating a compaction state from config * More refactoring per review * refactor to remove duplicate fingerprint generator code * Do some consolidation of fingerprint related classes to clean up code * minor cleanup * fix fobidden api use * Improvements and cleanup to the fingerprint and state persist + cache * Refactor where in the code compaction fingerprints are generated * Formalize unique constraint exception check in sqlmetadataconnector and db specific impls * some naming cleanup * Migrate the compaction state cleanup duty to the overlord * Blow up the compaction supervisor scheduler if incremental caching is disabled * add some strict input sanitization in upserting compaction fingerprints * cleanup test class * Add pending flag to compaction state to prevent potentially destructive early cleanup * Refactor database naming to use indexingState instead of compactionState * Refactor naming to IndexingState for the metadata cleanup duty * refresh some docs * fixup tests * Refactoring name of CompactionStateCache to IndexingStateCache * Rename CompactionStateStorage to IndexingStateStorage * Refactor compactionStateFingerprint out of the code in favor of indexingStateFingerprint * Refactor FingerprintMapper name to remove compaction for indexing state * refactorings after self review * fixup a few things post merge with master * Cleanup and refactor after code review round Batch marking of indexing states as active to avoid chained updates where only one is needed Build segments table missing columns error column by column refactor how we are configuring and executing the ol metadata cleanup duties. fix missed naming refactor Improve readability of upsertIndexingState Fixup SqlIndexingStateStorage constructor drop default impl of isUniqueConstraintViolation Refactor how the deterministic mapper is handled for reindexing * cleanup * use effective state for dimspec and indexspec for reindexing fingerprinting * Only call into running checks if there are unknown states to check * Update milestone on PR close and ensure they are visible for the originally desired milestone (apache#18935) * SegmentLocalCacheManagerConcurrencyTest: Use tempDir for temp files. (apache#18937) The tests should use temporary directories rather than the current working directory. * Update to testcontainers 2.x and update various images. (apache#18945) This patch updates to testcontainers 2.x, which improves compatibility with newer versions of Docker. It also updates most images to the latest versions available. PostgreSQL and MariaDB remain on 16 and 11, however. * Max metrics for group by queries (apache#18934) Added metrics mergeBuffer/maxAcquisitionTimeNs, groupBy/maxSpilledBytes and groupBy/maxMergeDictionarySize to track peak resource usage per query. * fix json column isNumeric check to properly consider array element selector types (apache#18948) * Add sys.queries table. (apache#18923) The sys.queries table provides insight into currently-running queries. It provides the same information as the /druid/v2/sql/queries API. As such, it currently only works with Dart. In this patch the table is documented, but off by default. It can be enabled by setting druid.sql.planner.enableSysQueriesTable = true. This patch additionally adds an "includeComplete" parameter to /druid/v2/sql/queries, which is used by the implementation of the sys.queries table, to allow it to show information for recently-completed queries. * Bump kubernetes-client to latest and level vertx with what kubernetes-client uses (apache#18947) * Adjust cost-based autoscaler algorithm (apache#18936) * use includeComplete (apache#18940) * Add configurable option to scale-down during task run time for cost-based autoscaler (apache#18958) * Add configurable option to scale-down during task run time for cost-based autoscaler * Docs * Address review comments, compress tests a bit * remove custom json serde for DataNodeService (apache#18961) * Faster bucket search in ByteBufferHashTable (apache#18952) Adds hash code comparison for large enough keys to ByteBufferHashTable#findBucket(). Also, changes key comparison to use long/int/byte instead of byte-only comparison (thus, the comparison is now closer to HashTableUtils#memoryEquals() used in MemoryOpenHashTable). These changes are aimed to speed-up bucket search in ByteBufferHashTable, especially in high-collision cases. * Allow failing on residual for Iceberg filters on non-partition cols (apache#18953) Currently Iceberg ingest extension may ingest more data than is necessary due to residual data occurring from an Iceberg filter on non-partition columns. This adds an option to ignore + log a warning or fail on filters that result in residual so users are aware of this extra data and can action on it. * Rely on `taskCountMin` in `computeValidTaskCounts`; correct the embedded test for cost-based-autoscaler (apache#18963) This patch fixes a behaviour where computeValidTaskCounts took care of upper bound (taskCountMax), but did not care about taskCountMin. Also it fixes a flaky embedded test. * Web console: Server props dialog (apache#18960) * Init server props table * Add trim starts * reformat * Update `TableInputSpec` to be able to handle specific segments. (apache#18922) * input * format and deprecate * allow non-complete segments * test * SQL: Add rule for merging nested Aggregates. (apache#18498) The rule is adapted from Calcite's AggregateMergeRule, with two changes: 1) Includes a workaround for https://issues.apache.org/jira/browse/CALCITE-7162 2) Includes the ability to merge two Aggregate with a Project between them, by pushing the Project below the new merged Aggregate. * Speed up TopNQueryRunnerTest. (apache#18955) Takes the runtime from ~3 minutes to 10 seconds by reducing the number of test runs by 32x. There are two changes: 1) Instead of parameterizing for every possible combination of monomorphic specialization flags, only parameterize for all-on and all-off. The specializations handle different cases anyway, so they wouldn't trigger on the same queries. Reduces number of test runs by 16x. 2) Remove the parameterization on duplicateSingleAggregatorQueries. Only a handful of tests used it. Instead of parameterizing the entire suite, that handful of tests is expanded to include _duplicateAggregators versions. Reduces number of test runs by 2x. * Fix Hadoop multi-value string null value handling to match native batch (apache#18944) Doing some more digging, I found another unfortunate data difference between native batch (on-cluster) and Hadoop batch ingest. Ingesting a multi-value string ["a","b",null] with Hadoop is treated as ["a","b","null"] and in native batch, this correctly ingests to ["a","b",null]. This is difference appears to be a bug in all Druid versions(even latest). While this will not affect the current null handling migration, this will affect the future Hadoop -> native batch ingestion migration that will also need to take place. Hadoop doesn't allow for all-null columns in segments, it simply excludes them from the segment. I've updated the Hadoop job to support running druid.indexer.task.storeEmptyColumns=true, which allows us to store all NULL columns (how native/streaming ingest work today). BREAKING CHANGES 1. Hadoop ingests will now process multi-value string inputs like ["a","b",null] -> ["a","b",null] instead of ["a","b","null"] to match native batch ingestion. 2. Hadoop ingests will now by default keep columns with all NULL values, instead of excluding them from the segment. useStringValueOfNullInLists parameter in RowBasedColumnSelectorFactory.java has been removed. * modify ExprEvalBindingVector to use current vector size instead of array length when coercing values, cache coercion arrays (apache#18967) * modify ExprEvalBindingVector to use current vector size instead of array length when coercing values, cache coercsion arrays expression vector binding improvements changes: * split ExpressionEvalBindingVector into ExpressionEvalNumericBindingVector and ExpressionEvalObjectBindingVector * modify ExpressionEvalNumericBindingVector and ExpressionEvalObjectBindingVector to use current vector size instead of input array size when coercing values * modify ExpressionEvalNumericBindingVector and ExpressionEvalObjectBindingVector to use externally managed object array caches for value coercion instead of recreating each time * benchmarks * SQL: Use specialized virtual columns for expression filters. (apache#18965) This patch adjusts planning for expression filters to use specialized virtual columns when they exist. This allows them to take advantage of optimizations, such as the ones that are available for JSON_VALUE, even when the overall expression is complicated. * add tier/storage/capacity metric to make actual tier disk size metrics available for historicals in vsf mode (apache#18962) * Adjust costs for burst scaleup during heavy lag for cost-based autoscaler (apache#18969) * udpate copyright year to 2026 (apache#18972) * Bump diff from 4.0.1 to 4.0.4 in /web-console (apache#18933) * docs: add docs for projections (apache#18056) * Better query error classification for user errors (apache#18949) This change checks instanceof before casting RexLiteral.value() to Number in SQL aggregators. When users pass invalid queries (e.g., a string literal '99.99' where numeric literals are expected), InvalidSqlInput exception is thrown, which returns 400 (USER/INVALID_INPUT) instead of 500 (ADMIN/UNCATEGORIZED). This improves error diagnostics for invalid queries. * changes related to 36 release (apache#18975) * add vsf AcquireSegmentResult metrics to ChannelCounters (apache#18971) * Migrate query integration tests to embedded framework (apache#18978) Changes --------- - Move `ITBroadcastJoinQueryTest` to embedded framework - Remove `ITWikipediaQueryTest` - Add `QueryLaningTest` which was the only useful assertion being done in the wikipedia test * Upgrade compiler version to JDK 17 (apache#18977) Upgrade compiler version to JDK 17. This removes compiler compatibility for indexing-hadoop (no longer supported extension). * add storage_size to sys.servers (apache#18979) * bugfix: Fix bug that could lead to illegal k8s label ending in non-alphanumeric (apache#18981) * Remove experimental flag from multi-supervisor docs (apache#18983) Multi-supervisor support has been in 2 major versions (with v36 being the 3rd). I think the implementation is stable enough for marking as non-experimental. * Add groupby max metrics to prometheus config (apache#18970) * Add metrics and improve logging for row signature flapping. (apache#18966) Add segment/schemaCache/rowSignature/changed and segment/schemaCache/rowSignature/column/count metrics to get visibility into when the Broker's segment metadata cache's row signature for each datasource is initialized and updated. The rationale for these metrics and logging enhancements is that we noticed row signatures flapping (columns reordered spuriously) that can cause SQL queries to be translated to incorrect native queries because the signatures flapped. This can cause sporadic missing data when the queries are incorrectly planned and is noticeable in environments with high QPS. * bugfix: Create tombstones when needed while doing REPLACE mode with range partitioning plus parallel indexing (apache#18938) * Create tombstones for range and hashed partitioning when everything has been filtered out * MSQ compaction doesn't support hash partitioning * cleanup test file * Cleanup verbose comments in test code * Hashed partitioning doesn't actually need the special handling * fix checkstyle * test coverage * fix vsf load time to be actual load time and not include wait time (apache#18988) * Update guice to 6.0.0 (apache#18986) * Update surefire to 3.5.4 ; upgrade NestedDataScanQueryTest to use junit5 (apache#18847) * Add optional plugins to basic cost function in CostBasedAutoScaler (apache#18976) Changes: - separate the logic of pure cost function, making all additional logic opt-in in config; - `scaleDownBarrier` has been changed to `minScaleDownDelay`, which is now `Duration`; - changes to high lag fast scaleup: logarithmic scaling formula for idle decay on high lag and task boundaries. Details: This change replaces the sqrt-based scaling formula with a logarithmic formula that provides more aggressive emergency recovery at low task counts and millions of lag. Idle decay: ` ln(lagSeverity) / ln(maxSeverity)`. Less aggressive, scales well with lag growth. Formula `K = P/(6.4*sqrt(C))` means small task counts get massive K values (emergency recovery), while large task counts get smaller K values (stability). * docs: update zookeeper version (apache#18836) * docs: update zookeeper version * add link to zk release page * Fix MSQ compaction state and native interval locking, add test coverage (apache#18950) * MSQ compaction runner run test * fix test * fix test 2 * lock input interval * test * test coverage * allowNonAlignedInterval and forceDropExisting * fix test * Update indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java Co-authored-by: Lucas Capistrant <capistrant@users.noreply.github.com> * Update indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java Co-authored-by: Lucas Capistrant <capistrant@users.noreply.github.com> * update * style * drop-existing * Apply suggestion from @kfaraz Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com> * format * aligned * build * mis-aligned * format * test * lock-interval * lock * test * force drop existing, revert non-aligned, deprecated allowNonAlignedInterval * revert THREE_HOUR * revert format change * test * comment * use-queue * reduce test * batchSegmentAllocation --------- Co-authored-by: Lucas Capistrant <capistrant@users.noreply.github.com> Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com> * Update assertj-core for CVE-2026-24400 (apache#18994) Co-authored-by: Ashwin Tumma <ashwin.tumma@salesforce.com> --------- Co-authored-by: Lucas Capistrant <capistrant@users.noreply.github.com> Co-authored-by: Gian Merlino <gianmerlino@gmail.com> Co-authored-by: Virushade <phuaguanwei99@gmail.com> Co-authored-by: Clint Wylie <cwylie@apache.org> Co-authored-by: Sasha Syrotenko <alexander.syrotenko@imply.io> Co-authored-by: Vadim Ogievetsky <vadim@ogievetsky.com> Co-authored-by: Andrei Pechkurov <37772591+puzpuzpuz@users.noreply.github.com> Co-authored-by: jtuglu1 <jtuglu@netflix.com> Co-authored-by: Cece Mei <yingqian.mei@gmail.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com> Co-authored-by: mshahid6 <maryam.shahid1299@gmail.com> Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com> Co-authored-by: aho135 <andrewho135@gmail.com> Co-authored-by: Abhishek Radhakrishnan <abhishek.rb19@gmail.com> Co-authored-by: Ashwin Tumma <ashwin.tumma23@gmail.com> Co-authored-by: Ashwin Tumma <ashwin.tumma@salesforce.com>
disclaimer: This PR begins the migration away from always using the term compaction. It may create some short term friction/frustration. But the goal is that longer term we refer to things that are not pure compaction in a more ambiguous way (indexing state versus compaction state, for example)
Description
Add new functionality to Compaction Supervisors. Instead of storing compaction state for segments individually, centralize the state storage in a new metadata table. Individual segments will store a computed fingerprint that references an indexing state (aka compaction state) in the new metadata table. Since many segments will eventually end up sharing common indexing states, this should greatly reduce duplication in metadata storage.
note: This applies only to Compaction Supervisors. Scheduled compaction on the coordinator will not use fingerprinting. This is intentional as a part of the roadmap to sunsetting coordinator duty compaction in the future.
Indexing State Fingerprinting
Instead of storing serialized
CompactionStateobjects as thelastCompactionStatefield in every compacted segment, generate a fingerprint for aCompactionStateand attach that to compacted segments. Add new centralized storage for serializedCompactionStateobjects where individual states can be looked up by the aforementioned fingerprint. Since it is common for many segments in a data source to share a singleCompactionState, this greatly reduces the metadata storage overhead for storing indexing states in the metadata store.Metadata Store Changes
druid_segmentsAdd new column
indexing_state_fingerprintthat stores the fingerprint representation of the segments currentCompactionState. It can benullif no compaction has taken place.druid_indexingStatesNew metadata table that stores the full serialized
CompactionStateassociated with a fingerprint. Segments can look up their detailed state here by using theindexing_state_fingerprintthat they are associated with to pull the full state payload.IndexingStateStorageSqlIndexingStateStorageThe
IndexingStateStorageinterface + implementation(s) is responsible for managing the persistence and lifecycle of indexing states. It stores unique compaction configurations (identified by fingerprints) in the metadata database. The manager tracks which compaction states are actively referenced by segments, marking unreferenced states as unused and periodically cleaning up old unused states. This fingerprinting approach allows Druid to efficiently store and retrieve compaction metadata without duplicating identical compaction configurations across multiple segments.HeapMemoryIndexingStateStorageMeant to serve as a mechanism for testing and simulations where metadata persistence may not be available/needed
IndexingStateCacheIndexingStateCacheis a new component of theHeapMemorySegmentMetadataCache. It is modeled strongly after the existing datasource schema cache. This is where the existing indexing states are cached for reference by compaction supervisors.CompactSegments Coordinator Duty Roadmap
This PR does not add support for indexing state fingerprinting to the coordinator based scheduled compaction that is carried out by the CompactSegments coordinator duty. This is because the Druid roadmap is to move all scheduled compaction to compaction supervisors. Making the decision to forgo indexing state fingerprint support for the legacy duty based compaction is a conscious choice we are making to help drive usage of supervisors and limit changes to the legacy duty based compaction code. Another PR should be spun up to officially deprecate legacy scheduled compaction on the coordinator.
Legacy
lastCompactionStateRoadmapThis PR implements no automatic transition to fingerprints for segments who are compacted and store
CompactionStatein theirlastCompactionStatefield. Instead this PR aims to continue supportinglastCompactionStatein Compaction decision making for segments compacted before fingerprinting. This means that legacy segments will not have to be re-compacted simply because they are not fingerprinted, as long as they have the properCompactionStateas specified by the compaction configuration for the data source in question.This PR also writes both the new fingerprint as well as the legacy
lastCompactionStateto aDataSegmentby default. This allows normal rolling upgrade order as well as Druid version rollback without un-needed re-compaction. An operator can disable writinglastCompactionStateby updating the cluster compaction config, after the Druid upgrade completes. Eventually, Druid code base will cease writinglastCompactionStateat all and instead force using fingerprinting going forward. I think this should be done in the Druid version following the first version that this new feature is seen in. Even at this point,lastCompactionStatewill need to continue to be supported for already written segments, unless we want to devise an automated migration plan that can run in the background of a cluster to get all compacted segments migrated to fingerprinting.Follow ups needed
CompactionStatereferences in the app code to IndexingStatelastCompactionStatein data segments and metastore which will need to be carried onwards.Release note
Improvements to automatic compaction supervisors. Instead of individual compacted segments having to store their full compaction state in the metadata store, states will now be stored in a central location in the metadata store ( a new
indexingStatestable). Individual segments will only need to store a unique reference (indexing_state_fingerprint) to their full compaction state. Since it is typical that many segments in a single datasource share the same underlying compaction state, this will end up greatly reducing metadata storage requirements for automatic compaction.For backward compatibility purposes, detailed compaction state will continue to be persisted in each segment until a future date. An operator can opt out of this by setting
storeCompactionStatePerSegmenttofalsein the cluster compaction config. This should only be done with the understanding that rolling back to an earlier version of Druid would trigger re-compaction on any segments who had been compacted since updating that config.It must also be noted that using Automatic Compaction Supervisors now requires incremental segment metadata caching to be enabled on the Overlord and Coordinator via runtime properties.
Upgrade Note
Automatic Compaction Supervisors now requires incremental segment metadata caching to be enabled on the Overlord and Coordinator via runtime properties.
druid.manager.segments.useIncrementalCachemust not benever. See configuration docs for details.Metadata store changes are required for this upgrade. If you already have
druid.metadata.storage.connector.createTablesset totrueno action is needed. If you have this feature disabled, you will need to alter thesegmentstable and create thecompactionStatestable. Postgres DDL is provided below as a guide. You will have to adapt the syntax to your metadata store backend as well as use proper table naming depending on your configured table prefix and database.Key changed/added classes in this PR
CompactionStatusCompactionConfigBasedJobTemplateCompactionStateSQLMetadataConnectorIndexingStateStorageSqlIndexingStateStorageIndexingStateCacheCompactSegmentsKillUnreferencedIndexingStateThis PR has: