Skip to content

Commit 47d6f34

Browse files
authored
Emit missing ingest/merge/time, ingest/merge/cpu and ingest/persists/cpu metrics (#18866)
The metrics ingest/merge/time, ingest/merge/cpu and ingest/persists/cpu metrics have been documented but were previously reported as zero because they were not set in the streaming and batch appendators (it probably regressed in a refactor). This patch now correctly reports ingest/merge/time, ingest/merge/cpu and ingest/persists/cpu metrics for streaming and batch ingestion tasks. Also, cleaned up KafkaIndexTaskTest and KinesisIndexTaskTest by initializing a StubServiceEmitter as a non-static member in the base class SeekableStreamIndexTaskTestBase so it can be used by each unit test as needed.
1 parent c4b5ea6 commit 47d6f34

File tree

9 files changed

+197
-54
lines changed

9 files changed

+197
-54
lines changed

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,6 @@
8888
import org.apache.druid.java.util.common.guava.Sequences;
8989
import org.apache.druid.java.util.common.parsers.CloseableIterator;
9090
import org.apache.druid.java.util.common.parsers.ParseException;
91-
import org.apache.druid.java.util.emitter.EmittingLogger;
92-
import org.apache.druid.java.util.emitter.core.NoopEmitter;
93-
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
9491
import org.apache.druid.math.expr.ExprMacroTable;
9592
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
9693
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
@@ -118,6 +115,7 @@
118115
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
119116
import org.apache.druid.segment.incremental.RowMeters;
120117
import org.apache.druid.segment.indexing.DataSchema;
118+
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
121119
import org.apache.druid.segment.transform.ExpressionTransform;
122120
import org.apache.druid.segment.transform.TransformSpec;
123121
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
@@ -196,9 +194,7 @@ public byte[] value()
196194

197195
private static TestingCluster zkServer;
198196
private static TestBroker kafkaServer;
199-
private static ServiceEmitter emitter;
200197
private static int topicPostfix;
201-
202198
static final Module TEST_MODULE = new SimpleModule("kafkaTestModule").registerSubtypes(
203199
new NamedType(TestKafkaInputFormat.class, "testKafkaInputFormat"),
204200
new NamedType(TestKafkaFormatWithMalformedDataDetection.class, "testKafkaFormatWithMalformedDataDetection")
@@ -288,14 +284,6 @@ public KafkaIndexTaskTest(LockGranularity lockGranularity)
288284
@BeforeClass
289285
public static void setupClass() throws Exception
290286
{
291-
emitter = new ServiceEmitter(
292-
"service",
293-
"host",
294-
new NoopEmitter()
295-
);
296-
emitter.start();
297-
EmittingLogger.registerEmitter(emitter);
298-
299287
zkServer = new TestingCluster(1);
300288
zkServer.start();
301289

@@ -354,8 +342,6 @@ public static void tearDownClass() throws Exception
354342

355343
zkServer.stop();
356344
zkServer = null;
357-
358-
emitter.close();
359345
}
360346

361347
@Test(timeout = 60_000L)
@@ -388,7 +374,6 @@ public void testRunAfterDataInserted() throws Exception
388374
// Wait for task to exit
389375
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
390376
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
391-
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());
392377

393378
// Check published metadata and segments in deep storage
394379
assertEqualsExceptVersion(
@@ -402,6 +387,12 @@ public void testRunAfterDataInserted() throws Exception
402387
new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 5L))),
403388
newDataSchemaMetadata()
404389
);
390+
391+
final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
392+
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
393+
Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
394+
Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
395+
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
405396
}
406397

407398
@Test(timeout = 60_000L)
@@ -450,6 +441,12 @@ public void testIngestNullColumnAfterDataInserted() throws Exception
450441
Assert.assertEquals(dimensionsSpec.getDimensionNames().get(i), segment.getDimensions().get(i));
451442
}
452443
}
444+
445+
final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
446+
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
447+
Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
448+
Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
449+
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
453450
}
454451

455452
@Test(timeout = 60_000L)
@@ -739,6 +736,13 @@ public void testIncrementalHandOff() throws Exception
739736
),
740737
newDataSchemaMetadata()
741738
);
739+
740+
final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
741+
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
742+
Assert.assertEquals(8, observedSegmentGenerationMetrics.rowOutput());
743+
Assert.assertEquals(7, observedSegmentGenerationMetrics.handOffCount());
744+
Assert.assertEquals(4, observedSegmentGenerationMetrics.numPersists());
745+
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
742746
}
743747

744748
@Test(timeout = 60_000L)
@@ -1698,6 +1702,14 @@ public void testMultipleParseExceptionsSuccess() throws Exception
16981702
"{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}"
16991703
);
17001704
Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
1705+
1706+
emitter.verifyValue("ingest/segments/count", 4);
1707+
1708+
final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
1709+
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
1710+
Assert.assertEquals(7, observedSegmentGenerationMetrics.rowOutput());
1711+
Assert.assertEquals(4, observedSegmentGenerationMetrics.handOffCount());
1712+
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
17011713
}
17021714

17031715
@Test(timeout = 60_000L)
@@ -1775,6 +1787,14 @@ public void testMultipleParseExceptionsFailure() throws Exception
17751787

17761788
List<String> expectedInputs = Arrays.asList("", "unparseable");
17771789
Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
1790+
1791+
emitter.verifyNotEmitted("ingest/segments/count");
1792+
1793+
final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
1794+
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
1795+
Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
1796+
Assert.assertEquals(1, observedSegmentGenerationMetrics.numPersists());
1797+
Assert.assertEquals(0, observedSegmentGenerationMetrics.handOffCount());
17781798
}
17791799

17801800
@Test(timeout = 60_000L)
@@ -3457,6 +3477,14 @@ public void testTaskWithTransformSpecDoesNotCauseCliPeonCyclicDependency()
34573477
// Check segments in deep storage
34583478
Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", publishedDescriptors.get(0)));
34593479
Assert.assertEquals(ImmutableList.of("bb"), readSegmentColumn("dim1t", publishedDescriptors.get(0)));
3480+
3481+
emitter.verifyValue("ingest/segments/count", 1);
3482+
3483+
final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
3484+
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
3485+
Assert.assertEquals(1, observedSegmentGenerationMetrics.rowOutput());
3486+
Assert.assertEquals(1, observedSegmentGenerationMetrics.handOffCount());
3487+
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
34603488
}
34613489

34623490
public static class TestKafkaInputFormat implements InputFormat

extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,6 @@
6868
import org.apache.druid.java.util.common.DateTimes;
6969
import org.apache.druid.java.util.common.StringUtils;
7070
import org.apache.druid.java.util.common.concurrent.Execs;
71-
import org.apache.druid.java.util.emitter.EmittingLogger;
72-
import org.apache.druid.java.util.emitter.core.NoopEmitter;
73-
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
7471
import org.apache.druid.math.expr.ExprMacroTable;
7572
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
7673
import org.apache.druid.query.DruidProcessingConfigTest;
@@ -86,6 +83,7 @@
8683
import org.apache.druid.segment.incremental.RowIngestionMeters;
8784
import org.apache.druid.segment.incremental.RowMeters;
8885
import org.apache.druid.segment.indexing.DataSchema;
86+
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
8987
import org.apache.druid.segment.transform.ExpressionTransform;
9088
import org.apache.druid.segment.transform.TransformSpec;
9189
import org.apache.druid.timeline.DataSegment;
@@ -168,7 +166,6 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
168166
);
169167

170168
private static KinesisRecordSupplier recordSupplier;
171-
private static ServiceEmitter emitter;
172169

173170
@Parameterized.Parameters(name = "{0}")
174171
public static Iterable<Object[]> constructorFeeder()
@@ -194,13 +191,6 @@ public static Iterable<Object[]> constructorFeeder()
194191
@BeforeClass
195192
public static void setupClass()
196193
{
197-
emitter = new ServiceEmitter(
198-
"service",
199-
"host",
200-
new NoopEmitter()
201-
);
202-
emitter.start();
203-
EmittingLogger.registerEmitter(emitter);
204194
taskExec = MoreExecutors.listeningDecorator(
205195
Executors.newCachedThreadPool(
206196
Execs.makeThreadFactory("kinesis-task-test-%d")
@@ -252,7 +242,6 @@ public static void tearDownClass() throws Exception
252242
{
253243
taskExec.shutdown();
254244
taskExec.awaitTermination(20, TimeUnit.MINUTES);
255-
emitter.close();
256245
}
257246

258247
private void waitUntil(KinesisIndexTask task, Predicate<KinesisIndexTask> predicate)
@@ -356,6 +345,13 @@ public void testRunAfterDataInserted() throws Exception
356345
),
357346
newDataSchemaMetadata()
358347
);
348+
349+
final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
350+
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
351+
Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
352+
Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
353+
Assert.assertEquals(2, observedSegmentGenerationMetrics.numPersists());
354+
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
359355
}
360356

361357
@Test(timeout = 120_000L)
@@ -753,6 +749,13 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception
753749
new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(STREAM, endOffsets)),
754750
newDataSchemaMetadata()
755751
);
752+
753+
final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
754+
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
755+
Assert.assertEquals(7, observedSegmentGenerationMetrics.rowOutput());
756+
Assert.assertEquals(6, observedSegmentGenerationMetrics.handOffCount());
757+
Assert.assertEquals(5, observedSegmentGenerationMetrics.numPersists());
758+
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
756759
}
757760

758761

@@ -1032,6 +1035,12 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception
10321035
),
10331036
newDataSchemaMetadata()
10341037
);
1038+
final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
1039+
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
1040+
Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
1041+
Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
1042+
Assert.assertEquals(2, observedSegmentGenerationMetrics.numPersists());
1043+
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
10351044
}
10361045

10371046

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,10 @@
8383
import org.apache.druid.java.util.common.guava.Comparators;
8484
import org.apache.druid.java.util.common.logger.Logger;
8585
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
86+
import org.apache.druid.java.util.emitter.EmittingLogger;
8687
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
8788
import org.apache.druid.java.util.metrics.MonitorScheduler;
89+
import org.apache.druid.java.util.metrics.StubServiceEmitter;
8890
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
8991
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
9092
import org.apache.druid.metadata.TestDerbyConnector;
@@ -120,6 +122,7 @@
120122
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
121123
import org.apache.druid.segment.metadata.SegmentSchemaManager;
122124
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
125+
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
123126
import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
124127
import org.apache.druid.server.DruidNode;
125128
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
@@ -134,7 +137,9 @@
134137
import org.easymock.EasyMock;
135138
import org.easymock.EasyMockSupport;
136139
import org.joda.time.Interval;
140+
import org.junit.After;
137141
import org.junit.Assert;
142+
import org.junit.Before;
138143
import org.junit.Rule;
139144
import org.junit.rules.TemporaryFolder;
140145

@@ -198,6 +203,8 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport
198203

199204
protected final List<Task> runningTasks = new ArrayList<>();
200205
protected final LockGranularity lockGranularity;
206+
207+
protected StubServiceEmitter emitter;
201208
protected File directory;
202209
protected File reportsFile;
203210
protected TaskToolboxFactory toolboxFactory;
@@ -251,6 +258,20 @@ public SeekableStreamIndexTaskTestBase(
251258
this.lockGranularity = lockGranularity;
252259
}
253260

261+
@Before
262+
public void setupBase()
263+
{
264+
emitter = new StubServiceEmitter();
265+
emitter.start();
266+
EmittingLogger.registerEmitter(emitter);
267+
}
268+
269+
@After
270+
public void tearDownBase() throws IOException
271+
{
272+
emitter.close();
273+
}
274+
254275
protected static ByteEntity jb(
255276
String timestamp,
256277
String dim1,
@@ -408,6 +429,16 @@ protected SegmentDescriptor sd(final String intervalString, final int partitionN
408429
return new SegmentDescriptor(interval, "fakeVersion", partitionNum);
409430
}
410431

432+
protected void verifyPersistAndMergeTimeMetricsArePositive(SegmentGenerationMetrics observedSegmentGenerationMetrics)
433+
{
434+
Assert.assertNotNull(observedSegmentGenerationMetrics);
435+
Assert.assertTrue(observedSegmentGenerationMetrics.persistTimeMillis() > 0);
436+
Assert.assertTrue(observedSegmentGenerationMetrics.persistCpuTime() > 0);
437+
438+
Assert.assertTrue(observedSegmentGenerationMetrics.mergeTimeMillis() > 0);
439+
Assert.assertTrue(observedSegmentGenerationMetrics.mergeCpuTime() > 0);
440+
}
441+
411442
protected void assertEqualsExceptVersion(
412443
List<SegmentDescriptorAndExpectedDim1Values> expectedDescriptors,
413444
List<SegmentDescriptor> actualDescriptors

server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,21 @@ public void reportMaxSegmentHandoffTime(long maxSegmentHandoffTime)
186186
}
187187
}
188188

189+
public void setMergeTime(long elapsedMergeTimeMillis)
190+
{
191+
mergeTimeMillis.set(elapsedMergeTimeMillis);
192+
}
193+
194+
public void setMergeCpuTime(long elapsedCpuTimeNanos)
195+
{
196+
mergeCpuTime.set(elapsedCpuTimeNanos);
197+
}
198+
199+
public void setPersistCpuTime(long elpasedCpuTimeNanos)
200+
{
201+
persistCpuTime.set(elpasedCpuTimeNanos);
202+
}
203+
189204
public void markProcessingDone()
190205
{
191206
this.processingDone.set(true);

0 commit comments

Comments
 (0)