Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.elasticsearch.logstashbridge.ingest;

import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.logstashbridge.StableBridgeAPI;
import org.elasticsearch.logstashbridge.script.ScriptServiceBridge;
Expand All @@ -20,14 +21,21 @@ public static PipelineBridge wrap(final Pipeline pipeline) {
return new PipelineBridge(pipeline);
}

@FixForMultiProject(description = "should we pass a non-null project ID here?")
public static PipelineBridge create(
String id,
Map<String, Object> config,
Map<String, ProcessorBridge.Factory> processorFactories,
ScriptServiceBridge scriptServiceBridge
) throws Exception {
return wrap(
Pipeline.create(id, config, StableBridgeAPI.unwrap(processorFactories), StableBridgeAPI.unwrapNullable(scriptServiceBridge))
Pipeline.create(
id,
config,
StableBridgeAPI.unwrap(processorFactories),
StableBridgeAPI.unwrapNullable(scriptServiceBridge),
null
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.elasticsearch.logstashbridge.ingest;

import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Processor;
Expand Down Expand Up @@ -118,7 +119,7 @@ static Factory wrap(final Processor.Factory delegate) {
@Override
default Processor.Factory unwrap() {
final Factory stableAPIFactory = this;
return (registry, tag, description, config) -> stableAPIFactory.create(
return (registry, tag, description, config, projectId) -> stableAPIFactory.create(
StableBridgeAPI.wrap(registry, Factory::wrap),
tag,
description,
Expand All @@ -131,14 +132,17 @@ private Wrapped(final Processor.Factory delegate) {
super(delegate);
}

@FixForMultiProject(description = "should we pass a non-null project ID here?")
@Override
public ProcessorBridge create(
final Map<String, Factory> registry,
final String processorTag,
final String description,
final Map<String, Object> config
) throws Exception {
return ProcessorBridge.wrap(this.delegate.create(StableBridgeAPI.unwrap(registry), processorTag, description, config));
return ProcessorBridge.wrap(
this.delegate.create(StableBridgeAPI.unwrap(registry), processorTag, description, config, null)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,9 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
Map<String, Processor.Factory> processors = new HashMap<>();
processors.put(
"drop",
(factories, tag, description, config) -> new TestProcessor(tag, "drop", description, ingestDocument -> null)
(factories, tag, description, config, projectId) -> new TestProcessor(tag, "drop", description, ingestDocument -> null)
);
processors.put("reroute", (factories, tag, description, config) -> {
processors.put("reroute", (factories, tag, description, config, projectId) -> {
String destination = (String) config.remove("destination");
return new TestProcessor(
tag,
Expand All @@ -416,7 +416,12 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
});
processors.put(
"fail",
(processorFactories, tag, description, config) -> new TestProcessor(tag, "fail", description, new RuntimeException())
(processorFactories, tag, description, config, projectId) -> new TestProcessor(
tag,
"fail",
description,
new RuntimeException()
)
);
return processors;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.tika.metadata.Office;
import org.apache.tika.metadata.TikaCoreProperties;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
Expand Down Expand Up @@ -232,7 +233,8 @@ public AttachmentProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
Map<String, Object> config,
ProjectId projectId
) {
String field = readStringProperty(TYPE, processorTag, config, "field");
String resourceName = readOptionalStringProperty(TYPE, processorTag, config, "resource_name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void testBuildDefaults() throws Exception {

String processorTag = randomAlphaOfLength(10);

AttachmentProcessor processor = factory.create(null, processorTag, null, config);
AttachmentProcessor processor = factory.create(null, processorTag, null, config, null);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("attachment"));
Expand All @@ -57,7 +57,7 @@ public void testConfigureIndexedChars() throws Exception {
config.put("indexed_chars", indexedChars);

String processorTag = randomAlphaOfLength(10);
AttachmentProcessor processor = factory.create(null, processorTag, null, config);
AttachmentProcessor processor = factory.create(null, processorTag, null, config, null);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getIndexedChars(), is(indexedChars));
assertFalse(processor.isIgnoreMissing());
Expand All @@ -73,7 +73,7 @@ public void testBuildTargetField() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("target_field", "_field");
AttachmentProcessor processor = factory.create(null, null, null, config);
AttachmentProcessor processor = factory.create(null, null, null, config, null);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("_field"));
assertFalse(processor.isIgnoreMissing());
Expand All @@ -97,7 +97,7 @@ public void testBuildFields() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("properties", fieldNames);
AttachmentProcessor processor = factory.create(null, null, null, config);
AttachmentProcessor processor = factory.create(null, null, null, config, null);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getProperties(), equalTo(properties));
assertFalse(processor.isIgnoreMissing());
Expand All @@ -114,7 +114,7 @@ public void testBuildIllegalFieldOption() throws Exception {
config.put("field", "_field");
config.put("properties", Collections.singletonList("invalid"));
try {
factory.create(null, null, null, config);
factory.create(null, null, null, config, null);
fail("exception expected");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), containsString("[properties] illegal field option [invalid]"));
Expand All @@ -128,7 +128,7 @@ public void testBuildIllegalFieldOption() throws Exception {
config.put("field", "_field");
config.put("properties", "invalid");
try {
factory.create(null, null, null, config);
factory.create(null, null, null, config, null);
fail("exception expected");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[properties] property isn't a list, but of type [java.lang.String]"));
Expand All @@ -148,7 +148,7 @@ public void testIgnoreMissing() throws Exception {

String processorTag = randomAlphaOfLength(10);

AttachmentProcessor processor = factory.create(null, processorTag, null, config);
AttachmentProcessor processor = factory.create(null, processorTag, null, config, null);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("attachment"));
Expand All @@ -169,7 +169,7 @@ public void testRemoveBinary() throws Exception {

String processorTag = randomAlphaOfLength(10);

AttachmentProcessor processor = factory.create(null, processorTag, null, config);
AttachmentProcessor processor = factory.create(null, processorTag, null, config, null);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("attachment"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
Expand Down Expand Up @@ -108,7 +109,8 @@ public AbstractStringProcessor<?> create(
Map<String, Processor.Factory> registry,
String tag,
String description,
Map<String, Object> config
Map<String, Object> config,
ProjectId projectId
) throws Exception {
String field = ConfigurationUtils.readStringProperty(processorType, tag, config, "field");
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(processorType, tag, config, "ignore_missing", false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
Expand Down Expand Up @@ -73,7 +74,8 @@ public AppendProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
Map<String, Object> config,
ProjectId projectId
) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
Expand Down Expand Up @@ -297,7 +298,8 @@ public CommunityIdProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
Map<String, Object> config,
ProjectId projectId
) throws Exception {
String sourceIpField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "source_ip", DEFAULT_SOURCE_IP);
String sourcePortField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "source_port", DEFAULT_SOURCE_PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
Expand Down Expand Up @@ -216,7 +217,8 @@ public ConvertProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
Map<String, Object> config,
ProjectId projectId
) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String typeProperty = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "type");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
Expand Down Expand Up @@ -93,7 +94,8 @@ public CsvProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
Map<String, Object> config,
ProjectId projectId
) {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String quote = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "quote", "\"");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.ingest.common;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
Expand Down Expand Up @@ -150,7 +151,8 @@ public DateIndexNameProcessor create(
Map<String, Processor.Factory> registry,
String tag,
String description,
Map<String, Object> config
Map<String, Object> config,
ProjectId projectId
) throws Exception {
String localeString = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "locale");
String timezoneString = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "timezone");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.ingest.common;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.util.LocaleUtils;
Expand Down Expand Up @@ -183,7 +184,8 @@ public DateProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
Map<String, Object> config,
ProjectId projectId
) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", DEFAULT_TARGET_FIELD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.dissect.DissectParser;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
Expand Down Expand Up @@ -60,7 +61,8 @@ public DissectProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
Map<String, Object> config,
ProjectId projectId
) {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String pattern = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "pattern");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
Expand Down Expand Up @@ -120,7 +121,8 @@ public Processor create(
Map<String, Processor.Factory> processorFactories,
String tag,
String description,
Map<String, Object> config
Map<String, Object> config,
ProjectId projectId
) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
if (field.contains(".") == false && field.equals("*") == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
Expand Down Expand Up @@ -60,7 +61,8 @@ public FailProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
Map<String, Object> config,
ProjectId projectId
) throws Exception {
String message = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "message");
TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.hash.Murmur3Hasher;
import org.elasticsearch.common.util.ByteUtils;
Expand Down Expand Up @@ -227,7 +228,8 @@ public FingerprintProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
Map<String, Object> config,
ProjectId projectId
) throws Exception {
List<String> fields = ConfigurationUtils.readList(TYPE, processorTag, config, "fields");
if (fields.size() < 1) {
Expand Down
Loading