Skip to content

Commit 62440f3

Browse files
committed
Comments
1 parent decc140 commit 62440f3

File tree

19 files changed

+96
-142
lines changed

19 files changed

+96
-142
lines changed

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@
129129
import org.apache.druid.server.security.ResourceAction;
130130
import org.apache.druid.server.security.ResourceType;
131131
import org.apache.druid.timeline.DataSegment;
132-
import org.apache.druid.utils.CollectionUtils;
133132
import org.apache.kafka.clients.producer.KafkaProducer;
134133
import org.apache.kafka.clients.producer.ProducerRecord;
135134
import org.apache.kafka.common.header.Header;
@@ -1663,8 +1662,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception
16631662
Assert.assertNull(reportData.getErrorMsg());
16641663

16651664
// Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
1666-
Map<String, Integer> expectedThrownAwayByReason = CollectionUtils.mapValues(InputRowFilterResult.buildRejectedCounterMap(), Long::intValue);
1667-
expectedThrownAwayByReason.put(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason(), 1);
1665+
Map<String, Integer> expectedThrownAwayByReason = Map.of(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason(), 1);
16681666
Map<String, Object> expectedMetrics = ImmutableMap.of(
16691667
RowIngestionMeters.BUILD_SEGMENTS,
16701668
ImmutableMap.of(
@@ -1752,7 +1750,7 @@ public void testMultipleParseExceptionsFailure() throws Exception
17521750
Assert.assertNotNull(reportData.getErrorMsg());
17531751

17541752
// Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
1755-
Map<String, Integer> expectedThrownAwayByReason = CollectionUtils.mapValues(InputRowFilterResult.buildRejectedCounterMap(), Long::intValue);
1753+
Map<String, Integer> expectedThrownAwayByReason = Map.of();
17561754
Map<String, Object> expectedMetrics = ImmutableMap.of(
17571755
RowIngestionMeters.BUILD_SEGMENTS,
17581756
ImmutableMap.of(

extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@
8989
import org.apache.druid.segment.transform.ExpressionTransform;
9090
import org.apache.druid.segment.transform.TransformSpec;
9191
import org.apache.druid.timeline.DataSegment;
92-
import org.apache.druid.utils.CollectionUtils;
9392
import org.easymock.EasyMock;
9493
import org.joda.time.Duration;
9594
import org.joda.time.Period;
@@ -1197,7 +1196,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception
11971196
Assert.assertNull(reportData.getErrorMsg());
11981197

11991198
// Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
1200-
Map<String, Integer> expectedThrownAwayByReason = CollectionUtils.mapValues(InputRowFilterResult.buildRejectedCounterMap(), Long::intValue);
1199+
Map<String, Integer> expectedThrownAwayByReason = Map.of();
12011200
Map<String, Object> expectedMetrics = ImmutableMap.of(
12021201
RowIngestionMeters.BUILD_SEGMENTS,
12031202
ImmutableMap.of(
@@ -1290,7 +1289,7 @@ public void testMultipleParseExceptionsFailure() throws Exception
12901289
Assert.assertNotNull(reportData.getErrorMsg());
12911290

12921291
// Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
1293-
Map<String, Integer> expectedThrownAwayByReason = CollectionUtils.mapValues(InputRowFilterResult.buildRejectedCounterMap(), Long::intValue);
1292+
Map<String, Integer> expectedThrownAwayByReason = Map.of();
12941293
Map<String, Object> expectedMetrics = ImmutableMap.of(
12951294
RowIngestionMeters.BUILD_SEGMENTS,
12961295
ImmutableMap.of(

indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class DropwizardRowIngestionMeters implements RowIngestionMeters
3939
private final Meter processedBytes;
4040
private final Meter processedWithError;
4141
private final Meter unparseable;
42-
private final Meter[] thrownAwayByReason = new Meter[InputRowFilterResult.NUM_FILTER_RESULT];
42+
private final Meter[] thrownAwayByReason = new Meter[InputRowFilterResult.numValues()];
4343

4444
public DropwizardRowIngestionMeters()
4545
{
@@ -120,9 +120,12 @@ public void incrementThrownAway(InputRowFilterResult reason)
120120
@Override
121121
public Map<String, Long> getThrownAwayByReason()
122122
{
123-
Map<String, Long> result = InputRowFilterResult.buildRejectedCounterMap();
123+
Map<String, Long> result = new HashMap<>();
124124
for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) {
125-
result.put(reason.getReason(), thrownAwayByReason[reason.ordinal()].getCount());
125+
long count = thrownAwayByReason[reason.ordinal()].getCount();
126+
if (count > 0) {
127+
result.put(reason.getReason(), count);
128+
}
126129
}
127130
return result;
128131
}

indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
2424
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
2525
import org.apache.druid.java.util.metrics.AbstractMonitor;
26+
import org.apache.druid.query.DruidMetrics;
2627
import org.apache.druid.segment.incremental.InputRowFilterResult;
2728
import org.apache.druid.segment.incremental.RowIngestionMeters;
2829
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
@@ -37,7 +38,6 @@
3738
public class TaskRealtimeMetricsMonitor extends AbstractMonitor
3839
{
3940
private static final EmittingLogger log = new EmittingLogger(TaskRealtimeMetricsMonitor.class);
40-
private static final String REASON_DIMENSION = "reason";
4141

4242
private final SegmentGenerationMetrics segmentGenerationMetrics;
4343
private final RowIngestionMeters rowIngestionMeters;
@@ -76,7 +76,7 @@ public boolean doMonitor(ServiceEmitter emitter)
7676
if (delta > 0) {
7777
deltaThrownAwayByReason.put(reason.getReason(), delta);
7878
emitter.emit(
79-
builder.setDimension(REASON_DIMENSION, reason.getReason())
79+
builder.setDimension(DruidMetrics.REASON, reason.getReason())
8080
.setMetric("ingest/events/thrownAway", delta)
8181
);
8282
}

indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputRowFilter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public interface InputRowFilter
3434
/**
3535
* Tests whether the given row should be accepted.
3636
*
37-
* @return {@link InputRowFilterResult#ACCEPTED} if the row should be accepted, or another {@link InputRowFilterResult} value if the row should be rejected
37+
* @return {@link InputRowFilterResult#ACCEPTED} only if the row should be accepted, otherwise another {@link InputRowFilterResult} value.
3838
*/
3939
InputRowFilterResult test(InputRow row);
4040

@@ -44,7 +44,7 @@ public interface InputRowFilter
4444
*/
4545
static InputRowFilter fromPredicate(Predicate<InputRow> predicate)
4646
{
47-
return row -> predicate.test(row) ? InputRowFilterResult.ACCEPTED : InputRowFilterResult.FILTERED;
47+
return row -> predicate.test(row) ? InputRowFilterResult.ACCEPTED : InputRowFilterResult.CUSTOM_FILTER;
4848
}
4949

5050
/**

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
import org.apache.druid.indexing.common.actions.TaskLocks;
6767
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
6868
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
69-
import org.apache.druid.indexing.common.task.InputRowFilter;
7069
import org.apache.druid.indexing.input.InputRowSchemas;
7170
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
7271
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
@@ -2147,10 +2146,8 @@ private void refreshMinMaxMessageTime()
21472146
}
21482147

21492148
/**
2150-
* Returns the filter result for a row.
21512149
* Returns {@link InputRowFilterResult#ACCEPTED} if the row should be accepted,
21522150
* or a rejection reason otherwise.
2153-
* This method is used as a {@link InputRowFilter} for the {@link StreamChunkParser}.
21542151
*/
21552152
InputRowFilterResult ensureRowIsNonNullAndWithinMessageTimeBounds(@Nullable InputRow row)
21562153
{

indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,10 @@ public void testThrownAwayEmitsReasonDimension()
125125
realMeters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME);
126126
realMeters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME);
127127
realMeters.incrementThrownAway(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME);
128-
realMeters.incrementThrownAway(InputRowFilterResult.FILTERED);
129-
realMeters.incrementThrownAway(InputRowFilterResult.FILTERED);
130-
realMeters.incrementThrownAway(InputRowFilterResult.FILTERED);
131-
realMeters.incrementThrownAway(InputRowFilterResult.FILTERED);
128+
realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
129+
realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
130+
realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
131+
realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
132132

133133
TaskRealtimeMetricsMonitor monitor = new TaskRealtimeMetricsMonitor(
134134
segmentGenerationMetrics,
@@ -145,8 +145,8 @@ public void testThrownAwayEmitsReasonDimension()
145145
}
146146

147147
Assert.assertEquals(Long.valueOf(2), thrownAwayByReason.get("null"));
148-
Assert.assertEquals(Long.valueOf(3), thrownAwayByReason.get("beforeMinMessageTime"));
149-
Assert.assertEquals(Long.valueOf(1), thrownAwayByReason.get("afterMaxMessageTime"));
148+
Assert.assertEquals(Long.valueOf(3), thrownAwayByReason.get("beforeMinimumMessageTime"));
149+
Assert.assertEquals(Long.valueOf(1), thrownAwayByReason.get("afterMaximumMessageTime"));
150150
Assert.assertEquals(Long.valueOf(4), thrownAwayByReason.get("filtered"));
151151
}
152152

@@ -155,7 +155,7 @@ public void testThrownAwayReasonDimensionOnlyEmittedWhenNonZero()
155155
{
156156
SimpleRowIngestionMeters realMeters = new SimpleRowIngestionMeters();
157157
realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
158-
realMeters.incrementThrownAway(InputRowFilterResult.FILTERED);
158+
realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
159159

160160
TaskRealtimeMetricsMonitor monitor = new TaskRealtimeMetricsMonitor(
161161
segmentGenerationMetrics,
@@ -175,8 +175,8 @@ public void testThrownAwayReasonDimensionOnlyEmittedWhenNonZero()
175175
Assert.assertEquals(2, thrownAwayByReason.size());
176176
Assert.assertTrue(thrownAwayByReason.containsKey("null"));
177177
Assert.assertTrue(thrownAwayByReason.containsKey("filtered"));
178-
Assert.assertFalse(thrownAwayByReason.containsKey("beforeMinMessageTime"));
179-
Assert.assertFalse(thrownAwayByReason.containsKey("afterMaxMessageTime"));
178+
Assert.assertFalse(thrownAwayByReason.containsKey("beforeMinimumMessageTime"));
179+
Assert.assertFalse(thrownAwayByReason.containsKey("afterMaximumMessageTime"));
180180
}
181181

182182
@Test
@@ -204,8 +204,8 @@ public void testThrownAwayReasonDeltaAcrossMonitorCalls()
204204

205205
emitter.flush();
206206
realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
207-
realMeters.incrementThrownAway(InputRowFilterResult.FILTERED);
208-
realMeters.incrementThrownAway(InputRowFilterResult.FILTERED);
207+
realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
208+
realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
209209
monitor.doMonitor(emitter);
210210

211211
// Find counts from second call - should be deltas only

indexing-service/src/test/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersTest.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ public void testIncrementThrownAwayWithReason()
6161
meters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD);
6262
meters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME);
6363
meters.incrementThrownAway(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME);
64-
meters.incrementThrownAway(InputRowFilterResult.FILTERED);
65-
meters.incrementThrownAway(InputRowFilterResult.FILTERED);
66-
meters.incrementThrownAway(InputRowFilterResult.FILTERED);
64+
meters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
65+
meters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
66+
meters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
6767

6868
// Total thrownAway should be sum of all reasons
6969
Assert.assertEquals(7, meters.getThrownAway());
@@ -73,20 +73,17 @@ public void testIncrementThrownAwayWithReason()
7373
Assert.assertEquals(Long.valueOf(2), byReason.get(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason()));
7474
Assert.assertEquals(Long.valueOf(1), byReason.get(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.getReason()));
7575
Assert.assertEquals(Long.valueOf(1), byReason.get(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.getReason()));
76-
Assert.assertEquals(Long.valueOf(3), byReason.get(InputRowFilterResult.FILTERED.getReason()));
76+
Assert.assertEquals(Long.valueOf(3), byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason()));
7777
}
7878

7979
@Test
8080
public void testGetThrownAwayByReasonReturnsAllReasons()
8181
{
8282
DropwizardRowIngestionMeters meters = new DropwizardRowIngestionMeters();
8383

84-
// Even with no increments, all reasons should be present with 0 counts
84+
// With no increments, all reasons should be present with 0 counts
8585
Map<String, Long> byReason = meters.getThrownAwayByReason();
86-
Assert.assertEquals(InputRowFilterResult.rejectedValues().length, byReason.size());
87-
for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) {
88-
Assert.assertEquals(Long.valueOf(0), byReason.get(reason.getReason()));
89-
}
86+
Assert.assertTrue(byReason.isEmpty());
9087
}
9188

9289
@Test
@@ -95,7 +92,7 @@ public void testMovingAverages()
9592
DropwizardRowIngestionMeters meters = new DropwizardRowIngestionMeters();
9693

9794
meters.incrementProcessed();
98-
meters.incrementThrownAway(InputRowFilterResult.FILTERED);
95+
meters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER);
9996

10097
Map<String, Object> movingAverages = meters.getMovingAverages();
10198
Assert.assertNotNull(movingAverages);

indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ public void testRowFilterWithReasons()
363363
} else if (dim1 == 20) {
364364
return InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME;
365365
} else {
366-
return InputRowFilterResult.FILTERED;
366+
return InputRowFilterResult.CUSTOM_FILTER;
367367
}
368368
};
369369

@@ -388,10 +388,9 @@ public void testRowFilterWithReasons()
388388

389389
// Check per-reason counts
390390
Map<String, Long> byReason = rowIngestionMeters.getThrownAwayByReason();
391-
Assert.assertEquals(Long.valueOf(0), byReason.get(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason()));
391+
Assert.assertEquals(2, byReason.size());
392392
Assert.assertEquals(Long.valueOf(1), byReason.get(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.getReason())); // dim1=20
393-
Assert.assertEquals(Long.valueOf(0), byReason.get(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.getReason()));
394-
Assert.assertEquals(Long.valueOf(1), byReason.get(InputRowFilterResult.FILTERED.getReason())); // dim1=30
393+
Assert.assertEquals(Long.valueOf(1), byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason())); // dim1=30
395394
}
396395

397396
@Test
@@ -416,10 +415,8 @@ public void testRowFilterFromPredicate()
416415

417416
// All thrown away should have FILTERED reason when using fromPredicate
418417
Map<String, Long> byReason = rowIngestionMeters.getThrownAwayByReason();
419-
Assert.assertEquals(Long.valueOf(2), byReason.get(InputRowFilterResult.FILTERED.getReason()));
420-
Assert.assertEquals(Long.valueOf(0), byReason.get(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason()));
421-
Assert.assertEquals(Long.valueOf(0), byReason.get(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.getReason()));
422-
Assert.assertEquals(Long.valueOf(0), byReason.get(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.getReason()));
418+
Assert.assertEquals(1, byReason.size());
419+
Assert.assertEquals(Long.valueOf(2), byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason()));
423420
}
424421

425422
@Test
@@ -429,7 +426,7 @@ public void testRowFilterAnd()
429426
final InputRowFilter nullFilter = row -> row == null ? InputRowFilterResult.NULL_OR_EMPTY_RECORD : InputRowFilterResult.ACCEPTED;
430427

431428
// Second filter: reject if dim1 != 10
432-
final InputRowFilter valueFilter = row -> (Integer) row.getRaw("dim1") == 10 ? InputRowFilterResult.ACCEPTED : InputRowFilterResult.FILTERED;
429+
final InputRowFilter valueFilter = row -> (Integer) row.getRaw("dim1") == 10 ? InputRowFilterResult.ACCEPTED : InputRowFilterResult.CUSTOM_FILTER;
433430

434431
// Combine filters
435432
final InputRowFilter combinedFilter = nullFilter.and(valueFilter);
@@ -449,7 +446,7 @@ public void testRowFilterAnd()
449446

450447
// All rejected rows should have FILTERED reason (from second filter)
451448
Map<String, Long> byReason = rowIngestionMeters.getThrownAwayByReason();
452-
Assert.assertEquals(Long.valueOf(2), byReason.get(InputRowFilterResult.FILTERED.getReason()));
449+
Assert.assertEquals(Long.valueOf(2), byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason()));
453450
}
454451

455452
private static InputRow newRow(DateTime timestamp, Object dim1Val, Object dim2Val)

indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@
110110
import org.apache.druid.timeline.partition.NumberedShardSpec;
111111
import org.apache.druid.timeline.partition.PartitionIds;
112112
import org.apache.druid.timeline.partition.ShardSpec;
113-
import org.apache.druid.utils.CollectionUtils;
114113
import org.easymock.EasyMock;
115114
import org.hamcrest.CoreMatchers;
116115
import org.hamcrest.MatcherAssert;
@@ -1511,11 +1510,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception
15111510
IngestionStatsAndErrors reportData = getTaskReportData();
15121511

15131512
// Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
1514-
Map<String, Integer> expectedThrownAwayByReason = CollectionUtils.mapValues(
1515-
InputRowFilterResult.buildRejectedCounterMap(),
1516-
Long::intValue
1517-
);
1518-
expectedThrownAwayByReason.put(InputRowFilterResult.FILTERED.getReason(), 1);
1513+
Map<String, Integer> expectedThrownAwayByReason = Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1);
15191514
Map<String, Object> expectedMetrics = ImmutableMap.of(
15201515
RowIngestionMeters.DETERMINE_PARTITIONS,
15211516
ImmutableMap.of(
@@ -1691,15 +1686,8 @@ public void testMultipleParseExceptionsFailure() throws Exception
16911686
IngestionStatsAndErrors reportData = getTaskReportData();
16921687

16931688
// Jackson will serde numerics ≤ 32bits as Integers, rather than Longs
1694-
Map<String, Integer> expectedDeterminePartitionsThrownAwayByReason = CollectionUtils.mapValues(
1695-
InputRowFilterResult.buildRejectedCounterMap(),
1696-
Long::intValue
1697-
);
1698-
Map<String, Integer> expectedBuildSegmentsThrownAwayByReason = CollectionUtils.mapValues(
1699-
InputRowFilterResult.buildRejectedCounterMap(),
1700-
Long::intValue
1701-
);
1702-
expectedBuildSegmentsThrownAwayByReason.put(InputRowFilterResult.FILTERED.getReason(), 1);
1689+
Map<String, Integer> expectedDeterminePartitionsThrownAwayByReason = Map.of();
1690+
Map<String, Integer> expectedBuildSegmentsThrownAwayByReason = Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1);
17031691
Map<String, Object> expectedMetrics = ImmutableMap.of(
17041692
RowIngestionMeters.DETERMINE_PARTITIONS,
17051693
ImmutableMap.of(
@@ -1812,15 +1800,8 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc
18121800

18131801
IngestionStatsAndErrors reportData = getTaskReportData();
18141802

1815-
Map<String, Integer> expectedDeterminePartitionsThrownAwayByReason = CollectionUtils.mapValues(
1816-
InputRowFilterResult.buildRejectedCounterMap(),
1817-
Long::intValue
1818-
);
1819-
expectedDeterminePartitionsThrownAwayByReason.put(InputRowFilterResult.FILTERED.getReason(), 1);
1820-
Map<String, Integer> expectedBuildSegmentsThrownAwayByReason = CollectionUtils.mapValues(
1821-
InputRowFilterResult.buildRejectedCounterMap(),
1822-
Long::intValue
1823-
);
1803+
Map<String, Integer> expectedDeterminePartitionsThrownAwayByReason = Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1);
1804+
Map<String, Integer> expectedBuildSegmentsThrownAwayByReason = Map.of();
18241805
Map<String, Object> expectedMetrics = ImmutableMap.of(
18251806
RowIngestionMeters.DETERMINE_PARTITIONS,
18261807
ImmutableMap.of(

0 commit comments

Comments
 (0)