Skip to content

Commit c4b5ea6

Browse files
authored
Add drop reason dimension to ingest/events/thrownAway metric (#18855)
This adds: - Better logging in task logs indicating the breakdown of thrown away events by reason. - A `reason` dimension to the `ingest/events/thrownAway` metric for aggregating on thrown away reason. - A `thrownAwayByReason` map to row statistics task API response payload, so future consumers can make use of it should they need to. - Better interface for row filters, making it easy to adding more filtering reasons in the future.
1 parent e518840 commit c4b5ea6

File tree

33 files changed

+1095
-152
lines changed

33 files changed

+1095
-152
lines changed

docs/operations/metrics.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
270270
|`ingest/events/processed`|Number of events processed per emission period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to the number of events per emission period.|
271271
|`ingest/events/processedWithError`|Number of events processed with some partial errors per emission period. Events processed with partial errors are counted towards both this metric and `ingest/events/processed`.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
272272
|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
273-
|`ingest/events/thrownAway`|Number of events rejected because they are null, or filtered by `transformSpec`, or outside one of `lateMessageRejectionPeriod`, `earlyMessageRejectionPeriod`.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
273+
|`ingest/events/thrownAway`|Number of events rejected because they are null, or filtered by `transformSpec`, or outside one of `lateMessageRejectionPeriod`, `earlyMessageRejectionPeriod`. The `reason` dimension indicates why the event was thrown away.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`, `reason`|0|
274274
|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
275275
|`ingest/input/bytes`|Number of bytes read from input sources, after decompression but prior to parsing. This covers all data read, including data that does not end up being fully processed and ingested. For example, this includes data that ends up being rejected for being unparseable or filtered out.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the amount of data read.|
276276
|`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`, `taskType`, `groupId`|Your number of events with rollup.|

extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@
126126
},
127127
"ingest/events/thrownAway": {
128128
"dimensions": [
129-
"dataSource"
129+
"dataSource",
130+
"reason"
130131
],
131132
"type": "counter"
132133
},

extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@
5454
"query/cache/total/timeouts": [],
5555
"query/cache/total/errors": [],
5656
"ingest/events/thrownAway": [
57-
"dataSource"
57+
"dataSource",
58+
"reason"
5859
],
5960
"ingest/events/unparseable": [
6061
"dataSource"

extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@
104104
"ingest/events/processed" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events successfully processed per emission period." },
105105
"ingest/events/processedWithError" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events processed with some partial errors per emission period." },
106106
"ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events rejected because the events are unparseable." },
107-
"ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events rejected because they are outside the windowPeriod."},
107+
"ingest/events/thrownAway" : { "dimensions" : ["dataSource", "reason"], "type" : "count", "help": "Number of events rejected because they are null, filtered by transformSpec, or outside the message rejection periods. The `reason` dimension indicates why the event was thrown away."},
108108
"ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events rejected because the events are duplicated."},
109109
"ingest/input/bytes" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of bytes read from input sources, after decompression but prior to parsing." },
110110
"ingest/rows/output" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of Druid rows persisted."},

extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
"query/cache/total/timeouts" : { "dimensions" : [], "type" : "gauge" },
3939
"query/cache/total/errors" : { "dimensions" : [], "type" : "gauge" },
4040

41-
"ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" : "count" },
41+
"ingest/events/thrownAway" : { "dimensions" : ["dataSource", "reason"], "type" : "count" },
4242
"ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" : "count" },
4343
"ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" : "count" },
4444
"ingest/events/processed" : { "dimensions" : ["dataSource"], "type" : "count" },

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
114114
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
115115
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
116+
import org.apache.druid.segment.incremental.InputRowFilterResult;
116117
import org.apache.druid.segment.incremental.RowIngestionMeters;
117118
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
118119
import org.apache.druid.segment.incremental.RowMeters;
@@ -717,7 +718,7 @@ public void testIncrementalHandOff() throws Exception
717718
);
718719

719720
long totalBytes = getTotalSizeOfRecords(0, 10) + getTotalSizeOfRecords(13, 15);
720-
verifyTaskMetrics(task, RowMeters.with().bytes(totalBytes).unparseable(3).thrownAway(1).totalProcessed(8));
721+
verifyTaskMetrics(task, RowMeters.with().bytes(totalBytes).unparseable(3).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 1).totalProcessed(8));
721722

722723
// Check published metadata and segments in deep storage
723724
assertEqualsExceptVersion(
@@ -851,7 +852,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception
851852
);
852853

853854
long totalBytes = getTotalSizeOfRecords(0, 10) + getTotalSizeOfRecords(13, 15);
854-
verifyTaskMetrics(task, RowMeters.with().bytes(totalBytes).unparseable(3).thrownAway(1).totalProcessed(8));
855+
verifyTaskMetrics(task, RowMeters.with().bytes(totalBytes).unparseable(3).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 1).totalProcessed(8));
855856

856857
// Check published metadata and segments in deep storage
857858
assertEqualsExceptVersion(
@@ -1165,7 +1166,7 @@ public void testRunWithMinimumMessageTime() throws Exception
11651166

11661167
// Wait for task to exit
11671168
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
1168-
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAway(2).totalProcessed(3));
1169+
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAwayByReason(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME, 2).totalProcessed(3));
11691170

11701171
// Check published metadata and segments in deep storage
11711172
assertEqualsExceptVersion(
@@ -1214,7 +1215,7 @@ public void testRunWithMaximumMessageTime() throws Exception
12141215

12151216
// Wait for task to exit
12161217
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
1217-
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAway(2).totalProcessed(3));
1218+
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAwayByReason(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME, 2).totalProcessed(3));
12181219

12191220
// Check published metadata and segments in deep storage
12201221
assertEqualsExceptVersion(
@@ -1272,7 +1273,7 @@ public void testRunWithTransformSpec() throws Exception
12721273

12731274
// Wait for task to exit
12741275
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
1275-
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAway(4).totalProcessed(1));
1276+
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 4).totalProcessed(1));
12761277

12771278
// Check published metadata
12781279
final List<SegmentDescriptor> publishedDescriptors = publishedDescriptors();
@@ -1642,7 +1643,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception
16421643
verifyTaskMetrics(task, RowMeters.with()
16431644
.bytes(totalRecordBytes)
16441645
.unparseable(3).errors(3)
1645-
.thrownAway(1).totalProcessed(4));
1646+
.thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 1).totalProcessed(4));
16461647

16471648
// Check published metadata
16481649
assertEqualsExceptVersion(
@@ -1660,14 +1661,17 @@ public void testMultipleParseExceptionsSuccess() throws Exception
16601661
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
16611662
Assert.assertNull(reportData.getErrorMsg());
16621663

1664+
// Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
1665+
Map<String, Integer> expectedThrownAwayByReason = Map.of(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason(), 1);
16631666
Map<String, Object> expectedMetrics = ImmutableMap.of(
16641667
RowIngestionMeters.BUILD_SEGMENTS,
16651668
ImmutableMap.of(
16661669
RowIngestionMeters.PROCESSED, 4,
16671670
RowIngestionMeters.PROCESSED_BYTES, (int) totalRecordBytes,
16681671
RowIngestionMeters.PROCESSED_WITH_ERROR, 3,
16691672
RowIngestionMeters.UNPARSEABLE, 3,
1670-
RowIngestionMeters.THROWN_AWAY, 1
1673+
RowIngestionMeters.THROWN_AWAY, 1,
1674+
RowIngestionMeters.THROWN_AWAY_BY_REASON, expectedThrownAwayByReason
16711675
)
16721676
);
16731677
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
@@ -1745,14 +1749,17 @@ public void testMultipleParseExceptionsFailure() throws Exception
17451749
Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState());
17461750
Assert.assertNotNull(reportData.getErrorMsg());
17471751

1752+
// Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
1753+
Map<String, Integer> expectedThrownAwayByReason = Map.of();
17481754
Map<String, Object> expectedMetrics = ImmutableMap.of(
17491755
RowIngestionMeters.BUILD_SEGMENTS,
17501756
ImmutableMap.of(
17511757
RowIngestionMeters.PROCESSED, 3,
17521758
RowIngestionMeters.PROCESSED_BYTES, (int) totalBytes,
17531759
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
17541760
RowIngestionMeters.UNPARSEABLE, 3,
1755-
RowIngestionMeters.THROWN_AWAY, 0
1761+
RowIngestionMeters.THROWN_AWAY, 0,
1762+
RowIngestionMeters.THROWN_AWAY_BY_REASON, expectedThrownAwayByReason
17561763
)
17571764
);
17581765
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
@@ -1887,7 +1894,7 @@ public void testRunConflicting() throws Exception
18871894

18881895
verifyTaskMetrics(task1, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
18891896
verifyTaskMetrics(task2, RowMeters.with().bytes(getTotalSizeOfRecords(3, 10))
1890-
.unparseable(3).thrownAway(1).totalProcessed(3));
1897+
.unparseable(3).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 1).totalProcessed(3));
18911898

18921899
// Check published segments & metadata, should all be from the first task
18931900
final List<SegmentDescriptor> publishedDescriptors = publishedDescriptors();
@@ -1961,7 +1968,7 @@ public void testRunConflictingWithoutTransactions() throws Exception
19611968

19621969
verifyTaskMetrics(task1, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
19631970
verifyTaskMetrics(task2, RowMeters.with().bytes(getTotalSizeOfRecords(3, 10))
1964-
.unparseable(3).thrownAway(1).totalProcessed(3));
1971+
.unparseable(3).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 1).totalProcessed(3));
19651972

19661973
// Check published segments & metadata
19671974
SegmentDescriptorAndExpectedDim1Values desc3 = sdd("2011/P1D", 1, ImmutableList.of("d", "e"));
@@ -2576,7 +2583,7 @@ public void testRunTransactionModeRollback() throws Exception
25762583

25772584
long totalBytes = getTotalSizeOfRecords(0, 2) + getTotalSizeOfRecords(5, 11);
25782585
verifyTaskMetrics(task, RowMeters.with().bytes(totalBytes)
2579-
.unparseable(3).errors(1).thrownAway(1).totalProcessed(3));
2586+
.unparseable(3).errors(1).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 1).totalProcessed(3));
25802587

25812588
// Check published metadata and segments in deep storage
25822589
assertEqualsExceptVersion(
@@ -3437,7 +3444,7 @@ public void testTaskWithTransformSpecDoesNotCauseCliPeonCyclicDependency()
34373444

34383445
// Wait for task to exit
34393446
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
3440-
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAway(4).totalProcessed(1));
3447+
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 4).totalProcessed(1));
34413448

34423449
// Check published metadata
34433450
final List<SegmentDescriptor> publishedDescriptors = publishedDescriptors();

extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
8383
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
8484
import org.apache.druid.segment.TestHelper;
85+
import org.apache.druid.segment.incremental.InputRowFilterResult;
8586
import org.apache.druid.segment.incremental.RowIngestionMeters;
8687
import org.apache.druid.segment.incremental.RowMeters;
8788
import org.apache.druid.segment.indexing.DataSchema;
@@ -801,7 +802,7 @@ public void testRunWithMinimumMessageTime() throws Exception
801802
verifyAll();
802803

803804
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSize(RECORDS, 0, 5))
804-
.thrownAway(2).totalProcessed(3));
805+
.thrownAwayByReason(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME, 2).totalProcessed(3));
805806

806807
// Check published metadata
807808
assertEqualsExceptVersion(
@@ -864,7 +865,7 @@ public void testRunWithMaximumMessageTime() throws Exception
864865
verifyAll();
865866

866867
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSize(RECORDS, 0, 5))
867-
.thrownAway(2).totalProcessed(3));
868+
.thrownAwayByReason(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME, 2).totalProcessed(3));
868869

869870
// Check published metadata and segments in deep storage
870871
assertEqualsExceptVersion(
@@ -923,7 +924,7 @@ public void testRunWithTransformSpec() throws Exception
923924
verifyAll();
924925

925926
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSize(RECORDS, 0, 5))
926-
.thrownAway(4).totalProcessed(1));
927+
.thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 4).totalProcessed(1));
927928

928929
// Check published metadata
929930
assertEqualsExceptVersion(ImmutableList.of(sdd("2009/P1D", 0)), publishedDescriptors());
@@ -1194,14 +1195,17 @@ public void testMultipleParseExceptionsSuccess() throws Exception
11941195
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
11951196
Assert.assertNull(reportData.getErrorMsg());
11961197

1198+
// Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
1199+
Map<String, Integer> expectedThrownAwayByReason = Map.of();
11971200
Map<String, Object> expectedMetrics = ImmutableMap.of(
11981201
RowIngestionMeters.BUILD_SEGMENTS,
11991202
ImmutableMap.of(
12001203
RowIngestionMeters.PROCESSED, 4,
12011204
RowIngestionMeters.PROCESSED_BYTES, 763,
12021205
RowIngestionMeters.PROCESSED_WITH_ERROR, 3,
12031206
RowIngestionMeters.UNPARSEABLE, 4,
1204-
RowIngestionMeters.THROWN_AWAY, 0
1207+
RowIngestionMeters.THROWN_AWAY, 0,
1208+
RowIngestionMeters.THROWN_AWAY_BY_REASON, expectedThrownAwayByReason
12051209
)
12061210
);
12071211
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
@@ -1284,14 +1288,17 @@ public void testMultipleParseExceptionsFailure() throws Exception
12841288
Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState());
12851289
Assert.assertNotNull(reportData.getErrorMsg());
12861290

1291+
// Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
1292+
Map<String, Integer> expectedThrownAwayByReason = Map.of();
12871293
Map<String, Object> expectedMetrics = ImmutableMap.of(
12881294
RowIngestionMeters.BUILD_SEGMENTS,
12891295
ImmutableMap.of(
12901296
RowIngestionMeters.PROCESSED, 3,
12911297
RowIngestionMeters.PROCESSED_BYTES, (int) totalBytes,
12921298
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
12931299
RowIngestionMeters.UNPARSEABLE, 3,
1294-
RowIngestionMeters.THROWN_AWAY, 0
1300+
RowIngestionMeters.THROWN_AWAY, 0,
1301+
RowIngestionMeters.THROWN_AWAY_BY_REASON, expectedThrownAwayByReason
12951302
)
12961303
);
12971304
Assert.assertEquals(expectedMetrics, reportData.getRowStats());

0 commit comments

Comments
 (0)