Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
d8490b0
meatadata store bits part 1
capistrant Dec 13, 2025
3d2d423
annotate segments with compaction fingerprint before persist
capistrant Dec 13, 2025
48854f4
Add ability to generate compaction state fingerprint
capistrant Dec 13, 2025
c6a3367
add fingerprint to task context and make legacy last compaction state…
capistrant Dec 13, 2025
f3b706e
update embedded tests for compaction supervisors to flex fingerprints
capistrant Dec 13, 2025
46fb807
checkpoint with persisting compaction states
capistrant Dec 13, 2025
0fef358
add duty to clean up unused compaction states
capistrant Dec 14, 2025
edeaf30
take fingerprints into account in CompactionStatus
capistrant Dec 14, 2025
97daf3f
Add and improve tests
capistrant Dec 15, 2025
dbcdfcf
get rid of some todo comments
capistrant Dec 15, 2025
38f6d15
fix checkstyle
capistrant Dec 15, 2025
4cf1197
cleanup some more TODO
capistrant Dec 15, 2025
ba269bd
Add some docs
capistrant Dec 15, 2025
f168bc9
update web console
capistrant Dec 15, 2025
2292b15
make cache size configurable and fix some spelling
capistrant Dec 15, 2025
74c8ebc
fixup use of deprecated builder
capistrant Dec 15, 2025
adac5ec
fix checktyle
capistrant Dec 15, 2025
4fb3a9c
fix coordinator compactsegments duty and respond to self review comments
capistrant Dec 15, 2025
708c6f8
fix spellchecker
capistrant Dec 15, 2025
03bb14a
predates is a word
capistrant Dec 16, 2025
a262f79
improve some javadocs
capistrant Dec 16, 2025
6126e22
simplify some test assertions based on review
capistrant Dec 16, 2025
b78ec13
better naming
capistrant Dec 16, 2025
78f115e
controller impl cleanup
capistrant Dec 16, 2025
f06d715
For compaction supervisors, take persisting pending compaction states…
capistrant Dec 16, 2025
d571e43
use Configs.valueOrDefault helper in data segment
capistrant Dec 16, 2025
07afc2f
Refactor where fingerprinting happens and how the object mapper is wi…
capistrant Dec 16, 2025
12ea741
refactor CompactionStateManager into an interface with a persisted an…
capistrant Dec 16, 2025
f57527a
Merge branch 'master' into compaction-fingerprinting
capistrant Dec 22, 2025
858cbd3
remove fingerprinting support from the coordinator compact segments duty
capistrant Dec 22, 2025
9afab2f
Move on heap compaction state manager to test sources
capistrant Dec 22, 2025
34a8a11
CompactionStateManager is now overlord only
capistrant Dec 22, 2025
7214418
Refactor how the compaction state fingerprint cache is wired up
capistrant Jan 2, 2026
6a9743a
Merge branch 'master' into compaction-fingerprinting
capistrant Jan 2, 2026
cd55b0e
prettify
capistrant Jan 4, 2026
58724cc
small changes after self-review
capistrant Jan 4, 2026
0c1c6ed
Cleanup CompactionStateCache per review
capistrant Jan 9, 2026
6bd1875
compactionstatemanager to compactionstatestorage plus refactor
capistrant Jan 9, 2026
79e3a54
Add compaction state added and deleted metrics
capistrant Jan 9, 2026
6e23adc
improve queries for compaction state cache sync
capistrant Jan 9, 2026
58149e6
clean up doc wording
capistrant Jan 9, 2026
b5568d2
Miscl. cleanup from review
capistrant Jan 9, 2026
0fab9f9
some metadata store code cleanup
capistrant Jan 9, 2026
6f40f9f
refactor id out of the compaction states table as it is superflous
capistrant Jan 9, 2026
8713760
Some CompactionStatus cleanup
capistrant Jan 10, 2026
a4bf6c8
Migrate the location of creating a compaction state from config
capistrant Jan 10, 2026
21a10e6
More refactoring per review
capistrant Jan 10, 2026
5ffb59e
refactor to remove duplicate fingerprint generator code
capistrant Jan 10, 2026
0c38c76
Do some consolidation of fingerprint related classes to clean up code
capistrant Jan 10, 2026
f62e2ea
minor cleanup
capistrant Jan 10, 2026
c81f242
fix fobidden api use
capistrant Jan 11, 2026
ed2c06c
Improvements and cleanup to the fingerprint and state persist + cache
capistrant Jan 12, 2026
f0632f9
Refactor where in the code compaction fingerprints are generated
capistrant Jan 12, 2026
b081727
Formalize unique constraint exception check in sqlmetadataconnector a…
capistrant Jan 12, 2026
0103d8e
some naming cleanup
capistrant Jan 12, 2026
7aefe38
Migrate the compaction state cleanup duty to the overlord
capistrant Jan 13, 2026
5c4fd98
Blow up the compaction supervisor scheduler if incremental caching is…
capistrant Jan 13, 2026
74cb69b
add some strict input sanitization in upserting compaction fingerprints
capistrant Jan 13, 2026
fa72c38
cleanup test class
capistrant Jan 13, 2026
c467a39
Add pending flag to compaction state to prevent potentially destructi…
capistrant Jan 13, 2026
79ac0c5
Refactor database naming to use indexingState instead of compactionState
capistrant Jan 13, 2026
0d7c73f
Refactor naming to IndexingState for the metadata cleanup duty
capistrant Jan 14, 2026
d5105f3
refresh some docs
capistrant Jan 14, 2026
5793cb0
fixup tests
capistrant Jan 14, 2026
f87f99f
Refactoring name of CompactionStateCache to IndexingStateCache
capistrant Jan 14, 2026
5af3a5b
Rename CompactionStateStorage to IndexingStateStorage
capistrant Jan 14, 2026
acde92b
Refactor compactionStateFingerprint out of the code in favor of index…
capistrant Jan 14, 2026
5c2050c
Refactor FingerprintMapper name to remove compaction for indexing state
capistrant Jan 14, 2026
ff2949f
refactorings after self review
capistrant Jan 14, 2026
389e8da
Merge branch 'master' into compaction-fingerprinting
capistrant Jan 14, 2026
c034d05
fixup a few things post merge with master
capistrant Jan 14, 2026
3c2ed1a
Merge branch 'master' into compaction-fingerprinting
capistrant Jan 15, 2026
0cdbb37
Cleanup and refactor after code review round
capistrant Jan 16, 2026
7768a8a
cleanup
capistrant Jan 16, 2026
ec668a8
use effective state for dimspec and indexspec for reindexing fingerpr…
capistrant Jan 20, 2026
e582667
Only call into running checks if there are unknown states to check
capistrant Jan 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.metadata.HeapMemoryCompactionStateManager;
import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
import org.apache.druid.server.compaction.CompactionSegmentIterator;
import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
Expand Down Expand Up @@ -135,7 +136,8 @@ public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
policy,
compactionConfigs,
dataSources,
Collections.emptyMap()
Collections.emptyMap(),
new HeapMemoryCompactionStateManager()
);
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
blackhole.consume(iterator.next());
Expand Down
1 change: 1 addition & 0 deletions docs/api-reference/automatic-compaction-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@ This includes the following fields:
|`compactionPolicy`|Policy to choose intervals for compaction. Currently, the only supported policy is [Newest segment first](#compaction-policy-newestsegmentfirst).|Newest segment first|
|`useSupervisors`|Whether compaction should be run on Overlord using supervisors instead of Coordinator duties.|false|
|`engine`|Engine used for running compaction tasks, unless overridden in the datasource-level compaction config. Possible values are `native` and `msq`. `msq` engine can be used for compaction only if `useSupervisors` is `true`.|`native`|
|`legacyPersistLastCompactionStateInSegments`|Whether to persist the full compaction state in segment metadata. When `true` (default), compaction state is stored in both the segment metadata and the compaction states table. This is historically how Druid has worked. When `false`, only a fingerprint reference is stored in the segment metadata, reducing storage overhead in the segments table. The actual compaction state is stored in the compaction states table and can be referenced with the aforementioned fingerprint. Eventually this configuration will be removed and all compaction will use the fingerprint method only. This configuration exists for operators to opt into this future pattern early. **WARNING: if you set this to false and then compact data, rolling back to a Druid version that pre-dates compaction state fingerprinting (< Druid 36) will result in missing compaction states and trigger compaction on segments that may already be compacted.**|`true`|

#### Compaction policy `newestSegmentFirst`

Expand Down
1 change: 1 addition & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ These properties specify the JDBC connection and other configuration around the
|`druid.metadata.storage.tables.segments`|The table to use to look for segments.|`druid_segments`|
|`druid.metadata.storage.tables.rules`|The table to use to look for segment load/drop rules.|`druid_rules`|
|`druid.metadata.storage.tables.config`|The table to use to look for configs.|`druid_config`|
|`druid.metadata.storage.tables.compactionStates`|The table to use to store compaction state fingerprints.|`druid_compactionStates`|
|`druid.metadata.storage.tables.tasks`|Used by the indexing service to store tasks.|`druid_tasks`|
|`druid.metadata.storage.tables.taskLog`|Used by the indexing service to store task logs.|`druid_tasklogs`|
|`druid.metadata.storage.tables.taskLock`|Used by the indexing service to store task locks.|`druid_tasklocks`|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1508,7 +1508,7 @@ public void testAutoCompactionDutyWithDimensionsSpec(CompactionEngine engine) th
@ParameterizedTest(name = "useSupervisors={0}")
public void testAutoCompactionDutyWithFilter(boolean useSupervisors) throws Exception
{
updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null));
updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null, true));

loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
Expand Down Expand Up @@ -1552,7 +1552,7 @@ public void testAutoCompactionDutyWithFilter(boolean useSupervisors) throws Exce
@ParameterizedTest(name = "useSupervisors={0}")
public void testAutoCompationDutyWithMetricsSpec(boolean useSupervisors) throws Exception
{
updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null));
updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null, true));

loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
Expand Down Expand Up @@ -1854,7 +1854,7 @@ private void forceTriggerAutoCompaction(
).collect(Collectors.toList())
);
updateClusterConfig(
new ClusterCompactionConfig(0.5, intervals.size(), policy, true, null)
new ClusterCompactionConfig(0.5, intervals.size(), policy, true, null, true)
);

// Wait for scheduler to pick up the compaction job
Expand All @@ -1864,7 +1864,7 @@ private void forceTriggerAutoCompaction(

// Disable all compaction
updateClusterConfig(
new ClusterCompactionConfig(0.5, intervals.size(), COMPACT_NOTHING_POLICY, true, null)
new ClusterCompactionConfig(0.5, intervals.size(), COMPACT_NOTHING_POLICY, true, null, true)
);
} else {
forceTriggerAutoCompaction(numExpectedSegmentsAfterCompaction);
Expand Down Expand Up @@ -1956,7 +1956,8 @@ private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int maxCom
maxCompactionTaskSlots,
oldConfig.getCompactionPolicy(),
oldConfig.isUseSupervisors(),
oldConfig.getEngine()
oldConfig.getEngine(),
oldConfig.isLegacyPersistLastCompactionStateInSegments()
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
Expand All @@ -50,6 +51,7 @@
import org.apache.druid.testing.embedded.EmbeddedRouter;
import org.apache.druid.testing.embedded.indexing.MoreResources;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.apache.druid.timeline.CompactionState;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Period;
Expand Down Expand Up @@ -111,14 +113,14 @@ public EmbeddedDruidCluster createCluster()
private void configureCompaction(CompactionEngine compactionEngine)
{
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 100, null, true, compactionEngine))
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 100, null, true, compactionEngine, true))
);
Assertions.assertTrue(updateResponse.isSuccess());
}

@MethodSource("getEngine")
@ParameterizedTest(name = "compactionEngine={0}")
public void test_ingestDayGranularity_andCompactToMonthGranularity_withInlineConfig(CompactionEngine compactionEngine)
public void test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToYearGranularity_withInlineConfig(CompactionEngine compactionEngine)
{
configureCompaction(compactionEngine);

Expand All @@ -132,7 +134,7 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity_withInlineCon
Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY));

// Create a compaction config with MONTH granularity
InlineSchemaDataSourceCompactionConfig compactionConfig =
InlineSchemaDataSourceCompactionConfig monthGranConfig =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
InlineSchemaDataSourceCompactionConfig monthGranConfig =
InlineSchemaDataSourceCompactionConfig monthGranularityConfig =

InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
Expand Down Expand Up @@ -165,11 +167,170 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity_withInlineCon
)
.build();

runCompactionWithSpec(compactionConfig);
runCompactionWithSpec(monthGranConfig);
waitForAllCompactionTasksToFinish();

Assertions.assertEquals(0, getNumSegmentsWith(Granularities.DAY));
Assertions.assertEquals(1, getNumSegmentsWith(Granularities.MONTH));

verifyCompactedSegmentsHaveFingerprints(monthGranConfig);

InlineSchemaDataSourceCompactionConfig yearGranConfig =
InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
.withSkipOffsetFromLatest(Period.seconds(0))
.withGranularitySpec(
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null)
)
.withTuningConfig(
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new DimensionRangePartitionsSpec(null, 5000, List.of("item"), false),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
)
.build();

overlord.latchableEmitter().flush(); // flush events so wait for works correctly on the next round of compaction
runCompactionWithSpec(yearGranConfig);
waitForAllCompactionTasksToFinish();

Assertions.assertEquals(0, getNumSegmentsWith(Granularities.DAY));
Assertions.assertEquals(0, getNumSegmentsWith(Granularities.MONTH));
Assertions.assertEquals(1, getNumSegmentsWith(Granularities.YEAR));

verifyCompactedSegmentsHaveFingerprints(yearGranConfig);
}

@MethodSource("getEngine")
@ParameterizedTest(name = "compactionEngine={0}")
public void test_compaction_withPersistLastCompactionStateFalse_storesOnlyFingerprint(CompactionEngine compactionEngine)
throws InterruptedException
{
// Configure cluster with persistLastCompactionState=false
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(
new ClusterCompactionConfig(1.0, 100, null, true, compactionEngine, false)
)
);
Assertions.assertTrue(updateResponse.isSuccess());

// Ingest data at DAY granularity
runIngestionAtGranularity(
"DAY",
"2025-06-01T00:00:00.000Z,shirt,105\n"
+ "2025-06-02T00:00:00.000Z,trousers,210"
);
Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));

// Create compaction config to compact to MONTH granularity
InlineSchemaDataSourceCompactionConfig monthConfig =
InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
.withSkipOffsetFromLatest(Period.seconds(0))
.withGranularitySpec(
new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null)
)
.withTuningConfig(
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new DimensionRangePartitionsSpec(1000, null, List.of("item"), false),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
)
.build();

runCompactionWithSpec(monthConfig);
waitForAllCompactionTasksToFinish();

verifySegmentsHaveNullLastCompactionStateAndNonNullFingerprint();
}

private void verifySegmentsHaveNullLastCompactionStateAndNonNullFingerprint()
{
overlord
.bindings()
.segmentsMetadataStorage()
.retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
.forEach(segment -> {
Assertions.assertNull(
segment.getLastCompactionState(),
"Segment " + segment.getId() + " should have null lastCompactionState"
);
Assertions.assertNotNull(
segment.getCompactionStateFingerprint(),
"Segment " + segment.getId() + " should have non-null compactionStateFingerprint"
);
});
}

private void verifyCompactedSegmentsHaveFingerprints(DataSourceCompactionConfig compactionConfig)
{
String expectedFingerprint = CompactionState.generateCompactionStateFingerprint(
CompactSegments.createCompactionStateFromConfig(compactionConfig),
dataSource
);

overlord
.bindings()
.segmentsMetadataStorage()
.retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
.forEach(segment -> {
String fingerprint = segment.getCompactionStateFingerprint();
Assertions.assertNotNull(
fingerprint,
"Segment " + segment.getId() + " should have a compaction state fingerprint"
);
Assertions.assertFalse(
fingerprint.isEmpty(),
"Segment " + segment.getId() + " fingerprint should not be empty"
);
// SHA-256 fingerprints should be 64 hex characters
Assertions.assertEquals(
64,
fingerprint.length(),
"Segment " + segment.getId() + " fingerprint should be 64 characters (SHA-256)"
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 3 assertions can be omitted, just the last one should suffice.

Assertions.assertEquals(
expectedFingerprint,
fingerprint,
"Segment " + segment.getId() + " fingerprint should match expected fingerprint"
);
});
}

private void runCompactionWithSpec(DataSourceCompactionConfig config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkip
);

final ClusterCompactionConfig updatedCompactionConfig
= new ClusterCompactionConfig(1.0, 10, null, true, null);
= new ClusterCompactionConfig(1.0, 10, null, true, null, null);
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(updatedCompactionConfig)
);
Expand Down Expand Up @@ -323,7 +323,7 @@ public void test_ingestClusterMetrics_compactionSkipsLockedIntervals()
);

final ClusterCompactionConfig updatedCompactionConfig
= new ClusterCompactionConfig(1.0, 10, null, true, null);
= new ClusterCompactionConfig(1.0, 10, null, true, null, null);
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(updatedCompactionConfig)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private IndexTask createIndexTaskForInlineData(String taskId)
private void enableCompactionSupervisor()
{
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 10, null, true, null))
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 10, null, true, null, null))
);
Assertions.assertTrue(updateResponse.isSuccess());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public MetadataStorageTablesConfig getMetadataStorageTablesConfig()
null,
null,
null,
null,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,21 @@ public static boolean isGuaranteedRollup(
return tuningConfig.isForceGuaranteedRollup();
}

public static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateFingerprintToSegments(
String compactionStateFingerprint
)
{
if (compactionStateFingerprint != null) {
return segments -> segments.stream()
.map(
segment -> segment.withCompactionStateFingerprint(compactionStateFingerprint)
)
.collect(Collectors.toSet());
} else {
return Function.identity();
}
}

public static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateToSegments(
boolean storeCompactionState,
TaskToolbox toolbox,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,11 +902,19 @@ private TaskStatus generateAndPublishSegments(
Tasks.STORE_COMPACTION_STATE_KEY,
Tasks.DEFAULT_STORE_COMPACTION_STATE
);

final String compactionStateFingerprint = getContextValue(
Tasks.COMPACTION_STATE_FINGERPRINT_KEY,
null
);

final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction =
addCompactionStateToSegments(
storeCompactionState,
toolbox,
ingestionSchema
).andThen(
addCompactionStateFingerprintToSegments(compactionStateFingerprint)
);

Set<DataSegment> tombStones = Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,13 @@ public class Tasks
static {
Verify.verify(STORE_COMPACTION_STATE_KEY.equals(CompactSegments.STORE_COMPACTION_STATE_KEY));
}

/**
* Context k:v pair that holds the fingerprint of the compaction state to be stored with the segment
*/
public static final String COMPACTION_STATE_FINGERPRINT_KEY = "compactionStateFingerprint";

static {
Verify.verify(COMPACTION_STATE_FINGERPRINT_KEY.equals(CompactSegments.COMPACTION_STATE_FINGERPRINT_KEY));
}
}
Loading
Loading