Skip to content

Commit 71fbba6

Browse files
authored
Emit supervisorId dimension with streaming task metrics (#18803)
Changes: - Add dimension `supervisorId` to all metrics emitted by streaming tasks - Simplify constructor of `TaskRealtimeMetricsMonitor` - Remove `TaskRealtimeMetricsMonitorBuilder`
1 parent bbaaa6a commit 71fbba6

File tree

14 files changed

+77
-92
lines changed

14 files changed

+77
-92
lines changed

embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.junit.jupiter.api.Test;
5252
import org.junit.jupiter.api.Timeout;
5353

54-
import java.util.List;
5554
import java.util.Map;
5655
import java.util.Set;
5756
import java.util.stream.Collectors;
@@ -154,7 +153,8 @@ public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues()
154153
// Wait for segments to be handed off
155154
indexer.latchableEmitter().waitForEventAggregate(
156155
event -> event.hasMetricName("ingest/handoff/count")
157-
.hasDimension(DruidMetrics.DATASOURCE, List.of(dataSource)),
156+
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
157+
.hasDimension(DruidMetrics.SUPERVISOR_ID, supervisorId),
158158
agg -> agg.hasSumAtLeast(expectedSegmentsHandedOff)
159159
);
160160

embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ protected void setupLookups() throws Exception
145145
// Wait for it to be loaded.
146146
indexer.latchableEmitter().waitForEventAggregate(
147147
event -> event.hasMetricName("ingest/events/processed")
148-
.hasDimension(DruidMetrics.DATASOURCE, Collections.singletonList(dataSource)),
148+
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
149149
agg -> agg.hasSumAtLeast(totalRows)
150150
);
151151
broker.latchableEmitter().waitForEvent(

embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeUnnestQueryTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.junit.jupiter.api.Test;
3838
import org.junit.jupiter.api.Timeout;
3939

40-
import java.util.Collections;
4140
import java.util.List;
4241

4342
public class EmbeddedMSQRealtimeUnnestQueryTest extends BaseRealtimeQueryTest
@@ -98,7 +97,7 @@ void setupAll()
9897
// Wait for it to be loaded.
9998
indexer.latchableEmitter().waitForEventAggregate(
10099
event -> event.hasMetricName("ingest/events/processed")
101-
.hasDimension(DruidMetrics.DATASOURCE, Collections.singletonList(dataSource)),
100+
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
102101
agg -> agg.hasSumAtLeast(totalRows)
103102
);
104103
}

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void test_runKafkaSupervisor()
127127
Assertions.assertTrue(supervisorStatus.isSuspended());
128128
indexer.latchableEmitter().waitForEventAggregate(
129129
event -> event.hasMetricName("ingest/handoff/count")
130-
.hasDimension(DruidMetrics.DATASOURCE, List.of(dataSource)),
130+
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
131131
agg -> agg.hasSumAtLeast(expectedSegments)
132132
);
133133
overlord.latchableEmitter().waitForEventAggregate(

indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java

Lines changed: 0 additions & 53 deletions
This file was deleted.

indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,14 @@
1919

2020
package org.apache.druid.indexing.common.stats;
2121

22-
import com.google.common.collect.ImmutableMap;
2322
import org.apache.druid.java.util.emitter.EmittingLogger;
2423
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
2524
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
2625
import org.apache.druid.java.util.metrics.AbstractMonitor;
27-
import org.apache.druid.java.util.metrics.MonitorUtils;
28-
import org.apache.druid.query.DruidMetrics;
2926
import org.apache.druid.segment.incremental.RowIngestionMeters;
3027
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
3128
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
3229

33-
import javax.annotation.Nullable;
34-
import java.util.Map;
35-
3630
/**
3731
* Emits metrics from {@link SegmentGenerationMetrics} and {@link RowIngestionMeters}.
3832
*/
@@ -42,24 +36,20 @@ public class TaskRealtimeMetricsMonitor extends AbstractMonitor
4236

4337
private final SegmentGenerationMetrics segmentGenerationMetrics;
4438
private final RowIngestionMeters rowIngestionMeters;
45-
private final Map<String, String[]> dimensions;
46-
@Nullable
47-
private final Map<String, Object> metricTags;
39+
private final ServiceMetricEvent.Builder builder;
4840

4941
private SegmentGenerationMetrics previousSegmentGenerationMetrics;
5042
private RowIngestionMetersTotals previousRowIngestionMetersTotals;
5143

5244
public TaskRealtimeMetricsMonitor(
5345
SegmentGenerationMetrics segmentGenerationMetrics,
5446
RowIngestionMeters rowIngestionMeters,
55-
Map<String, String[]> dimensions,
56-
@Nullable Map<String, Object> metricTags
47+
ServiceMetricEvent.Builder metricEventBuilder
5748
)
5849
{
5950
this.segmentGenerationMetrics = segmentGenerationMetrics;
6051
this.rowIngestionMeters = rowIngestionMeters;
61-
this.dimensions = ImmutableMap.copyOf(dimensions);
62-
this.metricTags = metricTags;
52+
this.builder = metricEventBuilder;
6353
previousSegmentGenerationMetrics = new SegmentGenerationMetrics();
6454
previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 0, 0);
6555
}
@@ -70,17 +60,13 @@ public boolean doMonitor(ServiceEmitter emitter)
7060
SegmentGenerationMetrics metrics = segmentGenerationMetrics.snapshot();
7161
RowIngestionMetersTotals rowIngestionMetersTotals = rowIngestionMeters.getTotals();
7262

73-
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
74-
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
75-
7663
final long thrownAway = rowIngestionMetersTotals.getThrownAway() - previousRowIngestionMetersTotals.getThrownAway();
7764
if (thrownAway > 0) {
7865
log.warn(
7966
"[%,d] events thrown away. Possible causes: null events, events filtered out by transformSpec, or events outside earlyMessageRejectionPeriod / lateMessageRejectionPeriod.",
8067
thrownAway
8168
);
8269
}
83-
builder.setDimensionIfNotNull(DruidMetrics.TAGS, metricTags);
8470
emitter.emit(builder.setMetric("ingest/events/thrownAway", thrownAway));
8571

8672
final long unparseable = rowIngestionMetersTotals.getUnparseable()

indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,6 @@ public static IngestionMode fromString(String name)
101101
private File reportsFile;
102102
private File statusFile;
103103

104-
private final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
105-
106104
private volatile CountDownLatch cleanupCompletionLatch;
107105

108106
protected AbstractTask(String id, String dataSource, Map<String, Object> context, IngestionMode ingestionMode)
@@ -131,7 +129,6 @@ protected AbstractTask(
131129
// Copy the given context into a new mutable map because the Druid indexing service can add some internal contexts.
132130
this.context = context == null ? new HashMap<>() : new HashMap<>(context);
133131
this.ingestionMode = ingestionMode;
134-
IndexTaskUtils.setTaskDimensions(metricBuilder, this);
135132
}
136133

137134
protected AbstractTask(
@@ -385,8 +382,14 @@ public Map<String, Object> getContext()
385382
return context;
386383
}
387384

388-
protected ServiceMetricEvent.Builder getMetricBuilder()
385+
/**
386+
* @return A fresh instance of {@link ServiceMetricEvent.Builder} that can be
387+
* used to emit metrics for this task.
388+
*/
389+
public ServiceMetricEvent.Builder getMetricBuilder()
389390
{
391+
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
392+
IndexTaskUtils.setTaskDimensions(metricBuilder, this);
390393
return metricBuilder;
391394
}
392395

@@ -426,13 +429,15 @@ private static IngestionMode computeIngestionMode(boolean isAppendToExisting, bo
426429
+ "Either dropExisting or appendToExisting should be set to false");
427430
}
428431

432+
/**
433+
* Emits a metric for this task using the {@link #getMetricBuilder() metric builder}.
434+
*/
429435
public void emitMetric(
430436
ServiceEmitter emitter,
431437
String metric,
432438
Number value
433439
)
434440
{
435-
436441
if (emitter == null || metric == null || value == null) {
437442
return;
438443
}

indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import org.apache.druid.indexer.partitions.PartitionsSpec;
4949
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
5050
import org.apache.druid.indexer.report.TaskReport;
51-
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
5251
import org.apache.druid.indexing.common.TaskToolbox;
5352
import org.apache.druid.indexing.common.actions.TaskActionClient;
5453
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
@@ -818,7 +817,7 @@ private TaskStatus generateAndPublishSegments(
818817
{
819818
final SegmentGenerationMetrics buildSegmentsSegmentGenerationMetrics = new SegmentGenerationMetrics();
820819
final TaskRealtimeMetricsMonitor metricsMonitor =
821-
TaskRealtimeMetricsMonitorBuilder.build(this, buildSegmentsSegmentGenerationMetrics, buildSegmentsMeters);
820+
new TaskRealtimeMetricsMonitor(buildSegmentsSegmentGenerationMetrics, buildSegmentsMeters, getMetricBuilder());
822821
toolbox.addMonitor(metricsMonitor);
823822

824823
final PartitionsSpec partitionsSpec = partitionAnalysis.getPartitionsSpec();

indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.druid.indexer.TaskStatus;
2626
import org.apache.druid.indexer.partitions.PartitionsSpec;
2727
import org.apache.druid.indexer.report.TaskReport;
28-
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
2928
import org.apache.druid.indexing.common.TaskToolbox;
3029
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
3130
import org.apache.druid.indexing.common.task.BatchAppenderators;
@@ -169,7 +168,7 @@ private List<DataSegment> generateSegments(
169168
final SegmentGenerationMetrics segmentGenerationMetrics = new SegmentGenerationMetrics();
170169
buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
171170
final TaskRealtimeMetricsMonitor metricsMonitor =
172-
TaskRealtimeMetricsMonitorBuilder.build(this, segmentGenerationMetrics, buildSegmentsMeters);
171+
new TaskRealtimeMetricsMonitor(segmentGenerationMetrics, buildSegmentsMeters, getMetricBuilder());
173172
toolbox.addMonitor(metricsMonitor);
174173

175174
final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();

indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.druid.indexer.granularity.GranularitySpec;
3535
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
3636
import org.apache.druid.indexer.report.TaskReport;
37-
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
3837
import org.apache.druid.indexing.common.TaskToolbox;
3938
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
4039
import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -362,7 +361,7 @@ private DataSegmentsWithSchemas generateAndPushSegments(
362361
final GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
363362
final SegmentGenerationMetrics segmentGenerationMetrics = new SegmentGenerationMetrics();
364363
final TaskRealtimeMetricsMonitor metricsMonitor =
365-
TaskRealtimeMetricsMonitorBuilder.build(this, segmentGenerationMetrics, rowIngestionMeters);
364+
new TaskRealtimeMetricsMonitor(segmentGenerationMetrics, rowIngestionMeters, getMetricBuilder());
366365
toolbox.addMonitor(metricsMonitor);
367366

368367
final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();

0 commit comments

Comments
 (0)