Skip to content

Conversation

@kfaraz
Copy link
Contributor

@kfaraz kfaraz commented Dec 3, 2025

Description

Druid currently supports two compaction policies to prioritize intervals for compaction.

  • newestSegmentFirst: prioritizes compaction of intervals with the latest data
    • Assumes that newer intervals are queried more frequently and thus should be picked first for the partitioning improvements provided by compaction.
    • ⚠️ Can get into a state where it keeps compacting the latest interval if new data keeps coming in or if the compaction job keeps failing
    • ⚠️ Tries to compact future intervals
    • ⚠️ Does not account for cluster stability which is a function of the number of used segments in the cluster
  • fixedIntervalOrder: explicitly specifies the datasources and intervals which should be compacted
    • 🔧 Meant to be a test only policy and running it in production is impractical as it requires a lot of manual intervention

Proposed policy

mostFragmentedFirst - prioritizes compaction of the most fragmented intervals

  • Prioritizes intervals that are expected to provide the greatest reduction in segment count.
  • Allows specifying thresholds which must be met in an interval to trigger compaction.
    An interval is eligible for compaction if and only if:
    • The number of uncompacted segments in the interval is at least minUncompatedCount.
    • The total uncompacted bytes in the interval is at least minUncompatedBytes
    • The average size of uncompacted segments is at most maxAverageUncompactedBytesPerSegment

Prioritizes an interval based on the "fragmentation index" of the interval which is computed as follows:

// The first term is simply the number of uncompacted segments in the interval
segmentCountTerm = numUncompactedSegments

// The second term accounts for the smallness of the segments in the interval
// Smaller segments are preferred for compaction as they are expected to provide a greater reduction
// in segment count
smallnessRatio = (maxAverageUncompactedBytesPerSegment / averageSizeOfUncompactedSegments)
smallnessTerm = minUncompactedCount * smallnessRatio

fragmentationIndex = segmentCountTerm + smallnessTerm

Changes

  • Update CompactionStatus.compute() to track the number of compacted and uncompacted segments
  • Add new compaction policy mostFragmentedFirst which prioritizes intervals with most small uncompacted segments

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@JsonCreator
public MostFragmentedIntervalFirstPolicy(
@JsonProperty("minUncompactedCount") @Nullable Integer minUncompactedCount,
@JsonProperty("minUncompactedBytes") @Nullable Long minUncompactedBytes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use HumanReadableBytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

* compaction. Default value is {@link #SIZE_2_GB}.
*/
@JsonProperty
public long getMaxUncompactedSize()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The names of minUncompactedBytes and maxUncompactedSize are confusing, because they look very similar but one refers to total and one refers to average. How about minUncompactedBytes and maxAverageUncompactedBytesPerSegment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I wish this could be specified in terms of row counts rather than byte counts. The target segment size rowsPerSegment is specified as a number of rows, so it's most natural to specify the max average uncompacted size in terms of rows as well. Like, I might say that the target is 3M rows per segment but don't bother compacting if the segments average 2.5M already.

I recognize this requires additional metadata to be available that may not currently be available, so it doesn't have to be done in this PR. I'm just having a thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree that average number of rows per segment would be a better parameter for this function.
I have used the average bytes here as a proxy until we have the row count for each segment available in the metadata.

final long avgUncompactedSize = Math.max(1, uncompacted.getTotalBytes() / uncompacted.getNumSegments());

// Priority increases as size decreases and number increases
final double normalizingFactor = 1000f;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this factor? seems like it wouldn't affect the results.

Copy link
Contributor Author

@kfaraz kfaraz Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this doesn't serve any purpose any more.

I had started out keeping the priority as a long and used a multiplication factor in an effort to get whole number values. But I later realized that it would be difficult to have a universal factor that works for all cases, so best to use a double instead.


/**
* Computes the priority of the given compaction candidate by checking the
* total number and average size of uncompacted segments.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intuitively what does the priority "mean"? It's helpful to add that to the javadocs.

Reading through, it seems like the priority boils down to pow(uncompacted.getNumSegments, 2) / uncompacted.getTotalBytes. Why that particular formula?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests for this function would be helpful, to illustrate its behavior in various situations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will update the javadocs.

I wanted it to be directly proportional to the number of uncompacted segments and inversely proportional to the size of uncompacted segments.

So

priority
= uncompactedCount / avgUncompactedSize
= uncompactedCount * (uncompactedCount / uncompactedBytes)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other alternative would be to have something like

priority = factor1 * uncompactedCount + factor2 * avgUncompactedSize

but that seemed like it would be difficult to tune as no set of factors 1 and 2 would apply universally to all cases.

Copy link
Contributor Author

@kfaraz kfaraz Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intuitively what does the priority "mean"? It's helpful to add that to the javadocs.

Would it also make sense to rename the method as computeFragmentationIndex(), since essentially that is what we are trying to achieve here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the javadocs. Also updated the fragmentation index formula. Details in the PR description.

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a good place to start. We can evolve it based on experience in the real world.

@gianm gianm merged commit 36f3413 into apache:master Dec 5, 2025
57 checks passed
@kfaraz kfaraz deleted the new_compact_policy branch December 5, 2025 04:52
@kfaraz
Copy link
Contributor Author

kfaraz commented Dec 5, 2025

Thanks for the review, @gianm !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants