99
1010package org .elasticsearch .ingest ;
1111
12+ import org .elasticsearch .TransportVersions ;
1213import org .elasticsearch .cluster .Diff ;
1314import org .elasticsearch .cluster .SimpleDiffable ;
1415import org .elasticsearch .common .Strings ;
1516import org .elasticsearch .common .bytes .BytesReference ;
1617import org .elasticsearch .common .io .stream .StreamInput ;
1718import org .elasticsearch .common .io .stream .StreamOutput ;
19+ import org .elasticsearch .common .util .Maps ;
1820import org .elasticsearch .common .xcontent .XContentHelper ;
1921import org .elasticsearch .xcontent .ContextParser ;
2022import org .elasticsearch .xcontent .ObjectParser ;
2123import org .elasticsearch .xcontent .ParseField ;
2224import org .elasticsearch .xcontent .ToXContentObject ;
2325import org .elasticsearch .xcontent .XContentBuilder ;
2426import org .elasticsearch .xcontent .XContentType ;
27+ import org .elasticsearch .xcontent .json .JsonXContent ;
2528
2629import java .io .IOException ;
30+ import java .util .ArrayList ;
31+ import java .util .Collections ;
32+ import java .util .List ;
2733import java .util .Map ;
2834import java .util .Objects ;
2935
3036/**
31- * Encapsulates a pipeline's id and configuration as a blob
37+ * Encapsulates a pipeline's id and configuration as a loosely typed map -- see {@link Pipeline} for the
38+ * parsed and processed object(s) that a pipeline configuration will become. This class is used for things
39+ * like keeping track of pipelines in the cluster state (where a pipeline is 'just some json') whereas the
40+ * {@link Pipeline} class is used in the actual processing of ingest documents through pipelines in the
41+ * {@link IngestService}.
3242 */
3343public final class PipelineConfiguration implements SimpleDiffable <PipelineConfiguration >, ToXContentObject {
3444
3545 private static final ObjectParser <Builder , Void > PARSER = new ObjectParser <>("pipeline_config" , true , Builder ::new );
3646 static {
3747 PARSER .declareString (Builder ::setId , new ParseField ("id" ));
38- PARSER .declareField ((parser , builder , aVoid ) -> {
39- XContentBuilder contentBuilder = XContentBuilder .builder (parser .contentType ().xContent ());
40- contentBuilder .generator ().copyCurrentStructure (parser );
41- builder .setConfig (BytesReference .bytes (contentBuilder ), contentBuilder .contentType ());
42- }, new ParseField ("config" ), ObjectParser .ValueType .OBJECT );
43-
48+ PARSER .declareField (
49+ (parser , builder , aVoid ) -> builder .setConfig (parser .map ()),
50+ new ParseField ("config" ),
51+ ObjectParser .ValueType .OBJECT
52+ );
4453 }
4554
4655 public static ContextParser <Void , PipelineConfiguration > getParser () {
@@ -50,56 +59,94 @@ public static ContextParser<Void, PipelineConfiguration> getParser() {
5059 private static class Builder {
5160
5261 private String id ;
53- private BytesReference config ;
54- private XContentType xContentType ;
62+ private Map <String , Object > config ;
5563
5664 void setId (String id ) {
5765 this .id = id ;
5866 }
5967
60- void setConfig (BytesReference config , XContentType xContentType ) {
68+ void setConfig (Map < String , Object > config ) {
6169 this .config = config ;
62- this .xContentType = xContentType ;
6370 }
6471
6572 PipelineConfiguration build () {
66- return new PipelineConfiguration (id , config , xContentType );
73+ return new PipelineConfiguration (id , config );
6774 }
6875 }
6976
7077 private final String id ;
71- // Store config as bytes reference, because the config is only used when the pipeline store reads the cluster state
72- // and the way the map of maps config is read requires a deep copy (it removes instead of gets entries to check for unused options)
73- // also the get pipeline api just directly returns this to the caller
74- private final BytesReference config ;
75- private final XContentType xContentType ;
78+ private final Map <String , Object > config ;
7679
77- public PipelineConfiguration (String id , BytesReference config , XContentType xContentType ) {
80+ public PipelineConfiguration (String id , Map < String , Object > config ) {
7881 this .id = Objects .requireNonNull (id );
79- this .config = Objects .requireNonNull (config );
80- this .xContentType = Objects .requireNonNull (xContentType );
82+ this .config = deepCopy (config , true ); // defensive deep copy
83+ }
84+
85+ /**
86+ * A convenience constructor that parses some bytes as a map representing a pipeline's config and then delegates to the
87+ * conventional {@link #PipelineConfiguration(String, Map)} constructor.
88+ *
89+ * @param id the id of the pipeline
90+ * @param config a parse-able bytes reference that will return a pipeline configuration
91+ * @param xContentType the content-type to use while parsing the pipeline configuration
92+ */
93+ public PipelineConfiguration (String id , BytesReference config , XContentType xContentType ) {
94+ this (id , XContentHelper .convertToMap (config , true , xContentType ).v2 ());
8195 }
8296
8397 public String getId () {
8498 return id ;
8599 }
86100
87- public Map <String , Object > getConfigAsMap () {
88- return XContentHelper .convertToMap (config , true , xContentType ).v2 ();
101+ /**
102+ * @return a reference to the unmodifiable configuration map for this pipeline
103+ */
104+ public Map <String , Object > getConfig () {
105+ return getConfig (true );
89106 }
90107
91- // pkg-private for tests
92- XContentType getXContentType () {
93- return xContentType ;
108+ /**
109+ * @param unmodifiable whether the returned map should be unmodifiable or not
110+ * @return a reference to the unmodifiable config map (if unmodifiable is true) or
111+ * a reference to a freshly-created mutable deep copy of the config map (if unmodifiable is false)
112+ */
113+ public Map <String , Object > getConfig (boolean unmodifiable ) {
114+ if (unmodifiable ) {
115+ return config ; // already unmodifiable
116+ } else {
117+ return deepCopy (config , false );
118+ }
94119 }
95120
96- // pkg-private for tests
97- BytesReference getConfig () {
98- return config ;
121+ @ SuppressWarnings ("unchecked" )
122+ private static <T > T deepCopy (final T value , final boolean unmodifiable ) {
123+ return (T ) innerDeepCopy (value , unmodifiable );
124+ }
125+
126+ private static Object innerDeepCopy (final Object value , final boolean unmodifiable ) {
127+ if (value instanceof Map <?, ?> mapValue ) {
128+ final Map <Object , Object > copy = Maps .newLinkedHashMapWithExpectedSize (mapValue .size ()); // n.b. maintain ordering
129+ for (Map .Entry <?, ?> entry : mapValue .entrySet ()) {
130+ copy .put (innerDeepCopy (entry .getKey (), unmodifiable ), innerDeepCopy (entry .getValue (), unmodifiable ));
131+ }
132+ return unmodifiable ? Collections .unmodifiableMap (copy ) : copy ;
133+ } else if (value instanceof List <?> listValue ) {
134+ final List <Object > copy = new ArrayList <>(listValue .size ());
135+ for (Object itemValue : listValue ) {
136+ copy .add (innerDeepCopy (itemValue , unmodifiable ));
137+ }
138+ return unmodifiable ? Collections .unmodifiableList (copy ) : copy ;
139+ } else {
140+ // if this list of expected value types ends up not being exhaustive, then we want to learn about that
141+ // at development time, but it's probably better to err on the side of passing through the value at runtime
142+ assert (value == null || value instanceof String || value instanceof Number || value instanceof Boolean )
143+ : "unexpected value type [" + value .getClass () + "]" ;
144+ return value ;
145+ }
99146 }
100147
101148 public Integer getVersion () {
102- Object o = getConfigAsMap () .get ("version" );
149+ Object o = config .get ("version" );
103150 if (o == null ) {
104151 return null ;
105152 } else if (o instanceof Number number ) {
@@ -113,13 +160,22 @@ public Integer getVersion() {
113160 public XContentBuilder toXContent (XContentBuilder builder , Params params ) throws IOException {
114161 builder .startObject ();
115162 builder .field ("id" , id );
116- builder .field ("config" , getConfigAsMap () );
163+ builder .field ("config" , config );
117164 builder .endObject ();
118165 return builder ;
119166 }
120167
121168 public static PipelineConfiguration readFrom (StreamInput in ) throws IOException {
122- return new PipelineConfiguration (in .readString (), in .readBytesReference (), in .readEnum (XContentType .class ));
169+ final String id = in .readString ();
170+ final Map <String , Object > config ;
171+ if (in .getTransportVersion ().onOrAfter (TransportVersions .INGEST_PIPELINE_CONFIGURATION_AS_MAP )) {
172+ config = in .readGenericMap ();
173+ } else {
174+ final BytesReference bytes = in .readSlicedBytesReference ();
175+ final XContentType type = in .readEnum (XContentType .class );
176+ config = XContentHelper .convertToMap (bytes , true , type ).v2 ();
177+ }
178+ return new PipelineConfiguration (id , config );
123179 }
124180
125181 public static Diff <PipelineConfiguration > readDiffFrom (StreamInput in ) throws IOException {
@@ -134,8 +190,14 @@ public String toString() {
134190 @ Override
135191 public void writeTo (StreamOutput out ) throws IOException {
136192 out .writeString (id );
137- out .writeBytesReference (config );
138- XContentHelper .writeTo (out , xContentType );
193+ if (out .getTransportVersion ().onOrAfter (TransportVersions .INGEST_PIPELINE_CONFIGURATION_AS_MAP )) {
194+ out .writeGenericMap (config );
195+ } else {
196+ XContentBuilder builder = XContentBuilder .builder (JsonXContent .jsonXContent ).prettyPrint ();
197+ builder .map (config );
198+ out .writeBytesReference (BytesReference .bytes (builder ));
199+ XContentHelper .writeTo (out , XContentType .JSON );
200+ }
139201 }
140202
141203 @ Override
@@ -146,14 +208,14 @@ public boolean equals(Object o) {
146208 PipelineConfiguration that = (PipelineConfiguration ) o ;
147209
148210 if (id .equals (that .id ) == false ) return false ;
149- return getConfigAsMap () .equals (that .getConfigAsMap () );
211+ return config .equals (that .config );
150212
151213 }
152214
153215 @ Override
154216 public int hashCode () {
155217 int result = id .hashCode ();
156- result = 31 * result + getConfigAsMap () .hashCode ();
218+ result = 31 * result + config .hashCode ();
157219 return result ;
158220 }
159221}
0 commit comments