Skip to content

Commit 20e186a

Browse files
authored
Make enrich project-aware (#124099)
Makes the execution and use of enrich policies project-aware. Note: this does not make the enrich cache project-aware. That is to be handled in a follow-up PR.
1 parent ff6465b commit 20e186a

File tree

119 files changed

+863
-541
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

119 files changed

+863
-541
lines changed

libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineBridge.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
*/
99
package org.elasticsearch.logstashbridge.ingest;
1010

11+
import org.elasticsearch.core.FixForMultiProject;
1112
import org.elasticsearch.ingest.Pipeline;
1213
import org.elasticsearch.logstashbridge.StableBridgeAPI;
1314
import org.elasticsearch.logstashbridge.script.ScriptServiceBridge;
@@ -20,14 +21,21 @@ public static PipelineBridge wrap(final Pipeline pipeline) {
2021
return new PipelineBridge(pipeline);
2122
}
2223

24+
@FixForMultiProject(description = "should we pass a non-null project ID here?")
2325
public static PipelineBridge create(
2426
String id,
2527
Map<String, Object> config,
2628
Map<String, ProcessorBridge.Factory> processorFactories,
2729
ScriptServiceBridge scriptServiceBridge
2830
) throws Exception {
2931
return wrap(
30-
Pipeline.create(id, config, StableBridgeAPI.unwrap(processorFactories), StableBridgeAPI.unwrapNullable(scriptServiceBridge))
32+
Pipeline.create(
33+
id,
34+
config,
35+
StableBridgeAPI.unwrap(processorFactories),
36+
StableBridgeAPI.unwrapNullable(scriptServiceBridge),
37+
null
38+
)
3139
);
3240
}
3341

libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
*/
99
package org.elasticsearch.logstashbridge.ingest;
1010

11+
import org.elasticsearch.core.FixForMultiProject;
1112
import org.elasticsearch.core.TimeValue;
1213
import org.elasticsearch.ingest.IngestService;
1314
import org.elasticsearch.ingest.Processor;
@@ -118,7 +119,7 @@ static Factory wrap(final Processor.Factory delegate) {
118119
@Override
119120
default Processor.Factory unwrap() {
120121
final Factory stableAPIFactory = this;
121-
return (registry, tag, description, config) -> stableAPIFactory.create(
122+
return (registry, tag, description, config, projectId) -> stableAPIFactory.create(
122123
StableBridgeAPI.wrap(registry, Factory::wrap),
123124
tag,
124125
description,
@@ -131,14 +132,17 @@ private Wrapped(final Processor.Factory delegate) {
131132
super(delegate);
132133
}
133134

135+
@FixForMultiProject(description = "should we pass a non-null project ID here?")
134136
@Override
135137
public ProcessorBridge create(
136138
final Map<String, Factory> registry,
137139
final String processorTag,
138140
final String description,
139141
final Map<String, Object> config
140142
) throws Exception {
141-
return ProcessorBridge.wrap(this.delegate.create(StableBridgeAPI.unwrap(registry), processorTag, description, config));
143+
return ProcessorBridge.wrap(
144+
this.delegate.create(StableBridgeAPI.unwrap(registry), processorTag, description, config, null)
145+
);
142146
}
143147

144148
@Override

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/IngestFailureStoreMetricsIT.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -403,9 +403,9 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
403403
Map<String, Processor.Factory> processors = new HashMap<>();
404404
processors.put(
405405
"drop",
406-
(factories, tag, description, config) -> new TestProcessor(tag, "drop", description, ingestDocument -> null)
406+
(factories, tag, description, config, projectId) -> new TestProcessor(tag, "drop", description, ingestDocument -> null)
407407
);
408-
processors.put("reroute", (factories, tag, description, config) -> {
408+
processors.put("reroute", (factories, tag, description, config, projectId) -> {
409409
String destination = (String) config.remove("destination");
410410
return new TestProcessor(
411411
tag,
@@ -416,7 +416,12 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
416416
});
417417
processors.put(
418418
"fail",
419-
(processorFactories, tag, description, config) -> new TestProcessor(tag, "fail", description, new RuntimeException())
419+
(processorFactories, tag, description, config, projectId) -> new TestProcessor(
420+
tag,
421+
"fail",
422+
description,
423+
new RuntimeException()
424+
)
420425
);
421426
return processors;
422427
}

modules/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/AttachmentProcessor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.tika.metadata.Office;
1616
import org.apache.tika.metadata.TikaCoreProperties;
1717
import org.elasticsearch.ElasticsearchParseException;
18+
import org.elasticsearch.cluster.metadata.ProjectId;
1819
import org.elasticsearch.common.Strings;
1920
import org.elasticsearch.common.logging.DeprecationCategory;
2021
import org.elasticsearch.common.logging.DeprecationLogger;
@@ -232,7 +233,8 @@ public AttachmentProcessor create(
232233
Map<String, Processor.Factory> registry,
233234
String processorTag,
234235
String description,
235-
Map<String, Object> config
236+
Map<String, Object> config,
237+
ProjectId projectId
236238
) {
237239
String field = readStringProperty(TYPE, processorTag, config, "field");
238240
String resourceName = readOptionalStringProperty(TYPE, processorTag, config, "resource_name");

modules/ingest-attachment/src/test/java/org/elasticsearch/ingest/attachment/AttachmentProcessorFactoryTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void testBuildDefaults() throws Exception {
3636

3737
String processorTag = randomAlphaOfLength(10);
3838

39-
AttachmentProcessor processor = factory.create(null, processorTag, null, config);
39+
AttachmentProcessor processor = factory.create(null, processorTag, null, config, null);
4040
assertThat(processor.getTag(), equalTo(processorTag));
4141
assertThat(processor.getField(), equalTo("_field"));
4242
assertThat(processor.getTargetField(), equalTo("attachment"));
@@ -57,7 +57,7 @@ public void testConfigureIndexedChars() throws Exception {
5757
config.put("indexed_chars", indexedChars);
5858

5959
String processorTag = randomAlphaOfLength(10);
60-
AttachmentProcessor processor = factory.create(null, processorTag, null, config);
60+
AttachmentProcessor processor = factory.create(null, processorTag, null, config, null);
6161
assertThat(processor.getTag(), equalTo(processorTag));
6262
assertThat(processor.getIndexedChars(), is(indexedChars));
6363
assertFalse(processor.isIgnoreMissing());
@@ -73,7 +73,7 @@ public void testBuildTargetField() throws Exception {
7373
Map<String, Object> config = new HashMap<>();
7474
config.put("field", "_field");
7575
config.put("target_field", "_field");
76-
AttachmentProcessor processor = factory.create(null, null, null, config);
76+
AttachmentProcessor processor = factory.create(null, null, null, config, null);
7777
assertThat(processor.getField(), equalTo("_field"));
7878
assertThat(processor.getTargetField(), equalTo("_field"));
7979
assertFalse(processor.isIgnoreMissing());
@@ -97,7 +97,7 @@ public void testBuildFields() throws Exception {
9797
Map<String, Object> config = new HashMap<>();
9898
config.put("field", "_field");
9999
config.put("properties", fieldNames);
100-
AttachmentProcessor processor = factory.create(null, null, null, config);
100+
AttachmentProcessor processor = factory.create(null, null, null, config, null);
101101
assertThat(processor.getField(), equalTo("_field"));
102102
assertThat(processor.getProperties(), equalTo(properties));
103103
assertFalse(processor.isIgnoreMissing());
@@ -114,7 +114,7 @@ public void testBuildIllegalFieldOption() throws Exception {
114114
config.put("field", "_field");
115115
config.put("properties", Collections.singletonList("invalid"));
116116
try {
117-
factory.create(null, null, null, config);
117+
factory.create(null, null, null, config, null);
118118
fail("exception expected");
119119
} catch (ElasticsearchParseException e) {
120120
assertThat(e.getMessage(), containsString("[properties] illegal field option [invalid]"));
@@ -128,7 +128,7 @@ public void testBuildIllegalFieldOption() throws Exception {
128128
config.put("field", "_field");
129129
config.put("properties", "invalid");
130130
try {
131-
factory.create(null, null, null, config);
131+
factory.create(null, null, null, config, null);
132132
fail("exception expected");
133133
} catch (ElasticsearchParseException e) {
134134
assertThat(e.getMessage(), equalTo("[properties] property isn't a list, but of type [java.lang.String]"));
@@ -148,7 +148,7 @@ public void testIgnoreMissing() throws Exception {
148148

149149
String processorTag = randomAlphaOfLength(10);
150150

151-
AttachmentProcessor processor = factory.create(null, processorTag, null, config);
151+
AttachmentProcessor processor = factory.create(null, processorTag, null, config, null);
152152
assertThat(processor.getTag(), equalTo(processorTag));
153153
assertThat(processor.getField(), equalTo("_field"));
154154
assertThat(processor.getTargetField(), equalTo("attachment"));
@@ -169,7 +169,7 @@ public void testRemoveBinary() throws Exception {
169169

170170
String processorTag = randomAlphaOfLength(10);
171171

172-
AttachmentProcessor processor = factory.create(null, processorTag, null, config);
172+
AttachmentProcessor processor = factory.create(null, processorTag, null, config, null);
173173
assertThat(processor.getTag(), equalTo(processorTag));
174174
assertThat(processor.getField(), equalTo("_field"));
175175
assertThat(processor.getTargetField(), equalTo("attachment"));

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.ingest.common;
1111

12+
import org.elasticsearch.cluster.metadata.ProjectId;
1213
import org.elasticsearch.ingest.AbstractProcessor;
1314
import org.elasticsearch.ingest.ConfigurationUtils;
1415
import org.elasticsearch.ingest.IngestDocument;
@@ -108,7 +109,8 @@ public AbstractStringProcessor<?> create(
108109
Map<String, Processor.Factory> registry,
109110
String tag,
110111
String description,
111-
Map<String, Object> config
112+
Map<String, Object> config,
113+
ProjectId projectId
112114
) throws Exception {
113115
String field = ConfigurationUtils.readStringProperty(processorType, tag, config, "field");
114116
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(processorType, tag, config, "ignore_missing", false);

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.ingest.common;
1111

12+
import org.elasticsearch.cluster.metadata.ProjectId;
1213
import org.elasticsearch.ingest.AbstractProcessor;
1314
import org.elasticsearch.ingest.ConfigurationUtils;
1415
import org.elasticsearch.ingest.IngestDocument;
@@ -73,7 +74,8 @@ public AppendProcessor create(
7374
Map<String, Processor.Factory> registry,
7475
String processorTag,
7576
String description,
76-
Map<String, Object> config
77+
Map<String, Object> config,
78+
ProjectId projectId
7779
) throws Exception {
7880
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
7981
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CommunityIdProcessor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.ingest.common;
1111

12+
import org.elasticsearch.cluster.metadata.ProjectId;
1213
import org.elasticsearch.common.network.InetAddresses;
1314
import org.elasticsearch.ingest.AbstractProcessor;
1415
import org.elasticsearch.ingest.ConfigurationUtils;
@@ -297,7 +298,8 @@ public CommunityIdProcessor create(
297298
Map<String, Processor.Factory> registry,
298299
String processorTag,
299300
String description,
300-
Map<String, Object> config
301+
Map<String, Object> config,
302+
ProjectId projectId
301303
) throws Exception {
302304
String sourceIpField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "source_ip", DEFAULT_SOURCE_IP);
303305
String sourcePortField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "source_port", DEFAULT_SOURCE_PORT);

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.ingest.common;
1111

12+
import org.elasticsearch.cluster.metadata.ProjectId;
1213
import org.elasticsearch.common.network.InetAddresses;
1314
import org.elasticsearch.ingest.AbstractProcessor;
1415
import org.elasticsearch.ingest.ConfigurationUtils;
@@ -216,7 +217,8 @@ public ConvertProcessor create(
216217
Map<String, Processor.Factory> registry,
217218
String processorTag,
218219
String description,
219-
Map<String, Object> config
220+
Map<String, Object> config,
221+
ProjectId projectId
220222
) throws Exception {
221223
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
222224
String typeProperty = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "type");

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CsvProcessor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.ingest.common;
1111

12+
import org.elasticsearch.cluster.metadata.ProjectId;
1213
import org.elasticsearch.ingest.AbstractProcessor;
1314
import org.elasticsearch.ingest.ConfigurationUtils;
1415
import org.elasticsearch.ingest.IngestDocument;
@@ -93,7 +94,8 @@ public CsvProcessor create(
9394
Map<String, Processor.Factory> registry,
9495
String processorTag,
9596
String description,
96-
Map<String, Object> config
97+
Map<String, Object> config,
98+
ProjectId projectId
9799
) {
98100
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
99101
String quote = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "quote", "\"");

0 commit comments

Comments
 (0)