Skip to content

Commit bbc62dc

Browse files
authored
add query metrics for vsf mode (apache#18727)
changes: * added `canLoadSegments()` method to `SegmentManager` and `SegmentCacheManager` * added 'query/load/batch/time' metric for 'wall time' spent waiting on all segments to load for a query * added 'query/load/time/avg' metric for average per segment load time for a query * added 'query/load/time/max' metric for max per segment load time for a query * added 'query/load/wait/avg' metric for average per segment wait time to begin loading for a query * added 'query/load/wait/max' metric for max per segment wait time to begin loading for a query * added 'query/load/count' metric for number of segments loaded on demand for a query * added 'query/load/bytes/total' metric for amount of data loaded on demand for a query
1 parent 38e5e3e commit bbc62dc

File tree

16 files changed

+492
-68
lines changed

16 files changed

+492
-68
lines changed

embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@
2424
import org.apache.druid.indexer.TaskState;
2525
import org.apache.druid.java.util.common.HumanReadableBytes;
2626
import org.apache.druid.java.util.common.StringUtils;
27+
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
2728
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
29+
import org.apache.druid.query.DefaultQueryMetrics;
2830
import org.apache.druid.query.DruidProcessingConfigTest;
31+
import org.apache.druid.server.metrics.LatchableEmitter;
2932
import org.apache.druid.sql.calcite.planner.Calcites;
3033
import org.apache.druid.testing.embedded.EmbeddedBroker;
3134
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
@@ -43,10 +46,12 @@
4346
import org.junit.jupiter.api.Test;
4447
import org.testcontainers.shaded.com.google.common.io.ByteStreams;
4548

49+
import javax.annotation.Nullable;
4650
import java.io.File;
4751
import java.io.IOException;
4852
import java.nio.file.Files;
4953
import java.util.Collections;
54+
import java.util.List;
5055
import java.util.concurrent.ThreadLocalRandom;
5156

5257
/**
@@ -140,6 +145,7 @@ void testQueryPartials()
140145
// "2015-09-12T08:00:00Z/2025-09-12T14:00:00Z"
141146
// "2015-09-12T14:00:00Z/2025-09-12T19:00:00Z"
142147
// "2015-09-12T19:00:00Z/2025-09-13T00:00:00Z"
148+
143149
final String[] queries = new String[]{
144150
"select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 00:00:00' and __time < TIMESTAMP '2015-09-12 08:00:00'",
145151
"select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 08:00:00' and __time < TIMESTAMP '2015-09-12 14:00:00'",
@@ -154,13 +160,79 @@ void testQueryPartials()
154160
};
155161

156162
Assertions.assertEquals(expectedResults[0], Long.parseLong(cluster.runSql(queries[0], dataSource)));
163+
assertMetrics(1, 8L);
157164
Assertions.assertEquals(expectedResults[1], Long.parseLong(cluster.runSql(queries[1], dataSource)));
165+
assertMetrics(2, 6L);
158166
Assertions.assertEquals(expectedResults[2], Long.parseLong(cluster.runSql(queries[2], dataSource)));
167+
assertMetrics(3, 5L);
159168
Assertions.assertEquals(expectedResults[3], Long.parseLong(cluster.runSql(queries[3], dataSource)));
169+
assertMetrics(4, 5L);
160170

161171
for (int i = 0; i < 1000; i++) {
162172
int nextQuery = ThreadLocalRandom.current().nextInt(queries.length);
163173
Assertions.assertEquals(expectedResults[nextQuery], Long.parseLong(cluster.runSql(queries[nextQuery], dataSource)));
174+
assertMetrics(i + 5, null);
175+
}
176+
}
177+
178+
private void assertMetrics(int expectedEventCount, @Nullable Long expectedLoadCount)
179+
{
180+
LatchableEmitter emitter = historical.latchableEmitter();
181+
final int lastIndex = expectedEventCount - 1;
182+
183+
List<ServiceMetricEvent> countEvents = emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_COUNT);
184+
Assertions.assertEquals(expectedEventCount, countEvents.size());
185+
if (expectedLoadCount != null) {
186+
Assertions.assertEquals(expectedLoadCount, countEvents.get(lastIndex).getValue());
187+
}
188+
boolean hasLoads = countEvents.get(lastIndex).getValue().longValue() > 0;
189+
190+
List<ServiceMetricEvent> timeEvents = emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_BATCH_TIME);
191+
Assertions.assertEquals(expectedEventCount, timeEvents.size());
192+
if (hasLoads) {
193+
Assertions.assertTrue(timeEvents.get(lastIndex).getValue().longValue() > 0);
194+
} else {
195+
Assertions.assertEquals(0, timeEvents.get(lastIndex).getValue().longValue());
196+
}
197+
198+
List<ServiceMetricEvent> timeMaxEvents = emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_TIME_MAX);
199+
Assertions.assertEquals(expectedEventCount, timeMaxEvents.size());
200+
if (hasLoads) {
201+
Assertions.assertTrue(timeMaxEvents.get(lastIndex).getValue().longValue() > 0);
202+
} else {
203+
Assertions.assertEquals(0, timeMaxEvents.get(lastIndex).getValue().longValue());
204+
}
205+
206+
List<ServiceMetricEvent> timeAvgEvents = emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_TIME_AVG);
207+
Assertions.assertEquals(expectedEventCount, timeAvgEvents.size());
208+
if (hasLoads) {
209+
Assertions.assertTrue(timeAvgEvents.get(lastIndex).getValue().longValue() > 0);
210+
} else {
211+
Assertions.assertEquals(0, timeAvgEvents.get(lastIndex).getValue().longValue());
212+
}
213+
214+
List<ServiceMetricEvent> waitMaxEvents = emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_WAIT_TIME_MAX);
215+
Assertions.assertEquals(expectedEventCount, waitMaxEvents.size());
216+
if (hasLoads) {
217+
Assertions.assertTrue(waitMaxEvents.get(lastIndex).getValue().longValue() >= 0);
218+
} else {
219+
Assertions.assertEquals(0, waitMaxEvents.get(lastIndex).getValue().longValue());
220+
}
221+
222+
List<ServiceMetricEvent> waitAvgEvents = emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_WAIT_TIME_AVG);
223+
Assertions.assertEquals(expectedEventCount, waitAvgEvents.size());
224+
if (hasLoads) {
225+
Assertions.assertTrue(waitAvgEvents.get(lastIndex).getValue().longValue() >= 0);
226+
} else {
227+
Assertions.assertEquals(0, waitAvgEvents.get(lastIndex).getValue().longValue());
228+
}
229+
230+
List<ServiceMetricEvent> loadSizeEvents = emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_BYTES);
231+
Assertions.assertEquals(expectedEventCount, loadSizeEvents.size());
232+
if (hasLoads) {
233+
Assertions.assertTrue(loadSizeEvents.get(lastIndex).getValue().longValue() > 0);
234+
} else {
235+
Assertions.assertEquals(0, loadSizeEvents.get(lastIndex).getValue().longValue());
164236
}
165237
}
166238

processing/src/main/java/org/apache/druid/query/DataSegmentAndDescriptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@
2525

2626
public class DataSegmentAndDescriptor
2727
{
28+
public static DataSegmentAndDescriptor missing(SegmentDescriptor descriptor)
29+
{
30+
return new DataSegmentAndDescriptor(null, descriptor);
31+
}
32+
2833
@Nullable
2934
private final DataSegment dataSegment;
3035
private final SegmentDescriptor descriptor;

processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,20 @@
4242
*/
4343
public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMetrics<QueryType>
4444
{
45+
public static final String QUERY_TIME = "query/time";
46+
public static final String QUERY_BYTES = "query/bytes";
47+
public static final String QUERY_CPU_TIME = "query/cpu/time";
4548
public static final String QUERY_WAIT_TIME = "query/wait/time";
4649
public static final String QUERY_SEGMENT_TIME = "query/segment/time";
4750
public static final String QUERY_SEGMENT_AND_CACHE_TIME = "query/segmentAndCache/time";
4851
public static final String QUERY_RESULT_CACHE_HIT = "query/resultCache/hit";
52+
public static final String QUERY_ON_DEMAND_LOAD_BATCH_TIME = "query/load/batch/time";
53+
public static final String QUERY_ON_DEMAND_LOAD_TIME_AVG = "query/load/time/avg";
54+
public static final String QUERY_ON_DEMAND_LOAD_TIME_MAX = "query/load/time/max";
55+
public static final String QUERY_ON_DEMAND_WAIT_TIME_AVG = "query/load/wait/avg";
56+
public static final String QUERY_ON_DEMAND_WAIT_TIME_MAX = "query/load/wait/max";
57+
public static final String QUERY_ON_DEMAND_LOAD_COUNT = "query/load/count";
58+
public static final String QUERY_ON_DEMAND_LOAD_BYTES = "query/load/bytes/total";
4959

5060
protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
5161
protected final Map<String, Number> metrics = new HashMap<>();
@@ -240,13 +250,13 @@ public BitmapResultFactory<?> makeBitmapResultFactory(BitmapFactory factory)
240250
@Override
241251
public QueryMetrics<QueryType> reportQueryTime(long timeNs)
242252
{
243-
return reportMillisTimeMetric("query/time", timeNs);
253+
return reportMillisTimeMetric(QUERY_TIME, timeNs);
244254
}
245255

246256
@Override
247257
public QueryMetrics<QueryType> reportQueryBytes(long byteCount)
248258
{
249-
return reportMetric("query/bytes", byteCount);
259+
return reportMetric(QUERY_BYTES, byteCount);
250260
}
251261

252262
@Override
@@ -267,6 +277,48 @@ public QueryMetrics<QueryType> reportSegmentAndCacheTime(long timeNs)
267277
return reportMillisTimeMetric(QUERY_SEGMENT_AND_CACHE_TIME, timeNs);
268278
}
269279

280+
@Override
281+
public QueryMetrics<QueryType> reportSegmentOnDemandLoadTime(long timeNs)
282+
{
283+
return reportMillisTimeMetric(QUERY_ON_DEMAND_LOAD_BATCH_TIME, timeNs);
284+
}
285+
286+
@Override
287+
public QueryMetrics<QueryType> reportSegmentOnDemandLoadTimeAvg(long timeNs)
288+
{
289+
return reportMillisTimeMetric(QUERY_ON_DEMAND_LOAD_TIME_AVG, timeNs);
290+
}
291+
292+
@Override
293+
public QueryMetrics<QueryType> reportSegmentOnDemandLoadWaitTimeMax(long timeNs)
294+
{
295+
return reportMillisTimeMetric(QUERY_ON_DEMAND_WAIT_TIME_MAX, timeNs);
296+
}
297+
298+
@Override
299+
public QueryMetrics<QueryType> reportSegmentOnDemandLoadWaitTimeAvg(long timeNs)
300+
{
301+
return reportMillisTimeMetric(QUERY_ON_DEMAND_WAIT_TIME_AVG, timeNs);
302+
}
303+
304+
@Override
305+
public QueryMetrics<QueryType> reportSegmentOnDemandLoadTimeMax(long timeNs)
306+
{
307+
return reportMillisTimeMetric(QUERY_ON_DEMAND_LOAD_TIME_MAX, timeNs);
308+
}
309+
310+
@Override
311+
public QueryMetrics<QueryType> reportSegmentOnDemandLoadBytes(long byteCount)
312+
{
313+
return reportMetric(QUERY_ON_DEMAND_LOAD_BYTES, byteCount);
314+
}
315+
316+
@Override
317+
public QueryMetrics<QueryType> reportSegmentOnDemandLoadCount(long count)
318+
{
319+
return reportMetric(QUERY_ON_DEMAND_LOAD_COUNT, count);
320+
}
321+
270322
@Override
271323
public QueryMetrics<QueryType> reportResultCachePoll(boolean hit)
272324
{
@@ -276,7 +328,7 @@ public QueryMetrics<QueryType> reportResultCachePoll(boolean hit)
276328
@Override
277329
public QueryMetrics<QueryType> reportCpuTime(long timeNs)
278330
{
279-
return reportMetric("query/cpu/time", TimeUnit.NANOSECONDS.toMicros(timeNs));
331+
return reportMetric(QUERY_CPU_TIME, TimeUnit.NANOSECONDS.toMicros(timeNs));
280332
}
281333

282334
@Override

processing/src/main/java/org/apache/druid/query/QueryMetrics.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,52 @@ default void filterBundle(FilterBundle.BundleInfo bundleInfo)
361361
*/
362362
QueryMetrics<QueryType> reportSegmentAndCacheTime(long timeNs);
363363

364+
365+
/**
366+
* Registers the apparent time spent loading segments on demand, before queing up for processing. This measurement is
367+
* wall-clock time to when the last segment is finished loading and ready for processing.
368+
* <p>
369+
* Emitted once per query
370+
*/
371+
QueryMetrics<QueryType> reportSegmentOnDemandLoadTime(long timeNs);
372+
/**
373+
* Registers the average time spent loading segments on demand across threads.
374+
* <p>
375+
* Emitted once per query
376+
*/
377+
QueryMetrics<QueryType> reportSegmentOnDemandLoadTimeAvg(long timeNs);
378+
/**
379+
* Registers the maximum time spent loading segments on demand across all load threads.
380+
* <p>
381+
* Emitted once per query
382+
*/
383+
QueryMetrics<QueryType> reportSegmentOnDemandLoadWaitTimeMax(long timeNs);
384+
/**
385+
* Registers the average time spent waiting for a thread to start loading segments on demand across threads.
386+
* <p>
387+
* Emitted once per query
388+
*/
389+
QueryMetrics<QueryType> reportSegmentOnDemandLoadWaitTimeAvg(long timeNs);
390+
/**
391+
* Registers the maximum time spent waiting for a thread to start loading segments on demand across all load threads.
392+
* <p>
393+
* Emitted once per query
394+
*/
395+
QueryMetrics<QueryType> reportSegmentOnDemandLoadTimeMax(long timeNs);
396+
/**
397+
* Registers the total number of bytes added to the cache when loading segments on demand, summing the sizes loaded by
398+
* individual segement load threads.
399+
* <p>
400+
* Emitted once per query
401+
*/
402+
QueryMetrics<QueryType> reportSegmentOnDemandLoadBytes(long byteCount);
403+
/**
404+
* Registers the total numer of segments loaded on demand.
405+
* <p>
406+
* Emitted once per query
407+
*/
408+
QueryMetrics<QueryType> reportSegmentOnDemandLoadCount(long count);
409+
364410
/**
365411
* Emits iff a given query polled the result-level cache and the success of that operation.
366412
*/

processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,48 @@ public QueryMetrics reportSegmentAndCacheTime(long timeNs)
223223
return delegateQueryMetrics.reportSegmentAndCacheTime(timeNs);
224224
}
225225

226+
@Override
227+
public QueryMetrics reportSegmentOnDemandLoadTime(long timeNs)
228+
{
229+
return delegateQueryMetrics.reportSegmentOnDemandLoadTime(timeNs);
230+
}
231+
232+
@Override
233+
public QueryMetrics reportSegmentOnDemandLoadTimeAvg(long timeNs)
234+
{
235+
return delegateQueryMetrics.reportSegmentOnDemandLoadTimeAvg(timeNs);
236+
}
237+
238+
@Override
239+
public QueryMetrics reportSegmentOnDemandLoadWaitTimeMax(long timeNs)
240+
{
241+
return delegateQueryMetrics.reportSegmentOnDemandLoadWaitTimeMax(timeNs);
242+
}
243+
244+
@Override
245+
public QueryMetrics reportSegmentOnDemandLoadWaitTimeAvg(long timeNs)
246+
{
247+
return delegateQueryMetrics.reportSegmentOnDemandLoadWaitTimeAvg(timeNs);
248+
}
249+
250+
@Override
251+
public QueryMetrics reportSegmentOnDemandLoadTimeMax(long timeNs)
252+
{
253+
return delegateQueryMetrics.reportSegmentOnDemandLoadTimeMax(timeNs);
254+
}
255+
256+
@Override
257+
public QueryMetrics reportSegmentOnDemandLoadBytes(long byteCount)
258+
{
259+
return delegateQueryMetrics.reportSegmentOnDemandLoadBytes(byteCount);
260+
}
261+
262+
@Override
263+
public QueryMetrics reportSegmentOnDemandLoadCount(long count)
264+
{
265+
return delegateQueryMetrics.reportSegmentOnDemandLoadCount(count);
266+
}
267+
226268
@Override
227269
public QueryMetrics reportResultCachePoll(boolean hit)
228270
{

processing/src/main/java/org/apache/druid/segment/loading/AcquireSegmentAction.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import javax.annotation.Nullable;
2929
import java.io.Closeable;
3030
import java.io.IOException;
31-
import java.util.Optional;
3231
import java.util.concurrent.atomic.AtomicBoolean;
3332
import java.util.function.Supplier;
3433

@@ -53,16 +52,16 @@ public class AcquireSegmentAction implements Closeable
5352
{
5453
public static AcquireSegmentAction missingSegment()
5554
{
56-
return new AcquireSegmentAction(() -> Futures.immediateFuture(Optional::empty), null);
55+
return new AcquireSegmentAction(() -> Futures.immediateFuture(AcquireSegmentResult.empty()), null);
5756
}
5857

59-
private final Supplier<ListenableFuture<ReferenceCountedObjectProvider<Segment>>> segmentFutureSupplier;
58+
private final Supplier<ListenableFuture<AcquireSegmentResult>> segmentFutureSupplier;
6059
@Nullable
6160
private final Closeable loadCleanup;
6261
private final AtomicBoolean closed = new AtomicBoolean(false);
6362

6463
public AcquireSegmentAction(
65-
Supplier<ListenableFuture<ReferenceCountedObjectProvider<Segment>>> segmentFutureSupplier,
64+
Supplier<ListenableFuture<AcquireSegmentResult>> segmentFutureSupplier,
6665
@Nullable Closeable loadCleanup
6766
)
6867
{
@@ -76,7 +75,7 @@ public AcquireSegmentAction(
7675
* either as an immediate future if the segment already exists in cache. The 'action' to fetch the segment and return
7776
* the reference provider is not initiated until this method is called.
7877
*/
79-
public ListenableFuture<ReferenceCountedObjectProvider<Segment>> getSegmentFuture()
78+
public ListenableFuture<AcquireSegmentResult> getSegmentFuture()
8079
{
8180
return segmentFutureSupplier.get();
8281
}

0 commit comments

Comments
 (0)