Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,14 @@

package org.apache.druid.indexing.common.stats;

import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
import org.apache.druid.java.util.metrics.MonitorUtils;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;

import javax.annotation.Nullable;
import java.util.Map;

/**
* Emits metrics from {@link SegmentGenerationMetrics} and {@link RowIngestionMeters}.
*/
Expand All @@ -42,24 +36,20 @@ public class TaskRealtimeMetricsMonitor extends AbstractMonitor

private final SegmentGenerationMetrics segmentGenerationMetrics;
private final RowIngestionMeters rowIngestionMeters;
private final Map<String, String[]> dimensions;
@Nullable
private final Map<String, Object> metricTags;
private final ServiceMetricEvent.Builder builder;

private SegmentGenerationMetrics previousSegmentGenerationMetrics;
private RowIngestionMetersTotals previousRowIngestionMetersTotals;

public TaskRealtimeMetricsMonitor(
SegmentGenerationMetrics segmentGenerationMetrics,
RowIngestionMeters rowIngestionMeters,
Map<String, String[]> dimensions,
@Nullable Map<String, Object> metricTags
ServiceMetricEvent.Builder metricEventBuilder
)
{
this.segmentGenerationMetrics = segmentGenerationMetrics;
this.rowIngestionMeters = rowIngestionMeters;
this.dimensions = ImmutableMap.copyOf(dimensions);
this.metricTags = metricTags;
this.builder = metricEventBuilder;
previousSegmentGenerationMetrics = new SegmentGenerationMetrics();
previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 0, 0);
}
Expand All @@ -70,17 +60,13 @@ public boolean doMonitor(ServiceEmitter emitter)
SegmentGenerationMetrics metrics = segmentGenerationMetrics.snapshot();
RowIngestionMetersTotals rowIngestionMetersTotals = rowIngestionMeters.getTotals();

final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);

final long thrownAway = rowIngestionMetersTotals.getThrownAway() - previousRowIngestionMetersTotals.getThrownAway();
if (thrownAway > 0) {
log.warn(
"[%,d] events thrown away. Possible causes: null events, events filtered out by transformSpec, or events outside earlyMessageRejectionPeriod / lateMessageRejectionPeriod.",
thrownAway
);
}
builder.setDimensionIfNotNull(DruidMetrics.TAGS, metricTags);
emitter.emit(builder.setMetric("ingest/events/thrownAway", thrownAway));

final long unparseable = rowIngestionMetersTotals.getUnparseable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ public static IngestionMode fromString(String name)
private File reportsFile;
private File statusFile;

private final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();

private volatile CountDownLatch cleanupCompletionLatch;

protected AbstractTask(String id, String dataSource, Map<String, Object> context, IngestionMode ingestionMode)
Expand Down Expand Up @@ -131,7 +129,6 @@ protected AbstractTask(
// Copy the given context into a new mutable map because the Druid indexing service can add some internal contexts.
this.context = context == null ? new HashMap<>() : new HashMap<>(context);
this.ingestionMode = ingestionMode;
IndexTaskUtils.setTaskDimensions(metricBuilder, this);
}

protected AbstractTask(
Expand Down Expand Up @@ -385,8 +382,14 @@ public Map<String, Object> getContext()
return context;
}

protected ServiceMetricEvent.Builder getMetricBuilder()
/**
* @return A fresh instance of {@link ServiceMetricEvent.Builder} that can be
* used to emit metrics for this task.
*/
public ServiceMetricEvent.Builder getMetricBuilder()
{
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, this);
return metricBuilder;
}

Expand Down Expand Up @@ -426,13 +429,15 @@ private static IngestionMode computeIngestionMode(boolean isAppendToExisting, bo
+ "Either dropExisting or appendToExisting should be set to false");
}

/**
* Emits a metric for this task using the {@link #getMetricBuilder() metric builder}.
*/
public void emitMetric(
ServiceEmitter emitter,
String metric,
Number value
)
{

if (emitter == null || metric == null || value == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
Expand Down Expand Up @@ -818,7 +817,7 @@ private TaskStatus generateAndPublishSegments(
{
final SegmentGenerationMetrics buildSegmentsSegmentGenerationMetrics = new SegmentGenerationMetrics();
final TaskRealtimeMetricsMonitor metricsMonitor =
TaskRealtimeMetricsMonitorBuilder.build(this, buildSegmentsSegmentGenerationMetrics, buildSegmentsMeters);
new TaskRealtimeMetricsMonitor(buildSegmentsSegmentGenerationMetrics, buildSegmentsMeters, getMetricBuilder());
toolbox.addMonitor(metricsMonitor);

final PartitionsSpec partitionsSpec = partitionAnalysis.getPartitionsSpec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.BatchAppenderators;
Expand Down Expand Up @@ -169,7 +168,7 @@ private List<DataSegment> generateSegments(
final SegmentGenerationMetrics segmentGenerationMetrics = new SegmentGenerationMetrics();
buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
final TaskRealtimeMetricsMonitor metricsMonitor =
TaskRealtimeMetricsMonitorBuilder.build(this, segmentGenerationMetrics, buildSegmentsMeters);
new TaskRealtimeMetricsMonitor(segmentGenerationMetrics, buildSegmentsMeters, getMetricBuilder());
toolbox.addMonitor(metricsMonitor);

final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.druid.indexer.granularity.GranularitySpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
Expand Down Expand Up @@ -362,7 +361,7 @@ private DataSegmentsWithSchemas generateAndPushSegments(
final GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
final SegmentGenerationMetrics segmentGenerationMetrics = new SegmentGenerationMetrics();
final TaskRealtimeMetricsMonitor metricsMonitor =
TaskRealtimeMetricsMonitorBuilder.build(this, segmentGenerationMetrics, rowIngestionMeters);
new TaskRealtimeMetricsMonitor(segmentGenerationMetrics, rowIngestionMeters, getMetricBuilder());
toolbox.addMonitor(metricsMonitor);

final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
Expand Down Expand Up @@ -197,6 +199,13 @@ public String getCurrentRunnerStatus()
return (status != null) ? status.toString() : null;
}

@Override
public ServiceMetricEvent.Builder getMetricBuilder()
{
return super.getMetricBuilder()
.setDimensionIfNotNull(DruidMetrics.SUPERVISOR_ID, supervisorId);
}

public Appenderator newAppenderator(
TaskToolbox toolbox,
SegmentGenerationMetrics metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction;
import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction;
Expand Down Expand Up @@ -431,7 +430,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
// Set up SegmentGenerationMetrics
this.segmentGenerationMetrics = new SegmentGenerationMetrics();
final TaskRealtimeMetricsMonitor metricsMonitor =
TaskRealtimeMetricsMonitorBuilder.build(task, segmentGenerationMetrics, rowIngestionMeters);
new TaskRealtimeMetricsMonitor(segmentGenerationMetrics, rowIngestionMeters, task.getMetricBuilder());
toolbox.addMonitor(metricsMonitor);

final String lookupTier = task.getContextValue(CTX_KEY_LOOKUP_TIER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.MonitorUtils;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
Expand Down Expand Up @@ -74,7 +75,11 @@ public void setUp()
return null;
})
.when(emitter).emit(ArgumentMatchers.any(Event.class));
target = new TaskRealtimeMetricsMonitor(segmentGenerationMetrics, rowIngestionMeters, DIMENSIONS, TAGS);
target = new TaskRealtimeMetricsMonitor(
segmentGenerationMetrics,
rowIngestionMeters,
createMetricEventBuilder()
);
}

@Test
Expand All @@ -89,7 +94,11 @@ public void testdoMonitorShouldEmitUserProvidedTags()
@Test
public void testdoMonitorWithoutTagsShouldNotEmitTags()
{
target = new TaskRealtimeMetricsMonitor(segmentGenerationMetrics, rowIngestionMeters, DIMENSIONS, null);
target = new TaskRealtimeMetricsMonitor(
segmentGenerationMetrics,
rowIngestionMeters,
createMetricEventBuilder()
);
for (ServiceMetricEvent sme : emittedEvents.values()) {
Assert.assertFalse(sme.getUserDims().containsKey(DruidMetrics.TAGS));
}
Expand All @@ -98,7 +107,11 @@ public void testdoMonitorWithoutTagsShouldNotEmitTags()
@Test
public void testMessageGapAggStats()
{
target = new TaskRealtimeMetricsMonitor(segmentGenerationMetrics, rowIngestionMeters, DIMENSIONS, null);
target = new TaskRealtimeMetricsMonitor(
segmentGenerationMetrics,
rowIngestionMeters,
createMetricEventBuilder()
);

target.doMonitor(emitter);
Assert.assertFalse(emittedEvents.containsKey("ingest/events/minMessageGap"));
Expand All @@ -113,4 +126,12 @@ public void testMessageGapAggStats()
Assert.assertTrue(emittedEvents.containsKey("ingest/events/maxMessageGap"));
Assert.assertTrue(emittedEvents.containsKey("ingest/events/avgMessageGap"));
}

private ServiceMetricEvent.Builder createMetricEventBuilder()
{
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
MonitorUtils.addDimensionsToBuilder(builder, DIMENSIONS);
builder.setDimensionIfNotNull(DruidMetrics.TAGS, TAGS);
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.UpdateStatusAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.server.DruidNode;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.junit.Assert;
Expand Down Expand Up @@ -309,4 +311,24 @@ public void testBatchIOConfigNone()
Assert.assertEquals(AbstractTask.IngestionMode.NONE, ingestionMode);
}

@Test
public void test_getMetricBuilder_hasAllTaskDimensions()
{
final AbstractTask task = NoopTask.create();
final ServiceMetricEvent.Builder builder = task.getMetricBuilder();
Assert.assertEquals(task.getId(), builder.getDimension(DruidMetrics.TASK_ID));
Assert.assertEquals(task.getGroupId(), builder.getDimension(DruidMetrics.GROUP_ID));
Assert.assertEquals(task.getDataSource(), builder.getDimension(DruidMetrics.DATASOURCE));
Assert.assertEquals(task.getType(), builder.getDimension(DruidMetrics.TASK_TYPE));
}

@Test
public void test_getMetricBuilder_returnsFreshInstance()
{
final AbstractTask task = NoopTask.create();
final ServiceMetricEvent.Builder builder1 = task.getMetricBuilder();
final ServiceMetricEvent.Builder builder2 = task.getMetricBuilder();

Assert.assertNotSame(builder1, builder2);
}
}
Loading