Skip to content

Commit 9cea875

Browse files
authored
Add task dimensions to ServiceEmitter (#18876)
All task metrics are now emitted with the following dimensions: taskId, dataSource, taskType, groupId, and id (for backward compatibility; id will be removed in favor of the taskId dimension in a future release). This is achieved by wiring in the TaskHolder and getting the task dimensions from it in the ServiceEmitter initialization, so individual monitors no longer need to pass the task holder to retrieve and append task dimensions on every monitor call. Some monitors were previously missing these dimensions. This change ensures that all task metrics are emitted with these dimensions by default and in a consistent format.
1 parent 150a457 commit 9cea875

File tree

68 files changed

+723
-446
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+723
-446
lines changed

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.commons.io.FileUtils;
4040
import org.apache.curator.test.TestingCluster;
4141
import org.apache.druid.cli.CliPeon;
42+
import org.apache.druid.cli.CliPeonTest;
4243
import org.apache.druid.cli.PeonLoadSpecHolder;
4344
import org.apache.druid.cli.PeonTaskHolder;
4445
import org.apache.druid.data.input.InputEntity;
@@ -88,6 +89,12 @@
8889
import org.apache.druid.java.util.common.guava.Sequences;
8990
import org.apache.druid.java.util.common.parsers.CloseableIterator;
9091
import org.apache.druid.java.util.common.parsers.ParseException;
92+
import org.apache.druid.java.util.emitter.core.Emitter;
93+
import org.apache.druid.java.util.emitter.core.Event;
94+
import org.apache.druid.java.util.emitter.core.EventMap;
95+
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
96+
import org.apache.druid.java.util.metrics.StubServiceEmitter;
97+
import org.apache.druid.java.util.metrics.TaskHolder;
9198
import org.apache.druid.math.expr.ExprMacroTable;
9299
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
93100
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
@@ -121,7 +128,6 @@
121128
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
122129
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
123130
import org.apache.druid.server.metrics.LoadSpecHolder;
124-
import org.apache.druid.server.metrics.TaskHolder;
125131
import org.apache.druid.server.security.Action;
126132
import org.apache.druid.server.security.Resource;
127133
import org.apache.druid.server.security.ResourceAction;
@@ -3487,6 +3493,64 @@ public void testTaskWithTransformSpecDoesNotCauseCliPeonCyclicDependency()
34873493
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
34883494
}
34893495

3496+
@Test(timeout = 60_000L)
3497+
public void testKafkaTaskContainsAllTaskDimensions()
3498+
throws IOException, ExecutionException, InterruptedException
3499+
{
3500+
insertData();
3501+
3502+
final KafkaIndexTask task = createTask(
3503+
"index_kafka_test_id1",
3504+
new KafkaIndexTaskIOConfig(
3505+
0,
3506+
"sequence0",
3507+
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 2L), ImmutableSet.of()),
3508+
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 5L)),
3509+
kafkaServer.consumerProperties(),
3510+
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
3511+
true,
3512+
null,
3513+
null,
3514+
INPUT_FORMAT,
3515+
null,
3516+
Duration.standardHours(2).getStandardMinutes()
3517+
)
3518+
);
3519+
3520+
Injector peonInjector = CliPeonTest.makePeonInjectorWithStubEmitter(task, temporaryFolder, OBJECT_MAPPER);
3521+
Emitter peonEmitter = peonInjector.getInstance(Emitter.class);
3522+
Assert.assertTrue(peonEmitter instanceof StubServiceEmitter);
3523+
emitter = (StubServiceEmitter) peonEmitter;
3524+
emitter.start();
3525+
makeToolboxFactory();
3526+
3527+
final ListenableFuture<TaskStatus> future = runTask(task);
3528+
3529+
// Wait for task to exit
3530+
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
3531+
Assert.assertTrue(emitter.getNumEmittedEvents() > 0);
3532+
3533+
// Check published metadata & segments in deep storage
3534+
final List<SegmentDescriptor> publishedDescriptors = publishedDescriptors();
3535+
Assert.assertEquals(
3536+
new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 5L))),
3537+
newDataSchemaMetadata()
3538+
);
3539+
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", publishedDescriptors.get(0)));
3540+
3541+
Assert.assertTrue(emitter.getNumEmittedEvents() > 0);
3542+
for (Event event : emitter.getEvents()) {
3543+
if (event instanceof ServiceMetricEvent) {
3544+
EventMap observedEvent = event.toMap();
3545+
Assert.assertEquals("test_ds", observedEvent.get("dataSource"));
3546+
Assert.assertEquals("index_kafka_test_id1", observedEvent.get("id"));
3547+
Assert.assertEquals("index_kafka_test_id1", observedEvent.get("taskId"));
3548+
Assert.assertEquals("index_kafka", observedEvent.get("taskType"));
3549+
Assert.assertEquals("index_kafka_test_ds", observedEvent.get("groupId"));
3550+
}
3551+
}
3552+
}
3553+
34903554
public static class TestKafkaInputFormat implements InputFormat
34913555
{
34923556
final InputFormat baseInputFormat;

processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceEmitter.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,39 +25,56 @@
2525
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
2626
import org.apache.druid.java.util.emitter.core.Emitter;
2727
import org.apache.druid.java.util.emitter.core.Event;
28+
import org.apache.druid.java.util.metrics.NoopTaskHolder;
29+
import org.apache.druid.java.util.metrics.TaskHolder;
2830

2931
import java.io.IOException;
3032

3133
public class ServiceEmitter implements Emitter
3234
{
33-
private final ImmutableMap<String, String> serviceDimensions;
3435
private final Emitter emitter;
36+
private final String service;
37+
private final ImmutableMap<String, String> otherServiceDimensions;
38+
private final String host;
39+
private final TaskHolder taskHolder;
40+
41+
/**
42+
* This is initialized in {@link #start()} rather than in the constructor, since calling {@link TaskHolder#getMetricDimensions()}
43+
* may introduce cyclic dependencies. So we defer initialization until {@link #start()} which is {@link LifecycleStart} managed.
44+
*/
45+
private ImmutableMap<String, String> serviceDimensions;
3546

3647
public ServiceEmitter(String service, String host, Emitter emitter)
3748
{
38-
this(service, host, emitter, ImmutableMap.of());
49+
this(service, host, emitter, ImmutableMap.of(), new NoopTaskHolder());
3950
}
4051

4152
public ServiceEmitter(
4253
String service,
4354
String host,
4455
Emitter emitter,
45-
ImmutableMap<String, String> otherServiceDimensions
56+
ImmutableMap<String, String> otherServiceDimensions,
57+
TaskHolder taskHolder
4658
)
4759
{
48-
this.serviceDimensions = ImmutableMap
49-
.<String, String>builder()
50-
.put("service", Preconditions.checkNotNull(service, "service should be non-null"))
51-
.put("host", Preconditions.checkNotNull(host, "host should be non-null"))
52-
.putAll(otherServiceDimensions)
53-
.build();
60+
this.service = Preconditions.checkNotNull(service, "service should be non-null");
61+
this.host = Preconditions.checkNotNull(host, "host should be non-null");
62+
this.otherServiceDimensions = otherServiceDimensions;
5463
this.emitter = emitter;
64+
this.taskHolder = taskHolder;
5565
}
5666

5767
@Override
5868
@LifecycleStart
5969
public void start()
6070
{
71+
serviceDimensions = ImmutableMap
72+
.<String, String>builder()
73+
.put(Event.SERVICE, service)
74+
.put(Event.HOST, host)
75+
.putAll(otherServiceDimensions)
76+
.putAll(taskHolder.getMetricDimensions())
77+
.build();
6178
emitter.start();
6279
}
6380

processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,21 @@ public class CgroupCpuMonitor extends FeedDefiningMonitor
4040
private static final Logger LOG = new Logger(CgroupCpuMonitor.class);
4141
private static final Long DEFAULT_USER_HZ = 100L;
4242
final CgroupDiscoverer cgroupDiscoverer;
43-
final Map<String, String[]> dimensions;
4443
private Long userHz;
4544
private final KeyedDiff jiffies = new KeyedDiff();
4645
private long prevJiffiesSnapshotAt = 0;
4746
private final boolean isRunningOnCgroupsV2;
4847
private final CgroupV2CpuMonitor cgroupV2CpuMonitor;
4948

50-
public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
49+
public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, String feed)
5150
{
5251
super(feed);
5352
this.cgroupDiscoverer = cgroupDiscoverer;
54-
this.dimensions = dimensions;
55-
53+
5654
// Check if we're running on cgroups v2
5755
this.isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2);
5856
if (isRunningOnCgroupsV2) {
59-
this.cgroupV2CpuMonitor = new CgroupV2CpuMonitor(cgroupDiscoverer, dimensions, feed);
57+
this.cgroupV2CpuMonitor = new CgroupV2CpuMonitor(cgroupDiscoverer, feed);
6058
LOG.info("Detected cgroups v2, using CgroupV2CpuMonitor behavior for accurate metrics");
6159
} else {
6260
this.cgroupV2CpuMonitor = null;
@@ -65,19 +63,14 @@ public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, Str
6563

6664
}
6765

68-
public CgroupCpuMonitor(final Map<String, String[]> dimensions, String feed)
69-
{
70-
this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed);
71-
}
72-
73-
public CgroupCpuMonitor(final Map<String, String[]> dimensions)
66+
public CgroupCpuMonitor(String feed)
7467
{
75-
this(dimensions, DEFAULT_METRICS_FEED);
68+
this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), feed);
7669
}
7770

7871
public CgroupCpuMonitor()
7972
{
80-
this(ImmutableMap.of());
73+
this(DEFAULT_METRICS_FEED);
8174
}
8275

8376
@Override
@@ -96,7 +89,6 @@ private boolean doMonitorV1(ServiceEmitter emitter)
9689
long now = Instant.now().getEpochSecond();
9790

9891
final ServiceMetricEvent.Builder builder = builder();
99-
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
10092
builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name());
10193

10294
emitter.emit(builder.setMetric("cgroup/cpu/shares", cpuSnapshot.getShares()));

processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,52 +19,41 @@
1919

2020
package org.apache.druid.java.util.metrics;
2121

22-
import com.google.common.collect.ImmutableMap;
2322
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
2423
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
2524
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
2625
import org.apache.druid.java.util.metrics.cgroups.CpuSet;
2726
import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer;
2827

29-
import java.util.Map;
30-
3128
/**
3229
* Monitor that reports CPU set metrics from cgroups both v1 and v2.
3330
*/
3431

3532
public class CgroupCpuSetMonitor extends FeedDefiningMonitor
3633
{
3734
final CgroupDiscoverer cgroupDiscoverer;
38-
final Map<String, String[]> dimensions;
3935

40-
public CgroupCpuSetMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
36+
public CgroupCpuSetMonitor(CgroupDiscoverer cgroupDiscoverer, String feed)
4137
{
4238
super(feed);
4339
this.cgroupDiscoverer = cgroupDiscoverer;
44-
this.dimensions = dimensions;
45-
}
46-
47-
public CgroupCpuSetMonitor(final Map<String, String[]> dimensions, String feed)
48-
{
49-
this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed);
5040
}
5141

52-
public CgroupCpuSetMonitor(final Map<String, String[]> dimensions)
42+
public CgroupCpuSetMonitor(String feed)
5343
{
54-
this(dimensions, DEFAULT_METRICS_FEED);
44+
this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), feed);
5545
}
5646

5747
public CgroupCpuSetMonitor()
5848
{
59-
this(ImmutableMap.of());
49+
this(DEFAULT_METRICS_FEED);
6050
}
6151

6252
@Override
6353
public boolean doMonitor(ServiceEmitter emitter)
6454
{
6555
final CpuSet.CpuSetMetric cpusetSnapshot = cgroupDiscoverer.getCpuSetMetrics();
6656
final ServiceMetricEvent.Builder builder = builder();
67-
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
6857
builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name());
6958
emitter.emit(builder.setMetric(
7059
"cgroup/cpuset/cpu_count",

processing/src/main/java/org/apache/druid/java/util/metrics/CgroupDiskMonitor.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,41 +33,34 @@
3333
public class CgroupDiskMonitor extends FeedDefiningMonitor
3434
{
3535
private static final Logger LOG = new Logger(CgroupDiskMonitor.class);
36-
final CgroupDiscoverer cgroupDiscoverer;
37-
final Map<String, String[]> dimensions;
36+
private final CgroupDiscoverer cgroupDiscoverer;
3837
private final KeyedDiff diff = new KeyedDiff();
3938
private final boolean isRunningOnCgroupsV2;
4039
private final CgroupV2DiskMonitor cgroupV2DiskMonitor;
4140

42-
public CgroupDiskMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
41+
public CgroupDiskMonitor(CgroupDiscoverer cgroupDiscoverer, String feed)
4342
{
4443
super(feed);
4544
this.cgroupDiscoverer = cgroupDiscoverer;
46-
this.dimensions = dimensions;
47-
45+
4846
// Check if we're running on cgroups v2
4947
this.isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2);
5048
if (isRunningOnCgroupsV2) {
51-
this.cgroupV2DiskMonitor = new CgroupV2DiskMonitor(cgroupDiscoverer, dimensions, feed);
49+
this.cgroupV2DiskMonitor = new CgroupV2DiskMonitor(cgroupDiscoverer, feed);
5250
LOG.info("Detected cgroups v2, using CgroupV2DiskMonitor behavior for accurate metrics");
5351
} else {
5452
this.cgroupV2DiskMonitor = null;
5553
}
5654
}
5755

58-
public CgroupDiskMonitor(final Map<String, String[]> dimensions, String feed)
59-
{
60-
this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed);
61-
}
62-
63-
public CgroupDiskMonitor(final Map<String, String[]> dimensions)
56+
public CgroupDiskMonitor(String feed)
6457
{
65-
this(dimensions, DEFAULT_METRICS_FEED);
58+
this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), feed);
6659
}
6760

6861
public CgroupDiskMonitor()
6962
{
70-
this(ImmutableMap.of());
63+
this(DEFAULT_METRICS_FEED);
7164
}
7265

7366
@Override
@@ -97,7 +90,6 @@ private boolean doMonitorV1(ServiceEmitter emitter)
9790
if (stats != null) {
9891
final ServiceMetricEvent.Builder builder = builder()
9992
.setDimension("diskName", entry.getValue().getDiskName());
100-
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
10193
builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion());
10294
for (Map.Entry<String, Long> stat : stats.entrySet()) {
10395
emitter.emit(builder.setMetric(stat.getKey(), stat.getValue()));

0 commit comments

Comments
 (0)