Skip to content

Commit bab9d42

Browse files
committed
Add missing streaming appendator metrics.
- Adds the missing streaming appendator metrics for ingest/merge/time, ingest/merge/cpu and ingest/persists/cpu
1 parent e518840 commit bab9d42

File tree

4 files changed

+61
-16
lines changed

4 files changed

+61
-16
lines changed

server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,21 @@ public void reportMaxSegmentHandoffTime(long maxSegmentHandoffTime)
186186
}
187187
}
188188

189+
public void setMergeTime(long elapsedMergeTimeMillis)
190+
{
191+
mergeTimeMillis.set(elapsedMergeTimeMillis);
192+
}
193+
194+
public void setMergeCpuTime(long elapsedCpuTimeNanos)
195+
{
196+
mergeCpuTime.set(elapsedCpuTimeNanos);
197+
}
198+
199+
public void setPersistCpuTime(long elpasedCpuTimeNanos)
200+
{
201+
persistCpuTime.set(elpasedCpuTimeNanos);
202+
}
203+
189204
public void markProcessingDone()
190205
{
191206
this.processingDone.set(true);

server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.google.common.base.Function;
2525
import com.google.common.base.Joiner;
2626
import com.google.common.base.Preconditions;
27-
import com.google.common.base.Stopwatch;
2827
import com.google.common.base.Supplier;
2928
import com.google.common.collect.ImmutableList;
3029
import com.google.common.collect.Iterables;
@@ -46,6 +45,7 @@
4645
import org.apache.druid.java.util.common.ISE;
4746
import org.apache.druid.java.util.common.Pair;
4847
import org.apache.druid.java.util.common.RE;
48+
import org.apache.druid.java.util.common.Stopwatch;
4949
import org.apache.druid.java.util.common.StringUtils;
5050
import org.apache.druid.java.util.common.concurrent.Execs;
5151
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
@@ -81,6 +81,7 @@
8181
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
8282
import org.apache.druid.timeline.DataSegment;
8383
import org.apache.druid.timeline.SegmentId;
84+
import org.apache.druid.utils.JvmUtils;
8485
import org.joda.time.Interval;
8586

8687
import javax.annotation.Nullable;
@@ -682,6 +683,7 @@ public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
682683
@Override
683684
public Object call() throws IOException
684685
{
686+
final long startPersistCpuNanos = JvmUtils.safeGetThreadCpuTime();
685687
try {
686688
setTaskThreadContext();
687689
for (Pair<FireHydrant, SegmentIdWithShardSpec> pair : indexesToPersist) {
@@ -748,14 +750,15 @@ public Object call() throws IOException
748750
}
749751
finally {
750752
metrics.incrementNumPersists();
751-
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
753+
metrics.incrementPersistTimeMillis(persistStopwatch.millisElapsed());
754+
metrics.setPersistCpuTime(JvmUtils.safeGetThreadCpuTime() - startPersistCpuNanos);
752755
persistStopwatch.stop();
753756
}
754757
}
755758
}
756759
);
757760

758-
final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
761+
final long startDelay = runExecStopwatch.millisElapsed();
759762
metrics.incrementPersistBackPressureMillis(startDelay);
760763
if (startDelay > WARN_DELAY) {
761764
log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", startDelay);
@@ -935,8 +938,9 @@ private DataSegmentWithMetadata mergeAndPush(
935938
}
936939

937940
final File mergedFile;
938-
final long mergeFinishTime;
939-
final long startTime = System.nanoTime();
941+
final Stopwatch stopwatch = Stopwatch.createStarted();
942+
final long mergeTimeMillis;
943+
final long startMergeCpuNanos = JvmUtils.safeGetThreadCpuTime();
940944
List<QueryableIndex> indexes = new ArrayList<>();
941945
Closer closer = Closer.create();
942946
try {
@@ -961,9 +965,11 @@ private DataSegmentWithMetadata mergeAndPush(
961965
tuningConfig.getMaxColumnsToMerge()
962966
);
963967

964-
mergeFinishTime = System.nanoTime();
965-
966-
log.debug("Segment[%s] built in %,dms.", identifier, (mergeFinishTime - startTime) / 1000000);
968+
mergeTimeMillis = stopwatch.millisElapsed();
969+
stopwatch.restart();
970+
metrics.setMergeTime(mergeTimeMillis);
971+
metrics.setMergeCpuTime(JvmUtils.safeGetThreadCpuTime() - startMergeCpuNanos);
972+
log.debug("Segment[%s] built in %,dms.", identifier, mergeTimeMillis);
967973
}
968974
catch (Throwable t) {
969975
throw closer.rethrow(t);
@@ -979,8 +985,7 @@ private DataSegmentWithMetadata mergeAndPush(
979985
// dataSegmentPusher retries internally when appropriate; no need for retries here.
980986
final DataSegment segment = dataSegmentPusher.push(mergedFile, segmentToPush, useUniquePath);
981987

982-
final long pushFinishTime = System.nanoTime();
983-
988+
final long pushTimeMillis = stopwatch.millisElapsed();
984989
objectMapper.writeValue(descriptorFile, segment);
985990

986991
log.info(
@@ -991,8 +996,8 @@ private DataSegmentWithMetadata mergeAndPush(
991996
identifier,
992997
segment.getSize(),
993998
indexes.size(),
994-
(mergeFinishTime - startTime) / 1000000,
995-
(pushFinishTime - mergeFinishTime) / 1000000,
999+
mergeTimeMillis,
1000+
pushTimeMillis,
9961001
objectMapper.writeValueAsString(segment.getLoadSpec())
9971002
);
9981003

server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.druid.segment.incremental.RowIngestionMeters;
5757
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
5858
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
59+
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
5960
import org.apache.druid.segment.realtime.sink.Committers;
6061
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
6162
import org.apache.druid.testing.InitializedNullHandlingTest;
@@ -102,8 +103,10 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
102103
@Test
103104
public void testSimpleIngestion() throws Exception
104105
{
106+
final SegmentGenerationMetrics segmentGenerationMetrics = new SegmentGenerationMetrics();
105107
try (final StreamAppenderatorTester tester =
106108
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
109+
.segmentGenerationMetrics(segmentGenerationMetrics)
107110
.basePersistDirectory(temporaryFolder.newFolder())
108111
.build()) {
109112
final Appenderator appenderator = tester.getAppenderator();
@@ -183,6 +186,14 @@ public SegmentIdWithShardSpec apply(DataSegment input)
183186
);
184187
Assert.assertEquals(sorted(tester.getPushedSegments()), sorted(segmentsAndCommitMetadata.getSegments()));
185188

189+
Assert.assertEquals(2, segmentGenerationMetrics.numPersists());
190+
Assert.assertEquals(3, segmentGenerationMetrics.rowOutput());
191+
Assert.assertTrue(segmentGenerationMetrics.persistTimeMillis() > 0);
192+
Assert.assertTrue(segmentGenerationMetrics.persistCpuTime() > 0);
193+
194+
Assert.assertTrue(segmentGenerationMetrics.mergeTimeMillis() > 0);
195+
Assert.assertTrue(segmentGenerationMetrics.mergeCpuTime() > 0);
196+
186197
// clear
187198
appenderator.clear();
188199
Assert.assertTrue(appenderator.getSegments().isEmpty());
@@ -192,10 +203,12 @@ public SegmentIdWithShardSpec apply(DataSegment input)
192203
@Test
193204
public void testPushFailure() throws Exception
194205
{
206+
final SegmentGenerationMetrics segmentGenerationMetrics = new SegmentGenerationMetrics();
195207
try (final StreamAppenderatorTester tester =
196208
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
197209
.basePersistDirectory(temporaryFolder.newFolder())
198210
.enablePushFailure(true)
211+
.segmentGenerationMetrics(segmentGenerationMetrics)
199212
.build()) {
200213
final Appenderator appenderator = tester.getAppenderator();
201214
boolean thrown;
@@ -269,6 +282,8 @@ public void testPushFailure() throws Exception
269282
CoreMatchers.startsWith("Push failure test"))))
270283
);
271284
}
285+
286+
Assert.assertEquals(1, segmentGenerationMetrics.failedHandoffs());
272287
}
273288

274289
@Test

server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ public StreamAppenderatorTester(
115115
final ServiceEmitter serviceEmitter,
116116
final PolicyEnforcer policyEnforcer,
117117
final boolean releaseLocksOnHandoff,
118-
final TaskIntervalUnlocker taskIntervalUnlocker
118+
final TaskIntervalUnlocker taskIntervalUnlocker,
119+
final SegmentGenerationMetrics segmentGenerationMetrics
119120
)
120121
{
121122
objectMapper = new DefaultObjectMapper();
@@ -165,7 +166,7 @@ public StreamAppenderatorTester(
165166
releaseLocksOnHandoff
166167
);
167168

168-
metrics = new SegmentGenerationMetrics();
169+
metrics = segmentGenerationMetrics == null ? new SegmentGenerationMetrics() : segmentGenerationMetrics;
169170
queryExecutor = Execs.singleThreaded("queryExecutor(%d)");
170171

171172
IndexIO indexIO = new IndexIO(
@@ -364,6 +365,7 @@ public static class Builder
364365
private PolicyEnforcer policyEnforcer = NoopPolicyEnforcer.instance();
365366
private boolean releaseLocksOnHandoff;
366367
private TaskIntervalUnlocker taskIntervalUnlocker = interval -> {};
368+
private SegmentGenerationMetrics segmentGenerationMetrics;
367369

368370
public Builder maxRowsInMemory(final int maxRowsInMemory)
369371
{
@@ -389,6 +391,12 @@ public Builder enablePushFailure(final boolean enablePushFailure)
389391
return this;
390392
}
391393

394+
public Builder segmentGenerationMetrics(final SegmentGenerationMetrics segmentGenerationMetrics)
395+
{
396+
this.segmentGenerationMetrics = segmentGenerationMetrics;
397+
return this;
398+
}
399+
392400
public Builder rowIngestionMeters(final RowIngestionMeters rowIngestionMeters)
393401
{
394402
this.rowIngestionMeters = rowIngestionMeters;
@@ -446,7 +454,8 @@ public StreamAppenderatorTester build()
446454
serviceEmitter,
447455
policyEnforcer,
448456
releaseLocksOnHandoff,
449-
taskIntervalUnlocker
457+
taskIntervalUnlocker,
458+
segmentGenerationMetrics
450459
);
451460
}
452461

@@ -468,7 +477,8 @@ public StreamAppenderatorTester build(
468477
serviceEmitter,
469478
policyEnforcer,
470479
releaseLocksOnHandoff,
471-
taskIntervalUnlocker
480+
taskIntervalUnlocker,
481+
segmentGenerationMetrics
472482
);
473483
}
474484
}

0 commit comments

Comments
 (0)