Skip to content

Commit 42f05b9

Browse files
authored
Preventing serialization errors in the nodes stats API (#90319) (#90430)
Preventing serialization errors in the nodes stats API, and adding logging to the ingest counter code so that we can find the root cause of the problem in the future.
1 parent 1b5f1fa commit 42f05b9

File tree

7 files changed

+168
-73
lines changed

7 files changed

+168
-73
lines changed

docs/changelog/90319.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 90319
2+
summary: Preventing serialization errors in the nodes stats API
3+
area: Ingest Node
4+
type: bug
5+
issues:
6+
- 77973

server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.ingest;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.ElasticsearchException;
1214
import org.elasticsearch.core.Tuple;
1315

@@ -16,6 +18,7 @@
1618
import java.util.Collections;
1719
import java.util.List;
1820
import java.util.Map;
21+
import java.util.concurrent.atomic.AtomicBoolean;
1922
import java.util.function.BiConsumer;
2023
import java.util.function.LongSupplier;
2124
import java.util.stream.Collectors;
@@ -30,6 +33,8 @@ public class CompoundProcessor implements Processor {
3033
public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
3134
public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline";
3235

36+
private static final Logger logger = LogManager.getLogger(CompoundProcessor.class);
37+
3338
private final boolean ignoreFailure;
3439
private final List<Processor> processors;
3540
private final List<Processor> onFailureProcessors;
@@ -191,25 +196,43 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume
191196
final IngestMetric finalMetric = processorsWithMetrics.get(currentProcessor).v2();
192197
final Processor finalProcessor = processorsWithMetrics.get(currentProcessor).v1();
193198
final IngestDocument finalIngestDocument = ingestDocument;
199+
/*
200+
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
201+
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
202+
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
203+
* is only executed once.
204+
*/
205+
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
194206
finalMetric.preIngest();
207+
final AtomicBoolean postIngestHasBeenCalled = new AtomicBoolean(false);
195208
try {
196209
finalProcessor.execute(ingestDocument, (result, e) -> {
197-
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - finalStartTimeInNanos;
198-
finalMetric.postIngest(ingestTimeInNanos);
199-
200-
if (e != null) {
201-
executeOnFailureOuter(finalCurrentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e);
210+
if (listenerHasBeenCalled.getAndSet(true)) {
211+
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
212+
assert false : "A listener was unexpectedly called more than once";
202213
} else {
203-
if (result != null) {
204-
innerExecute(nextProcessor, result, handler);
214+
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - finalStartTimeInNanos;
215+
finalMetric.postIngest(ingestTimeInNanos);
216+
postIngestHasBeenCalled.set(true);
217+
if (e != null) {
218+
executeOnFailureOuter(finalCurrentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e);
205219
} else {
206-
handler.accept(null, null);
220+
if (result != null) {
221+
innerExecute(nextProcessor, result, handler);
222+
} else {
223+
handler.accept(null, null);
224+
}
207225
}
208226
}
209227
});
210228
} catch (Exception e) {
211229
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
212-
finalMetric.postIngest(ingestTimeInNanos);
230+
if (postIngestHasBeenCalled.get()) {
231+
logger.warn("Preventing postIngest from being called more than once", new RuntimeException());
232+
assert false : "Attempt to call postIngest more than once";
233+
} else {
234+
finalMetric.postIngest(ingestTimeInNanos);
235+
}
213236
executeOnFailureOuter(currentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e);
214237
}
215238
}

server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.ingest;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.common.logging.DeprecationCategory;
1214
import org.elasticsearch.common.logging.DeprecationLogger;
1315
import org.elasticsearch.script.DynamicMap;
@@ -26,6 +28,7 @@
2628
import java.util.ListIterator;
2729
import java.util.Map;
2830
import java.util.Set;
31+
import java.util.concurrent.atomic.AtomicBoolean;
2932
import java.util.function.BiConsumer;
3033
import java.util.function.Function;
3134
import java.util.function.LongSupplier;
@@ -45,6 +48,8 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
4548
return value;
4649
});
4750

51+
private static final Logger logger = LogManager.getLogger(ConditionalProcessor.class);
52+
4853
static final String TYPE = "conditional";
4954

5055
private final Script condition;
@@ -120,15 +125,27 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
120125

121126
if (matches) {
122127
final long startTimeInNanos = relativeTimeProvider.getAsLong();
128+
/*
129+
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
130+
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
131+
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
132+
* is only executed once.
133+
*/
134+
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
123135
metric.preIngest();
124136
processor.execute(ingestDocument, (result, e) -> {
125-
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
126-
metric.postIngest(ingestTimeInNanos);
127-
if (e != null) {
128-
metric.ingestFailed();
129-
handler.accept(null, e);
137+
if (listenerHasBeenCalled.getAndSet(true)) {
138+
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
139+
assert false : "A listener was unexpectedly called more than once";
130140
} else {
131-
handler.accept(result, null);
141+
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
142+
metric.postIngest(ingestTimeInNanos);
143+
if (e != null) {
144+
metric.ingestFailed();
145+
handler.accept(null, e);
146+
} else {
147+
handler.accept(result, null);
148+
}
132149
}
133150
});
134151
} else {

server/src/main/java/org/elasticsearch/ingest/IngestMetric.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.ingest;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.common.metrics.CounterMetric;
1214

1315
import java.util.concurrent.TimeUnit;
@@ -22,6 +24,8 @@
2224
*/
2325
class IngestMetric {
2426

27+
private static final Logger logger = LogManager.getLogger(IngestMetric.class);
28+
2529
/**
2630
* The time it takes to complete the measured item.
2731
*/
@@ -53,7 +57,19 @@ void preIngest() {
5357
*/
5458
void postIngest(long ingestTimeInNanos) {
5559
long current = ingestCurrent.decrementAndGet();
56-
assert current >= 0 : "ingest metric current count double-decremented";
60+
if (current < 0) {
61+
/*
62+
* This ought to never happen. However if it does, it's incredibly bad because ingestCurrent being negative causes a
63+
* serialization error that prevents the nodes stats API from working. So we're doing 3 things here:
64+
* (1) Log a stack trace at warn level so that the Elasticsearch engineering team can track down and fix the source of the
65+
* bug if it still exists
66+
* (2) Throw an AssertionError if assertions are enabled so that we are aware of the bug
67+
* (3) Increment the counter back up so that we don't hit serialization failures
68+
*/
69+
logger.warn("Current ingest counter decremented below 0", new RuntimeException());
70+
assert false : "ingest metric current count double-decremented";
71+
ingestCurrent.incrementAndGet();
72+
}
5773
this.ingestTimeInNanos.inc(ingestTimeInNanos);
5874
ingestCount.inc();
5975
}
@@ -84,6 +100,8 @@ void add(IngestMetric metrics) {
84100
IngestStats.Stats createStats() {
85101
// we track ingestTime at nanosecond resolution, but IngestStats uses millisecond resolution for reporting
86102
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(ingestTimeInNanos.count());
87-
return new IngestStats.Stats(ingestCount.count(), ingestTimeInMillis, ingestCurrent.get(), ingestFailed.count());
103+
// It is possible for the current count to briefly drop below 0, causing serialization problems. See #90319
104+
long currentCount = Math.max(0, ingestCurrent.get());
105+
return new IngestStats.Stats(ingestCount.count(), ingestTimeInMillis, currentCount, ingestFailed.count());
88106
}
89107
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 63 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import java.util.Objects;
7373
import java.util.Set;
7474
import java.util.concurrent.CopyOnWriteArrayList;
75+
import java.util.concurrent.atomic.AtomicBoolean;
7576
import java.util.concurrent.atomic.AtomicInteger;
7677
import java.util.function.BiConsumer;
7778
import java.util.function.Consumer;
@@ -887,60 +888,72 @@ private void innerExecute(
887888
VersionType versionType = indexRequest.versionType();
888889
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
889890
IngestDocument ingestDocument = new IngestDocument(index, id, version, routing, versionType, sourceAsMap);
891+
/*
892+
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
893+
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
894+
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
895+
* is only executed once.
896+
*/
897+
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
890898
ingestDocument.executePipeline(pipeline, (result, e) -> {
891-
long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
892-
totalMetrics.postIngest(ingestTimeInNanos);
893-
if (e != null) {
894-
totalMetrics.ingestFailed();
895-
handler.accept(e);
896-
} else if (result == null) {
897-
itemDroppedHandler.accept(slot);
898-
handler.accept(null);
899+
if (listenerHasBeenCalled.getAndSet(true)) {
900+
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
901+
assert false : "A listener was unexpectedly called more than once";
899902
} else {
900-
org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata();
901-
902-
// it's fine to set all metadata fields all the time, as ingest document holds their starting values
903-
// before ingestion, which might also get modified during ingestion.
904-
indexRequest.index(metadata.getIndex());
905-
indexRequest.id(metadata.getId());
906-
indexRequest.routing(metadata.getRouting());
907-
indexRequest.version(metadata.getVersion());
908-
if (metadata.getVersionType() != null) {
909-
indexRequest.versionType(VersionType.fromString(metadata.getVersionType()));
910-
}
911-
Number number;
912-
if ((number = metadata.getIfSeqNo()) != null) {
913-
indexRequest.setIfSeqNo(number.longValue());
914-
}
915-
if ((number = metadata.getIfPrimaryTerm()) != null) {
916-
indexRequest.setIfPrimaryTerm(number.longValue());
917-
}
918-
try {
919-
boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck();
920-
indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences);
921-
} catch (IllegalArgumentException ex) {
922-
// An IllegalArgumentException can be thrown when an ingest
923-
// processor creates a source map that is self-referencing.
924-
// In that case, we catch and wrap the exception so we can
925-
// include which pipeline failed.
903+
long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
904+
totalMetrics.postIngest(ingestTimeInNanos);
905+
if (e != null) {
926906
totalMetrics.ingestFailed();
927-
handler.accept(
928-
new IllegalArgumentException(
929-
"Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]",
930-
ex
931-
)
932-
);
933-
return;
934-
}
935-
Map<String, String> map;
936-
if ((map = metadata.getDynamicTemplates()) != null) {
937-
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
938-
mergedDynamicTemplates.putAll(map);
939-
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
940-
}
941-
postIngest(ingestDocument, indexRequest);
907+
handler.accept(e);
908+
} else if (result == null) {
909+
itemDroppedHandler.accept(slot);
910+
handler.accept(null);
911+
} else {
912+
org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata();
913+
914+
// it's fine to set all metadata fields all the time, as ingest document holds their starting values
915+
// before ingestion, which might also get modified during ingestion.
916+
indexRequest.index(metadata.getIndex());
917+
indexRequest.id(metadata.getId());
918+
indexRequest.routing(metadata.getRouting());
919+
indexRequest.version(metadata.getVersion());
920+
if (metadata.getVersionType() != null) {
921+
indexRequest.versionType(VersionType.fromString(metadata.getVersionType()));
922+
}
923+
Number number;
924+
if ((number = metadata.getIfSeqNo()) != null) {
925+
indexRequest.setIfSeqNo(number.longValue());
926+
}
927+
if ((number = metadata.getIfPrimaryTerm()) != null) {
928+
indexRequest.setIfPrimaryTerm(number.longValue());
929+
}
930+
try {
931+
boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck();
932+
indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences);
933+
} catch (IllegalArgumentException ex) {
934+
// An IllegalArgumentException can be thrown when an ingest
935+
// processor creates a source map that is self-referencing.
936+
// In that case, we catch and wrap the exception so we can
937+
// include which pipeline failed.
938+
totalMetrics.ingestFailed();
939+
handler.accept(
940+
new IllegalArgumentException(
941+
"Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]",
942+
ex
943+
)
944+
);
945+
return;
946+
}
947+
Map<String, String> map;
948+
if ((map = metadata.getDynamicTemplates()) != null) {
949+
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
950+
mergedDynamicTemplates.putAll(map);
951+
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
952+
}
953+
postIngest(ingestDocument, indexRequest);
942954

943-
handler.accept(null);
955+
handler.accept(null);
956+
}
944957
}
945958
});
946959
}

server/src/main/java/org/elasticsearch/ingest/Pipeline.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.ingest;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.ElasticsearchParseException;
1214
import org.elasticsearch.core.Nullable;
1315
import org.elasticsearch.script.ScriptService;
@@ -16,6 +18,7 @@
1618
import java.util.Collections;
1719
import java.util.List;
1820
import java.util.Map;
21+
import java.util.concurrent.atomic.AtomicBoolean;
1922
import java.util.function.BiConsumer;
2023
import java.util.function.LongSupplier;
2124

@@ -30,6 +33,8 @@ public final class Pipeline {
3033
public static final String ON_FAILURE_KEY = "on_failure";
3134
public static final String META_KEY = "_meta";
3235

36+
private static final Logger logger = LogManager.getLogger(Pipeline.class);
37+
3338
private final String id;
3439
@Nullable
3540
private final String description;
@@ -113,14 +118,26 @@ public static Pipeline create(
113118
*/
114119
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
115120
final long startTimeInNanos = relativeTimeProvider.getAsLong();
121+
/*
122+
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
123+
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
124+
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
125+
* is only executed once.
126+
*/
127+
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
116128
metrics.preIngest();
117129
compoundProcessor.execute(ingestDocument, (result, e) -> {
118-
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
119-
metrics.postIngest(ingestTimeInNanos);
120-
if (e != null) {
121-
metrics.ingestFailed();
130+
if (listenerHasBeenCalled.getAndSet(true)) {
131+
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
132+
assert false : "A listener was unexpectedly called more than once";
133+
} else {
134+
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
135+
metrics.postIngest(ingestTimeInNanos);
136+
if (e != null) {
137+
metrics.ingestFailed();
138+
}
139+
handler.accept(result, e);
122140
}
123-
handler.accept(result, e);
124141
});
125142
}
126143

server/src/test/java/org/elasticsearch/ingest/IngestMetricTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ public void testPostIngestDoubleDecrement() {
4242

4343
// the second postIngest triggers an assertion error
4444
expectThrows(AssertionError.class, () -> metric.postIngest(500000L));
45-
assertThat(-1L, equalTo(metric.createStats().getIngestCurrent()));
45+
// We never allow the reported ingestCurrent to be negative:
46+
assertThat(metric.createStats().getIngestCurrent(), equalTo(0L));
4647
}
4748

4849
}

0 commit comments

Comments
 (0)