Skip to content

Commit 7aec41b

Browse files
authored
Minor clean up of LatchableEmitter / StubServiceEmitter (#18255)
Follow up to #18249 Changes: - Maintain a List of processed events in `LatchableEmitter`. This is an improvement over the current flow where a copy of events is created upon receiving every new event. - When a new condition is registered, evaluate all past events upfront, then add it to the set of wait conditions - Evaluate each new event as it is received Other changes: - Hide the internal queue implementation of `StubServiceEmitter` from tests and sub-classes - Reduce the usage of `StubServiceEmitter.getEvents()`. Use the inbuilt `verifyValue` methods instead.
1 parent 01dd8d1 commit 7aec41b

File tree

34 files changed

+270
-363
lines changed

34 files changed

+270
-363
lines changed

extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ void test_launchPeonJobAndWaitForStart()
9393
Pod peonPod = instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1, TimeUnit.SECONDS);
9494

9595
Assertions.assertNotNull(peonPod);
96-
Assertions.assertEquals(1, serviceEmitter.getEvents().size());
96+
Assertions.assertEquals(1, serviceEmitter.getNumEmittedEvents());
9797
}
9898

9999
@Test

indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.druid.java.util.common.Intervals;
3333
import org.apache.druid.java.util.common.granularity.Granularities;
3434
import org.apache.druid.java.util.common.guava.Comparators;
35+
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
3536
import org.apache.druid.java.util.metrics.StubServiceEmitter;
3637
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
3738
import org.apache.druid.metadata.UnusedSegmentKillerConfig;
@@ -52,7 +53,6 @@
5253
import org.junit.Test;
5354

5455
import java.util.List;
55-
import java.util.Queue;
5656
import java.util.Set;
5757
import java.util.stream.Collectors;
5858

@@ -329,8 +329,7 @@ public void test_run_prioritizesOlderIntervals()
329329
emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);
330330

331331
// Verify that the kill intervals are sorted with the oldest interval first
332-
final Queue<StubServiceEmitter.ServiceMetricEventSnapshot> events =
333-
emitter.getMetricEvents().get(TaskMetrics.RUN_DURATION);
332+
final List<ServiceMetricEvent> events = emitter.getMetricEvents(TaskMetrics.RUN_DURATION);
334333
final List<Interval> killIntervals = events.stream().map(event -> {
335334
final String taskId = (String) event.getUserDims().get(DruidMetrics.TASK_ID);
336335
String[] splits = taskId.split("_");

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -780,8 +780,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc
780780
Assert.assertEquals(2, taskCountAfterScaleOut);
781781
Assert.assertTrue(
782782
dynamicActionEmitter
783-
.getMetricEvents()
784-
.get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
783+
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
785784
.stream()
786785
.map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
787786
.filter(Objects::nonNull)
@@ -840,8 +839,7 @@ public int getActiveTaskGroupsCount()
840839

841840
Assert.assertTrue(
842841
dynamicActionEmitter
843-
.getMetricEvents()
844-
.get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
842+
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
845843
.stream()
846844
.map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
847845
.filter(Objects::nonNull)
@@ -1103,8 +1101,7 @@ public int getActiveTaskGroupsCount()
11031101

11041102
Assert.assertTrue(
11051103
dynamicActionEmitter
1106-
.getMetricEvents()
1107-
.get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
1104+
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
11081105
.stream()
11091106
.map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
11101107
.filter(Objects::nonNull)

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2521,7 +2521,7 @@ public void testStaleOffsetsNegativeLagNotEmitted() throws Exception
25212521
latch.await();
25222522

25232523
supervisor.emitLag();
2524-
Assert.assertEquals(0, emitter.getEvents().size());
2524+
Assert.assertEquals(0, emitter.getNumEmittedEvents());
25252525
}
25262526

25272527
private void validateSupervisorStateAfterResetOffsets(

indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,12 @@
2121

2222
import com.google.common.collect.ImmutableMap;
2323
import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
24-
import org.apache.druid.java.util.emitter.core.Event;
25-
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
2624
import org.apache.druid.java.util.metrics.StubServiceEmitter;
2725
import org.junit.Assert;
2826
import org.junit.Test;
2927
import org.mockito.Mockito;
3028

31-
import java.util.List;
29+
import java.util.Map;
3230

3331
public class ShuffleMonitorTest
3432
{
@@ -46,23 +44,16 @@ public void testDoMonitor()
4644
final ShuffleMonitor monitor = new ShuffleMonitor();
4745
monitor.setShuffleMetrics(shuffleMetrics);
4846
Assert.assertTrue(monitor.doMonitor(emitter));
49-
final List<Event> events = emitter.getEvents();
50-
Assert.assertEquals(2, events.size());
51-
Assert.assertSame(ServiceMetricEvent.class, events.get(0).getClass());
52-
ServiceMetricEvent event = (ServiceMetricEvent) events.get(0);
53-
Assert.assertEquals(ShuffleMonitor.SHUFFLE_BYTES_KEY, event.getMetric());
54-
Assert.assertEquals(310L, event.getValue());
55-
Assert.assertEquals(
56-
ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"),
57-
event.getUserDims()
47+
Assert.assertEquals(2, emitter.getNumEmittedEvents());
48+
emitter.verifyValue(
49+
ShuffleMonitor.SHUFFLE_BYTES_KEY,
50+
Map.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"),
51+
310L
5852
);
59-
Assert.assertSame(ServiceMetricEvent.class, events.get(1).getClass());
60-
event = (ServiceMetricEvent) events.get(1);
61-
Assert.assertEquals(ShuffleMonitor.SHUFFLE_REQUESTS_KEY, event.getMetric());
62-
Assert.assertEquals(3, event.getValue());
63-
Assert.assertEquals(
64-
ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"),
65-
event.getUserDims()
53+
emitter.verifyValue(
54+
ShuffleMonitor.SHUFFLE_REQUESTS_KEY,
55+
Map.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"),
56+
3
6657
);
6758
}
6859
}

processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,14 @@ public EventMap toMap()
125125
.build();
126126
}
127127

128+
/**
129+
* Creates an immutable copy of this metric event. This is used only in tests.
130+
*/
131+
public ServiceMetricEvent copy()
132+
{
133+
return new ServiceMetricEvent(createdTime, serviceDims, Map.copyOf(userDims), feed, metric, value);
134+
}
135+
128136
/**
129137
* Builder for a {@link ServiceMetricEvent}. This builder can be used for
130138
* building only one event.

processing/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
public class ServiceMetricEventTest
3636
{
3737
@Test
38-
public void testStupidTest()
38+
public void testBuilder()
3939
{
4040
ServiceMetricEvent builderEvent = new ServiceMetricEvent.Builder()
4141
.setDimension("user1", "a")
@@ -317,4 +317,31 @@ public void testSetDimensionIfNotNullShouldNotSetNullDimension()
317317
Assert.assertTrue(target.getUserDims().isEmpty());
318318
Assert.assertNull(target.getUserDims().get("userDimMap"));
319319
}
320+
321+
@Test
322+
public void test_copy_returnsAnImmutableInstance()
323+
{
324+
final ServiceMetricEvent.Builder eventBuilder = ServiceMetricEvent
325+
.builder()
326+
.setDimension("dim1", "v1")
327+
.setMetric("m1", 100);
328+
329+
final ServiceMetricEvent event1 = eventBuilder.build("coordinator", "localhost");
330+
final ServiceMetricEvent event1Copy = event1.copy();
331+
332+
Assert.assertEquals(Map.of("dim1", "v1"), event1.getUserDims());
333+
Assert.assertEquals(Map.of("dim1", "v1"), event1Copy.getUserDims());
334+
335+
final ServiceMetricEvent event2 = eventBuilder
336+
.setDimension("dim2", "v2")
337+
.setMetric("m2", 200)
338+
.build("coordinator", "localhost");
339+
340+
// Verify that the original event gets changed dimensions
341+
Assert.assertEquals(Map.of("dim1", "v1", "dim2", "v2"), event2.getUserDims());
342+
Assert.assertEquals(Map.of("dim1", "v1", "dim2", "v2"), event1.getUserDims());
343+
344+
// But the event copy still has the original dimensions
345+
Assert.assertEquals(Map.of("dim1", "v1"), event1Copy.getUserDims());
346+
}
320347
}

processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import com.google.common.collect.ImmutableMap;
2323
import org.apache.druid.java.util.common.FileUtils;
24-
import org.apache.druid.java.util.emitter.core.Event;
2524
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
2625
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer;
2726
import org.apache.druid.java.util.metrics.cgroups.TestUtils;
@@ -34,8 +33,6 @@
3433

3534
import java.io.File;
3635
import java.io.IOException;
37-
import java.util.List;
38-
import java.util.Map;
3936

4037
public class CgroupCpuSetMonitorTest
4138
{
@@ -72,19 +69,11 @@ public void testMonitor()
7269
final CgroupCpuSetMonitor monitor = new CgroupCpuSetMonitor(discoverer, ImmutableMap.of(), "some_feed");
7370
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
7471
Assert.assertTrue(monitor.doMonitor(emitter));
75-
final List<Event> actualEvents = emitter.getEvents();
76-
Assert.assertEquals(4, actualEvents.size());
77-
final Map<String, Object> cpusEvent = actualEvents.get(0).toMap();
78-
final Map<String, Object> effectiveCpusEvent = actualEvents.get(1).toMap();
79-
final Map<String, Object> memsEvent = actualEvents.get(2).toMap();
80-
final Map<String, Object> effectiveMemsEvent = actualEvents.get(3).toMap();
81-
Assert.assertEquals("cgroup/cpuset/cpu_count", cpusEvent.get("metric"));
82-
Assert.assertEquals(8, cpusEvent.get("value"));
83-
Assert.assertEquals("cgroup/cpuset/effective_cpu_count", effectiveCpusEvent.get("metric"));
84-
Assert.assertEquals(7, effectiveCpusEvent.get("value"));
85-
Assert.assertEquals("cgroup/cpuset/mems_count", memsEvent.get("metric"));
86-
Assert.assertEquals(4, memsEvent.get("value"));
87-
Assert.assertEquals("cgroup/cpuset/effective_mems_count", effectiveMemsEvent.get("metric"));
88-
Assert.assertEquals(1, effectiveMemsEvent.get("value"));
72+
Assert.assertEquals(4, emitter.getNumEmittedEvents());
73+
74+
emitter.verifyValue("cgroup/cpuset/cpu_count", 8);
75+
emitter.verifyValue("cgroup/cpuset/effective_cpu_count", 7);
76+
emitter.verifyValue("cgroup/cpuset/mems_count", 4);
77+
emitter.verifyValue("cgroup/cpuset/effective_mems_count", 1);
8978
}
9079
}

processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,13 @@ public void testMonitor() throws IOException
6868
final CgroupDiskMonitor monitor = new CgroupDiskMonitor(discoverer, ImmutableMap.of(), "some_feed");
6969
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
7070
Assert.assertTrue(monitor.doMonitor(emitter));
71-
Assert.assertEquals(0, emitter.getEvents().size());
71+
Assert.assertEquals(0, emitter.getNumEmittedEvents());
7272

7373
TestUtils.copyOrReplaceResource("/blkio.throttle.io_service_bytes-2", serviceBytesFile);
7474
TestUtils.copyOrReplaceResource("/blkio.throttle.io_serviced-2", servicedFile);
7575

7676
Assert.assertTrue(monitor.doMonitor(emitter));
77-
Assert.assertEquals(8, emitter.getEvents().size());
77+
Assert.assertEquals(8, emitter.getNumEmittedEvents());
7878
Assert.assertTrue(
7979
emitter
8080
.getEvents()

processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import com.google.common.collect.ImmutableMap;
2323
import org.apache.druid.java.util.common.FileUtils;
24-
import org.apache.druid.java.util.emitter.core.Event;
2524
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
2625
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer;
2726
import org.apache.druid.java.util.metrics.cgroups.TestUtils;
@@ -34,7 +33,6 @@
3433

3534
import java.io.File;
3635
import java.io.IOException;
37-
import java.util.List;
3836

3937
public class CgroupMemoryMonitorTest
4038
{
@@ -71,7 +69,6 @@ public void testMonitor()
7169
final CgroupMemoryMonitor monitor = new CgroupMemoryMonitor(discoverer, ImmutableMap.of(), "some_feed");
7270
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
7371
Assert.assertTrue(monitor.doMonitor(emitter));
74-
final List<Event> actualEvents = emitter.getEvents();
75-
Assert.assertEquals(46, actualEvents.size());
72+
Assert.assertEquals(46, emitter.getNumEmittedEvents());
7673
}
7774
}

0 commit comments

Comments
 (0)