Skip to content

Commit 2e24da1

Browse files
authored
[7.17] Preventing serialization errors in the nodes stats API (#90319) (#90466)
1 parent 817e168 commit 2e24da1

File tree

6 files changed

+145
-56
lines changed

6 files changed

+145
-56
lines changed

server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.ingest;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.ElasticsearchException;
1214
import org.elasticsearch.core.Tuple;
1315

@@ -16,6 +18,7 @@
1618
import java.util.Collections;
1719
import java.util.List;
1820
import java.util.Map;
21+
import java.util.concurrent.atomic.AtomicBoolean;
1922
import java.util.function.BiConsumer;
2023
import java.util.function.LongSupplier;
2124
import java.util.stream.Collectors;
@@ -30,6 +33,8 @@ public class CompoundProcessor implements Processor {
3033
public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
3134
public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline";
3235

36+
private static final Logger logger = LogManager.getLogger(CompoundProcessor.class);
37+
3338
private final boolean ignoreFailure;
3439
private final List<Processor> processors;
3540
private final List<Processor> onFailureProcessors;
@@ -132,25 +137,43 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume
132137
final Processor processor = processorWithMetric.v1();
133138
final IngestMetric metric = processorWithMetric.v2();
134139
final long startTimeInNanos = relativeTimeProvider.getAsLong();
140+
/*
141+
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
142+
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
143+
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
144+
* is only executed once.
145+
*/
146+
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
135147
metric.preIngest();
148+
final AtomicBoolean postIngestHasBeenCalled = new AtomicBoolean(false);
136149
try {
137150
processor.execute(ingestDocument, (result, e) -> {
138-
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
139-
metric.postIngest(ingestTimeInNanos);
140-
141-
if (e != null) {
142-
executeOnFailure(currentProcessor, ingestDocument, handler, processor, metric, e);
151+
if (listenerHasBeenCalled.getAndSet(true)) {
152+
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
153+
assert false : "A listener was unexpectedly called more than once";
143154
} else {
144-
if (result != null) {
145-
innerExecute(currentProcessor + 1, result, handler);
155+
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
156+
metric.postIngest(ingestTimeInNanos);
157+
postIngestHasBeenCalled.set(true);
158+
if (e != null) {
159+
executeOnFailure(currentProcessor, ingestDocument, handler, processor, metric, e);
146160
} else {
147-
handler.accept(null, null);
161+
if (result != null) {
162+
innerExecute(currentProcessor + 1, result, handler);
163+
} else {
164+
handler.accept(null, null);
165+
}
148166
}
149167
}
150168
});
151169
} catch (Exception e) {
152170
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
153-
metric.postIngest(ingestTimeInNanos);
171+
if (postIngestHasBeenCalled.get()) {
172+
logger.warn("Preventing postIngest from being called more than once", new RuntimeException());
173+
assert false : "Attempt to call postIngest more than once";
174+
} else {
175+
metric.postIngest(ingestTimeInNanos);
176+
}
154177
executeOnFailure(currentProcessor, ingestDocument, handler, processor, metric, e);
155178
}
156179
}

server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.ingest;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.common.logging.DeprecationCategory;
1214
import org.elasticsearch.common.logging.DeprecationLogger;
1315
import org.elasticsearch.script.DynamicMap;
@@ -26,6 +28,7 @@
2628
import java.util.ListIterator;
2729
import java.util.Map;
2830
import java.util.Set;
31+
import java.util.concurrent.atomic.AtomicBoolean;
2932
import java.util.function.BiConsumer;
3033
import java.util.function.Function;
3134
import java.util.function.LongSupplier;
@@ -45,6 +48,8 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
4548
return value;
4649
});
4750

51+
private static final Logger logger = LogManager.getLogger(ConditionalProcessor.class);
52+
4853
static final String TYPE = "conditional";
4954

5055
private final Script condition;
@@ -98,15 +103,27 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
98103

99104
if (matches) {
100105
final long startTimeInNanos = relativeTimeProvider.getAsLong();
106+
/*
107+
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
108+
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
109+
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
110+
* is only executed once.
111+
*/
112+
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
101113
metric.preIngest();
102114
processor.execute(ingestDocument, (result, e) -> {
103-
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
104-
metric.postIngest(ingestTimeInNanos);
105-
if (e != null) {
106-
metric.ingestFailed();
107-
handler.accept(null, e);
115+
if (listenerHasBeenCalled.getAndSet(true)) {
116+
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
117+
assert false : "A listener was unexpectedly called more than once";
108118
} else {
109-
handler.accept(result, null);
119+
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
120+
metric.postIngest(ingestTimeInNanos);
121+
if (e != null) {
122+
metric.ingestFailed();
123+
handler.accept(null, e);
124+
} else {
125+
handler.accept(result, null);
126+
}
110127
}
111128
});
112129
} else {

server/src/main/java/org/elasticsearch/ingest/IngestMetric.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.ingest;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.common.metrics.CounterMetric;
1214

1315
import java.util.concurrent.TimeUnit;
@@ -22,6 +24,8 @@
2224
*/
2325
class IngestMetric {
2426

27+
private static final Logger logger = LogManager.getLogger(IngestMetric.class);
28+
2529
/**
2630
* The time it takes to complete the measured item.
2731
*/
@@ -53,7 +57,19 @@ void preIngest() {
5357
*/
5458
void postIngest(long ingestTimeInNanos) {
5559
long current = ingestCurrent.decrementAndGet();
56-
assert current >= 0 : "ingest metric current count double-decremented";
60+
if (current < 0) {
61+
/*
62+
* This ought to never happen. However if it does, it's incredibly bad because ingestCurrent being negative causes a
63+
* serialization error that prevents the nodes stats API from working. So we're doing 3 things here:
64+
* (1) Log a stack trace at warn level so that the Elasticsearch engineering team can track down and fix the source of the
65+
* bug if it still exists
66+
* (2) Throw an AssertionError if assertions are enabled so that we are aware of the bug
67+
* (3) Increment the counter back up so that we don't hit serialization failures
68+
*/
69+
logger.warn("Current ingest counter decremented below 0", new RuntimeException());
70+
assert false : "ingest metric current count double-decremented";
71+
ingestCurrent.incrementAndGet();
72+
}
5773
this.ingestTimeInNanos.inc(ingestTimeInNanos);
5874
ingestCount.inc();
5975
}
@@ -84,6 +100,8 @@ void add(IngestMetric metrics) {
84100
IngestStats.Stats createStats() {
85101
// we track ingestTime at nanosecond resolution, but IngestStats uses millisecond resolution for reporting
86102
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(ingestTimeInNanos.count());
87-
return new IngestStats.Stats(ingestCount.count(), ingestTimeInMillis, ingestCurrent.get(), ingestFailed.count());
103+
// It is possible for the current count to briefly drop below 0, causing serialization problems. See #90319
104+
long currentCount = Math.max(0, ingestCurrent.get());
105+
return new IngestStats.Stats(ingestCount.count(), ingestTimeInMillis, currentCount, ingestFailed.count());
88106
}
89107
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import java.util.Objects;
7171
import java.util.Set;
7272
import java.util.concurrent.CopyOnWriteArrayList;
73+
import java.util.concurrent.atomic.AtomicBoolean;
7374
import java.util.concurrent.atomic.AtomicInteger;
7475
import java.util.function.BiConsumer;
7576
import java.util.function.Consumer;
@@ -846,42 +847,54 @@ private void innerExecute(
846847
VersionType versionType = indexRequest.versionType();
847848
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
848849
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap);
850+
/*
851+
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
852+
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
853+
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
854+
* is only executed once.
855+
*/
856+
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
849857
ingestDocument.executePipeline(pipeline, (result, e) -> {
850-
long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
851-
totalMetrics.postIngest(ingestTimeInNanos);
852-
if (e != null) {
853-
totalMetrics.ingestFailed();
854-
handler.accept(e);
855-
} else if (result == null) {
856-
itemDroppedHandler.accept(slot);
857-
handler.accept(null);
858+
if (listenerHasBeenCalled.getAndSet(true)) {
859+
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
860+
assert false : "A listener was unexpectedly called more than once";
858861
} else {
859-
Map<IngestDocument.Metadata, Object> metadataMap = ingestDocument.extractMetadata();
860-
// it's fine to set all metadata fields all the time, as ingest document holds their starting values
861-
// before ingestion, which might also get modified during ingestion.
862-
indexRequest.index((String) metadataMap.get(IngestDocument.Metadata.INDEX));
863-
indexRequest.type((String) metadataMap.get(IngestDocument.Metadata.TYPE));
864-
indexRequest.id((String) metadataMap.get(IngestDocument.Metadata.ID));
865-
indexRequest.routing((String) metadataMap.get(IngestDocument.Metadata.ROUTING));
866-
indexRequest.version(((Number) metadataMap.get(IngestDocument.Metadata.VERSION)).longValue());
867-
if (metadataMap.get(IngestDocument.Metadata.VERSION_TYPE) != null) {
868-
indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.Metadata.VERSION_TYPE)));
869-
}
870-
if (metadataMap.get(IngestDocument.Metadata.IF_SEQ_NO) != null) {
871-
indexRequest.setIfSeqNo(((Number) metadataMap.get(IngestDocument.Metadata.IF_SEQ_NO)).longValue());
872-
}
873-
if (metadataMap.get(IngestDocument.Metadata.IF_PRIMARY_TERM) != null) {
874-
indexRequest.setIfPrimaryTerm(((Number) metadataMap.get(IngestDocument.Metadata.IF_PRIMARY_TERM)).longValue());
875-
}
876-
indexRequest.source(ingestDocument.getSourceAndMetadata(), indexRequest.getContentType());
877-
if (metadataMap.get(IngestDocument.Metadata.DYNAMIC_TEMPLATES) != null) {
878-
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
879-
@SuppressWarnings("unchecked")
880-
Map<String, String> map = (Map<String, String>) metadataMap.get(IngestDocument.Metadata.DYNAMIC_TEMPLATES);
881-
mergedDynamicTemplates.putAll(map);
882-
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
862+
long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
863+
totalMetrics.postIngest(ingestTimeInNanos);
864+
if (e != null) {
865+
totalMetrics.ingestFailed();
866+
handler.accept(e);
867+
} else if (result == null) {
868+
itemDroppedHandler.accept(slot);
869+
handler.accept(null);
870+
} else {
871+
Map<IngestDocument.Metadata, Object> metadataMap = ingestDocument.extractMetadata();
872+
// it's fine to set all metadata fields all the time, as ingest document holds their starting values
873+
// before ingestion, which might also get modified during ingestion.
874+
indexRequest.index((String) metadataMap.get(IngestDocument.Metadata.INDEX));
875+
indexRequest.type((String) metadataMap.get(IngestDocument.Metadata.TYPE));
876+
indexRequest.id((String) metadataMap.get(IngestDocument.Metadata.ID));
877+
indexRequest.routing((String) metadataMap.get(IngestDocument.Metadata.ROUTING));
878+
indexRequest.version(((Number) metadataMap.get(IngestDocument.Metadata.VERSION)).longValue());
879+
if (metadataMap.get(IngestDocument.Metadata.VERSION_TYPE) != null) {
880+
indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.Metadata.VERSION_TYPE)));
881+
}
882+
if (metadataMap.get(IngestDocument.Metadata.IF_SEQ_NO) != null) {
883+
indexRequest.setIfSeqNo(((Number) metadataMap.get(IngestDocument.Metadata.IF_SEQ_NO)).longValue());
884+
}
885+
if (metadataMap.get(IngestDocument.Metadata.IF_PRIMARY_TERM) != null) {
886+
indexRequest.setIfPrimaryTerm(((Number) metadataMap.get(IngestDocument.Metadata.IF_PRIMARY_TERM)).longValue());
887+
}
888+
indexRequest.source(ingestDocument.getSourceAndMetadata(), indexRequest.getContentType());
889+
if (metadataMap.get(IngestDocument.Metadata.DYNAMIC_TEMPLATES) != null) {
890+
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
891+
@SuppressWarnings("unchecked")
892+
Map<String, String> map = (Map<String, String>) metadataMap.get(IngestDocument.Metadata.DYNAMIC_TEMPLATES);
893+
mergedDynamicTemplates.putAll(map);
894+
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
895+
}
896+
handler.accept(null);
883897
}
884-
handler.accept(null);
885898
}
886899
});
887900
}

server/src/main/java/org/elasticsearch/ingest/Pipeline.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.ingest;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.ElasticsearchParseException;
1214
import org.elasticsearch.core.Nullable;
1315
import org.elasticsearch.script.ScriptService;
@@ -16,6 +18,7 @@
1618
import java.util.Collections;
1719
import java.util.List;
1820
import java.util.Map;
21+
import java.util.concurrent.atomic.AtomicBoolean;
1922
import java.util.function.BiConsumer;
2023
import java.util.function.LongSupplier;
2124

@@ -30,6 +33,8 @@ public final class Pipeline {
3033
public static final String ON_FAILURE_KEY = "on_failure";
3134
public static final String META_KEY = "_meta";
3235

36+
private static final Logger logger = LogManager.getLogger(Pipeline.class);
37+
3338
private final String id;
3439
@Nullable
3540
private final String description;
@@ -113,14 +118,26 @@ public static Pipeline create(
113118
*/
114119
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
115120
final long startTimeInNanos = relativeTimeProvider.getAsLong();
121+
/*
122+
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
123+
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
124+
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
125+
* is only executed once.
126+
*/
127+
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
116128
metrics.preIngest();
117129
compoundProcessor.execute(ingestDocument, (result, e) -> {
118-
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
119-
metrics.postIngest(ingestTimeInNanos);
120-
if (e != null) {
121-
metrics.ingestFailed();
130+
if (listenerHasBeenCalled.getAndSet(true)) {
131+
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
132+
assert false : "A listener was unexpectedly called more than once";
133+
} else {
134+
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
135+
metrics.postIngest(ingestTimeInNanos);
136+
if (e != null) {
137+
metrics.ingestFailed();
138+
}
139+
handler.accept(result, e);
122140
}
123-
handler.accept(result, e);
124141
});
125142
}
126143

server/src/test/java/org/elasticsearch/ingest/IngestMetricTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ public void testPostIngestDoubleDecrement() {
4242

4343
// the second postIngest triggers an assertion error
4444
expectThrows(AssertionError.class, () -> metric.postIngest(500000L));
45-
assertThat(-1L, equalTo(metric.createStats().getIngestCurrent()));
45+
// We never allow the reported ingestCurrent to be negative:
46+
assertThat(metric.createStats().getIngestCurrent(), equalTo(0L));
4647
}
4748

4849
}

0 commit comments

Comments
 (0)