Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
21004b4
Add byte buffer tracking for underlying hash tables
GWphua Nov 7, 2025
c935ea6
Byte buffer tracking for underlying offset handlers
GWphua Nov 7, 2025
c781910
Fix tests
GWphua Nov 7, 2025
7063d09
Fix quidem tests
GWphua Nov 10, 2025
19f6bc3
Documentation
GWphua Nov 10, 2025
0fcb6a0
bytesUsed naming
GWphua Nov 11, 2025
25f10d2
Add max metrics
GWphua Nov 24, 2025
b6ad3c2
Add missing calculation in BufferHashGrouper
GWphua Nov 24, 2025
28719eb
Checkstyle
GWphua Nov 24, 2025
59fe03c
Checkstyle
GWphua Nov 24, 2025
e6020a6
Merge remote-tracking branch 'origin/master' into group-by-query
GWphua Dec 31, 2025
507eecd
GroupByStatsProvider javadocs
GWphua Dec 31, 2025
9623e3a
Fix GroupByStatsProviderTest comments
GWphua Jan 12, 2026
ae40900
Fix doc order for GroupByStatsProvider metrics
GWphua Jan 12, 2026
400d0f4
Fix test for GroupByStatsMonitorTest
GWphua Jan 12, 2026
8f7b218
Update server/src/test/java/org/apache/druid/server/metrics/GroupBySt…
GWphua Jan 14, 2026
df3bf70
Revert stylistic changes in BufferHashGrouper
GWphua Jan 14, 2026
ac71a63
Rename mergeBufferUsage to mergeBufferUsedBytes
GWphua Jan 14, 2026
003da9c
Order of maxAcquisitionTimeNs
GWphua Jan 14, 2026
e416867
Track the open addressing hash table
GWphua Jan 14, 2026
cd38a05
Merge branch 'master' into group-by-query
GWphua Jan 16, 2026
a26c40a
Remove max metrics, push them in another PR...
GWphua Jan 21, 2026
9725532
Remove max metrics in GroupByStatsProviderTest
GWphua Jan 21, 2026
1455712
LimitedBufferHashGrouper to use parent method to report maxTableBuffe…
GWphua Jan 21, 2026
5db69c5
Standardised merge buffer names
GWphua Jan 21, 2026
9ce074a
Tests for buffer hash grouper
GWphua Jan 21, 2026
4f4a10a
Address multiplication cast
GWphua Jan 22, 2026
e92357d
Javadocs for getMergeBufferUsedBytes
GWphua Jan 22, 2026
d55e402
Remix comments in test for peak calculations
GWphua Jan 23, 2026
34eb62d
Merge branch 'master' into group-by-query
GWphua Jan 26, 2026
988de09
Clean up after merging conflicts
GWphua Jan 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,14 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxMergeDictionarySize`|Maximum size of the on-heap merge dictionary in bytes observed for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|

### Historical

Expand All @@ -113,9 +118,14 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxMergeDictionarySize`|Maximum size of the on-heap merge dictionary in bytes observed for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|

### Real-time

Expand All @@ -140,9 +150,14 @@ to represent the task ID are deprecated and will be removed in a future release.
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/maxMergeDictionarySize`|Maximum size of the on-heap merge dictionary in bytes observed for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|

### Jetty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import java.util.concurrent.atomic.AtomicLong;

/**
* Metrics collector for groupBy queries like spilled bytes, merge buffer acquistion time, dictionary size.
* Collects groupBy query metrics (spilled bytes, merge buffer usage, dictionary size) per-query, then
* aggregates them when queries complete. Stats are retrieved and reset periodically via {@link #getStatsSince()}.
*/
@LazySingleton
public class GroupByStatsProvider
Expand Down Expand Up @@ -60,34 +61,67 @@ public synchronized void closeQuery(QueryResourceId resourceId)

public synchronized AggregateStats getStatsSince()
{
return aggregateStatsContainer.reset();
AggregateStats aggregateStats = new AggregateStats(aggregateStatsContainer);
aggregateStatsContainer.reset();
return aggregateStats;
}

public static class AggregateStats
{
private long mergeBufferQueries = 0;
private long mergeBufferAcquisitionTimeNs = 0;
private long mergeBufferTotalUsage = 0;
private long maxMergeBufferAcquisitionTimeNs = 0;
private long maxMergeBufferUsage = 0;
private long spilledQueries = 0;
private long spilledBytes = 0;
private long maxSpilledBytes = 0;
private long mergeDictionarySize = 0;
private long maxMergeDictionarySize = 0;

public AggregateStats()
{
}

public AggregateStats(AggregateStats aggregateStats)
{
this(
aggregateStats.mergeBufferQueries,
aggregateStats.mergeBufferAcquisitionTimeNs,
aggregateStats.mergeBufferTotalUsage,
aggregateStats.maxMergeBufferAcquisitionTimeNs,
aggregateStats.maxMergeBufferUsage,
aggregateStats.spilledQueries,
aggregateStats.spilledBytes,
aggregateStats.maxSpilledBytes,
aggregateStats.mergeDictionarySize,
aggregateStats.maxMergeDictionarySize
);
}

public AggregateStats(
long mergeBufferQueries,
long mergeBufferAcquisitionTimeNs,
long mergeBufferTotalUsage,
long maxMergeBufferAcquisitionTimeNs,
long maxMergeBufferUsage,
long spilledQueries,
long spilledBytes,
long mergeDictionarySize
long maxSpilledBytes,
long mergeDictionarySize,
long maxMergeDictionarySize
)
{
this.mergeBufferQueries = mergeBufferQueries;
this.mergeBufferAcquisitionTimeNs = mergeBufferAcquisitionTimeNs;
this.mergeBufferTotalUsage = mergeBufferTotalUsage;
this.maxMergeBufferAcquisitionTimeNs = maxMergeBufferAcquisitionTimeNs;
this.maxMergeBufferUsage = maxMergeBufferUsage;
this.spilledQueries = spilledQueries;
this.spilledBytes = spilledBytes;
this.maxSpilledBytes = maxSpilledBytes;
this.mergeDictionarySize = mergeDictionarySize;
this.maxMergeDictionarySize = maxMergeDictionarySize;
}

public long getMergeBufferQueries()
Expand All @@ -100,6 +134,21 @@ public long getMergeBufferAcquisitionTimeNs()
return mergeBufferAcquisitionTimeNs;
}

public long getMergeBufferTotalUsage()
{
return mergeBufferTotalUsage;
}

public long getMaxMergeBufferAcquisitionTimeNs()
{
return maxMergeBufferAcquisitionTimeNs;
}

public long getMaxMergeBufferUsage()
{
return maxMergeBufferUsage;
}

public long getSpilledQueries()
{
return spilledQueries;
Expand All @@ -110,50 +159,63 @@ public long getSpilledBytes()
return spilledBytes;
}

public long getMaxSpilledBytes()
{
return maxSpilledBytes;
}

public long getMergeDictionarySize()
{
return mergeDictionarySize;
}

public long getMaxMergeDictionarySize()
{
return maxMergeDictionarySize;
}

public void addQueryStats(PerQueryStats perQueryStats)
{
if (perQueryStats.getMergeBufferAcquisitionTimeNs() > 0) {
mergeBufferQueries++;
mergeBufferAcquisitionTimeNs += perQueryStats.getMergeBufferAcquisitionTimeNs();
maxMergeBufferAcquisitionTimeNs = Math.max(
maxMergeBufferAcquisitionTimeNs,
perQueryStats.getMergeBufferAcquisitionTimeNs()
);
mergeBufferTotalUsage += perQueryStats.getMergeBufferTotalUsage();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GWphua Instead of summing here, what do you think about taking the max? Then the metric emitted would be the max merge buffer usage of a single query in that emission period. This would be a good signal for operators on whether they need to tweak the mergeBuffer size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this.

There are other metrics where taking MAX will also make sense --
spilledBytes --> How much storage would be good to configure?
dictionarySize --> How large can the merge dictionary size get?

I am considering adding another metric (maxSpilledBytes, maxDictionarySize, maxSpilledBytes). What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agreed. I do think it makes sense to have it for those 3 metrics

Even for mergeBuffer/acquisitionTimeNs I think there's value in having the max, as it gives operators a signal on whether to increase numMergeBuffers

maxMergeBufferUsage = Math.max(maxMergeBufferUsage, perQueryStats.getMergeBufferTotalUsage());
}

if (perQueryStats.getSpilledBytes() > 0) {
spilledQueries++;
spilledBytes += perQueryStats.getSpilledBytes();
maxSpilledBytes = Math.max(maxSpilledBytes, perQueryStats.getSpilledBytes());
}

mergeDictionarySize += perQueryStats.getMergeDictionarySize();
maxMergeDictionarySize = Math.max(maxMergeDictionarySize, perQueryStats.getMergeDictionarySize());
}

public AggregateStats reset()
public void reset()
{
AggregateStats aggregateStats =
new AggregateStats(
mergeBufferQueries,
mergeBufferAcquisitionTimeNs,
spilledQueries,
spilledBytes,
mergeDictionarySize
);

this.mergeBufferQueries = 0;
this.mergeBufferAcquisitionTimeNs = 0;
this.mergeBufferTotalUsage = 0;
this.maxMergeBufferAcquisitionTimeNs = 0;
this.maxMergeBufferUsage = 0;
this.spilledQueries = 0;
this.spilledBytes = 0;
this.maxSpilledBytes = 0;
this.mergeDictionarySize = 0;

return aggregateStats;
this.maxMergeDictionarySize = 0;
}
}

public static class PerQueryStats
{
private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0);
private final AtomicLong mergeBufferTotalUsage = new AtomicLong(0);
private final AtomicLong spilledBytes = new AtomicLong(0);
private final AtomicLong mergeDictionarySize = new AtomicLong(0);

Expand All @@ -162,6 +224,11 @@ public void mergeBufferAcquisitionTime(long delay)
mergeBufferAcquisitionTimeNs.addAndGet(delay);
}

public void mergeBufferTotalUsage(long bytes)
{
mergeBufferTotalUsage.addAndGet(bytes);
}

public void spilledBytes(long bytes)
{
spilledBytes.addAndGet(bytes);
Expand All @@ -177,6 +244,11 @@ public long getMergeBufferAcquisitionTimeNs()
return mergeBufferAcquisitionTimeNs.get();
}

public long getMergeBufferTotalUsage()
{
return mergeBufferTotalUsage.get();
}

public long getSpilledBytes()
{
return spilledBytes.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ public void close()
aggregators.reset();
}

/**
* This method is implemented to return the highest memory value claimed by the Grouper. This is only
* used for monitoring the size of the merge buffers used.
*/
public long getMergeBufferUsage()
{
return hashTable.getMaxTableBufferUsage();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move this to the Grouper interface and have a default implementation for it. Override it in all the concrete classes as needed:

  default long getMergeBufferUsage()
  {
    return 0l;
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, getMergeBufferUsage() seems a bit vague. What do you think about getMaxMergeBufferUsedBytes()? Once decided on a name, please use the name across the board for consistency

Copy link
Contributor

@abhishekrb19 abhishekrb19 Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of these comments related to merge buffer usage may be stale. Please see #18731 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new name getMaxMergeBufferUsedBytes() sounds great, used it.

For the Groupers interface, please see my above comments about AbstractBufferHashGrouper in SpillingGrouper


/**
* Populate a {@link ReusableEntry} with values from a particular bucket.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.AbstractList;
import java.util.Collections;
Expand All @@ -50,7 +49,6 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
// to get a comparator that uses the ordering defined by the OrderByColumnSpec of a query.
private final boolean useDefaultSorting;

@Nullable
private ByteBufferIntList offsetList;

public BufferHashGrouper(
Expand Down Expand Up @@ -154,6 +152,18 @@ public void reset()
aggregators.reset();
}

@Override
public long getMergeBufferUsage()
{
if (!initialized) {
return 0L;
}

long hashTableUsage = hashTable.getMaxTableBufferUsage();
long offSetListUsage = offsetList.getMaxMergeBufferUsageBytes();
return hashTableUsage + offSetListUsage;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment, I think this would just more or less tell us configured size rather than actual buffer usage. (more or less because of offset list tracking)

}

@Override
public CloseableIterator<Entry<KeyType>> iterator(boolean sorted)
{
Expand Down Expand Up @@ -199,18 +209,15 @@ public int size()
}

// Sort offsets in-place.
Collections.sort(
wrappedOffsets,
(lhs, rhs) -> {
final ByteBuffer tableBuffer = hashTable.getTableBuffer();
return comparator.compare(
tableBuffer,
tableBuffer,
lhs + HASH_SIZE,
rhs + HASH_SIZE
);
}
);
wrappedOffsets.sort((lhs, rhs) -> {
final ByteBuffer tableBuffer = hashTable.getTableBuffer();
return comparator.compare(
tableBuffer,
tableBuffer,
lhs + HASH_SIZE,
rhs + HASH_SIZE
);
});

return new CloseableIterator<>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public static int calculateTableArenaSizeWithFixedAdditionalSize(
@Nullable
protected BucketUpdateHandler bucketUpdateHandler;

// Keeps track on how many bytes is being used in the merge buffer.
protected long maxTableBufferUsage;

public ByteBufferHashTable(
float maxLoadFactor,
int initialBuckets,
Expand All @@ -97,6 +100,7 @@ public ByteBufferHashTable(
this.maxSizeForTesting = maxSizeForTesting;
this.tableArenaSize = buffer.capacity();
this.bucketUpdateHandler = bucketUpdateHandler;
this.maxTableBufferUsage = 0;
}

public void reset()
Expand Down Expand Up @@ -139,6 +143,7 @@ public void reset()
bufferDup.position(tableStart);
bufferDup.limit(tableStart + maxBuckets * bucketSizeWithHash);
tableBuffer = bufferDup.slice();
updateMaxTableBufferUsage();

// Clear used bits of new table
for (int i = 0; i < maxBuckets; i++) {
Expand Down Expand Up @@ -225,6 +230,7 @@ public void adjustTableWhenFull()
maxBuckets = newBuckets;
regrowthThreshold = newMaxSize;
tableBuffer = newTableBuffer;
updateMaxTableBufferUsage();
tableStart = newTableStart;

growthCount++;
Expand Down Expand Up @@ -381,6 +387,16 @@ public int getGrowthCount()
return growthCount;
}

protected void updateMaxTableBufferUsage()
{
maxTableBufferUsage = Math.max(maxTableBufferUsage, tableBuffer.capacity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm missing something, the issue #17902 was created to actually get some visibility into actual merge buffer usage, but this metric would just tell us how much was actually configured instead?

tableBuffer.capacity() would just indicate the capacity of the buffers, so more or less what was configured via druid.processing.buffer.sizeBytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right about the current metric reporting the allocation.

I relooked the implementation, and found that maybe we can better estimate the ByteBufferHashTable. Since the ByteBufferHashTable is an open-addressing hash table, we can use the number of elements in table (size) * space taken up per bucket (bucketSizeWithHash) to estimate the usage in bytes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the AlternatingByteBufferHashTable inherit the max metrics reporting from the superclass. Should now accurately report the usage :)

}

public long getMaxTableBufferUsage()
{
return maxTableBufferUsage;
}

public interface BucketUpdateHandler
{
void handleNewBucket(int bucketOffset);
Expand Down
Loading
Loading