Skip to content

Commit bb33952

Browse files
authored
Optimize PipelineConfiguration-checking ClusterStateListeners (#117038) (#117098)
1 parent 5e6303c commit bb33952

File tree

16 files changed

+163
-65
lines changed

16 files changed

+163
-65
lines changed

libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@ public String getId() {
2828
return delegate.getId();
2929
}
3030

31-
public Map<String, Object> getConfigAsMap() {
32-
return delegate.getConfigAsMap();
31+
public Map<String, Object> getConfig() {
32+
return delegate.getConfig();
33+
}
34+
35+
public Map<String, Object> getConfig(final boolean unmodifiable) {
36+
return delegate.getConfig(unmodifiable);
3337
}
3438

3539
@Override

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState
268268
Set<String> ids = new HashSet<>();
269269
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
270270
for (PipelineConfiguration configuration : configurations) {
271-
List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
271+
List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfig().get(Pipeline.PROCESSORS_KEY);
272272
if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) {
273273
ids.add(configuration.getId());
274274
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ static TransportVersion def(int id) {
203203
public static final TransportVersion INDEX_STATS_ADDITIONAL_FIELDS_REVERT = def(8_794_00_0);
204204
public static final TransportVersion FAST_REFRESH_RCO_2 = def(8_795_00_0);
205205
public static final TransportVersion ESQL_ENRICH_RUNTIME_WARNINGS = def(8_796_00_0);
206+
public static final TransportVersion INGEST_PIPELINE_CONFIGURATION_AS_MAP = def(8_797_00_0);
206207

207208
/*
208209
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public RestStatus status() {
8080
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
8181
builder.startObject();
8282
for (PipelineConfiguration pipeline : pipelines) {
83-
builder.field(pipeline.getId(), summary ? Map.of() : pipeline.getConfigAsMap());
83+
builder.field(pipeline.getId(), summary ? Map.of() : pipeline.getConfig());
8484
}
8585
builder.endObject();
8686
return builder;

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -782,7 +782,7 @@ private void validateUseOfDeprecatedIngestPipelines(String name, IngestMetadata
782782
private void emitWarningIfPipelineIsDeprecated(String name, Map<String, PipelineConfiguration> pipelines, String pipelineName) {
783783
Optional.ofNullable(pipelineName)
784784
.map(pipelines::get)
785-
.filter(p -> Boolean.TRUE.equals(p.getConfigAsMap().get("deprecated")))
785+
.filter(p -> Boolean.TRUE.equals(p.getConfig().get("deprecated")))
786786
.ifPresent(
787787
p -> deprecationLogger.warn(
788788
DeprecationCategory.TEMPLATES,

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ public static boolean isNoOpPipelineUpdate(ClusterState state, PutPipelineReques
523523
&& currentIngestMetadata.getPipelines().containsKey(request.getId())) {
524524
var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
525525
var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId());
526-
if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) {
526+
if (currentPipeline.getConfig().equals(pipelineConfig)) {
527527
return true;
528528
}
529529
}
@@ -1287,7 +1287,7 @@ synchronized void innerUpdatePipelines(IngestMetadata newIngestMetadata) {
12871287
try {
12881288
Pipeline newPipeline = Pipeline.create(
12891289
newConfiguration.getId(),
1290-
newConfiguration.getConfigAsMap(),
1290+
newConfiguration.getConfig(false),
12911291
processorFactories,
12921292
scriptService
12931293
);
@@ -1411,7 +1411,7 @@ public <P extends Processor> Collection<String> getPipelineWithProcessorType(Cla
14111411

14121412
public synchronized void reloadPipeline(String id) throws Exception {
14131413
PipelineHolder holder = pipelines.get(id);
1414-
Pipeline updatedPipeline = Pipeline.create(id, holder.configuration.getConfigAsMap(), processorFactories, scriptService);
1414+
Pipeline updatedPipeline = Pipeline.create(id, holder.configuration.getConfig(false), processorFactories, scriptService);
14151415
Map<String, PipelineHolder> updatedPipelines = new HashMap<>(this.pipelines);
14161416
updatedPipelines.put(id, new PipelineHolder(holder.configuration, updatedPipeline));
14171417
this.pipelines = Map.copyOf(updatedPipelines);

server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java

Lines changed: 97 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,38 +9,47 @@
99

1010
package org.elasticsearch.ingest;
1111

12+
import org.elasticsearch.TransportVersions;
1213
import org.elasticsearch.cluster.Diff;
1314
import org.elasticsearch.cluster.SimpleDiffable;
1415
import org.elasticsearch.common.Strings;
1516
import org.elasticsearch.common.bytes.BytesReference;
1617
import org.elasticsearch.common.io.stream.StreamInput;
1718
import org.elasticsearch.common.io.stream.StreamOutput;
19+
import org.elasticsearch.common.util.Maps;
1820
import org.elasticsearch.common.xcontent.XContentHelper;
1921
import org.elasticsearch.xcontent.ContextParser;
2022
import org.elasticsearch.xcontent.ObjectParser;
2123
import org.elasticsearch.xcontent.ParseField;
2224
import org.elasticsearch.xcontent.ToXContentObject;
2325
import org.elasticsearch.xcontent.XContentBuilder;
2426
import org.elasticsearch.xcontent.XContentType;
27+
import org.elasticsearch.xcontent.json.JsonXContent;
2528

2629
import java.io.IOException;
30+
import java.util.ArrayList;
31+
import java.util.Collections;
32+
import java.util.List;
2733
import java.util.Map;
2834
import java.util.Objects;
2935

3036
/**
31-
* Encapsulates a pipeline's id and configuration as a blob
37+
* Encapsulates a pipeline's id and configuration as a loosely typed map -- see {@link Pipeline} for the
38+
* parsed and processed object(s) that a pipeline configuration will become. This class is used for things
39+
* like keeping track of pipelines in the cluster state (where a pipeline is 'just some json') whereas the
40+
* {@link Pipeline} class is used in the actual processing of ingest documents through pipelines in the
41+
* {@link IngestService}.
3242
*/
3343
public final class PipelineConfiguration implements SimpleDiffable<PipelineConfiguration>, ToXContentObject {
3444

3545
private static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("pipeline_config", true, Builder::new);
3646
static {
3747
PARSER.declareString(Builder::setId, new ParseField("id"));
38-
PARSER.declareField((parser, builder, aVoid) -> {
39-
XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent());
40-
contentBuilder.generator().copyCurrentStructure(parser);
41-
builder.setConfig(BytesReference.bytes(contentBuilder), contentBuilder.contentType());
42-
}, new ParseField("config"), ObjectParser.ValueType.OBJECT);
43-
48+
PARSER.declareField(
49+
(parser, builder, aVoid) -> builder.setConfig(parser.map()),
50+
new ParseField("config"),
51+
ObjectParser.ValueType.OBJECT
52+
);
4453
}
4554

4655
public static ContextParser<Void, PipelineConfiguration> getParser() {
@@ -50,56 +59,94 @@ public static ContextParser<Void, PipelineConfiguration> getParser() {
5059
private static class Builder {
5160

5261
private String id;
53-
private BytesReference config;
54-
private XContentType xContentType;
62+
private Map<String, Object> config;
5563

5664
void setId(String id) {
5765
this.id = id;
5866
}
5967

60-
void setConfig(BytesReference config, XContentType xContentType) {
68+
void setConfig(Map<String, Object> config) {
6169
this.config = config;
62-
this.xContentType = xContentType;
6370
}
6471

6572
PipelineConfiguration build() {
66-
return new PipelineConfiguration(id, config, xContentType);
73+
return new PipelineConfiguration(id, config);
6774
}
6875
}
6976

7077
private final String id;
71-
// Store config as bytes reference, because the config is only used when the pipeline store reads the cluster state
72-
// and the way the map of maps config is read requires a deep copy (it removes instead of gets entries to check for unused options)
73-
// also the get pipeline api just directly returns this to the caller
74-
private final BytesReference config;
75-
private final XContentType xContentType;
78+
private final Map<String, Object> config;
7679

77-
public PipelineConfiguration(String id, BytesReference config, XContentType xContentType) {
80+
public PipelineConfiguration(String id, Map<String, Object> config) {
7881
this.id = Objects.requireNonNull(id);
79-
this.config = Objects.requireNonNull(config);
80-
this.xContentType = Objects.requireNonNull(xContentType);
82+
this.config = deepCopy(config, true); // defensive deep copy
83+
}
84+
85+
/**
86+
* A convenience constructor that parses some bytes as a map representing a pipeline's config and then delegates to the
87+
* conventional {@link #PipelineConfiguration(String, Map)} constructor.
88+
*
89+
* @param id the id of the pipeline
90+
* @param config a parse-able bytes reference that will return a pipeline configuration
91+
* @param xContentType the content-type to use while parsing the pipeline configuration
92+
*/
93+
public PipelineConfiguration(String id, BytesReference config, XContentType xContentType) {
94+
this(id, XContentHelper.convertToMap(config, true, xContentType).v2());
8195
}
8296

8397
public String getId() {
8498
return id;
8599
}
86100

87-
public Map<String, Object> getConfigAsMap() {
88-
return XContentHelper.convertToMap(config, true, xContentType).v2();
101+
/**
102+
* @return a reference to the unmodifiable configuration map for this pipeline
103+
*/
104+
public Map<String, Object> getConfig() {
105+
return getConfig(true);
89106
}
90107

91-
// pkg-private for tests
92-
XContentType getXContentType() {
93-
return xContentType;
108+
/**
109+
* @param unmodifiable whether the returned map should be unmodifiable or not
110+
* @return a reference to the unmodifiable config map (if unmodifiable is true) or
111+
* a reference to a freshly-created mutable deep copy of the config map (if unmodifiable is false)
112+
*/
113+
public Map<String, Object> getConfig(boolean unmodifiable) {
114+
if (unmodifiable) {
115+
return config; // already unmodifiable
116+
} else {
117+
return deepCopy(config, false);
118+
}
94119
}
95120

96-
// pkg-private for tests
97-
BytesReference getConfig() {
98-
return config;
121+
@SuppressWarnings("unchecked")
122+
private static <T> T deepCopy(final T value, final boolean unmodifiable) {
123+
return (T) innerDeepCopy(value, unmodifiable);
124+
}
125+
126+
private static Object innerDeepCopy(final Object value, final boolean unmodifiable) {
127+
if (value instanceof Map<?, ?> mapValue) {
128+
final Map<Object, Object> copy = Maps.newLinkedHashMapWithExpectedSize(mapValue.size()); // n.b. maintain ordering
129+
for (Map.Entry<?, ?> entry : mapValue.entrySet()) {
130+
copy.put(innerDeepCopy(entry.getKey(), unmodifiable), innerDeepCopy(entry.getValue(), unmodifiable));
131+
}
132+
return unmodifiable ? Collections.unmodifiableMap(copy) : copy;
133+
} else if (value instanceof List<?> listValue) {
134+
final List<Object> copy = new ArrayList<>(listValue.size());
135+
for (Object itemValue : listValue) {
136+
copy.add(innerDeepCopy(itemValue, unmodifiable));
137+
}
138+
return unmodifiable ? Collections.unmodifiableList(copy) : copy;
139+
} else {
140+
// if this list of expected value types ends up not being exhaustive, then we want to learn about that
141+
// at development time, but it's probably better to err on the side of passing through the value at runtime
142+
assert (value == null || value instanceof String || value instanceof Number || value instanceof Boolean)
143+
: "unexpected value type [" + value.getClass() + "]";
144+
return value;
145+
}
99146
}
100147

101148
public Integer getVersion() {
102-
Object o = getConfigAsMap().get("version");
149+
Object o = config.get("version");
103150
if (o == null) {
104151
return null;
105152
} else if (o instanceof Number number) {
@@ -113,13 +160,22 @@ public Integer getVersion() {
113160
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
114161
builder.startObject();
115162
builder.field("id", id);
116-
builder.field("config", getConfigAsMap());
163+
builder.field("config", config);
117164
builder.endObject();
118165
return builder;
119166
}
120167

121168
public static PipelineConfiguration readFrom(StreamInput in) throws IOException {
122-
return new PipelineConfiguration(in.readString(), in.readBytesReference(), in.readEnum(XContentType.class));
169+
final String id = in.readString();
170+
final Map<String, Object> config;
171+
if (in.getTransportVersion().onOrAfter(TransportVersions.INGEST_PIPELINE_CONFIGURATION_AS_MAP)) {
172+
config = in.readGenericMap();
173+
} else {
174+
final BytesReference bytes = in.readSlicedBytesReference();
175+
final XContentType type = in.readEnum(XContentType.class);
176+
config = XContentHelper.convertToMap(bytes, true, type).v2();
177+
}
178+
return new PipelineConfiguration(id, config);
123179
}
124180

125181
public static Diff<PipelineConfiguration> readDiffFrom(StreamInput in) throws IOException {
@@ -134,8 +190,14 @@ public String toString() {
134190
@Override
135191
public void writeTo(StreamOutput out) throws IOException {
136192
out.writeString(id);
137-
out.writeBytesReference(config);
138-
XContentHelper.writeTo(out, xContentType);
193+
if (out.getTransportVersion().onOrAfter(TransportVersions.INGEST_PIPELINE_CONFIGURATION_AS_MAP)) {
194+
out.writeGenericMap(config);
195+
} else {
196+
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).prettyPrint();
197+
builder.map(config);
198+
out.writeBytesReference(BytesReference.bytes(builder));
199+
XContentHelper.writeTo(out, XContentType.JSON);
200+
}
139201
}
140202

141203
@Override
@@ -146,14 +208,14 @@ public boolean equals(Object o) {
146208
PipelineConfiguration that = (PipelineConfiguration) o;
147209

148210
if (id.equals(that.id) == false) return false;
149-
return getConfigAsMap().equals(that.getConfigAsMap());
211+
return config.equals(that.config);
150212

151213
}
152214

153215
@Override
154216
public int hashCode() {
155217
int result = id.hashCode();
156-
result = 31 * result + getConfigAsMap().hashCode();
218+
result = 31 * result + config.hashCode();
157219
return result;
158220
}
159221
}

server/src/test/java/org/elasticsearch/action/ingest/GetPipelineResponseTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public void testXContentDeserialization() throws IOException {
7979
assertEquals(actualPipelines.size(), parsedPipelines.size());
8080
for (PipelineConfiguration pipeline : parsedPipelines) {
8181
assertTrue(pipelinesMap.containsKey(pipeline.getId()));
82-
assertEquals(pipelinesMap.get(pipeline.getId()).getConfigAsMap(), pipeline.getConfigAsMap());
82+
assertEquals(pipelinesMap.get(pipeline.getId()).getConfig(), pipeline.getConfig());
8383
}
8484
}
8585

server/src/test/java/org/elasticsearch/ingest/IngestMetadataTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ public void testFromXContent() throws IOException {
5959
assertEquals(2, m.getPipelines().size());
6060
assertEquals("1", m.getPipelines().get("1").getId());
6161
assertEquals("2", m.getPipelines().get("2").getId());
62-
assertEquals(pipeline.getConfigAsMap(), m.getPipelines().get("1").getConfigAsMap());
63-
assertEquals(pipeline2.getConfigAsMap(), m.getPipelines().get("2").getConfigAsMap());
62+
assertEquals(pipeline.getConfig(), m.getPipelines().get("1").getConfig());
63+
assertEquals(pipeline2.getConfig(), m.getPipelines().get("2").getConfig());
6464
}
6565
}
6666

0 commit comments

Comments
 (0)