diff --git a/build.gradle b/build.gradle index ce3b89e9..1fddcbd1 100644 --- a/build.gradle +++ b/build.gradle @@ -307,10 +307,6 @@ task shadeElasticsearchIngestGeoIpModule(type: com.github.jengelman.gradle.plugi mergeServiceFiles() - String shadeNamespace = "org.elasticsearch.ingest.geoip.shaded" - relocate('com.fasterxml.jackson', "${shadeNamespace}.com.fasterxml.jackson") - relocate('com.maxmind', "${shadeNamespace}.com.maxmind") - exclude '**/module-info.class' } @@ -358,14 +354,14 @@ task shadeElasticsearchLogstashBridge(type: com.github.jengelman.gradle.plugins. description "Shades the Elasticsearch logstash-bridge jar" dependsOn buildElasticsearchLogstashBridge - + from("${buildDir}/elasticsearch-source/libs/logstash-bridge/build/distributions") { include "elasticsearch-logstash-bridge-*.jar" } - + archiveFileName = "elasticsearch-logstash-bridge-shaded.jar" destinationDirectory = file("${buildDir}/shaded") - + exclude '**/module-info.class' } diff --git a/gradle.properties b/gradle.properties index 89483d25..81721777 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,3 @@ LOGSTASH_PATH=../../logstash -ELASTICSEARCH_TREEISH=main +ELASTICSEARCH_REPO=mashhurs/elasticsearch +ELASTICSEARCH_TREEISH=logstash-bridge-geoip-interfaces diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchPipelineConfigurationResolver.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchPipelineConfigurationResolver.java index f6b870f5..0bc672ac 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchPipelineConfigurationResolver.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchPipelineConfigurationResolver.java @@ -15,7 +15,7 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; -import org.elasticsearch.ingest.PipelineConfiguration; +import org.elasticsearch.logstashbridge.ingest.PipelineConfigurationBridge; import java.util.Optional; @@ -24,7 +24,7 @@ * that retrieves pipelines from Elasticsearch. */ public class ElasticsearchPipelineConfigurationResolver - extends AbstractSimpleResolver + extends AbstractSimpleResolver implements PipelineConfigurationResolver { private final RestClient elasticsearchRestClient; private final PipelineConfigurationFactory pipelineConfigurationFactory; @@ -37,13 +37,13 @@ public ElasticsearchPipelineConfigurationResolver(final RestClient elasticsearch } @Override - public Optional resolveSafely(String pipelineName) throws Exception { + public Optional resolveSafely(String pipelineName) throws Exception { final Response response; try { final Request request = new Request("GET", URLEncodedUtils.formatSegments("_ingest", "pipeline", pipelineName)); response = elasticsearchRestClient.performRequest(request); final String jsonEncodedPayload = EntityUtils.toString(response.getEntity()); - final PipelineConfiguration pipelineConfiguration = pipelineConfigurationFactory.parseNamedObject(jsonEncodedPayload); + final PipelineConfigurationBridge pipelineConfiguration = pipelineConfigurationFactory.parseNamedObject(jsonEncodedPayload); return Optional.of(pipelineConfiguration); } catch (ResponseException re) { if (re.getResponse().getStatusLine().getStatusCode() == 404) { diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessor.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessor.java index e97f2b55..b835ef88 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessor.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessor.java @@ -13,16 +13,16 @@ import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.support.RefCountingRunnable; -import org.elasticsearch.core.IOUtils; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.LogstashInternalBridge; -import org.elasticsearch.ingest.common.FailProcessorException; +import org.elasticsearch.logstashbridge.core.FailProcessorExceptionBridge; +import org.elasticsearch.logstashbridge.core.IOUtilsBridge; +import org.elasticsearch.logstashbridge.core.RefCountingRunnableBridge; +import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge; import java.io.Closeable; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -32,7 +32,6 @@ import static co.elastic.logstash.filters.elasticintegration.util.EventUtil.eventAsMap; import static co.elastic.logstash.filters.elasticintegration.util.EventUtil.serializeEventForLog; -import static org.elasticsearch.core.Strings.format; /** * An {@link EventProcessor} processes {@link Event}s by: @@ -94,8 +93,11 @@ public Collection processEvents(final Collection incomingEvents) t final CountDownLatch latch = new CountDownLatch(1); final IntegrationBatch batch = new IntegrationBatch(incomingEvents); - try (RefCountingRunnable ref = new RefCountingRunnable(latch::countDown)) { + RefCountingRunnableBridge ref = new RefCountingRunnableBridge(latch::countDown); + try { batch.eachRequest(ref::acquire, this::processRequest); + } finally { + ref.close(); } // await on work that has gone async @@ -151,7 +153,7 @@ void processRequest(final IntegrationRequest request) { final IngestPipeline ingestPipeline = loadedPipeline.get(); LOGGER.trace(() -> String.format("Using loaded pipeline `%s` (%s)", pipelineName, System.identityHashCode(ingestPipeline))); - final IngestDocument ingestDocument = eventMarshaller.toIngestDocument(request.event()); + final IngestDocumentBridge ingestDocument = eventMarshaller.toIngestDocument(request.event()); resolvedIndexName.ifPresent(indexName -> { ingestDocument.getMetadata().setIndex(indexName); @@ -170,7 +172,7 @@ void processRequest(final IntegrationRequest request) { } } - private void executePipeline(final IngestDocument ingestDocument, final IngestPipeline ingestPipeline, final IntegrationRequest request) { + private void executePipeline(final IngestDocumentBridge ingestDocument, final IngestPipeline ingestPipeline, final IntegrationRequest request) { final String pipelineName = ingestPipeline.getId(); final String originalIndex = ingestDocument.getMetadata().getIndex(); ingestPipeline.execute(ingestDocument, (resultIngestDocument, ingestPipelineException) -> { @@ -193,17 +195,17 @@ private void executePipeline(final IngestDocument ingestDocument, final IngestPi } else { final String newIndex = resultIngestDocument.getMetadata().getIndex(); - if (!Objects.equals(originalIndex, newIndex) && LogstashInternalBridge.isReroute(resultIngestDocument)) { - LogstashInternalBridge.resetReroute(resultIngestDocument); + if (!Objects.equals(originalIndex, newIndex) && ingestDocument.isReroute()) { + ingestDocument.resetReroute(); boolean cycle = !resultIngestDocument.updateIndexHistory(newIndex); if (cycle) { request.complete(incomingEvent -> { - annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message", format( - "index cycle detected while processing pipeline [%s]: %s + %s", - pipelineName, - resultIngestDocument.getIndexHistory(), - newIndex - ))); + annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message", + String.format(Locale.ROOT, "index cycle detected while processing pipeline [%s]: %s + %s", + pipelineName, + resultIngestDocument.getIndexHistory(), + newIndex) + )); }); return; } @@ -214,12 +216,14 @@ private void executePipeline(final IngestDocument ingestDocument, final IngestPi final Optional reroutePipeline = resolve(reroutePipelineName.get(), internalPipelineProvider); if (reroutePipeline.isEmpty()) { request.complete(incomingEvent -> { - annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message", format( - "reroute failed to load next pipeline [%s]: %s -> %s", + annotateIngestPipelineFailure( + incomingEvent, pipelineName, - resultIngestDocument.getIndexHistory(), - reroutePipelineName.get() - ))); + Map.of("message", + String.format(Locale.ROOT, "reroute failed to load next pipeline [%s]: %s -> %s", + pipelineName, + resultIngestDocument.getIndexHistory(), + reroutePipelineName.get()))); }); } else { executePipeline(resultIngestDocument, reroutePipeline.get(), request); @@ -253,7 +257,9 @@ static private void annotateIngestPipelineFailure(final Event event, final Strin } static private Throwable unwrapException(final Exception exception) { - if (exception.getCause() instanceof FailProcessorException) { return exception.getCause(); } + if (FailProcessorExceptionBridge.isInstanceOf(exception.getCause())) { + return exception.getCause(); + } return exception; } @@ -277,6 +283,6 @@ static private Optional resolve(T resolvable, Resolver resolver) { @Override public void close() throws IOException { - IOUtils.closeWhileHandlingException(this.resourcesToClose); + IOUtilsBridge.closeWhileHandlingException(this.resourcesToClose); } } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java index 0a3f925d..a0e42823 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java @@ -8,7 +8,6 @@ import co.elastic.logstash.api.Event; import co.elastic.logstash.api.FilterMatchListener; -import co.elastic.logstash.filters.elasticintegration.ingest.RedactPlugin; import co.elastic.logstash.filters.elasticintegration.ingest.SetSecurityUserProcessor; import co.elastic.logstash.filters.elasticintegration.ingest.SingleProcessorIngestPlugin; import co.elastic.logstash.filters.elasticintegration.resolver.CacheReloadService; @@ -17,41 +16,23 @@ import co.elastic.logstash.filters.elasticintegration.resolver.ResolverCache; import co.elastic.logstash.filters.elasticintegration.util.Exceptions; import co.elastic.logstash.filters.elasticintegration.util.PluginContext; -import co.elastic.logstash.filters.elasticintegration.util.PluginProjectResolver; import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.ServiceManager; import org.elasticsearch.client.RestClient; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.IOUtils; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.env.Environment; -import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.LogstashInternalBridge; -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.ingest.common.IngestCommonPlugin; -import org.elasticsearch.ingest.common.ProcessorsWhitelistExtension; -import org.elasticsearch.ingest.useragent.IngestUserAgentPlugin; -import org.elasticsearch.painless.PainlessPlugin; -import org.elasticsearch.painless.PainlessScriptEngine; -import org.elasticsearch.painless.spi.PainlessExtension; -import org.elasticsearch.plugins.ExtensiblePlugin; -import org.elasticsearch.plugins.IngestPlugin; -import org.elasticsearch.script.IngestConditionalScript; -import org.elasticsearch.script.IngestScript; -import org.elasticsearch.script.ScriptEngine; -import org.elasticsearch.script.ScriptModule; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.script.mustache.MustacheScriptEngine; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.constantkeyword.ConstantKeywordPainlessExtension; -import org.elasticsearch.xpack.spatial.SpatialPainlessExtension; -import org.elasticsearch.xpack.wildcard.WildcardPainlessExtension; +import org.elasticsearch.logstashbridge.common.SettingsBridge; +import org.elasticsearch.logstashbridge.core.IOUtilsBridge; +import org.elasticsearch.logstashbridge.env.EnvironmentBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.plugins.IngestCommonPluginBridge; +import org.elasticsearch.logstashbridge.plugins.IngestPluginBridge; +import org.elasticsearch.logstashbridge.plugins.IngestUserAgentPluginBridge; +import org.elasticsearch.logstashbridge.plugins.RedactPluginBridge; +import org.elasticsearch.logstashbridge.script.ScriptServiceBridge; +import org.elasticsearch.logstashbridge.threadpool.ThreadPoolBridge; import java.io.Closeable; -import java.io.IOException; import java.time.Duration; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -91,44 +72,44 @@ public static EventProcessorBuilder fromElasticsearch(final RestClient elasticse } public EventProcessorBuilder() { - this.addProcessorsFromPlugin(IngestCommonPlugin::new, Set.of( - org.elasticsearch.ingest.common.AppendProcessor.TYPE, - org.elasticsearch.ingest.common.BytesProcessor.TYPE, - org.elasticsearch.ingest.common.CommunityIdProcessor.TYPE, - org.elasticsearch.ingest.common.ConvertProcessor.TYPE, - org.elasticsearch.ingest.common.CsvProcessor.TYPE, - org.elasticsearch.ingest.common.DateIndexNameProcessor.TYPE, - org.elasticsearch.ingest.common.DateProcessor.TYPE, - org.elasticsearch.ingest.common.DissectProcessor.TYPE, + this.addProcessorsFromPlugin(IngestCommonPluginBridge::new, Set.of( + IngestCommonPluginBridge.APPEND_PROCESSOR_TYPE, + IngestCommonPluginBridge.BYTES_PROCESSOR_TYPE, + IngestCommonPluginBridge.COMMUNITY_ID_PROCESSOR_TYPE, + IngestCommonPluginBridge.CONVERT_PROCESSOR_TYPE, + IngestCommonPluginBridge.CSV_PROCESSOR_TYPE, + IngestCommonPluginBridge.DATE_INDEX_NAME_PROCESSOR_TYPE, + IngestCommonPluginBridge.DATE_PROCESSOR_TYPE, + IngestCommonPluginBridge.DISSECT_PROCESSOR_TYPE, "dot_expander", // note: upstream constant is package-private - org.elasticsearch.ingest.DropProcessor.TYPE, // note: not in ingest-common - org.elasticsearch.ingest.common.FailProcessor.TYPE, - org.elasticsearch.ingest.common.FingerprintProcessor.TYPE, - org.elasticsearch.ingest.common.ForEachProcessor.TYPE, - org.elasticsearch.ingest.common.GrokProcessor.TYPE, - org.elasticsearch.ingest.common.GsubProcessor.TYPE, - org.elasticsearch.ingest.common.HtmlStripProcessor.TYPE, - org.elasticsearch.ingest.common.JoinProcessor.TYPE, - org.elasticsearch.ingest.common.JsonProcessor.TYPE, - org.elasticsearch.ingest.common.KeyValueProcessor.TYPE, - org.elasticsearch.ingest.common.LowercaseProcessor.TYPE, - org.elasticsearch.ingest.common.NetworkDirectionProcessor.TYPE, + IngestCommonPluginBridge.DROP_PROCESSOR_TYPE, + IngestCommonPluginBridge.FAIL_PROCESSOR_TYPE, + IngestCommonPluginBridge.FINGERPRINT_PROCESSOR_TYPE, + IngestCommonPluginBridge.FOR_EACH_PROCESSOR_TYPE, + IngestCommonPluginBridge.GROK_PROCESSOR_TYPE, + IngestCommonPluginBridge.GSUB_PROCESSOR_TYPE, + IngestCommonPluginBridge.HTML_STRIP_PROCESSOR_TYPE, + IngestCommonPluginBridge.JOIN_PROCESSOR_TYPE, + IngestCommonPluginBridge.JSON_PROCESSOR_TYPE, + IngestCommonPluginBridge.KEY_VALUE_PROCESSOR_TYPE, + IngestCommonPluginBridge.LOWERCASE_PROCESSOR_TYPE, + IngestCommonPluginBridge.NETWORK_DIRECTION_PROCESSOR_TYPE, // note: no `pipeline` processor, as we provide our own - org.elasticsearch.ingest.common.RegisteredDomainProcessor.TYPE, - org.elasticsearch.ingest.common.RemoveProcessor.TYPE, - org.elasticsearch.ingest.common.RenameProcessor.TYPE, - org.elasticsearch.ingest.common.RerouteProcessor.TYPE, - org.elasticsearch.ingest.common.ScriptProcessor.TYPE, - org.elasticsearch.ingest.common.SetProcessor.TYPE, - org.elasticsearch.ingest.common.SortProcessor.TYPE, - org.elasticsearch.ingest.common.SplitProcessor.TYPE, + IngestCommonPluginBridge.REGISTERED_DOMAIN_PROCESSOR_TYPE, + IngestCommonPluginBridge.REMOVE_PROCESSOR_TYPE, + IngestCommonPluginBridge.RENAME_PROCESSOR_TYPE, + IngestCommonPluginBridge.REROUTE_PROCESSOR_TYPE, + IngestCommonPluginBridge.SCRIPT_PROCESSOR_TYPE, + IngestCommonPluginBridge.SET_PROCESSOR_TYPE, + IngestCommonPluginBridge.SORT_PROCESSOR_TYPE, + IngestCommonPluginBridge.SPLIT_PROCESSOR_TYPE, "terminate", // note: upstream constant is package-private - org.elasticsearch.ingest.common.TrimProcessor.TYPE, - org.elasticsearch.ingest.common.URLDecodeProcessor.TYPE, - org.elasticsearch.ingest.common.UppercaseProcessor.TYPE, - org.elasticsearch.ingest.common.UriPartsProcessor.TYPE)); - this.addProcessorsFromPlugin(IngestUserAgentPlugin::new); - this.addProcessorsFromPlugin(RedactPlugin::new); + IngestCommonPluginBridge.TRIM_PROCESSOR_TYPE, + IngestCommonPluginBridge.URL_DECODE_PROCESSOR_TYPE, + IngestCommonPluginBridge.UPPERCASE_PROCESSOR_TYPE, + IngestCommonPluginBridge.URI_PARTS_PROCESSOR_TYPE)); + this.addProcessorsFromPlugin(IngestUserAgentPluginBridge::new); + this.addProcessorsFromPlugin(RedactPluginBridge::new); this.addProcessor(SetSecurityUserProcessor.TYPE, SetSecurityUserProcessor.Factory::new); } @@ -150,7 +131,7 @@ public EventProcessorBuilder() { // filer match listener private FilterMatchListener filterMatchListener; - private final List> ingestPlugins = new ArrayList<>(); + private final List> ingestPlugins = new ArrayList<>(); public synchronized EventProcessorBuilder setPipelineConfigurationResolver(final PipelineConfigurationResolver pipelineConfigurationResolver) { if (Objects.nonNull(this.pipelineConfigurationResolver)) { @@ -215,15 +196,15 @@ private synchronized EventProcessorBuilder setFilterMatchListener(final FilterMa return this; } - public EventProcessorBuilder addProcessor(final String type, final Supplier processorFactorySupplier) { + public EventProcessorBuilder addProcessor(final String type, final Supplier processorFactorySupplier) { return this.addProcessorsFromPlugin(SingleProcessorIngestPlugin.of(type, processorFactorySupplier)); } - public EventProcessorBuilder addProcessorsFromPlugin(Supplier pluginSupplier, Set requiredProcessors) { + public EventProcessorBuilder addProcessorsFromPlugin(Supplier pluginSupplier, Set requiredProcessors) { return this.addProcessorsFromPlugin(safeSubset(pluginSupplier, requiredProcessors)); } - public synchronized EventProcessorBuilder addProcessorsFromPlugin(Supplier pluginSupplier) { + public synchronized EventProcessorBuilder addProcessorsFromPlugin(Supplier pluginSupplier) { this.ingestPlugins.add(pluginSupplier); return this; } @@ -233,7 +214,7 @@ public synchronized EventProcessor build(final PluginContext pluginContext) { Objects.requireNonNull(this.eventToIndexNameResolver, "event index name resolver is REQUIRED"); Objects.requireNonNull(this.indexNameToPipelineNameResolver, "pipeline name resolver is REQUIRED"); - final Settings settings = Settings.builder() + final SettingsBridge settings = SettingsBridge.builder() .put("path.home", "/") .put("node.name", "logstash.filter.elastic_integration." + pluginContext.pluginId()) .put("ingest.grok.watchdog.interval", "1s") @@ -245,33 +226,22 @@ public synchronized EventProcessor build(final PluginContext pluginContext) { try { final ArrayList services = new ArrayList<>(); - final ThreadPool threadPool = LogstashInternalBridge.createThreadPool(settings); - resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)); + final ThreadPoolBridge threadPool = new ThreadPoolBridge(settings); + resourcesToClose.add(() -> ThreadPoolBridge.terminate(threadPool, 10, TimeUnit.SECONDS)); - final ScriptService scriptService = initScriptService(settings, threadPool); + final ScriptServiceBridge scriptService = new ScriptServiceBridge(settings, threadPool::absoluteTimeInMillis); resourcesToClose.add(scriptService); - final Environment env = new Environment(settings, null); - final Processor.Parameters processorParameters = new Processor.Parameters( - env, - scriptService, - null, - threadPool.getThreadContext(), - threadPool::relativeTimeInMillis, - (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic()), - null, - null, - threadPool.generic()::execute, - IngestService.createGrokThreadWatchdog(env, threadPool) - ); + final EnvironmentBridge env = new EnvironmentBridge(settings, null); + final ProcessorBridge.Parameters processorParameters = new ProcessorBridge.Parameters(env, scriptService, threadPool); IngestPipelineFactory ingestPipelineFactory = new IngestPipelineFactory(scriptService); - for (Supplier ingestPluginSupplier : ingestPlugins) { - final IngestPlugin ingestPlugin = ingestPluginSupplier.get(); + for (Supplier ingestPluginSupplier : ingestPlugins) { + final IngestPluginBridge ingestPlugin = ingestPluginSupplier.get(); if (ingestPlugin instanceof Closeable closeableIngestPlugin) { resourcesToClose.add(closeableIngestPlugin); } - final Map processorFactories = ingestPlugin.getProcessors(processorParameters); + final Map processorFactories = ingestPlugin.getProcessors(processorParameters); ingestPipelineFactory = ingestPipelineFactory.withProcessors(processorFactories); } @@ -309,53 +279,8 @@ public synchronized EventProcessor build(final PluginContext pluginContext) { indexNameToPipelineNameResolver, resourcesToClose); } catch (Exception e) { - IOUtils.closeWhileHandlingException(resourcesToClose); + IOUtilsBridge.closeWhileHandlingException(resourcesToClose); throw Exceptions.wrap(e, "Failed to build EventProcessor"); } } - - private static ScriptService initScriptService(final Settings settings, final ThreadPool threadPool) throws IOException { - Map engines = new HashMap<>(); - engines.put(PainlessScriptEngine.NAME, getPainlessScriptEngine(settings)); - engines.put(MustacheScriptEngine.NAME, new MustacheScriptEngine(settings)); - - return new ScriptService( - settings, - engines, - ScriptModule.CORE_CONTEXTS, - threadPool::absoluteTimeInMillis, - new PluginProjectResolver()); - } - - /** - * @param settings the Elasticsearch settings object - * @return a {@link ScriptEngine} for painless scripts for use in {@link IngestScript} and - * {@link IngestConditionalScript} contexts, including all available {@link PainlessExtension}s. - * @throws IOException when the underlying script engine cannot be created - */ - private static ScriptEngine getPainlessScriptEngine(final Settings settings) throws IOException { - try (final PainlessPlugin painlessPlugin = new PainlessPlugin()) { - - painlessPlugin.loadExtensions(new ExtensiblePlugin.ExtensionLoader() { - @Override - @SuppressWarnings("unchecked") - public List loadExtensions(Class extensionPointType) { - if (extensionPointType.isAssignableFrom(PainlessExtension.class)) { - final List extensions = new ArrayList<>(); - - extensions.add(new ConstantKeywordPainlessExtension()); // module: constant-keyword - extensions.add(new ProcessorsWhitelistExtension()); // module: ingest-common - extensions.add(new SpatialPainlessExtension()); // module: spatial - extensions.add(new WildcardPainlessExtension()); // module: wildcard - - return (List) extensions; - } else { - return List.of(); - } - } - }); - - return painlessPlugin.getScriptEngine(settings, Set.of(IngestScript.CONTEXT, IngestConditionalScript.CONTEXT)); - } - } } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestDuplexMarshaller.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestDuplexMarshaller.java index 3b8a6943..74980f98 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestDuplexMarshaller.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestDuplexMarshaller.java @@ -10,8 +10,8 @@ import co.elastic.logstash.api.EventFactory; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.script.Metadata; +import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge; +import org.elasticsearch.logstashbridge.script.MetadataBridge; import org.logstash.Javafier; import org.logstash.Timestamp; import org.logstash.plugins.BasicEventFactory; @@ -34,7 +34,7 @@ /** * The {@code IngestDuplexMarshaller} is capable of marshalling events between the internal logstash {@link Event} - * and the external Elasticsearch {@link IngestDocument}. + * and the external Elasticsearch {@link IngestDocumentBridge}. */ public class IngestDuplexMarshaller { private final EventFactory eventFactory; @@ -74,17 +74,17 @@ public static IngestDuplexMarshaller defaultInstance() { } /** - * Converts the provided Logstash {@link Event} into an Elasticsearch {@link IngestDocument}, + * Converts the provided Logstash {@link Event} into an Elasticsearch {@link IngestDocumentBridge}, * ensuring that required values are present, reserved values are of the appropriate shape, * and field values are of types that are useful to Ingest Processors. * * @param event the event to convert - * @return an equivalent Elasticsearch {@link IngestDocument} + * @return an equivalent Elasticsearch {@link IngestDocumentBridge} */ - public IngestDocument toIngestDocument(final Event event) { + public IngestDocumentBridge toIngestDocument(final Event event) { - // The public Elasticsearch IngestDocument constructor accepts a single source-and-metadata map, - // which it splits into two maps based keys being valid IngestDocument.Metadata properties. + // The public Elasticsearch IngestDocumentBridge constructor accepts a single source-and-metadata map, + // which it splits into two maps based keys being valid IngestDocumentBridge.Metadata properties. // we copy the entirety of the event's top-level fields into this. Map sourceAndMetadata = new HashMap<>(externalize(event.getData())); @@ -98,12 +98,12 @@ public IngestDocument toIngestDocument(final Event event) { // event's @version if available or provide a sensible default sanitizeIngestDocumentRequiredMetadataVersion(sourceAndMetadata, event); - // When an IngestDocument is initialized, its "ingestMetadata" is only expected to contain the + // When an IngestDocumentBridge is initialized, its "ingestMetadata" is only expected to contain the // event's timestamp, which is copied into the event and can be either a String or a ZonedDateTime. final Timestamp eventTimestamp = safeTimestampFrom(event.getField(org.logstash.Event.TIMESTAMP)); Map ingestMetadata = Map.of(INGEST_METADATA_TIMESTAMP_FIELD, Objects.requireNonNullElseGet(eventTimestamp, Timestamp::now).toString()); - return new IngestDocument(sourceAndMetadata, ingestMetadata); + return new IngestDocumentBridge(sourceAndMetadata, ingestMetadata); } /** @@ -116,7 +116,7 @@ public IngestDocument toIngestDocument(final Event event) { * should be largely safe in Elasticsearch. * * @param internalObject an object that may need to be externalized - * @return an object that is safe for use as a field value in an IngestDocument + * @return an object that is safe for use as a field value in an IngestDocumentBridge */ private Object externalize(final @Nullable Object internalObject) { if (Objects.isNull(internalObject)) { return null; } @@ -198,12 +198,11 @@ private Set externalize(final @Nonnull Set internalSet) { * @param event the event to fetch fallback values from */ private void sanitizeIngestDocumentRequiredMetadataVersion(final Map sourceAndMetadata, final Event event) { - Object sourceVersion = safeLongFrom(sourceAndMetadata.remove(IngestDocument.Metadata.VERSION.getFieldName())); + Object sourceVersion = safeLongFrom(sourceAndMetadata.remove(IngestDocumentBridge.Constants.METADATA_VERSION_FIELD_NAME)); if (Objects.isNull(sourceVersion)) { sourceVersion = safeLongFrom(event.getField(org.logstash.Event.VERSION)); } - - sourceAndMetadata.put(IngestDocument.Metadata.VERSION.getFieldName(), Objects.requireNonNullElse(sourceVersion, 1L)); + sourceAndMetadata.put(IngestDocumentBridge.Constants.METADATA_VERSION_FIELD_NAME, Objects.requireNonNullElse(sourceVersion, 1L)); } /** @@ -232,16 +231,16 @@ private Long safeLongFrom(final Object object) { } /** - * Converts the provided Elasticsearch {@link IngestDocument} into a Logstash {@link Event}, + * Converts the provided Elasticsearch {@link IngestDocumentBridge} into a Logstash {@link Event}, * ensuring that required values are present, reserved values are of the appropriate shape, - * and relevant metadata from the {@code IngestDocument} are available to further processing + * and relevant metadata from the {@code IngestDocumentBridge} are available to further processing * in Logstash. * * @param ingestDocument the document to convert * @return an equivalent Logstash {@link Event} */ - public Event toLogstashEvent(final IngestDocument ingestDocument) { - // the IngestDocument we get back will have modified source directly. + public Event toLogstashEvent(final IngestDocumentBridge ingestDocument) { + // the IngestDocumentBridge we get back will have a modified source directly. Map eventMap = internalize(ingestDocument.getSource()); // ensure that Logstash-reserved fields are of the expected shape @@ -252,7 +251,7 @@ public Event toLogstashEvent(final IngestDocument ingestDocument) { final Event event = eventFactory.newEvent(eventMap); - // inject the relevant normalized metadata from the IngestDocument + // inject the relevant normalized metadata from the IngestDocumentBridge event.setField(LOGSTASH_METADATA_INGEST_DOCUMENT_METADATA, normalizeIngestDocumentMetadata(ingestDocument)); return event; } @@ -323,14 +322,14 @@ private List internalize(final @Nonnull Collection externalCollection } /** - * Normalizes the IngestDocument's various metadata into a map that can be added to an Event + * Normalizes the IngestDocumentBridge's various metadata into a map that can be added to an Event * * @param ingestDocument the source * @return a simple map containing non-{@code null} metadata */ - private Map normalizeIngestDocumentMetadata(final IngestDocument ingestDocument) { + private Map normalizeIngestDocumentMetadata(final IngestDocumentBridge ingestDocument) { final Map collectedMetadata = new HashMap<>(); - final Metadata metadata = ingestDocument.getMetadata(); + final MetadataBridge metadata = ingestDocument.getMetadata(); collectedMetadata.put("index", metadata.getIndex()); collectedMetadata.put("id", metadata.getId()); @@ -354,7 +353,7 @@ private Map normalizeIngestDocumentMetadata(final IngestDocument * @param eventMap the map to mutate * @param ingestDocument the document to fetch version information from */ - private void sanitizeEventRequiredVersion(final Map eventMap, final IngestDocument ingestDocument) { + private void sanitizeEventRequiredVersion(final Map eventMap, final IngestDocumentBridge ingestDocument) { final Object sourceVersion = eventMap.remove(org.logstash.Event.VERSION); String safeVersion = null; if (Objects.nonNull(sourceVersion)) { @@ -383,7 +382,7 @@ private void sanitizeEventRequiredVersion(final Map eventMap, fin *
    *
  1. the value of ECS field with same semantic meaning `event.created`
  2. *
  3. the value of `_ingest.timestamp`
  4. - *
  5. the IngestDocument's initialization timestamp
  6. + *
  7. the IngestDocumentBridge's initialization timestamp
  8. *
* When the source contains a {@code @timestamp} value that cannot be coerced, * it is re-routed to {@code _@timestamp}. @@ -392,7 +391,7 @@ private void sanitizeEventRequiredVersion(final Map eventMap, fin * @param ingestDocument the document to fetch timestamp information from */ // extract and set the timestamp, moving a pre-existing `@timestamp` field out of the way - private void sanitizeEventRequiredTimestamp(final Map eventMap, final IngestDocument ingestDocument) { + private void sanitizeEventRequiredTimestamp(final Map eventMap, final IngestDocumentBridge ingestDocument) { final Object sourceTimestamp = eventMap.remove(org.logstash.Event.TIMESTAMP); Timestamp safeTimestamp = safeTimestampFrom(sourceTimestamp); diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestPipeline.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestPipeline.java index c88eb7b5..0038e983 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestPipeline.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestPipeline.java @@ -6,28 +6,28 @@ */ package co.elastic.logstash.filters.elasticintegration; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.PipelineConfiguration; +import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge; +import org.elasticsearch.logstashbridge.ingest.PipelineBridge; +import org.elasticsearch.logstashbridge.ingest.PipelineConfigurationBridge; import java.util.Objects; import java.util.function.BiConsumer; /** - * An {@link IngestPipeline} is a Logstash-internal wrapper for an Elasticsearch Ingest {@link Pipeline}. + * An {@link IngestPipeline} is a Logstash-internal wrapper for an Elasticsearch Ingest {@link PipelineBridge}. */ public class IngestPipeline { - private final PipelineConfiguration pipelineConfiguration; - private final Pipeline innerPipeline; + private final PipelineConfigurationBridge pipelineConfiguration; + private final PipelineBridge innerPipeline; /** - * @see IngestPipelineFactory#create(PipelineConfiguration) + * @see IngestPipelineFactory#create(PipelineConfigurationBridge) * * @param pipelineConfiguration the source ingest pipeline configuration * @param innerPipeline an instantiated ingest pipeline */ - IngestPipeline(final PipelineConfiguration pipelineConfiguration, - final Pipeline innerPipeline) { + IngestPipeline(final PipelineConfigurationBridge pipelineConfiguration, + final PipelineBridge innerPipeline) { this.pipelineConfiguration = pipelineConfiguration; this.innerPipeline = innerPipeline; } @@ -37,13 +37,13 @@ public String getId() { } /** - * This method "quacks like" its counterpart in {@link Pipeline#execute(IngestDocument, BiConsumer)}. + * This method "quacks like" its counterpart in {@link PipelineBridge#execute(IngestDocumentBridge, BiConsumer)}. * - * @param ingestDocument the Elasticsearch {@link IngestDocument} to execute + * @param ingestDocument the Elasticsearch {@link IngestDocumentBridge} to execute * @param handler a {@link BiConsumer} that handles the result XOR an exception */ - public void execute(final IngestDocument ingestDocument, - final BiConsumer handler) { + public void execute(final IngestDocumentBridge ingestDocument, + final BiConsumer handler) { // IngestDocument#executePipeline includes cyclic reference handling ingestDocument.executePipeline(this.innerPipeline, handler); } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestPipelineFactory.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestPipelineFactory.java index 73a87c37..937a7913 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestPipelineFactory.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestPipelineFactory.java @@ -9,11 +9,10 @@ import co.elastic.logstash.filters.elasticintegration.ingest.PipelineProcessor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.PipelineConfiguration; -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.script.ScriptService; +import org.elasticsearch.logstashbridge.ingest.PipelineBridge; +import org.elasticsearch.logstashbridge.ingest.PipelineConfigurationBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.script.ScriptServiceBridge; import java.util.HashMap; import java.util.Map; @@ -21,33 +20,33 @@ /** * An {@link IngestPipelineFactory} is capable of creating {@link IngestPipeline}s - * from {@link PipelineConfiguration}s. + * from {@link PipelineConfigurationBridge}s. */ public class IngestPipelineFactory { - private final ScriptService scriptService; - private final Map processorFactories; + private final ScriptServiceBridge scriptService; + private final Map processorFactories; private static final Logger LOGGER = LogManager.getLogger(IngestPipelineFactory.class); - public IngestPipelineFactory(final ScriptService scriptService) { + public IngestPipelineFactory(final ScriptServiceBridge scriptService) { this(scriptService, Map.of()); } - private IngestPipelineFactory(final ScriptService scriptService, - final Map processorFactories) { + private IngestPipelineFactory(final ScriptServiceBridge scriptService, + final Map processorFactories) { this.scriptService = scriptService; this.processorFactories = Map.copyOf(processorFactories); } - public IngestPipelineFactory withProcessors(final Map processorFactories) { - final Map intermediate = new HashMap<>(this.processorFactories); + public IngestPipelineFactory withProcessors(final Map processorFactories) { + final Map intermediate = new HashMap<>(this.processorFactories); intermediate.putAll(processorFactories); return new IngestPipelineFactory(scriptService, intermediate); } - public Optional create(final PipelineConfiguration pipelineConfiguration) { + public Optional create(final PipelineConfigurationBridge pipelineConfiguration) { try { - final Pipeline pipeline = Pipeline.create(pipelineConfiguration.getId(), pipelineConfiguration.getConfig(false), processorFactories, scriptService, ProjectId.DEFAULT); + final PipelineBridge pipeline = PipelineBridge.create(pipelineConfiguration.getId(), pipelineConfiguration.getConfig(false), processorFactories, scriptService); final IngestPipeline ingestPipeline = new IngestPipeline(pipelineConfiguration, pipeline); LOGGER.debug(() -> String.format("successfully created ingest pipeline `%s` from pipeline configuration", pipelineConfiguration.getId())); return Optional.of(ingestPipeline); @@ -61,10 +60,10 @@ public Optional create(final PipelineConfiguration pipelineConfi * * @param ingestPipelineResolver the {@link IngestPipelineResolver} to resolve through. * @return a copy of this {@code IngestPipelineFactory} that has a {@link PipelineProcessor.Factory} that can - * resolve pipleines through the provided {@link IngestPipelineResolver}. + * resolve pipelines through the provided {@link IngestPipelineResolver}. */ public IngestPipelineFactory withIngestPipelineResolver(final IngestPipelineResolver ingestPipelineResolver) { - final Map modifiedProcessorFactories = new HashMap<>(this.processorFactories); + final Map modifiedProcessorFactories = new HashMap<>(this.processorFactories); modifiedProcessorFactories.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(ingestPipelineResolver, this.scriptService)); return new IngestPipelineFactory(scriptService, modifiedProcessorFactories); } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/IntegrationBatch.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/IntegrationBatch.java index e5f2439e..0f85c32c 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/IntegrationBatch.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/IntegrationBatch.java @@ -7,7 +7,7 @@ package co.elastic.logstash.filters.elasticintegration; import co.elastic.logstash.api.Event; -import org.elasticsearch.core.Releasable; +import org.elasticsearch.logstashbridge.core.ReleasableBridge; import java.util.ArrayList; import java.util.Collection; @@ -22,7 +22,7 @@ public IntegrationBatch(Collection events) { this.events = new ArrayList<>(events); } - void eachRequest(Supplier releasableSupplier, Consumer consumer) { + void eachRequest(Supplier releasableSupplier, Consumer consumer) { for (int i = 0; i < this.events.size(); i++) { consumer.accept(new Request(i, releasableSupplier.get())); } @@ -30,9 +30,9 @@ void eachRequest(Supplier releasableSupplier, Consumer parseNamedObjects(final String json) throws Exception { + public List parseNamedObjects(final String json) throws Exception { return Spec.MAPPER.readValue(json, Spec.class).get(); } - public PipelineConfiguration parseNamedObject(final String json) throws Exception { - final List configs = parseNamedObjects(json); + public PipelineConfigurationBridge parseNamedObject(final String json) throws Exception { + final List configs = parseNamedObjects(json); if (configs.isEmpty()) { throw new IllegalStateException("Expected a single pipeline definition. Got none"); } else if (configs.size() > 1) { @@ -47,8 +45,8 @@ public PipelineConfiguration parseNamedObject(final String json) throws Exceptio return configs.get(0); } - public PipelineConfiguration parseConfigOnly(final String pipelineId, final String jsonEncodedConfig) { - return new PipelineConfiguration(pipelineId, new BytesArray(jsonEncodedConfig), XContentType.JSON); + public PipelineConfigurationBridge parseConfigOnly(final String pipelineId, final String jsonEncodedConfig) { + return new PipelineConfigurationBridge(pipelineId, jsonEncodedConfig); } @@ -61,14 +59,14 @@ public void setConfig(final String pipelineId, final JsonNode jsonNode) throws J idToConfigMap.put(pipelineId, MAPPER.writeValueAsString(jsonNode)); } - public List get(){ + public List get(){ return idToConfigMap.entrySet() .stream() .map(e -> init(e.getKey(), e.getValue())).toList(); } - private static PipelineConfiguration init(final String id, final String json) { - return new PipelineConfiguration(id, new BytesArray(json), XContentType.JSON); + private static PipelineConfigurationBridge init(final String id, final String json) { + return new PipelineConfigurationBridge(id, json); } } } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/PipelineConfigurationResolver.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/PipelineConfigurationResolver.java index 0189594c..ddbf8da8 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/PipelineConfigurationResolver.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/PipelineConfigurationResolver.java @@ -7,17 +7,17 @@ package co.elastic.logstash.filters.elasticintegration; import co.elastic.logstash.filters.elasticintegration.resolver.UncacheableResolver; -import org.elasticsearch.ingest.PipelineConfiguration; +import org.elasticsearch.logstashbridge.ingest.PipelineConfigurationBridge; import java.util.Optional; import java.util.function.Consumer; /** * A {@link PipelineConfigurationResolver} is capable of resolving a pipeline name into - * an Elasticsearch Ingest {@link PipelineConfiguration}. + * an Elasticsearch Ingest {@link PipelineConfigurationBridge}. */ @FunctionalInterface -public interface PipelineConfigurationResolver extends UncacheableResolver { +public interface PipelineConfigurationResolver extends UncacheableResolver { @Override - Optional resolve(String pipelineName, Consumer exceptionHandler); + Optional resolve(String pipelineName, Consumer exceptionHandler); } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/ConstantIpDatabaseHolder.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/ConstantIpDatabaseHolder.java index 0365fd4a..68573093 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/ConstantIpDatabaseHolder.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/ConstantIpDatabaseHolder.java @@ -1,7 +1,5 @@ package co.elastic.logstash.filters.elasticintegration.geoip; -import org.elasticsearch.ingest.geoip.IpDatabase; - import java.io.Closeable; import java.io.IOException; import java.util.Objects; diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/GeoIpProcessorFactory.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/GeoIpProcessorFactory.java index 7aff8aa0..c815f966 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/GeoIpProcessorFactory.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/GeoIpProcessorFactory.java @@ -6,13 +6,13 @@ */ package co.elastic.logstash.filters.elasticintegration.geoip; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.ingest.geoip.GeoIpProcessor; +import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.geoip.GeoIpProcessorBridge; import java.util.Map; +import java.util.stream.Collectors; -public class GeoIpProcessorFactory implements Processor.Factory { +public class GeoIpProcessorFactory implements ProcessorBridge.Factory { private final IpDatabaseProvider ipDatabaseProvider; public GeoIpProcessorFactory(final IpDatabaseProvider ipDatabaseProvider) { @@ -20,12 +20,18 @@ public GeoIpProcessorFactory(final IpDatabaseProvider ipDatabaseProvider) { } @Override - public Processor create(Map processorFactories, + public ProcessorBridge create(Map processorFactories, String tag, String description, - Map config, - ProjectId projectId) throws Exception { - return new GeoIpProcessor.Factory("geoip", this.ipDatabaseProvider) - .create(processorFactories, tag, description, config, projectId); + Map config) throws Exception { + return ProcessorBridge.fromInternal( + new GeoIpProcessorBridge.Factory("geoip", this.ipDatabaseProvider.toInternal()).toInternal() + .create(processorFactories.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey,e -> e.getValue().toInternal())), + tag, + description, + config, + null)); } } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseAdapter.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseAdapter.java index bc1efdaf..bf2836fe 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseAdapter.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseAdapter.java @@ -8,30 +8,25 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.common.CheckedBiFunction; -import org.elasticsearch.ingest.geoip.IpDatabase; -import org.elasticsearch.ingest.geoip.shaded.com.maxmind.db.CHMCache; -import org.elasticsearch.ingest.geoip.shaded.com.maxmind.db.NoCache; -import org.elasticsearch.ingest.geoip.shaded.com.maxmind.db.NodeCache; -import org.elasticsearch.ingest.geoip.shaded.com.maxmind.db.Reader; +import org.elasticsearch.logstashbridge.geoip.IpDatabaseBridge; +import org.elasticsearch.logstashbridge.geoip.MaxMindDbBridge; import java.io.File; import java.io.IOException; import java.nio.file.Path; import java.util.Optional; -public class IpDatabaseAdapter implements IpDatabase { +public class IpDatabaseAdapter extends IpDatabaseBridge.AbstractExternal { private static final Logger LOGGER = LogManager.getLogger(IpDatabaseAdapter.class); - private final Reader databaseReader; + private final MaxMindDbBridge.Reader databaseReader; private final String databaseType; private volatile boolean isReaderClosed = false; - public IpDatabaseAdapter(final Reader databaseReader) { + public IpDatabaseAdapter(final MaxMindDbBridge.Reader databaseReader) { this.databaseReader = databaseReader; - this.databaseType = databaseReader.getMetadata().getDatabaseType(); + this.databaseType = databaseReader.getDatabaseType(); } @Override @@ -40,12 +35,8 @@ public String getDatabaseType() { } @Override - public RESPONSE getResponse(String ipAddress, CheckedBiFunction responseProvider) { - try { - return responseProvider.apply(this.databaseReader, ipAddress); - } catch (Exception e) { - throw ExceptionsHelper.convertToRuntime(e); - } + public MaxMindDbBridge.Reader getDatabaseReader() throws IOException { + return this.databaseReader; } @Override @@ -67,25 +58,25 @@ boolean isReaderClosed() { } public static IpDatabaseAdapter defaultForPath(final Path database) throws IOException { - return new Builder(database.toFile()).setCache(new CHMCache(10_000)).build(); + return new Builder(database.toFile()).setCache(MaxMindDbBridge.NodeCache.get(10_000)).build(); } public static class Builder { private File databasePath; - private NodeCache nodeCache; + private MaxMindDbBridge.NodeCache nodeCache; - public Builder(File databasePath) { + public Builder(final File databasePath) { this.databasePath = databasePath; } - public Builder setCache(final NodeCache nodeCache) { + public Builder setCache(final MaxMindDbBridge.NodeCache nodeCache) { this.nodeCache = nodeCache; return this; } public IpDatabaseAdapter build() throws IOException { - final NodeCache nodeCache = Optional.ofNullable(this.nodeCache).orElseGet(NoCache::getInstance); - final Reader databaseReader = new Reader(this.databasePath, nodeCache); + final MaxMindDbBridge.NodeCache nodeCache = Optional.ofNullable(this.nodeCache).orElseGet(MaxMindDbBridge.NodeCache::getInstance); + final MaxMindDbBridge.Reader databaseReader = new MaxMindDbBridge.Reader(this.databasePath, nodeCache); return new IpDatabaseAdapter(databaseReader); } } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseHolder.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseHolder.java index f5dd8270..49670eeb 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseHolder.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseHolder.java @@ -1,7 +1,5 @@ package co.elastic.logstash.filters.elasticintegration.geoip; -import org.elasticsearch.ingest.geoip.IpDatabase; - interface IpDatabaseHolder { boolean isValid(); diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseProvider.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseProvider.java index 95397c3a..a8f99d41 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseProvider.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseProvider.java @@ -8,9 +8,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.core.IOUtils; -import org.elasticsearch.ingest.geoip.IpDatabase; +import org.elasticsearch.logstashbridge.core.IOUtilsBridge; +import org.elasticsearch.logstashbridge.geoip.IpDatabaseBridge; +import org.elasticsearch.logstashbridge.geoip.IpDatabaseProviderBridge; import java.io.Closeable; import java.io.File; @@ -23,24 +23,24 @@ import java.util.Map; import java.util.Objects; -public class IpDatabaseProvider implements org.elasticsearch.ingest.geoip.IpDatabaseProvider, Closeable { +public class IpDatabaseProvider extends IpDatabaseProviderBridge.AbstractExternal implements Closeable { private static final Logger LOGGER = LogManager.getLogger(IpDatabaseProvider.class); private final Map databaseMap; - IpDatabaseProvider(Map databaseMap) { + public IpDatabaseProvider(Map databaseMap) { this.databaseMap = Map.copyOf(databaseMap); } @Override - public Boolean isValid(ProjectId projectId, String databaseIdentifierFileName) { + public Boolean isValid(String databaseIdentifierFileName) { final IpDatabaseHolder holder = getDatabaseHolder(databaseIdentifierFileName); return Objects.nonNull(holder) && holder.isValid(); } @Override - public IpDatabase getDatabase(ProjectId projectId, String databaseIdentifierFileName) { + public IpDatabaseBridge getDatabase(String databaseIdentifierFileName) { final IpDatabaseHolder holder = getDatabaseHolder(databaseIdentifierFileName); if (Objects.isNull(holder)) { return null; @@ -58,7 +58,7 @@ IpDatabaseHolder getDatabaseHolder(String databaseIdentifierFileName) { public void close() throws IOException { databaseMap.forEach((name, holder) -> { if (holder instanceof Closeable) { - IOUtils.closeWhileHandlingException((Closeable) holder); + IOUtilsBridge.closeWhileHandlingException((Closeable) holder); } }); } @@ -71,7 +71,7 @@ public synchronized Builder setDatabaseHolder(final String identifierFileName, f if (Objects.nonNull(previous)) { LOGGER.warn(String.format("de-registered previous entry for `%s`: %s", identifierFileName, previous.info())); if (previous instanceof Closeable) { - IOUtils.closeWhileHandlingException((Closeable) previous); + IOUtilsBridge.closeWhileHandlingException((Closeable) previous); } } return this; diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/ManagedIpDatabaseHolder.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/ManagedIpDatabaseHolder.java index 3d6db958..fa176d4c 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/ManagedIpDatabaseHolder.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/ManagedIpDatabaseHolder.java @@ -2,8 +2,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.core.IOUtils; -import org.elasticsearch.ingest.geoip.IpDatabase; +import org.elasticsearch.logstashbridge.core.IOUtilsBridge; import java.io.Closeable; import java.io.IOException; @@ -72,7 +71,7 @@ public void setDatabasePath(final String newDatabasePath) { }); if (previousDatabase != null) { - IOUtils.closeWhileHandlingException(previousDatabase::closeReader); + IOUtilsBridge.closeWhileHandlingException(previousDatabase::closeReader); } } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/ValidatableIpDatabase.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/ValidatableIpDatabase.java deleted file mode 100644 index 510b6d85..00000000 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/ValidatableIpDatabase.java +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. - * under one or more contributor license agreements. Licensed under the - * Elastic License 2.0; you may not use this file except in compliance - * with the Elastic License 2.0. - */ -package co.elastic.logstash.filters.elasticintegration.geoip; - -import org.elasticsearch.ingest.geoip.IpDatabase; - -public interface ValidatableIpDatabase extends IpDatabase { - boolean isValid(); -} diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/PipelineProcessor.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/PipelineProcessor.java index 16298d09..cc506bfc 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/PipelineProcessor.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/PipelineProcessor.java @@ -8,37 +8,34 @@ import co.elastic.logstash.filters.elasticintegration.IngestPipeline; import co.elastic.logstash.filters.elasticintegration.IngestPipelineResolver; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.ingest.AbstractProcessor; -import org.elasticsearch.ingest.ConfigurationUtils; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.script.TemplateScript; +import org.elasticsearch.logstashbridge.ingest.ConfigurationUtilsBridge; +import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.script.ScriptServiceBridge; +import org.elasticsearch.logstashbridge.script.TemplateScriptBridge; import java.util.Map; import java.util.function.BiConsumer; -public class PipelineProcessor extends AbstractProcessor { +public class PipelineProcessor extends ProcessorBridge.AbstractExternal { public static final String TYPE = "pipeline"; + private final String tag; + private final String description; private final String pipelineName; - private final TemplateScript.Factory pipelineTemplate; + private final TemplateScriptBridge.Factory pipelineTemplate; private final IngestPipelineResolver pipelineProvider; private final boolean ignoreMissingPipeline; - private static final Logger LOGGER = LogManager.getLogger(PipelineProcessor.class); - private PipelineProcessor(String tag, String description, - TemplateScript.Factory pipelineTemplate, + TemplateScriptBridge.Factory pipelineTemplate, String pipelineName, boolean ignoreMissingPipeline, IngestPipelineResolver pipelineProvider) { - super(tag, description); + this.tag = tag; + this.description = description; this.pipelineTemplate = pipelineTemplate; this.pipelineName = pipelineName; this.pipelineProvider = pipelineProvider; @@ -54,6 +51,16 @@ public String getType() { return TYPE; } + @Override + public String getTag() { + return this.tag; + } + + @Override + public String getDescription() { + return this.description; + } + @Override public boolean isAsync() { // the pipeline processor always presents itself as async @@ -61,7 +68,7 @@ public boolean isAsync() { } @Override - public void execute(IngestDocument ingestDocument, BiConsumer handler) { + public void execute(IngestDocumentBridge ingestDocument, BiConsumer handler) { String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate); IngestPipeline pipeline = pipelineProvider.resolve(pipelineName).orElse(null); if (pipeline != null) { @@ -78,25 +85,25 @@ public void execute(IngestDocument ingestDocument, BiConsumer registry, + public ProcessorBridge create(Map registry, String processorTag, String description, - Map config, - ProjectId projectId) throws Exception { - String pipeline = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "name"); - TemplateScript.Factory pipelineTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "name", pipeline, scriptService); - boolean ignoreMissingPipeline = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing_pipeline", false); + Map config) throws Exception { + String pipeline = ConfigurationUtilsBridge.readStringProperty(TYPE, processorTag, config, "name"); + TemplateScriptBridge.Factory pipelineTemplate = ConfigurationUtilsBridge.compileTemplate(TYPE, processorTag, "name", pipeline, scriptService); + boolean ignoreMissingPipeline = ConfigurationUtilsBridge.readBooleanProperty(TYPE, processorTag, config, "ignore_missing_pipeline", false); return new PipelineProcessor(processorTag, description, pipelineTemplate, pipeline, ignoreMissingPipeline, pipelineProvider); } } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/RedactPlugin.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/RedactPlugin.java deleted file mode 100644 index 7b5f3304..00000000 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/RedactPlugin.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. - * under one or more contributor license agreements. Licensed under the - * Elastic License 2.0; you may not use this file except in compliance - * with the Elastic License 2.0. - */ - -package co.elastic.logstash.filters.elasticintegration.ingest; - -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.plugins.IngestPlugin; -import org.elasticsearch.license.XPackLicenseState; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.xpack.redact.RedactProcessor; - -import java.util.Map; - -public class RedactPlugin extends Plugin implements IngestPlugin { - @Override - public Map getProcessors(Processor.Parameters parameters) { - // Provide a TRIAL license state to the redact processor - final XPackLicenseState trialLicenseState = new XPackLicenseState(parameters.relativeTimeSupplier); - - return Map.of(RedactProcessor.TYPE, new RedactProcessor.Factory(trialLicenseState, parameters.matcherWatchdog)); - } -} diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SafeSubsetIngestPlugin.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SafeSubsetIngestPlugin.java index 45f5a6ef..79e2fa67 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SafeSubsetIngestPlugin.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SafeSubsetIngestPlugin.java @@ -1,8 +1,8 @@ package co.elastic.logstash.filters.elasticintegration.ingest; -import org.elasticsearch.core.IOUtils; -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.plugins.IngestPlugin; +import org.elasticsearch.logstashbridge.core.IOUtilsBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.plugins.IngestPluginBridge; import javax.annotation.Nonnull; import java.io.Closeable; @@ -14,35 +14,35 @@ import java.util.Set; import java.util.function.Supplier; -public class SafeSubsetIngestPlugin implements IngestPlugin, Closeable { - private final IngestPlugin ingestPlugin; +public class SafeSubsetIngestPlugin implements IngestPluginBridge, Closeable { + private final IngestPluginBridge ingestPlugin; private final Set requiredProcessors; - public static Supplier safeSubset(final @Nonnull Supplier ingestPluginSupplier, - final @Nonnull Set requiredProcessors) { + public static Supplier safeSubset(final @Nonnull Supplier ingestPluginSupplier, + final @Nonnull Set requiredProcessors) { return () -> new SafeSubsetIngestPlugin(ingestPluginSupplier, requiredProcessors); } - private SafeSubsetIngestPlugin(final @Nonnull Supplier ingestPluginSupplier, + private SafeSubsetIngestPlugin(final @Nonnull Supplier ingestPluginSupplier, final @Nonnull Set requiredProcessors) { try { this.ingestPlugin = Objects.requireNonNull(ingestPluginSupplier.get(), "an IngestPlugin must be supplied!"); this.requiredProcessors = Set.copyOf(requiredProcessors); } catch (Exception e) { - IOUtils.closeWhileHandlingException(this); + IOUtilsBridge.closeWhileHandlingException(this); throw e; } } @Override - public Map getProcessors(Processor.Parameters parameters) { - final Map providedProcessors = this.ingestPlugin.getProcessors(parameters); + public Map getProcessors(ProcessorBridge.Parameters parameters) { + final Map providedProcessors = this.ingestPlugin.getProcessors(parameters); - final Map acceptedProcessors = new HashMap<>(); + final Map acceptedProcessors = new HashMap<>(); final Set missingProcessors = new HashSet<>(); for (String requiredProcessor : this.requiredProcessors) { - final Processor.Factory processor = providedProcessors.get(requiredProcessor); + final ProcessorBridge.Factory processor = providedProcessors.get(requiredProcessor); if (!Objects.nonNull(processor)) { missingProcessors.add(requiredProcessor); } else { diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SetSecurityUserProcessor.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SetSecurityUserProcessor.java index 4cf31a6b..b8928ce1 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SetSecurityUserProcessor.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SetSecurityUserProcessor.java @@ -6,26 +6,27 @@ */ package co.elastic.logstash.filters.elasticintegration.ingest; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.ingest.AbstractProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.Processor; +import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; import java.util.Map; +import java.util.function.BiConsumer; -public class SetSecurityUserProcessor extends AbstractProcessor { +public class SetSecurityUserProcessor extends ProcessorBridge.AbstractExternal { public static final String TYPE = "set_security_user"; + private final String tag; + private final String description; - private SetSecurityUserProcessor(String tag, String description) { - super(tag, description); + private SetSecurityUserProcessor(final String tag, final String description) { + this.tag = tag; + this.description = description; } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + public void execute(IngestDocumentBridge ingestDocumentBridge, BiConsumer biConsumer) { // within Logstash, the set_security_user processor is a no-op - return ingestDocument; } @Override @@ -33,11 +34,28 @@ public String getType() { return TYPE; } - public static final class Factory implements Processor.Factory { + @Override + public String getTag() { + return this.tag; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public boolean isAsync() { + return false; + } + + public static final class Factory implements ProcessorBridge.Factory { @Override - public SetSecurityUserProcessor create(Map registry, String processorTag, - String description, Map config, ProjectId projectId) { + public ProcessorBridge create(Map registry, + String processorTag, + String description, + Map config) { String[] supportedConfigs = {"field", "properties"}; for (String cfg : supportedConfigs) { config.remove(cfg); diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SingleProcessorIngestPlugin.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SingleProcessorIngestPlugin.java index efe72929..c6249893 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SingleProcessorIngestPlugin.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SingleProcessorIngestPlugin.java @@ -6,37 +6,37 @@ */ package co.elastic.logstash.filters.elasticintegration.ingest; -import org.elasticsearch.core.IOUtils; -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.plugins.IngestPlugin; +import org.elasticsearch.logstashbridge.core.IOUtilsBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.plugins.IngestPluginBridge; import java.io.Closeable; import java.io.IOException; import java.util.Map; import java.util.function.Supplier; -public class SingleProcessorIngestPlugin implements IngestPlugin, Closeable { +public class SingleProcessorIngestPlugin implements IngestPluginBridge, Closeable { private final String type; - private final Processor.Factory processorFactory; + private final ProcessorBridge.Factory processorFactory; - public static Supplier of(final String type, final Supplier factorySupplier) { + public static Supplier of(final String type, final Supplier factorySupplier) { return () -> new SingleProcessorIngestPlugin(type, factorySupplier.get()); } - public SingleProcessorIngestPlugin(String type, Processor.Factory processorFactory) { + public SingleProcessorIngestPlugin(String type, ProcessorBridge.Factory processorFactory) { this.type = type; this.processorFactory = processorFactory; } @Override - public Map getProcessors(Processor.Parameters parameters) { + public Map getProcessors(ProcessorBridge.Parameters parameters) { return Map.of(this.type, this.processorFactory); } @Override public void close() throws IOException { if (this.processorFactory instanceof Closeable) { - IOUtils.closeWhileHandlingException((Closeable) this.processorFactory); + IOUtilsBridge.closeWhileHandlingException((Closeable) this.processorFactory); } } } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/util/PluginProjectResolver.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/util/PluginProjectResolver.java deleted file mode 100644 index 352ba66d..00000000 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/util/PluginProjectResolver.java +++ /dev/null @@ -1,21 +0,0 @@ -package co.elastic.logstash.filters.elasticintegration.util; - -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.cluster.project.ProjectResolver; -import org.elasticsearch.core.CheckedRunnable; - -public class PluginProjectResolver implements ProjectResolver { - @Override - public ProjectId getProjectId() { - return null; - } - - @Override - public void executeOnProject(ProjectId projectId, CheckedRunnable checkedRunnable) throws E { - if (projectId.equals(ProjectId.DEFAULT)) { - checkedRunnable.run(); - } else { - throw new IllegalArgumentException("Cannot execute on a project other than [" + ProjectId.DEFAULT + "]"); - } - } -} diff --git a/src/test/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchPipelineConfigurationResolverTest.java b/src/test/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchPipelineConfigurationResolverTest.java index 8434c14c..7bf3e667 100644 --- a/src/test/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchPipelineConfigurationResolverTest.java +++ b/src/test/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchPipelineConfigurationResolverTest.java @@ -8,7 +8,7 @@ import com.github.tomakehurst.wiremock.junit5.WireMockExtension; import org.elasticsearch.client.RestClient; -import org.elasticsearch.ingest.PipelineConfiguration; +import org.elasticsearch.logstashbridge.ingest.PipelineConfigurationBridge; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -54,7 +54,7 @@ void testLoadConfigurationExists() throws Exception { .willReturn(okJson(getMockResponseBody("get-ingest-pipeline-(my-pipeline-id).json")))); final AtomicReference lastException = new AtomicReference<>(); - final Optional resolvedPipelineConfiguration = resolver.resolve("my-pipeline-id", lastException::set); + final Optional resolvedPipelineConfiguration = resolver.resolve("my-pipeline-id", lastException::set); assertThat(lastException.get(), is(nullValue())); assertThat(resolvedPipelineConfiguration, isPresent()); resolvedPipelineConfiguration.ifPresent(pipelineConfiguration -> { @@ -72,7 +72,7 @@ void testLoadConfigurationPipelineWithSpecialCharacters() throws Exception { .willReturn(okJson(getMockResponseBody("get-ingest-pipeline-(special char pipeline).json")))); final AtomicReference lastException = new AtomicReference<>(); - final Optional resolvedPipelineConfiguration = resolver.resolve("special char pipeline", lastException::set); + final Optional resolvedPipelineConfiguration = resolver.resolve("special char pipeline", lastException::set); assertThat(lastException.get(), is(nullValue())); assertThat(resolvedPipelineConfiguration, isPresent()); resolvedPipelineConfiguration.ifPresent(pipelineConfiguration -> { @@ -90,7 +90,7 @@ void testLoadConfigurationNotFound() throws Exception { .willReturn(aResponse().withStatus(404))); final AtomicReference lastException = new AtomicReference<>(); - final Optional resolvedPipelineConfiguration = resolver.resolve("where-are-you", lastException::set); + final Optional resolvedPipelineConfiguration = resolver.resolve("where-are-you", lastException::set); assertThat(lastException.get(), is(nullValue())); // not found is not an exception assertThat(resolvedPipelineConfiguration, isEmpty()); }); @@ -103,7 +103,7 @@ void testLoadConfigurationNotAuthorized() throws Exception { .willReturn(aResponse().withStatus(403))); final AtomicReference lastException = new AtomicReference<>(); - final Optional resolvedPipelineConfiguration = resolver.resolve("who-am-i", lastException::set); + final Optional resolvedPipelineConfiguration = resolver.resolve("who-am-i", lastException::set); assertThat(lastException.get(), both(is(instanceOf(org.elasticsearch.client.ResponseException.class))).and( hasToString(containsString("403 Forbidden"))) ); @@ -128,12 +128,4 @@ private void withPipelineConfigurationResolver(final Consumer void assertThat(T actual, Function transform, Matcher matcher) { -// org.hamcrest.MatcherAssert.assertThat(transform.apply(actual), matcher); -// } -// -// static void assertThat(T actual, Matcher matcher) { -// assertThat(actual, Function.identity(), matcher); -// } } \ No newline at end of file diff --git a/src/test/java/co/elastic/logstash/filters/elasticintegration/IngestDuplexMarshallerTest.java b/src/test/java/co/elastic/logstash/filters/elasticintegration/IngestDuplexMarshallerTest.java index 00a0a454..1579bb44 100644 --- a/src/test/java/co/elastic/logstash/filters/elasticintegration/IngestDuplexMarshallerTest.java +++ b/src/test/java/co/elastic/logstash/filters/elasticintegration/IngestDuplexMarshallerTest.java @@ -8,7 +8,7 @@ import co.elastic.logstash.api.Event; import co.elastic.logstash.filters.elasticintegration.util.TestCapturingLogger; -import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge; import org.junit.jupiter.api.Test; import org.logstash.Timestamp; import org.logstash.plugins.BasicEventFactory; @@ -76,7 +76,7 @@ void ingestDocToEventModifiedAtTimestampZonedDateTimeValue() { "@version", "3", "message", "hello, world" )); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); final ZonedDateTime updatedTimestamp = ZonedDateTime.parse("2023-03-12T01:17:38.135792468Z"); intermediate.setFieldValue(org.logstash.Event.TIMESTAMP, updatedTimestamp); @@ -93,7 +93,7 @@ void ingestDocToEventModifiedAtTimestampStringValue() { "@timestamp", "2023-01-17T23:19:04.765182352Z", "@version", "3", "message", "hello, world")); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); final ZonedDateTime updatedTimestamp = ZonedDateTime.parse("2023-03-12T01:17:38.135792468Z"); intermediate.setFieldValue(org.logstash.Event.TIMESTAMP, updatedTimestamp.toString()); @@ -110,7 +110,7 @@ void ingestDocToEventModifiedAtTimestampInvalidStringValue() { "@timestamp", "2023-01-17T23:19:04.765182352Z", "@version", "3", "message", "hello, world")); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); intermediate.setFieldValue(org.logstash.Event.TIMESTAMP, "high noon"); @@ -130,7 +130,7 @@ void ingestDocToEventRemovedAtTimestamp() { "@timestamp", "2023-01-17T23:19:04.765182352Z", "@version", "3", "message", "hello, world")); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); intermediate.removeField(org.logstash.Event.TIMESTAMP); @@ -148,7 +148,7 @@ void ingestDocToEventRemovedAtTimestampWithEventCreatedAt() { "@timestamp", "2023-01-17T23:19:04.765182352Z", "@version", "3", "message", "hello, world")); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); intermediate.removeField(org.logstash.Event.TIMESTAMP); @@ -172,7 +172,7 @@ void ingestDocToEventModifiedMetadataVersion() { "@timestamp", "2023-01-17T23:19:04.765182352Z", "@version", "3", "message", "hello, world")); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); final long updatedMetadataVersion = 17L; intermediate.getMetadata().setVersion(updatedMetadataVersion); @@ -192,7 +192,7 @@ void ingestDocToEventAdditionalMetadata() { "@timestamp", "2023-01-17T23:19:04.765182352Z", "@version", "3", "message", "hello, world")); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); intermediate.getMetadata().setVersion(8191L); intermediate.getMetadata().setVersionType("external_gte"); // constrained @@ -217,7 +217,7 @@ void ingestDocToEventIncludingReservedAtTimestampField() { "@version", "3", "message", "hello, world")); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); // intentionally String to pass-through Valuifier#convert and make validation easier final String atTimestampInSource = "2023-03-12T01:17:38.135792468Z"; @@ -237,7 +237,7 @@ void ingestDocToEventIncludingReservedAtVersionField() { "@timestamp", "2023-01-17T23:19:04.765182352Z", "@version", "3", "message", "hello, world")); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); final String atVersionInSource = "bananas"; intermediate.setFieldValue(org.logstash.Event.VERSION, atVersionInSource); @@ -256,7 +256,7 @@ void ingestDocToEventIncludingReservedAtMetadataFieldWithAcceptableShape() { "@timestamp", "2023-01-17T23:19:04.765182352Z", "@version", "3", "message", "hello, world")); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); final Map atMetadataInSource = Map.of("this", "that","flip", "flop"); intermediate.setFieldValue(org.logstash.Event.METADATA, atMetadataInSource); @@ -276,7 +276,7 @@ void ingestDocToEventIncludingReservedAtMetadataFieldWithInvalidShape() { "@timestamp", "2023-01-17T23:19:04.765182352Z", "@version", "3", "message", "hello, world")); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); final List atMetadataInSource = List.of("wrong", "incorrect"); intermediate.setFieldValue(org.logstash.Event.METADATA, atMetadataInSource); @@ -295,7 +295,7 @@ void ingestDocToEventIncludingReservedAtMetadataFieldWithInvalidShape() { @Test void ingestDocToEventIncludingReservedTagsFieldWithInvalidShape() { final Event input = BasicEventFactory.INSTANCE.newEvent(Map.of("message", "hello, world")); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); final Map atTagsInSource = Map.of("this", "that"); intermediate.setFieldValue(org.logstash.Event.TAGS, atTagsInSource); @@ -311,7 +311,7 @@ void ingestDocToEventIncludingReservedTagsFieldWithInvalidShape() { @Test void ingestDocToEventIncludingReservedTagsFieldWithInvalidCoercibleShape() { final Event input = BasicEventFactory.INSTANCE.newEvent(Map.of("message", "hello, world")); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); final Set atTagsInSource = Set.of("this", "that"); intermediate.setFieldValue(org.logstash.Event.TAGS, atTagsInSource); @@ -328,7 +328,7 @@ void ingestDocToEventIncludingReservedTagsFieldWithInvalidCoercibleShape() { @Test void ingestDocToEventIncludingReservedTagsFieldWithStringShape() { final Event input = BasicEventFactory.INSTANCE.newEvent(Map.of("message", "hello, world")); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); final String atTagsInSource = "this"; intermediate.setFieldValue(org.logstash.Event.TAGS, atTagsInSource); @@ -344,7 +344,7 @@ void ingestDocToEventIncludingReservedTagsFieldWithStringShape() { @Test void ingestDocToEventIncludingReservedTagsFieldWithListOfStringShape() { final Event input = BasicEventFactory.INSTANCE.newEvent(Map.of("message", "hello, world")); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); final List atTagsInSource = List.of("this", "that"); intermediate.setFieldValue(org.logstash.Event.TAGS, atTagsInSource); @@ -367,7 +367,7 @@ void ingestDocToEventIncludingReservedTagsFieldWithListOfStringShape() { @Test void ingestDocToEventIncludingArrayType() { final Event input = BasicEventFactory.INSTANCE.newEvent(Map.of("message", "hello, world")); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); final String[] arrayValueInSource = new String[]{"this", "that"}; intermediate.setFieldValue("deeply.nested", arrayValueInSource); @@ -381,7 +381,7 @@ void ingestDocToEventIncludingReservedTagsFieldWithListOfStringShape() { @Test void eventToIngestDocFieldWithNestedZonedDateTimeValue() { final Event input = BasicEventFactory.INSTANCE.newEvent(Map.of("message", "hello, world")); - final IngestDocument intermediate = idm.toIngestDocument(input); + final IngestDocumentBridge intermediate = idm.toIngestDocument(input); final String iso8601value = "2023-05-03T03:17:59.182736455Z"; final ZonedDateTime zonedDateTime = ZonedDateTime.parse(iso8601value); @@ -407,7 +407,7 @@ void eventToIngestDoc() { "flip", "flop" ))); - final IngestDocument ingestDocument = idm.toIngestDocument(input); + final IngestDocumentBridge ingestDocument = idm.toIngestDocument(input); final String ingestTimestamp = getIngestDocumentTimestamp(ingestDocument); assertThat(ingestTimestamp, is(notNullValue())); @@ -428,7 +428,7 @@ void eventToIngestDocMissingRequiredVersion() { ))); input.remove(org.logstash.Event.VERSION); - final IngestDocument ingestDocument = idm.toIngestDocument(input); + final IngestDocumentBridge ingestDocument = idm.toIngestDocument(input); // sensible default assertThat(ingestDocument.getMetadata().getVersion(), is(equalTo(1L))); @@ -446,7 +446,7 @@ void eventToIngestDocMissingRequiredTimestamp() { ))); input.remove(org.logstash.Event.TIMESTAMP); - final IngestDocument ingestDocument = idm.toIngestDocument(input); + final IngestDocumentBridge ingestDocument = idm.toIngestDocument(input); final String ingestTimestamp = getIngestDocumentTimestamp(ingestDocument); assertThat(ingestTimestamp, where(Instant::parse, is(recentCurrentTimestamp()))); @@ -470,11 +470,11 @@ Instant getEventTimestamp(final Event event) { return ((org.logstash.Timestamp) event.getField(org.logstash.Event.TIMESTAMP)).toInstant(); } - String getIngestDocumentTimestamp(final IngestDocument ingestDocument) { - return ingestDocument.getFieldValue(IngestDocument.INGEST_KEY + "." + INGEST_METADATA_TIMESTAMP_FIELD, String.class); + String getIngestDocumentTimestamp(final IngestDocumentBridge ingestDocument) { + return (String) ingestDocument.getIngestMetadata().get(INGEST_METADATA_TIMESTAMP_FIELD); } - void validateIngestDocument(final IngestDocument ingestDocument, Consumer ingestDocumentConsumer) { + void validateIngestDocument(final IngestDocumentBridge ingestDocument, Consumer ingestDocumentConsumer) { ingestDocumentConsumer.accept(ingestDocument); } diff --git a/src/test/java/co/elastic/logstash/filters/elasticintegration/LocalDirectoryPipelineConfigurationResolver.java b/src/test/java/co/elastic/logstash/filters/elasticintegration/LocalDirectoryPipelineConfigurationResolver.java index 2a848901..6ea77652 100644 --- a/src/test/java/co/elastic/logstash/filters/elasticintegration/LocalDirectoryPipelineConfigurationResolver.java +++ b/src/test/java/co/elastic/logstash/filters/elasticintegration/LocalDirectoryPipelineConfigurationResolver.java @@ -9,17 +9,16 @@ import co.elastic.logstash.filters.elasticintegration.resolver.AbstractSimpleResolver; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ingest.PipelineConfiguration; +import org.elasticsearch.logstashbridge.ingest.PipelineConfigurationBridge; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Optional; -import java.util.function.Consumer; public class LocalDirectoryPipelineConfigurationResolver - extends AbstractSimpleResolver + extends AbstractSimpleResolver implements PipelineConfigurationResolver { private static final Logger LOGGER = LogManager.getLogger(LocalDirectoryPipelineConfigurationResolver.class); @@ -32,7 +31,7 @@ public LocalDirectoryPipelineConfigurationResolver(final Path localDirectory) { } @Override - public Optional resolveSafely(final String pipelineName) throws Exception { + public Optional resolveSafely(final String pipelineName) throws Exception { final Path pipelinePath = localDirectory.resolve(sanitizePath(pipelineName) + ".json"); LOGGER.trace(() -> String.format("RESOLVING `%s` -> `%s`", pipelineName, pipelinePath)); final File pipelineFile = pipelinePath.toFile(); diff --git a/src/test/java/co/elastic/logstash/filters/elasticintegration/LocalDirectoryPipelineConfigurationResolverTest.java b/src/test/java/co/elastic/logstash/filters/elasticintegration/LocalDirectoryPipelineConfigurationResolverTest.java index f3216091..92bd823f 100644 --- a/src/test/java/co/elastic/logstash/filters/elasticintegration/LocalDirectoryPipelineConfigurationResolverTest.java +++ b/src/test/java/co/elastic/logstash/filters/elasticintegration/LocalDirectoryPipelineConfigurationResolverTest.java @@ -6,7 +6,7 @@ */ package co.elastic.logstash.filters.elasticintegration; -import org.elasticsearch.ingest.PipelineConfiguration; +import org.elasticsearch.logstashbridge.ingest.PipelineConfigurationBridge; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -28,7 +28,7 @@ void resolvePresent() { final PipelineConfigurationResolver pcr = new LocalDirectoryPipelineConfigurationResolver(pipelinesResourcePath); - final Optional resolved = pcr.resolve("simple-mutate"); + final Optional resolved = pcr.resolve("simple-mutate"); assertThat(resolved.isPresent(), is(true)); assertThat(resolved.get().getId(), is(equalTo("simple-mutate"))); } @@ -39,7 +39,7 @@ void resolveMissing() { final PipelineConfigurationResolver pcr = new LocalDirectoryPipelineConfigurationResolver(pipelinesResourcePath); - final Optional resolved = pcr.resolve("not-there"); + final Optional resolved = pcr.resolve("not-there"); assertThat(resolved.isPresent(), is(false)); } @@ -50,12 +50,12 @@ void resolvePresentNonReadable() { final PipelineConfigurationResolver pcr = new LocalDirectoryPipelineConfigurationResolver(pipelinesResourcePath); - final Optional resolved = pcr.resolve("simple-mutate"); + final Optional resolved = pcr.resolve("simple-mutate"); assertThat(resolved, is(notNullValue())); assertThat(resolved.isPresent(), is(false)); final AtomicReference lastExceptionObserved = new AtomicReference<>(); - final Optional resolvedObserved = pcr.resolve("simple-mutate", lastExceptionObserved::set); + final Optional resolvedObserved = pcr.resolve("simple-mutate", lastExceptionObserved::set); assertAll("observed resolve", ()-> { assertThat(resolvedObserved.isPresent(), is(false)); assertThat(lastExceptionObserved.get(), is(notNullValue())); @@ -67,7 +67,7 @@ void resolvePresentNonReadable() { // recovery ensureSetFileReadable(pipelinesResourcePath.resolve("simple-mutate.json").toFile(), true); - final Optional resolvedRecovered = pcr.resolve("simple-mutate"); + final Optional resolvedRecovered = pcr.resolve("simple-mutate"); assertAll("recovery", () -> { assertThat(resolvedRecovered, is(notNullValue())); assertThat(resolvedRecovered.isPresent(), is(true)); @@ -82,12 +82,12 @@ void resolvePresentWritable() { final PipelineConfigurationResolver pcr = new LocalDirectoryPipelineConfigurationResolver(pipelinesResourcePath); - final Optional resolved = pcr.resolve("not-there"); + final Optional resolved = pcr.resolve("not-there"); assertThat(resolved, is(notNullValue())); assertThat(resolved.isPresent(), is(false)); final AtomicReference lastExceptionObserved = new AtomicReference<>(); - final Optional resolvedObserved = pcr.resolve("simple-mutate", lastExceptionObserved::set); + final Optional resolvedObserved = pcr.resolve("simple-mutate", lastExceptionObserved::set); assertAll("observed resolve", ()-> { assertThat(resolvedObserved.isPresent(), is(false)); assertThat(lastExceptionObserved.get(), is(notNullValue())); @@ -99,7 +99,7 @@ void resolvePresentWritable() { // recovery ensureSetFileWritable(pipelinesResourcePath.resolve("simple-mutate.json").toFile(), false); - final Optional resolvedRecovered = pcr.resolve("simple-mutate"); + final Optional resolvedRecovered = pcr.resolve("simple-mutate"); assertAll("recovery", () -> { assertThat(resolvedRecovered, is(notNullValue())); assertThat(resolvedRecovered.isPresent(), is(true)); diff --git a/src/test/java/co/elastic/logstash/filters/elasticintegration/PipelineConfigurationFactoryTest.java b/src/test/java/co/elastic/logstash/filters/elasticintegration/PipelineConfigurationFactoryTest.java index d60ddc07..7874fd6b 100644 --- a/src/test/java/co/elastic/logstash/filters/elasticintegration/PipelineConfigurationFactoryTest.java +++ b/src/test/java/co/elastic/logstash/filters/elasticintegration/PipelineConfigurationFactoryTest.java @@ -8,7 +8,7 @@ import co.elastic.logstash.filters.elasticintegration.util.LocalPipelinesUtil; import com.fasterxml.jackson.databind.ObjectMapper; -import org.elasticsearch.ingest.PipelineConfiguration; +import org.elasticsearch.logstashbridge.ingest.PipelineConfigurationBridge; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -41,7 +41,7 @@ class PipelineConfigurationFactoryTest { @Test public void testParseNamedObjectWithOnePipeline() throws Exception { final String json = elasticsearchApiFormattedJson("one-pipeline"); - final PipelineConfiguration loaded = PipelineConfigurationFactory.getInstance().parseNamedObject(json); + final PipelineConfigurationBridge loaded = PipelineConfigurationFactory.getInstance().parseNamedObject(json); assertThat(loaded, is(notNullValue())); assertThat(loaded.getId(), is(equalTo("pipeline-id-one"))); assertThat(loaded.getConfig(), is(equalTo(EXPECTED_PIPELINE_ID_ONE_CONFIG_MAP))); @@ -68,7 +68,7 @@ public void testParseNamedObjectsWithZeroPipelines() throws Exception { @Test public void testParseNamedObjectsWithOnePipeline() throws Exception { final String json = elasticsearchApiFormattedJson("one-pipeline"); - final List loaded = PipelineConfigurationFactory.getInstance().parseNamedObjects(json); + final List loaded = PipelineConfigurationFactory.getInstance().parseNamedObjects(json); assertThat(loaded, is(notNullValue())); assertThat(loaded, hasSize(1)); @@ -80,7 +80,7 @@ public void testParseNamedObjectsWithOnePipeline() throws Exception { @Test public void testParseNamedObjectWithTwoPipelines() throws Exception { final String json = elasticsearchApiFormattedJson("two-pipelines"); - final List loaded = PipelineConfigurationFactory.getInstance().parseNamedObjects(json); + final List loaded = PipelineConfigurationFactory.getInstance().parseNamedObjects(json); assertThat(loaded, is(notNullValue())); assertThat(loaded, hasSize(2)); @@ -97,7 +97,7 @@ public void testParseNamedObjectWithTwoPipelines() throws Exception { @Test public void testParseNamedObjectWithZeroPipelines() throws Exception { final String json = "{}"; - final List loaded = PipelineConfigurationFactory.getInstance().parseNamedObjects(json); + final List loaded = PipelineConfigurationFactory.getInstance().parseNamedObjects(json); assertThat(loaded, is(notNullValue())); assertThat(loaded, hasSize(0)); } @@ -106,7 +106,7 @@ public void testParseNamedObjectWithZeroPipelines() throws Exception { public void testParseConfigOnly() throws Exception { final ObjectMapper objectMapper = new ObjectMapper(); final String json = objectMapper.writeValueAsString(EXPECTED_PIPELINE_ID_ONE_CONFIG_MAP); - final PipelineConfiguration loaded = PipelineConfigurationFactory.getInstance().parseConfigOnly("bananas" , json); + final PipelineConfigurationBridge loaded = PipelineConfigurationFactory.getInstance().parseConfigOnly("bananas" , json); assertThat(loaded, is(notNullValue())); assertThat(loaded.getId(), is(equalTo("bananas"))); assertThat(loaded.getConfig(), is(equalTo(EXPECTED_PIPELINE_ID_ONE_CONFIG_MAP))); diff --git a/src/test/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseProviderTest.java b/src/test/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseProviderTest.java index 0238595f..71ddfe14 100644 --- a/src/test/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseProviderTest.java +++ b/src/test/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseProviderTest.java @@ -6,13 +6,11 @@ */ package co.elastic.logstash.filters.elasticintegration.geoip; - import co.elastic.logstash.filters.elasticintegration.util.IngestDocumentUtil; import co.elastic.logstash.filters.elasticintegration.util.ResourcesUtil; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.ingest.geoip.GeoIpProcessor; +import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.geoip.GeoIpProcessorBridge; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -39,14 +37,14 @@ void loadTestVendoredDatabases() throws Exception { withVendoredGeoIpDatabaseProvider(geoIpDatabaseProvider -> { assertAll("Loaded databases all report valid", - () -> assertThat(geoIpDatabaseProvider.isValid(ProjectId.DEFAULT, "GeoLite2-ASN.mmdb"), is(true)), - () -> assertThat(geoIpDatabaseProvider.isValid(ProjectId.DEFAULT, "GeoLite2-City.mmdb"), is(true)), - () -> assertThat(geoIpDatabaseProvider.isValid(ProjectId.DEFAULT, "GeoLite2-Country.mmdb"), is(true))); + () -> assertThat(geoIpDatabaseProvider.isValid("GeoLite2-ASN.mmdb"), is(true)), + () -> assertThat(geoIpDatabaseProvider.isValid("GeoLite2-City.mmdb"), is(true)), + () -> assertThat(geoIpDatabaseProvider.isValid("GeoLite2-Country.mmdb"), is(true))); assertAll("Non-loaded databases all report invalid", - () -> assertThat(geoIpDatabaseProvider.isValid(ProjectId.DEFAULT, "GeoLite2-Global.mmdb"), is(false)), - () -> assertThat(geoIpDatabaseProvider.isValid(ProjectId.DEFAULT, "Bananas.mmdb"), is(false)), - () -> assertThat(geoIpDatabaseProvider.isValid(ProjectId.DEFAULT, "Intergalactic.mmdb"), is(false))); + () -> assertThat(geoIpDatabaseProvider.isValid("GeoLite2-Global.mmdb"), is(false)), + () -> assertThat(geoIpDatabaseProvider.isValid("Bananas.mmdb"), is(false)), + () -> assertThat(geoIpDatabaseProvider.isValid("Intergalactic.mmdb"), is(false))); }); } @@ -54,9 +52,9 @@ void loadTestVendoredDatabases() throws Exception { void basicASNLookup() throws Exception { withVendoredGeoIpDatabaseProvider(geoIpDatabaseProvider -> { withGeoipProcessor(geoIpDatabaseProvider, makeConfig("database_file", "GeoLite2-ASN.mmdb"), (processor) -> { - final IngestDocument input = IngestDocumentUtil.createIngestDocument(Map.of("input", EXAMPLE_DOT_COM_INET_ADDRESS)); + final IngestDocumentBridge input = IngestDocumentUtil.createIngestDocument(Map.of("input", EXAMPLE_DOT_COM_INET_ADDRESS)); - final IngestDocument result = processIngestDocumentSynchronously(input, processor); + final IngestDocumentBridge result = processIngestDocumentSynchronously(input, processor); assertAll( () -> assertThat(result.getFieldValue("geoip.ip", String.class), is(equalTo(EXAMPLE_DOT_COM_INET_ADDRESS))), @@ -74,9 +72,9 @@ void basicCityDBLookupCityDetails() throws Exception { final Map configOverrides = Map.of("database_file", "GeoLite2-City.mmdb", "properties", List.of("location", "timezone")); withGeoipProcessor(geoIpDatabaseProvider, makeConfig(configOverrides), (processor) -> { - final IngestDocument input = IngestDocumentUtil.createIngestDocument(Map.of("input", EXAMPLE_DOT_COM_INET_ADDRESS)); + final IngestDocumentBridge input = IngestDocumentUtil.createIngestDocument(Map.of("input", EXAMPLE_DOT_COM_INET_ADDRESS)); - final IngestDocument result = processIngestDocumentSynchronously(input, processor); + final IngestDocumentBridge result = processIngestDocumentSynchronously(input, processor); assertAll( () -> assertThat(result.getFieldValue("geoip.location.lat", Double.class), is(closeTo(42.1596, 0.1))), @@ -91,9 +89,9 @@ void basicCityDBLookupCityDetails() throws Exception { void basicCityDBLookupCountryDetails() throws Exception { withVendoredGeoIpDatabaseProvider(ipDatabaseProvider -> { withGeoipProcessor(ipDatabaseProvider, makeConfig("database_file", "GeoLite2-City.mmdb"), (processor) -> { - final IngestDocument input = IngestDocumentUtil.createIngestDocument(Map.of("input", EXAMPLE_DOT_COM_INET_ADDRESS)); + final IngestDocumentBridge input = IngestDocumentUtil.createIngestDocument(Map.of("input", EXAMPLE_DOT_COM_INET_ADDRESS)); - final IngestDocument result = processIngestDocumentSynchronously(input, processor); + final IngestDocumentBridge result = processIngestDocumentSynchronously(input, processor); assertThat(result.getFieldValue("geoip.country_iso_code", String.class), is(equalTo("US"))); }); @@ -104,9 +102,9 @@ void basicCityDBLookupCountryDetails() throws Exception { void basicCountryDBLookupCountryDetails() throws Exception { withVendoredGeoIpDatabaseProvider(ipDatabaseProvider -> { withGeoipProcessor(ipDatabaseProvider, makeConfig("database_file", "GeoLite2-Country.mmdb"), (processor) -> { - final IngestDocument input = IngestDocumentUtil.createIngestDocument(Map.of("input", EXAMPLE_DOT_COM_INET_ADDRESS)); + final IngestDocumentBridge input = IngestDocumentUtil.createIngestDocument(Map.of("input", EXAMPLE_DOT_COM_INET_ADDRESS)); - final IngestDocument result = processIngestDocumentSynchronously(input, processor); + final IngestDocumentBridge result = processIngestDocumentSynchronously(input, processor); assertThat(result.getFieldValue("geoip.country_iso_code", String.class), is(equalTo("US"))); }); @@ -132,16 +130,21 @@ static void withVendoredGeoIpDatabaseProvider(final ExceptionalConsumer config, ExceptionalConsumer geoIpProcessorConsumer) throws Exception { - Processor processor = new GeoIpProcessor.Factory("geoip", geoIpDatabaseProvider).create(Map.of(), null, null, config, null); - geoIpProcessorConsumer.accept(processor); + static void withGeoipProcessor(final IpDatabaseProvider geoIpDatabaseProvider, Map config, ExceptionalConsumer geoIpProcessorConsumer) throws Exception { + final ProcessorBridge bridgeProcessor = ProcessorBridge.fromInternal( + new GeoIpProcessorBridge.Factory("geoip", geoIpDatabaseProvider.toInternal()).toInternal() + .create(Map.of(), null, null, config, null)); + geoIpProcessorConsumer.accept(bridgeProcessor); } - static IngestDocument processIngestDocumentSynchronously(final IngestDocument input, final Processor processor) throws Exception { + static IngestDocumentBridge processIngestDocumentSynchronously(final IngestDocumentBridge input, final ProcessorBridge processor) throws Exception { if (!processor.isAsync()) { - return processor.execute(input); + return new IngestDocumentBridge( + processor.toInternal().execute(input.toInternal()).getSourceAndMetadata(), + processor.toInternal().execute(input.toInternal()).getIngestMetadata() + ); } else { - final CompletableFuture future = new CompletableFuture<>(); + final CompletableFuture future = new CompletableFuture<>(); processor.execute(input, (id, ex) -> { if (ex != null) { future.completeExceptionally(ex); diff --git a/src/test/java/co/elastic/logstash/filters/elasticintegration/util/IngestDocumentUtil.java b/src/test/java/co/elastic/logstash/filters/elasticintegration/util/IngestDocumentUtil.java index 40332daf..acfd231f 100644 --- a/src/test/java/co/elastic/logstash/filters/elasticintegration/util/IngestDocumentUtil.java +++ b/src/test/java/co/elastic/logstash/filters/elasticintegration/util/IngestDocumentUtil.java @@ -1,6 +1,6 @@ package co.elastic.logstash.filters.elasticintegration.util; -import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge; import java.time.Instant; import java.util.HashMap; @@ -10,12 +10,12 @@ public class IngestDocumentUtil { private IngestDocumentUtil() { } - private static final Map BASE_SOURCE_AND_METADATA = Map.of(IngestDocument.Metadata.VERSION.getFieldName(), 1L); + private static final Map BASE_SOURCE_AND_METADATA = Map.of(IngestDocumentBridge.Constants.METADATA_VERSION_FIELD_NAME, 1L); - public static IngestDocument createIngestDocument(Map data) { + public static IngestDocumentBridge createIngestDocument(Map data) { final Map merged_source_and_metadata = new HashMap<>(BASE_SOURCE_AND_METADATA); merged_source_and_metadata.putAll(data); - return new IngestDocument(merged_source_and_metadata, Map.of("timestamp", Instant.now().toString())); + return new IngestDocumentBridge(merged_source_and_metadata, Map.of("timestamp", Instant.now().toString())); } } \ No newline at end of file