Skip to content

Commit db6a2b9

Browse files
committed
Store thrownAwayByReason in RowIngestionMetersTotals
1 parent 77078fe commit db6a2b9

24 files changed

+323
-240
lines changed

indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@
2121

2222
import com.codahale.metrics.Meter;
2323
import com.codahale.metrics.MetricRegistry;
24+
import org.apache.druid.segment.incremental.InputRowThrownAwayReason;
2425
import org.apache.druid.segment.incremental.RowIngestionMeters;
2526
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
26-
import org.apache.druid.segment.incremental.ThrownAwayReason;
2727

28+
import java.util.Arrays;
2829
import java.util.EnumMap;
2930
import java.util.HashMap;
3031
import java.util.Map;
@@ -35,13 +36,12 @@ public class DropwizardRowIngestionMeters implements RowIngestionMeters
3536
public static final String FIVE_MINUTE_NAME = "5m";
3637
public static final String FIFTEEN_MINUTE_NAME = "15m";
3738

38-
private static final int NUM_THROWN_AWAY_REASONS = ThrownAwayReason.values().length;
39+
private static final int NUM_THROWN_AWAY_REASONS = InputRowThrownAwayReason.values().length;
3940

4041
private final Meter processed;
4142
private final Meter processedBytes;
4243
private final Meter processedWithError;
4344
private final Meter unparseable;
44-
private final Meter thrownAway;
4545
private final Meter[] thrownAwayByReason = new Meter[NUM_THROWN_AWAY_REASONS];
4646

4747
public DropwizardRowIngestionMeters()
@@ -51,8 +51,7 @@ public DropwizardRowIngestionMeters()
5151
this.processedBytes = metricRegistry.meter(PROCESSED_BYTES);
5252
this.processedWithError = metricRegistry.meter(PROCESSED_WITH_ERROR);
5353
this.unparseable = metricRegistry.meter(UNPARSEABLE);
54-
this.thrownAway = metricRegistry.meter(THROWN_AWAY);
55-
for (ThrownAwayReason reason : ThrownAwayReason.values()) {
54+
for (InputRowThrownAwayReason reason : InputRowThrownAwayReason.values()) {
5655
this.thrownAwayByReason[reason.ordinal()] = metricRegistry.meter(THROWN_AWAY + "_" + reason.name());
5756
}
5857
}
@@ -108,21 +107,24 @@ public void incrementUnparseable()
108107
@Override
109108
public long getThrownAway()
110109
{
111-
return thrownAway.getCount();
110+
long totalThrownAway = 0;
111+
for (Meter meter : thrownAwayByReason) {
112+
totalThrownAway += meter.getCount();
113+
}
114+
return totalThrownAway;
112115
}
113116

114117
@Override
115-
public void incrementThrownAway(ThrownAwayReason reason)
118+
public void incrementThrownAway(InputRowThrownAwayReason reason)
116119
{
117-
thrownAway.mark();
118120
thrownAwayByReason[reason.ordinal()].mark();
119121
}
120122

121123
@Override
122-
public Map<ThrownAwayReason, Long> getThrownAwayByReason()
124+
public Map<InputRowThrownAwayReason, Long> getThrownAwayByReason()
123125
{
124-
EnumMap<ThrownAwayReason, Long> result = new EnumMap<>(ThrownAwayReason.class);
125-
for (ThrownAwayReason reason : ThrownAwayReason.values()) {
126+
EnumMap<InputRowThrownAwayReason, Long> result = new EnumMap<>(InputRowThrownAwayReason.class);
127+
for (InputRowThrownAwayReason reason : InputRowThrownAwayReason.values()) {
126128
result.put(reason, thrownAwayByReason[reason.ordinal()].getCount());
127129
}
128130
return result;
@@ -135,7 +137,7 @@ public RowIngestionMetersTotals getTotals()
135137
processed.getCount(),
136138
processedBytes.getCount(),
137139
processedWithError.getCount(),
138-
thrownAway.getCount(),
140+
getThrownAwayByReason(),
139141
unparseable.getCount()
140142
);
141143
}
@@ -150,21 +152,21 @@ public Map<String, Object> getMovingAverages()
150152
oneMinute.put(PROCESSED_BYTES, processedBytes.getOneMinuteRate());
151153
oneMinute.put(PROCESSED_WITH_ERROR, processedWithError.getOneMinuteRate());
152154
oneMinute.put(UNPARSEABLE, unparseable.getOneMinuteRate());
153-
oneMinute.put(THROWN_AWAY, thrownAway.getOneMinuteRate());
155+
oneMinute.put(THROWN_AWAY, Arrays.stream(thrownAwayByReason).map(Meter::getOneMinuteRate).reduce(0.0, Double::sum));
154156

155157
Map<String, Object> fiveMinute = new HashMap<>();
156158
fiveMinute.put(PROCESSED, processed.getFiveMinuteRate());
157159
fiveMinute.put(PROCESSED_BYTES, processedBytes.getFiveMinuteRate());
158160
fiveMinute.put(PROCESSED_WITH_ERROR, processedWithError.getFiveMinuteRate());
159161
fiveMinute.put(UNPARSEABLE, unparseable.getFiveMinuteRate());
160-
fiveMinute.put(THROWN_AWAY, thrownAway.getFiveMinuteRate());
162+
fiveMinute.put(THROWN_AWAY, Arrays.stream(thrownAwayByReason).map(Meter::getFiveMinuteRate).reduce(0.0, Double::sum));
161163

162164
Map<String, Object> fifteenMinute = new HashMap<>();
163165
fifteenMinute.put(PROCESSED, processed.getFifteenMinuteRate());
164166
fifteenMinute.put(PROCESSED_BYTES, processedBytes.getFifteenMinuteRate());
165167
fifteenMinute.put(PROCESSED_WITH_ERROR, processedWithError.getFifteenMinuteRate());
166168
fifteenMinute.put(UNPARSEABLE, unparseable.getFifteenMinuteRate());
167-
fifteenMinute.put(THROWN_AWAY, thrownAway.getFifteenMinuteRate());
169+
fifteenMinute.put(THROWN_AWAY, Arrays.stream(thrownAwayByReason).map(Meter::getFifteenMinuteRate).reduce(0.0, Double::sum));
168170

169171
movingAverages.put(ONE_MINUTE_NAME, oneMinute);
170172
movingAverages.put(FIVE_MINUTE_NAME, fiveMinute);

indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,11 @@
2323
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
2424
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
2525
import org.apache.druid.java.util.metrics.AbstractMonitor;
26+
import org.apache.druid.segment.incremental.InputRowThrownAwayReason;
2627
import org.apache.druid.segment.incremental.RowIngestionMeters;
2728
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
28-
import org.apache.druid.segment.incremental.ThrownAwayReason;
2929
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
3030

31-
import java.util.EnumMap;
3231
import java.util.Map;
3332

3433
/**
@@ -45,7 +44,6 @@ public class TaskRealtimeMetricsMonitor extends AbstractMonitor
4544

4645
private SegmentGenerationMetrics previousSegmentGenerationMetrics;
4746
private RowIngestionMetersTotals previousRowIngestionMetersTotals;
48-
private Map<ThrownAwayReason, Long> previousThrownAwayByReason;
4947

5048
public TaskRealtimeMetricsMonitor(
5149
SegmentGenerationMetrics segmentGenerationMetrics,
@@ -57,8 +55,7 @@ public TaskRealtimeMetricsMonitor(
5755
this.rowIngestionMeters = rowIngestionMeters;
5856
this.builder = metricEventBuilder;
5957
previousSegmentGenerationMetrics = new SegmentGenerationMetrics();
60-
previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 0, 0);
61-
previousThrownAwayByReason = new EnumMap<>(ThrownAwayReason.class);
58+
previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, Map.of(), 0);
6259
}
6360

6461
@Override
@@ -68,21 +65,21 @@ public boolean doMonitor(ServiceEmitter emitter)
6865
RowIngestionMetersTotals rowIngestionMetersTotals = rowIngestionMeters.getTotals();
6966

7067
// Emit per-reason metrics with the reason dimension
71-
final Map<ThrownAwayReason, Long> currentThrownAwayByReason = rowIngestionMeters.getThrownAwayByReason();
68+
final Map<InputRowThrownAwayReason, Long> currentThrownAwayByReason = rowIngestionMetersTotals.getThrownAwayByReason();
69+
final Map<InputRowThrownAwayReason, Long> previousThrownAwayByReason = previousRowIngestionMetersTotals.getThrownAwayByReason();
7270
long totalThrownAway = 0;
73-
for (ThrownAwayReason reason : ThrownAwayReason.values()) {
71+
for (InputRowThrownAwayReason reason : InputRowThrownAwayReason.values()) {
7472
final long currentCount = currentThrownAwayByReason.getOrDefault(reason, 0L);
7573
final long previousCount = previousThrownAwayByReason.getOrDefault(reason, 0L);
7674
final long delta = currentCount - previousCount;
7775
if (delta > 0) {
7876
totalThrownAway += delta;
7977
emitter.emit(
80-
builder.setDimension(REASON_DIMENSION, reason.getMetricValue())
78+
builder.setDimension(REASON_DIMENSION, reason.getReason())
8179
.setMetric("ingest/events/thrownAway", delta)
8280
);
8381
}
8482
}
85-
previousThrownAwayByReason = currentThrownAwayByReason;
8683
if (totalThrownAway > 0) {
8784
log.warn(
8885
"[%,d] events thrown away. Possible causes: null events, events filtered out by transformSpec, or events outside earlyMessageRejectionPeriod / lateMessageRejectionPeriod.",

indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ public static FilteringCloseableInputRowIterator inputSourceReader(
230230
);
231231
return new FilteringCloseableInputRowIterator(
232232
inputSourceReader.read(ingestionMeters),
233-
RowFilter.fromPredicate(rowFilter),
233+
InputRowFilter.fromPredicate(rowFilter),
234234
ingestionMeters,
235235
parseExceptionHandler
236236
);

indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,30 @@
2222
import org.apache.druid.data.input.InputRow;
2323
import org.apache.druid.java.util.common.parsers.CloseableIterator;
2424
import org.apache.druid.java.util.common.parsers.ParseException;
25+
import org.apache.druid.segment.incremental.InputRowThrownAwayReason;
2526
import org.apache.druid.segment.incremental.ParseExceptionHandler;
2627
import org.apache.druid.segment.incremental.RowIngestionMeters;
27-
import org.apache.druid.segment.incremental.ThrownAwayReason;
2828

2929
import java.io.IOException;
3030
import java.util.NoSuchElementException;
3131

3232
/**
3333
* An {@link InputRow} iterator used by ingestion {@link Task}s. It can filter out rows which do not satisfy the given
34-
* {@link RowFilter} or throw {@link ParseException} while parsing them. The relevant metric should be counted whenever
34+
* {@link InputRowFilter} or throw {@link ParseException} while parsing them. The relevant metric should be counted whenever
3535
* it filters out rows based on the filter. ParseException handling is delegatged to {@link ParseExceptionHandler}.
3636
*/
3737
public class FilteringCloseableInputRowIterator implements CloseableIterator<InputRow>
3838
{
3939
private final CloseableIterator<InputRow> delegate;
40-
private final RowFilter rowFilter;
40+
private final InputRowFilter rowFilter;
4141
private final RowIngestionMeters rowIngestionMeters;
4242
private final ParseExceptionHandler parseExceptionHandler;
4343

4444
private InputRow next;
4545

4646
public FilteringCloseableInputRowIterator(
4747
CloseableIterator<InputRow> delegate,
48-
RowFilter rowFilter,
48+
InputRowFilter rowFilter,
4949
RowIngestionMeters rowIngestionMeters,
5050
ParseExceptionHandler parseExceptionHandler
5151
)
@@ -67,7 +67,7 @@ public boolean hasNext()
6767
// delegate.next() can throw ParseException
6868
final InputRow row = delegate.next();
6969
// rowFilter.test() can throw ParseException, returns null if accepted, or reason if rejected
70-
final ThrownAwayReason rejectionReason = rowFilter.test(row);
70+
final InputRowThrownAwayReason rejectionReason = rowFilter.test(row);
7171
if (rejectionReason == null) {
7272
next = row;
7373
} else {

indexing-service/src/main/java/org/apache/druid/indexing/common/task/RowFilter.java renamed to indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputRowFilter.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.apache.druid.indexing.common.task;
2121

2222
import org.apache.druid.data.input.InputRow;
23-
import org.apache.druid.segment.incremental.ThrownAwayReason;
23+
import org.apache.druid.segment.incremental.InputRowThrownAwayReason;
2424

2525
import javax.annotation.Nullable;
2626
import java.util.function.Predicate;
@@ -30,30 +30,30 @@
3030
* This is similar to {@link Predicate} but returns the rejection reason instead of just a boolean.
3131
*/
3232
@FunctionalInterface
33-
public interface RowFilter
33+
public interface InputRowFilter
3434
{
3535
/**
3636
* Tests whether the given row should be accepted.
3737
*
3838
* @param row the input row to test
39-
* @return null if the row should be accepted, or the {@link ThrownAwayReason} if the row should be rejected
39+
* @return null if the row should be accepted, or the {@link InputRowThrownAwayReason} if the row should be rejected
4040
*/
4141
@Nullable
42-
ThrownAwayReason test(InputRow row);
42+
InputRowThrownAwayReason test(InputRow row);
4343

4444
/**
45-
* Creates a {@link RowFilter} from a Predicate. When the predicate returns false,
46-
* the rejection reason will be {@link ThrownAwayReason#FILTERED}.
45+
* Creates a {@link InputRowFilter} from a Predicate. When the predicate returns false,
46+
* the rejection reason will be {@link InputRowThrownAwayReason#FILTERED}.
4747
*/
48-
static RowFilter fromPredicate(Predicate<InputRow> predicate)
48+
static InputRowFilter fromPredicate(Predicate<InputRow> predicate)
4949
{
50-
return row -> predicate.test(row) ? null : ThrownAwayReason.FILTERED;
50+
return row -> predicate.test(row) ? null : InputRowThrownAwayReason.FILTERED;
5151
}
5252

5353
/**
54-
* Fully-permissive {@link RowFilter} used mainly for tests.
54+
* Fully-permissive {@link InputRowFilter} used mainly for tests.
5555
*/
56-
static RowFilter allow()
56+
static InputRowFilter allowAll()
5757
{
5858
return row -> null;
5959
}
@@ -62,10 +62,10 @@ static RowFilter allow()
6262
* Combines this filter with another filter. A row is rejected if either filter rejects it.
6363
* The rejection reason from the first rejecting filter (this filter first) is returned.
6464
*/
65-
default RowFilter and(RowFilter other)
65+
default InputRowFilter and(InputRowFilter other)
6666
{
6767
return row -> {
68-
ThrownAwayReason reason = this.test(row);
68+
InputRowThrownAwayReason reason = this.test(row);
6969
if (reason != null) {
7070
return reason;
7171
}

indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.apache.druid.rpc.HttpResponseException;
7070
import org.apache.druid.rpc.indexing.OverlordClient;
7171
import org.apache.druid.segment.SegmentSchemaMapping;
72+
import org.apache.druid.segment.incremental.InputRowThrownAwayReason;
7273
import org.apache.druid.segment.incremental.ParseExceptionReport;
7374
import org.apache.druid.segment.incremental.RowIngestionMeters;
7475
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
@@ -112,6 +113,7 @@
112113
import java.util.Collection;
113114
import java.util.Collections;
114115
import java.util.Comparator;
116+
import java.util.EnumMap;
115117
import java.util.HashMap;
116118
import java.util.HashSet;
117119
import java.util.List;
@@ -1602,11 +1604,25 @@ private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object build
16021604
return (RowIngestionMetersTotals) buildSegmentsRowStats;
16031605
} else if (buildSegmentsRowStats instanceof Map) {
16041606
Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
1607+
1608+
// Convert the thrownAwayByReason map from String keys to InputRowThrownAwayReason enum keys
1609+
Map<InputRowThrownAwayReason, Long> thrownAwayByReason = null;
1610+
Object rawThrownAwayByReason = buildSegmentsRowStatsMap.get("thrownAwayByReason");
1611+
if (rawThrownAwayByReason instanceof Map) {
1612+
thrownAwayByReason = new EnumMap<>(InputRowThrownAwayReason.class);
1613+
Map<?, ?> rawMap = (Map<?, ?>) rawThrownAwayByReason;
1614+
for (Map.Entry<?, ?> entry : rawMap.entrySet()) {
1615+
InputRowThrownAwayReason reason = InputRowThrownAwayReason.valueOf(entry.getKey().toString());
1616+
thrownAwayByReason.put(reason, ((Number) entry.getValue()).longValue());
1617+
}
1618+
}
1619+
16051620
return new RowIngestionMetersTotals(
16061621
((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
16071622
((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
16081623
((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
16091624
((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
1625+
thrownAwayByReason,
16101626
((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
16111627
);
16121628
} else {

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
import org.apache.druid.indexing.common.actions.TaskLocks;
6767
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
6868
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
69-
import org.apache.druid.indexing.common.task.RowFilter;
69+
import org.apache.druid.indexing.common.task.InputRowFilter;
7070
import org.apache.druid.indexing.input.InputRowSchemas;
7171
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
7272
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
@@ -79,10 +79,10 @@
7979
import org.apache.druid.java.util.common.concurrent.Execs;
8080
import org.apache.druid.java.util.emitter.EmittingLogger;
8181
import org.apache.druid.metadata.PendingSegmentRecord;
82+
import org.apache.druid.segment.incremental.InputRowThrownAwayReason;
8283
import org.apache.druid.segment.incremental.ParseExceptionHandler;
8384
import org.apache.druid.segment.incremental.ParseExceptionReport;
8485
import org.apache.druid.segment.incremental.RowIngestionMeters;
85-
import org.apache.druid.segment.incremental.ThrownAwayReason;
8686
import org.apache.druid.segment.realtime.ChatHandler;
8787
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
8888
import org.apache.druid.segment.realtime.appenderator.Appenderator;
@@ -421,7 +421,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
421421
inputRowSchema,
422422
task.getDataSchema().getTransformSpec(),
423423
toolbox.getIndexingTmpDir(),
424-
this::getRowRejectionReason,
424+
this::ensureRowIsNonNullAndWithinMessageTimeBounds,
425425
rowIngestionMeters,
426426
parseExceptionHandler
427427
);
@@ -2148,33 +2148,31 @@ private void refreshMinMaxMessageTime()
21482148

21492149
/**
21502150
* Returns the rejection reason for a row, or null if the row should be accepted.
2151-
* This method is used as a {@link RowFilter} for the {@link StreamChunkParser}.
2151+
* This method is used as a {@link InputRowFilter} for the {@link StreamChunkParser}.
21522152
*/
21532153
@Nullable
2154-
ThrownAwayReason getRowRejectionReason(final InputRow row)
2154+
InputRowThrownAwayReason ensureRowIsNonNullAndWithinMessageTimeBounds(@Nullable InputRow row)
21552155
{
21562156
if (row == null) {
2157-
return ThrownAwayReason.NULL;
2158-
}
2159-
if (minMessageTime.isAfter(row.getTimestamp())) {
2157+
return InputRowThrownAwayReason.NULL_OR_EMPTY_RECORD;
2158+
} else if (minMessageTime.isAfter(row.getTimestamp())) {
21602159
if (log.isDebugEnabled()) {
21612160
log.debug(
2162-
"CurrentTimeStamp[%s] is before MinimumMessageTime[%s]",
2161+
"CurrentTimeStamp[%s] is before minimumMessageTime[%s]",
21632162
row.getTimestamp(),
21642163
minMessageTime
21652164
);
21662165
}
2167-
return ThrownAwayReason.BEFORE_MIN_MESSAGE_TIME;
2168-
}
2169-
if (maxMessageTime.isBefore(row.getTimestamp())) {
2166+
return InputRowThrownAwayReason.BEFORE_MIN_MESSAGE_TIME;
2167+
} else if (maxMessageTime.isBefore(row.getTimestamp())) {
21702168
if (log.isDebugEnabled()) {
21712169
log.debug(
2172-
"CurrentTimeStamp[%s] is after MaximumMessageTime[%s]",
2170+
"CurrentTimeStamp[%s] is after maximumMessageTime[%s]",
21732171
row.getTimestamp(),
21742172
maxMessageTime
21752173
);
21762174
}
2177-
return ThrownAwayReason.AFTER_MAX_MESSAGE_TIME;
2175+
return InputRowThrownAwayReason.AFTER_MAX_MESSAGE_TIME;
21782176
}
21792177
return null;
21802178
}

0 commit comments

Comments
 (0)