Skip to content
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
21004b4
Add byte buffer tracking for underlying hash tables
GWphua Nov 7, 2025
c935ea6
Byte buffer tracking for underlying offset handlers
GWphua Nov 7, 2025
c781910
Fix tests
GWphua Nov 7, 2025
7063d09
Fix quidem tests
GWphua Nov 10, 2025
19f6bc3
Documentation
GWphua Nov 10, 2025
0fcb6a0
bytesUsed naming
GWphua Nov 11, 2025
25f10d2
Add max metrics
GWphua Nov 24, 2025
b6ad3c2
Add missing calculation in BufferHashGrouper
GWphua Nov 24, 2025
28719eb
Checkstyle
GWphua Nov 24, 2025
59fe03c
Checkstyle
GWphua Nov 24, 2025
e6020a6
Merge remote-tracking branch 'origin/master' into group-by-query
GWphua Dec 31, 2025
507eecd
GroupByStatsProvider javadocs
GWphua Dec 31, 2025
9623e3a
Fix GroupByStatsProviderTest comments
GWphua Jan 12, 2026
ae40900
Fix doc order for GroupByStatsProvider metrics
GWphua Jan 12, 2026
400d0f4
Fix test for GroupByStatsMonitorTest
GWphua Jan 12, 2026
8f7b218
Update server/src/test/java/org/apache/druid/server/metrics/GroupBySt…
GWphua Jan 14, 2026
df3bf70
Revert stylistic changes in BufferHashGrouper
GWphua Jan 14, 2026
ac71a63
Rename mergeBufferUsage to mergeBufferUsedBytes
GWphua Jan 14, 2026
003da9c
Order of maxAcquisitionTimeNs
GWphua Jan 14, 2026
e416867
Track the open addressing hash table
GWphua Jan 14, 2026
cd38a05
Merge branch 'master' into group-by-query
GWphua Jan 16, 2026
a26c40a
Remove max metrics, push them in another PR...
GWphua Jan 21, 2026
9725532
Remove max metrics in GroupByStatsProviderTest
GWphua Jan 21, 2026
1455712
LimitedBufferHashGrouper to use parent method to report maxTableBuffe…
GWphua Jan 21, 2026
5db69c5
Standardised merge buffer names
GWphua Jan 21, 2026
9ce074a
Tests for buffer hash grouper
GWphua Jan 21, 2026
4f4a10a
Address multiplication cast
GWphua Jan 22, 2026
e92357d
Javadocs for getMergeBufferUsedBytes
GWphua Jan 22, 2026
d55e402
Remix comments in test for peak calculations
GWphua Jan 23, 2026
34eb62d
Merge branch 'master' into group-by-query
GWphua Jan 26, 2026
988de09
Clean up after merging conflicts
GWphua Jan 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,14 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxMergeDictionarySize`|Maximum size of the on-heap merge dictionary in bytes observed for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|

### Historical

Expand All @@ -113,9 +118,14 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxMergeDictionarySize`|Maximum size of the on-heap merge dictionary in bytes observed for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|

### Real-time

Expand All @@ -140,9 +150,14 @@ to represent the task ID are deprecated and will be removed in a future release.
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/maxMergeDictionarySize`|Maximum size of the on-heap merge dictionary in bytes observed for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|

### Jetty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import java.util.concurrent.atomic.AtomicLong;

/**
* Metrics collector for groupBy queries like spilled bytes, merge buffer acquistion time, dictionary size.
* Collects groupBy query metrics (spilled bytes, merge buffer usage, dictionary size) per-query, then
* aggregates them when queries complete. Stats are retrieved and reset periodically via {@link #getStatsSince()}.
*/
@LazySingleton
public class GroupByStatsProvider
Expand Down Expand Up @@ -60,34 +61,67 @@ public synchronized void closeQuery(QueryResourceId resourceId)

public synchronized AggregateStats getStatsSince()
{
return aggregateStatsContainer.reset();
AggregateStats aggregateStats = new AggregateStats(aggregateStatsContainer);
aggregateStatsContainer.reset();
return aggregateStats;
}

public static class AggregateStats
{
private long mergeBufferQueries = 0;
private long mergeBufferAcquisitionTimeNs = 0;
private long totalMergeBufferUsedBytes = 0;
private long maxMergeBufferAcquisitionTimeNs = 0;
private long maxMergeBufferUsedBytes = 0;
private long spilledQueries = 0;
private long spilledBytes = 0;
private long maxSpilledBytes = 0;
private long mergeDictionarySize = 0;
private long maxMergeDictionarySize = 0;

public AggregateStats()
{
}

public AggregateStats(AggregateStats aggregateStats)
{
this(
aggregateStats.mergeBufferQueries,
aggregateStats.mergeBufferAcquisitionTimeNs,
aggregateStats.totalMergeBufferUsedBytes,
aggregateStats.maxMergeBufferAcquisitionTimeNs,
aggregateStats.maxMergeBufferUsedBytes,
aggregateStats.spilledQueries,
aggregateStats.spilledBytes,
aggregateStats.maxSpilledBytes,
aggregateStats.mergeDictionarySize,
aggregateStats.maxMergeDictionarySize
);
}

public AggregateStats(
long mergeBufferQueries,
long mergeBufferAcquisitionTimeNs,
long totalMergeBufferUsedBytes,
long maxMergeBufferAcquisitionTimeNs,
long maxMergeBufferUsedBytes,
long spilledQueries,
long spilledBytes,
long mergeDictionarySize
long maxSpilledBytes,
long mergeDictionarySize,
long maxMergeDictionarySize
)
{
this.mergeBufferQueries = mergeBufferQueries;
this.mergeBufferAcquisitionTimeNs = mergeBufferAcquisitionTimeNs;
this.totalMergeBufferUsedBytes = totalMergeBufferUsedBytes;
this.maxMergeBufferAcquisitionTimeNs = maxMergeBufferAcquisitionTimeNs;
this.maxMergeBufferUsedBytes = maxMergeBufferUsedBytes;
this.spilledQueries = spilledQueries;
this.spilledBytes = spilledBytes;
this.maxSpilledBytes = maxSpilledBytes;
this.mergeDictionarySize = mergeDictionarySize;
this.maxMergeDictionarySize = maxMergeDictionarySize;
}

public long getMergeBufferQueries()
Expand All @@ -100,6 +134,21 @@ public long getMergeBufferAcquisitionTimeNs()
return mergeBufferAcquisitionTimeNs;
}

public long getMaxMergeBufferAcquisitionTimeNs()
{
return maxMergeBufferAcquisitionTimeNs;
}

public long getTotalMergeBufferUsedBytes()
{
return totalMergeBufferUsedBytes;
}

public long getMaxMergeBufferUsedBytes()
{
return maxMergeBufferUsedBytes;
}

public long getSpilledQueries()
{
return spilledQueries;
Expand All @@ -110,50 +159,63 @@ public long getSpilledBytes()
return spilledBytes;
}

public long getMaxSpilledBytes()
{
return maxSpilledBytes;
}

public long getMergeDictionarySize()
{
return mergeDictionarySize;
}

public long getMaxMergeDictionarySize()
{
return maxMergeDictionarySize;
}

public void addQueryStats(PerQueryStats perQueryStats)
{
if (perQueryStats.getMergeBufferAcquisitionTimeNs() > 0) {
mergeBufferQueries++;
mergeBufferAcquisitionTimeNs += perQueryStats.getMergeBufferAcquisitionTimeNs();
maxMergeBufferAcquisitionTimeNs = Math.max(
maxMergeBufferAcquisitionTimeNs,
perQueryStats.getMergeBufferAcquisitionTimeNs()
);
totalMergeBufferUsedBytes += perQueryStats.getMergeBufferTotalUsedBytes();
maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, perQueryStats.getMergeBufferTotalUsedBytes());
}

if (perQueryStats.getSpilledBytes() > 0) {
spilledQueries++;
spilledBytes += perQueryStats.getSpilledBytes();
maxSpilledBytes = Math.max(maxSpilledBytes, perQueryStats.getSpilledBytes());
}

mergeDictionarySize += perQueryStats.getMergeDictionarySize();
maxMergeDictionarySize = Math.max(maxMergeDictionarySize, perQueryStats.getMergeDictionarySize());
}

public AggregateStats reset()
public void reset()
{
AggregateStats aggregateStats =
new AggregateStats(
mergeBufferQueries,
mergeBufferAcquisitionTimeNs,
spilledQueries,
spilledBytes,
mergeDictionarySize
);

this.mergeBufferQueries = 0;
this.mergeBufferAcquisitionTimeNs = 0;
this.maxMergeBufferAcquisitionTimeNs = 0;
this.totalMergeBufferUsedBytes = 0;
this.maxMergeBufferUsedBytes = 0;
this.spilledQueries = 0;
this.spilledBytes = 0;
this.maxSpilledBytes = 0;
this.mergeDictionarySize = 0;

return aggregateStats;
this.maxMergeDictionarySize = 0;
}
}

public static class PerQueryStats
{
private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0);
private final AtomicLong mergeBufferTotalUsedBytes = new AtomicLong(0);
private final AtomicLong spilledBytes = new AtomicLong(0);
private final AtomicLong mergeDictionarySize = new AtomicLong(0);

Expand All @@ -162,6 +224,11 @@ public void mergeBufferAcquisitionTime(long delay)
mergeBufferAcquisitionTimeNs.addAndGet(delay);
}

public void mergeBufferTotalUsedBytes(long bytes)
{
mergeBufferTotalUsedBytes.addAndGet(bytes);
}

public void spilledBytes(long bytes)
{
spilledBytes.addAndGet(bytes);
Expand All @@ -177,6 +244,11 @@ public long getMergeBufferAcquisitionTimeNs()
return mergeBufferAcquisitionTimeNs.get();
}

public long getMergeBufferTotalUsedBytes()
{
return mergeBufferTotalUsedBytes.get();
}

public long getSpilledBytes()
{
return spilledBytes.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ public void close()
aggregators.reset();
}

/**
* This method is implemented to return the highest memory value claimed by the Grouper. This is only
* used for monitoring the size of the merge buffers used.
*/
public long getMergeBufferUsedBytes()
{
return hashTable.getMaxTableBufferUsedBytes();
}

/**
* Populate a {@link ReusableEntry} with values from a particular bucket.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.AbstractList;
import java.util.Collections;
Expand All @@ -50,7 +49,6 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
// to get a comparator that uses the ordering defined by the OrderByColumnSpec of a query.
private final boolean useDefaultSorting;

@Nullable
private ByteBufferIntList offsetList;

public BufferHashGrouper(
Expand Down Expand Up @@ -154,6 +152,18 @@ public void reset()
aggregators.reset();
}

@Override
public long getMergeBufferUsedBytes()
{
if (!initialized) {
return 0L;
}

long hashTableUsage = hashTable.getMaxTableBufferUsedBytes();
long offSetListUsage = offsetList.getMaxMergeBufferUsedBytes();
return hashTableUsage + offSetListUsage;
}

@Override
public CloseableIterator<Entry<KeyType>> iterator(boolean sorted)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@

import java.nio.ByteBuffer;

/**
* A fixed-width, open-addressing hash table that lives inside a caller-provided byte buffer.
* <p>
* The table uses a contiguous slice of the merge buffer as its backing store. Each bucket holds at most one entry,
* and occupies {@code bucketSizeWithHash} number of bytes. Collisions are resolved by continuously probing the
* next bucket to find an empty bucket to slot the new entry. The current table view is maintained as a
* {@link ByteBuffer} slice that moves and grows within the arena as the table expands.
*/
public class ByteBufferHashTable
{
public static int calculateTableArenaSizeWithPerBucketAdditionalSize(
Expand Down Expand Up @@ -79,6 +87,9 @@ public static int calculateTableArenaSizeWithFixedAdditionalSize(
@Nullable
protected BucketUpdateHandler bucketUpdateHandler;

// Keeps track on how many bytes is being used in the merge buffer.
protected long maxTableBufferUsedBytes;

public ByteBufferHashTable(
float maxLoadFactor,
int initialBuckets,
Expand All @@ -97,6 +108,7 @@ public ByteBufferHashTable(
this.maxSizeForTesting = maxSizeForTesting;
this.tableArenaSize = buffer.capacity();
this.bucketUpdateHandler = bucketUpdateHandler;
this.maxTableBufferUsedBytes = 0;
}

public void reset()
Expand Down Expand Up @@ -139,6 +151,7 @@ public void reset()
bufferDup.position(tableStart);
bufferDup.limit(tableStart + maxBuckets * bucketSizeWithHash);
tableBuffer = bufferDup.slice();
updateMaxTableBufferUsedBytes();

// Clear used bits of new table
for (int i = 0; i < maxBuckets; i++) {
Expand Down Expand Up @@ -245,6 +258,7 @@ protected void initializeNewBucketKey(
tableBuffer.putInt(Groupers.getUsedFlag(keyHash));
tableBuffer.put(keyBuffer);
size++;
updateMaxTableBufferUsedBytes();

if (bucketUpdateHandler != null) {
bucketUpdateHandler.handleNewBucket(offset);
Expand Down Expand Up @@ -381,6 +395,16 @@ public int getGrowthCount()
return growthCount;
}

protected void updateMaxTableBufferUsedBytes()
{
maxTableBufferUsedBytes = Math.max(maxTableBufferUsedBytes, (long) size * bucketSizeWithHash);
}

public long getMaxTableBufferUsedBytes()
{
return maxTableBufferUsedBytes;
}

public interface BucketUpdateHandler
{
void handleNewBucket(int bucketOffset);
Expand Down
Loading
Loading