Skip to content

Commit 123b103

Browse files
authored
Optimize PipelineConfiguration-checking ClusterStateListeners (#117038)
1 parent d017f93 commit 123b103

File tree

16 files changed

+165
-72
lines changed

16 files changed

+165
-72
lines changed

libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@ public String getId() {
2828
return delegate.getId();
2929
}
3030

31-
public Map<String, Object> getConfigAsMap() {
32-
return delegate.getConfigAsMap();
31+
public Map<String, Object> getConfig() {
32+
return delegate.getConfig();
33+
}
34+
35+
public Map<String, Object> getConfig(final boolean unmodifiable) {
36+
return delegate.getConfig(unmodifiable);
3337
}
3438

3539
@Override

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState
268268
Set<String> ids = new HashSet<>();
269269
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
270270
for (PipelineConfiguration configuration : configurations) {
271-
List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
271+
List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfig().get(Pipeline.PROCESSORS_KEY);
272272
if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) {
273273
ids.add(configuration.getId());
274274
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ static TransportVersion def(int id) {
203203
public static final TransportVersion INDEX_STATS_ADDITIONAL_FIELDS_REVERT = def(8_794_00_0);
204204
public static final TransportVersion FAST_REFRESH_RCO_2 = def(8_795_00_0);
205205
public static final TransportVersion ESQL_ENRICH_RUNTIME_WARNINGS = def(8_796_00_0);
206+
public static final TransportVersion INGEST_PIPELINE_CONFIGURATION_AS_MAP = def(8_797_00_0);
206207

207208
/*
208209
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public RestStatus status() {
8080
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
8181
builder.startObject();
8282
for (PipelineConfiguration pipeline : pipelines) {
83-
builder.field(pipeline.getId(), summary ? Map.of() : pipeline.getConfigAsMap());
83+
builder.field(pipeline.getId(), summary ? Map.of() : pipeline.getConfig());
8484
}
8585
builder.endObject();
8686
return builder;

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -782,7 +782,7 @@ private void validateUseOfDeprecatedIngestPipelines(String name, IngestMetadata
782782
private void emitWarningIfPipelineIsDeprecated(String name, Map<String, PipelineConfiguration> pipelines, String pipelineName) {
783783
Optional.ofNullable(pipelineName)
784784
.map(pipelines::get)
785-
.filter(p -> Boolean.TRUE.equals(p.getConfigAsMap().get("deprecated")))
785+
.filter(p -> Boolean.TRUE.equals(p.getConfig().get("deprecated")))
786786
.ifPresent(
787787
p -> deprecationLogger.warn(
788788
DeprecationCategory.TEMPLATES,

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ public static boolean isNoOpPipelineUpdate(ClusterState state, PutPipelineReques
519519
&& currentIngestMetadata.getPipelines().containsKey(request.getId())) {
520520
var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
521521
var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId());
522-
if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) {
522+
if (currentPipeline.getConfig().equals(pipelineConfig)) {
523523
return true;
524524
}
525525
}
@@ -1292,7 +1292,7 @@ synchronized void innerUpdatePipelines(IngestMetadata newIngestMetadata) {
12921292
try {
12931293
Pipeline newPipeline = Pipeline.create(
12941294
newConfiguration.getId(),
1295-
newConfiguration.getConfigAsMap(),
1295+
newConfiguration.getConfig(false),
12961296
processorFactories,
12971297
scriptService
12981298
);
@@ -1416,7 +1416,7 @@ public <P extends Processor> Collection<String> getPipelineWithProcessorType(Cla
14161416

14171417
public synchronized void reloadPipeline(String id) throws Exception {
14181418
PipelineHolder holder = pipelines.get(id);
1419-
Pipeline updatedPipeline = Pipeline.create(id, holder.configuration.getConfigAsMap(), processorFactories, scriptService);
1419+
Pipeline updatedPipeline = Pipeline.create(id, holder.configuration.getConfig(false), processorFactories, scriptService);
14201420
Map<String, PipelineHolder> updatedPipelines = new HashMap<>(this.pipelines);
14211421
updatedPipelines.put(id, new PipelineHolder(holder.configuration, updatedPipeline));
14221422
this.pipelines = Map.copyOf(updatedPipelines);

server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java

Lines changed: 99 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -9,39 +9,47 @@
99

1010
package org.elasticsearch.ingest;
1111

12+
import org.elasticsearch.TransportVersions;
1213
import org.elasticsearch.cluster.Diff;
1314
import org.elasticsearch.cluster.SimpleDiffable;
1415
import org.elasticsearch.common.Strings;
1516
import org.elasticsearch.common.bytes.BytesReference;
1617
import org.elasticsearch.common.io.stream.StreamInput;
1718
import org.elasticsearch.common.io.stream.StreamOutput;
19+
import org.elasticsearch.common.util.Maps;
1820
import org.elasticsearch.common.xcontent.XContentHelper;
1921
import org.elasticsearch.xcontent.ContextParser;
2022
import org.elasticsearch.xcontent.ObjectParser;
2123
import org.elasticsearch.xcontent.ParseField;
2224
import org.elasticsearch.xcontent.ToXContentObject;
2325
import org.elasticsearch.xcontent.XContentBuilder;
2426
import org.elasticsearch.xcontent.XContentType;
27+
import org.elasticsearch.xcontent.json.JsonXContent;
2528

2629
import java.io.IOException;
27-
import java.io.UncheckedIOException;
30+
import java.util.ArrayList;
31+
import java.util.Collections;
32+
import java.util.List;
2833
import java.util.Map;
2934
import java.util.Objects;
3035

3136
/**
32-
* Encapsulates a pipeline's id and configuration as a blob
37+
* Encapsulates a pipeline's id and configuration as a loosely typed map -- see {@link Pipeline} for the
38+
* parsed and processed object(s) that a pipeline configuration will become. This class is used for things
39+
* like keeping track of pipelines in the cluster state (where a pipeline is 'just some json') whereas the
40+
* {@link Pipeline} class is used in the actual processing of ingest documents through pipelines in the
41+
* {@link IngestService}.
3342
*/
3443
public final class PipelineConfiguration implements SimpleDiffable<PipelineConfiguration>, ToXContentObject {
3544

3645
private static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("pipeline_config", true, Builder::new);
3746
static {
3847
PARSER.declareString(Builder::setId, new ParseField("id"));
39-
PARSER.declareField((parser, builder, aVoid) -> {
40-
XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent());
41-
contentBuilder.generator().copyCurrentStructure(parser);
42-
builder.setConfig(BytesReference.bytes(contentBuilder), contentBuilder.contentType());
43-
}, new ParseField("config"), ObjectParser.ValueType.OBJECT);
44-
48+
PARSER.declareField(
49+
(parser, builder, aVoid) -> builder.setConfig(parser.map()),
50+
new ParseField("config"),
51+
ObjectParser.ValueType.OBJECT
52+
);
4553
}
4654

4755
public static ContextParser<Void, PipelineConfiguration> getParser() {
@@ -51,56 +59,94 @@ public static ContextParser<Void, PipelineConfiguration> getParser() {
5159
private static class Builder {
5260

5361
private String id;
54-
private BytesReference config;
55-
private XContentType xContentType;
62+
private Map<String, Object> config;
5663

5764
void setId(String id) {
5865
this.id = id;
5966
}
6067

61-
void setConfig(BytesReference config, XContentType xContentType) {
68+
void setConfig(Map<String, Object> config) {
6269
this.config = config;
63-
this.xContentType = xContentType;
6470
}
6571

6672
PipelineConfiguration build() {
67-
return new PipelineConfiguration(id, config, xContentType);
73+
return new PipelineConfiguration(id, config);
6874
}
6975
}
7076

7177
private final String id;
72-
// Store config as bytes reference, because the config is only used when the pipeline store reads the cluster state
73-
// and the way the map of maps config is read requires a deep copy (it removes instead of gets entries to check for unused options)
74-
// also the get pipeline api just directly returns this to the caller
75-
private final BytesReference config;
76-
private final XContentType xContentType;
78+
private final Map<String, Object> config;
7779

78-
public PipelineConfiguration(String id, BytesReference config, XContentType xContentType) {
80+
public PipelineConfiguration(String id, Map<String, Object> config) {
7981
this.id = Objects.requireNonNull(id);
80-
this.config = Objects.requireNonNull(config);
81-
this.xContentType = Objects.requireNonNull(xContentType);
82+
this.config = deepCopy(config, true); // defensive deep copy
83+
}
84+
85+
/**
86+
* A convenience constructor that parses some bytes as a map representing a pipeline's config and then delegates to the
87+
* conventional {@link #PipelineConfiguration(String, Map)} constructor.
88+
*
89+
* @param id the id of the pipeline
90+
* @param config a parse-able bytes reference that will return a pipeline configuration
91+
* @param xContentType the content-type to use while parsing the pipeline configuration
92+
*/
93+
public PipelineConfiguration(String id, BytesReference config, XContentType xContentType) {
94+
this(id, XContentHelper.convertToMap(config, true, xContentType).v2());
8295
}
8396

8497
public String getId() {
8598
return id;
8699
}
87100

88-
public Map<String, Object> getConfigAsMap() {
89-
return XContentHelper.convertToMap(config, true, xContentType).v2();
101+
/**
102+
* @return a reference to the unmodifiable configuration map for this pipeline
103+
*/
104+
public Map<String, Object> getConfig() {
105+
return getConfig(true);
90106
}
91107

92-
// pkg-private for tests
93-
XContentType getXContentType() {
94-
return xContentType;
108+
/**
109+
* @param unmodifiable whether the returned map should be unmodifiable or not
110+
* @return a reference to the unmodifiable config map (if unmodifiable is true) or
111+
* a reference to a freshly-created mutable deep copy of the config map (if unmodifiable is false)
112+
*/
113+
public Map<String, Object> getConfig(boolean unmodifiable) {
114+
if (unmodifiable) {
115+
return config; // already unmodifiable
116+
} else {
117+
return deepCopy(config, false);
118+
}
119+
}
120+
121+
@SuppressWarnings("unchecked")
122+
private static <T> T deepCopy(final T value, final boolean unmodifiable) {
123+
return (T) innerDeepCopy(value, unmodifiable);
95124
}
96125

97-
// pkg-private for tests
98-
BytesReference getConfig() {
99-
return config;
126+
private static Object innerDeepCopy(final Object value, final boolean unmodifiable) {
127+
if (value instanceof Map<?, ?> mapValue) {
128+
final Map<Object, Object> copy = Maps.newLinkedHashMapWithExpectedSize(mapValue.size()); // n.b. maintain ordering
129+
for (Map.Entry<?, ?> entry : mapValue.entrySet()) {
130+
copy.put(innerDeepCopy(entry.getKey(), unmodifiable), innerDeepCopy(entry.getValue(), unmodifiable));
131+
}
132+
return unmodifiable ? Collections.unmodifiableMap(copy) : copy;
133+
} else if (value instanceof List<?> listValue) {
134+
final List<Object> copy = new ArrayList<>(listValue.size());
135+
for (Object itemValue : listValue) {
136+
copy.add(innerDeepCopy(itemValue, unmodifiable));
137+
}
138+
return unmodifiable ? Collections.unmodifiableList(copy) : copy;
139+
} else {
140+
// if this list of expected value types ends up not being exhaustive, then we want to learn about that
141+
// at development time, but it's probably better to err on the side of passing through the value at runtime
142+
assert (value == null || value instanceof String || value instanceof Number || value instanceof Boolean)
143+
: "unexpected value type [" + value.getClass() + "]";
144+
return value;
145+
}
100146
}
101147

102148
public Integer getVersion() {
103-
Object o = getConfigAsMap().get("version");
149+
Object o = config.get("version");
104150
if (o == null) {
105151
return null;
106152
} else if (o instanceof Number number) {
@@ -114,13 +160,22 @@ public Integer getVersion() {
114160
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
115161
builder.startObject();
116162
builder.field("id", id);
117-
builder.field("config", getConfigAsMap());
163+
builder.field("config", config);
118164
builder.endObject();
119165
return builder;
120166
}
121167

122168
public static PipelineConfiguration readFrom(StreamInput in) throws IOException {
123-
return new PipelineConfiguration(in.readString(), in.readBytesReference(), in.readEnum(XContentType.class));
169+
final String id = in.readString();
170+
final Map<String, Object> config;
171+
if (in.getTransportVersion().onOrAfter(TransportVersions.INGEST_PIPELINE_CONFIGURATION_AS_MAP)) {
172+
config = in.readGenericMap();
173+
} else {
174+
final BytesReference bytes = in.readSlicedBytesReference();
175+
final XContentType type = in.readEnum(XContentType.class);
176+
config = XContentHelper.convertToMap(bytes, true, type).v2();
177+
}
178+
return new PipelineConfiguration(id, config);
124179
}
125180

126181
public static Diff<PipelineConfiguration> readDiffFrom(StreamInput in) throws IOException {
@@ -135,8 +190,14 @@ public String toString() {
135190
@Override
136191
public void writeTo(StreamOutput out) throws IOException {
137192
out.writeString(id);
138-
out.writeBytesReference(config);
139-
XContentHelper.writeTo(out, xContentType);
193+
if (out.getTransportVersion().onOrAfter(TransportVersions.INGEST_PIPELINE_CONFIGURATION_AS_MAP)) {
194+
out.writeGenericMap(config);
195+
} else {
196+
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).prettyPrint();
197+
builder.map(config);
198+
out.writeBytesReference(BytesReference.bytes(builder));
199+
XContentHelper.writeTo(out, XContentType.JSON);
200+
}
140201
}
141202

142203
@Override
@@ -147,14 +208,14 @@ public boolean equals(Object o) {
147208
PipelineConfiguration that = (PipelineConfiguration) o;
148209

149210
if (id.equals(that.id) == false) return false;
150-
return getConfigAsMap().equals(that.getConfigAsMap());
211+
return config.equals(that.config);
151212

152213
}
153214

154215
@Override
155216
public int hashCode() {
156217
int result = id.hashCode();
157-
result = 31 * result + getConfigAsMap().hashCode();
218+
result = 31 * result + config.hashCode();
158219
return result;
159220
}
160221

@@ -164,7 +225,7 @@ public int hashCode() {
164225
* <p>The given upgrader is applied to the config map for any processor of the given type.
165226
*/
166227
PipelineConfiguration maybeUpgradeProcessors(String type, IngestMetadata.ProcessorConfigUpgrader upgrader) {
167-
Map<String, Object> mutableConfigMap = getConfigAsMap();
228+
Map<String, Object> mutableConfigMap = getConfig(false);
168229
boolean changed = false;
169230
// This should be a List of Maps, where the keys are processor types and the values are config maps.
170231
// But we'll skip upgrading rather than fail if not.
@@ -180,11 +241,7 @@ PipelineConfiguration maybeUpgradeProcessors(String type, IngestMetadata.Process
180241
}
181242
}
182243
if (changed) {
183-
try (XContentBuilder builder = XContentBuilder.builder(xContentType.xContent())) {
184-
return new PipelineConfiguration(id, BytesReference.bytes(builder.map(mutableConfigMap)), xContentType);
185-
} catch (IOException e) {
186-
throw new UncheckedIOException(e);
187-
}
244+
return new PipelineConfiguration(id, mutableConfigMap);
188245
} else {
189246
return this;
190247
}

server/src/test/java/org/elasticsearch/action/ingest/GetPipelineResponseTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public void testXContentDeserialization() throws IOException {
7979
assertEquals(actualPipelines.size(), parsedPipelines.size());
8080
for (PipelineConfiguration pipeline : parsedPipelines) {
8181
assertTrue(pipelinesMap.containsKey(pipeline.getId()));
82-
assertEquals(pipelinesMap.get(pipeline.getId()).getConfigAsMap(), pipeline.getConfigAsMap());
82+
assertEquals(pipelinesMap.get(pipeline.getId()).getConfig(), pipeline.getConfig());
8383
}
8484
}
8585

server/src/test/java/org/elasticsearch/ingest/IngestMetadataTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ public void testFromXContent() throws IOException {
5656
assertEquals(2, custom.getPipelines().size());
5757
assertEquals("1", custom.getPipelines().get("1").getId());
5858
assertEquals("2", custom.getPipelines().get("2").getId());
59-
assertEquals(pipeline.getConfigAsMap(), custom.getPipelines().get("1").getConfigAsMap());
60-
assertEquals(pipeline2.getConfigAsMap(), custom.getPipelines().get("2").getConfigAsMap());
59+
assertEquals(pipeline.getConfig(), custom.getPipelines().get("1").getConfig());
60+
assertEquals(pipeline2.getConfig(), custom.getPipelines().get("2").getConfig());
6161
}
6262
}
6363

0 commit comments

Comments
 (0)