diff --git a/build.gradle b/build.gradle index 1fddcbd1..8f9bf91f 100644 --- a/build.gradle +++ b/build.gradle @@ -165,6 +165,14 @@ def _requiredLogstashJar(pathPrefix, jarSpec, flavorSpec = null) { } } +static OutputStreamFunneler outputStreamFunneler(File logFile) { + logFile.parentFile.mkdirs() + logFile.delete() + logFile.createNewFile() + + return new OutputStreamFunneler(new LazyFileOutputStream(logFile)) +} + // https://docs.github.com/en/repositories/working-with-files/using-files/downloading-source-code-archives#source-code-archive-urls String githubArchivePath(repo, treeish="main", archiveFormat="zip") { def pathFragment = { @@ -203,8 +211,10 @@ task downloadElasticsearchSourceZip(type: Download) { task unzipDownloadedElasticsearchSourceZip(dependsOn: downloadElasticsearchSourceZip, type: Copy) { description "extracts Elasticsearch source from a downloaded zip file" + ext.location = "${buildDir}/elasticsearch-source/" + from zipTree(downloadElasticsearchSourceZip.dest) - into "${buildDir}/elasticsearch-source/" + into ext.location eachFile { // strip top-level directory path = path.replaceFirst(/^.+?\//, "") @@ -216,15 +226,14 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource def logFile = project.file("${buildDir}/elasticsearch-build.log") doFirst { - def funneler = new OutputStreamFunneler(new LazyFileOutputStream(logFile)) + def funneler = outputStreamFunneler(logFile) standardOutput = funneler.funnelInstance errorOutput = funneler.funnelInstance } - def esSource = "${buildDir}/elasticsearch-source/" + def esSource = "${unzipDownloadedElasticsearchSourceZip.outputs.files.singleFile}" def esBuildDir = "${esSource}/build" - inputs.dir esSource outputs.dir esBuildDir ext.buildRoot = esBuildDir @@ -238,7 +247,7 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource ext.module = { moduleName -> localDistroResult.map { "${it}/modules/${moduleName}"} } workingDir esSource - commandLine "./gradlew", "localDistro" + commandLine "./gradlew", "--stacktrace", "localDistro" ignoreExitValue true // handled in doLast doLast { @@ -260,20 +269,22 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource task buildElasticsearchLogstashBridge(type: Exec) { description "builds logstash-bridge lib module" - dependsOn buildElasticsearchLocalDistro + dependsOn unzipDownloadedElasticsearchSourceZip + dependsOn buildElasticsearchLocalDistro // mustRunAfter? def logFile = project.file("${buildDir}/logstash-bridge-build.log") doFirst { - def funneler = new OutputStreamFunneler(new LazyFileOutputStream(logFile)) + def funneler = outputStreamFunneler(logFile) standardOutput = funneler.funnelInstance errorOutput = funneler.funnelInstance } - def esSource = "${buildDir}/elasticsearch-source/" + def esSource = "${unzipDownloadedElasticsearchSourceZip.outputs.files.singleFile}" def esBuildDir = "${esSource}/build" - inputs.dir esSource - outputs.dir "${esBuildDir}/libs/logstash-bridge" + inputs.dir "${esSource}/libs/logstash-bridge" + + outputs.dir("${esSource}/libs/logstash-bridge/build/distributions") ext.buildRoot = esBuildDir workingDir esSource @@ -295,6 +306,28 @@ task buildElasticsearchLogstashBridge(type: Exec) { } } +def ingestGeoipPluginShadeNamespace = "org.elasticsearch.ingest.geoip.shaded" + +/** + * The StableBridge exposes GeoIP plugin internals, so it needs to relocate references to + * its bundled dependencies to match the shaded locations in our import of that plugin. + */ +task shadeElasticsearchStableBridge(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) { + description "Shades Maxmind dependencies" + + dependsOn buildElasticsearchLogstashBridge + + from(buildElasticsearchLogstashBridge) + + archiveFileName = "logstash-stable-bridge-shaded.jar" + destinationDirectory = file("${buildDir}/shaded") + + relocate('com.fasterxml.jackson', "${ingestGeoipPluginShadeNamespace}.com.fasterxml.jackson") + relocate('com.maxmind', "${ingestGeoipPluginShadeNamespace}.com.maxmind") + + mergeServiceFiles() +} + task shadeElasticsearchIngestGeoIpModule(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) { description "Shades embedded dependencies of the Elasticsearch Ingest GeoIP module" @@ -305,11 +338,16 @@ task shadeElasticsearchIngestGeoIpModule(type: com.github.jengelman.gradle.plugi archiveFileName = 'ingest-geoip-shaded.jar' destinationDirectory = file("${buildDir}/shaded") + relocate('com.fasterxml.jackson', "${ingestGeoipPluginShadeNamespace}.com.fasterxml.jackson") + relocate('com.maxmind', "${ingestGeoipPluginShadeNamespace}.com.maxmind") + mergeServiceFiles() exclude '**/module-info.class' } +def ingestGrokPluginShadeNamespace = "org.elasticsearch.grok.shaded" + task shadeElasticsearchGrokImplementation(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) { description "Shades embedded dependencies of the Elasticsearch Grok implementation" @@ -325,13 +363,16 @@ task shadeElasticsearchGrokImplementation(type: com.github.jengelman.gradle.plug destinationDirectory = file("${buildDir}/shaded") mergeServiceFiles() - String shadeNamespace = "org.elasticsearch.grok.shaded" - relocate('org.joni', "${shadeNamespace}.org.joni") - relocate('org.jcodings', "${shadeNamespace}.org.jcodings") + relocate('org.joni', "${ingestGrokPluginShadeNamespace}.org.joni") + relocate('org.jcodings', "${ingestGrokPluginShadeNamespace}.org.jcodings") exclude '**/module-info.class' } +/** + * The x-pack redact plugin reaches into the grok plugin's implementation, so + * they both need to point to the same relocated shaded components. + */ task shadeElasticsearchRedactPlugin(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) { description "Shades Elasticsearch Redact plugin to reference Grok's shaded dependencies" dependsOn buildElasticsearchLocalDistro @@ -343,24 +384,8 @@ task shadeElasticsearchRedactPlugin(type: com.github.jengelman.gradle.plugins.sh destinationDirectory = file("${buildDir}/shaded") // relocate elasticsearch-grok's dependencies to match - String shadeNamespace = "org.elasticsearch.grok.shaded" - relocate('org.joni', "${shadeNamespace}.org.joni") - relocate('org.jcodings', "${shadeNamespace}.org.jcodings") - - exclude '**/module-info.class' -} - -task shadeElasticsearchLogstashBridge(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) { - description "Shades the Elasticsearch logstash-bridge jar" - - dependsOn buildElasticsearchLogstashBridge - - from("${buildDir}/elasticsearch-source/libs/logstash-bridge/build/distributions") { - include "elasticsearch-logstash-bridge-*.jar" - } - - archiveFileName = "elasticsearch-logstash-bridge-shaded.jar" - destinationDirectory = file("${buildDir}/shaded") + relocate('org.joni', "${ingestGrokPluginShadeNamespace}.org.joni") + relocate('org.jcodings', "${ingestGrokPluginShadeNamespace}.org.jcodings") exclude '**/module-info.class' } @@ -369,11 +394,10 @@ task importMinimalElasticsearch() { description "Imports minimal portions of Elasticsearch localDistro" dependsOn buildElasticsearchLocalDistro - dependsOn buildElasticsearchLogstashBridge + dependsOn shadeElasticsearchStableBridge dependsOn shadeElasticsearchIngestGeoIpModule dependsOn shadeElasticsearchGrokImplementation dependsOn shadeElasticsearchRedactPlugin - dependsOn shadeElasticsearchLogstashBridge ext.jars = "${buildDir}/elasticsearch-minimal-jars" @@ -392,7 +416,7 @@ task importMinimalElasticsearch() { include jarPackageNamed("lucene-core") include jarPackageNamed("lucene-analysis-common") } - from(shadeElasticsearchLogstashBridge) + from(shadeElasticsearchStableBridge.outputs.files.singleFile) from(shadeElasticsearchGrokImplementation) from(buildElasticsearchLocalDistro.module("x-pack-core")) diff --git a/gradle.properties b/gradle.properties index 81721777..1a044d6f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ LOGSTASH_PATH=../../logstash -ELASTICSEARCH_REPO=mashhurs/elasticsearch -ELASTICSEARCH_TREEISH=logstash-bridge-geoip-interfaces +ELASTICSEARCH_REPO=yaauie/elasticsearch +ELASTICSEARCH_TREEISH=rye-bridge-refinement-moar diff --git a/lib/logstash/filters/elastic_integration.rb b/lib/logstash/filters/elastic_integration.rb index 33e97209..bcb786b7 100644 --- a/lib/logstash/filters/elastic_integration.rb +++ b/lib/logstash/filters/elastic_integration.rb @@ -368,11 +368,11 @@ def _elasticsearch_rest_client(config, &builder_interceptor) def initialize_event_processor! java_import('co.elastic.logstash.filters.elasticintegration.EventProcessorBuilder') - java_import('co.elastic.logstash.filters.elasticintegration.geoip.GeoIpProcessorFactory') + java_import('org.elasticsearch.logstashbridge.geoip.GeoIpProcessorFactoryBridge') @event_processor = EventProcessorBuilder.fromElasticsearch(@elasticsearch_rest_client, extract_immutable_config) .setFilterMatchListener(method(:filter_matched_java).to_proc) - .addProcessor("geoip") { GeoIpProcessorFactory.new(@geoip_database_provider) } + .addProcessor("geoip") { GeoIpProcessorFactoryBridge::create(@geoip_database_provider) } .build(@plugin_context) rescue => exception raise_config_error!("configuration did not produce an EventProcessor: #{exception}") diff --git a/settings.gradle b/settings.gradle index 2a2d5695..ddd2c71b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1 +1 @@ -rootProject.name = 'logstash-filter-elastic_integration' \ No newline at end of file +rootProject.name = 'logstash-filter-elastic_integration' diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessor.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessor.java index b835ef88..570fad11 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessor.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessor.java @@ -13,7 +13,6 @@ import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.logstashbridge.core.FailProcessorExceptionBridge; import org.elasticsearch.logstashbridge.core.IOUtilsBridge; import org.elasticsearch.logstashbridge.core.RefCountingRunnableBridge; import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge; @@ -93,7 +92,7 @@ public Collection processEvents(final Collection incomingEvents) t final CountDownLatch latch = new CountDownLatch(1); final IntegrationBatch batch = new IntegrationBatch(incomingEvents); - RefCountingRunnableBridge ref = new RefCountingRunnableBridge(latch::countDown); + RefCountingRunnableBridge ref = RefCountingRunnableBridge.create(latch::countDown); try { batch.eachRequest(ref::acquire, this::processRequest); } finally { @@ -179,12 +178,11 @@ private void executePipeline(final IngestDocumentBridge ingestDocument, final In // If no exception, then the original event is to be _replaced_ by the result if (Objects.nonNull(ingestPipelineException)) { // If we had an exception in the IngestPipeline, tag and emit the original Event - final Throwable unwrappedException = unwrapException(ingestPipelineException); - LOGGER.warn(() -> String.format("ingest pipeline `%s` failed", pipelineName), unwrappedException); + LOGGER.warn(() -> String.format("ingest pipeline `%s` failed", pipelineName), ingestPipelineException); request.complete(incomingEvent -> { annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of( - "message", unwrappedException.getMessage(), - "exception", unwrappedException.getClass().getName() + "message", ingestPipelineException.getMessage(), + "exception", ingestPipelineException.getClass().getName() )); }); } else if (Objects.isNull(resultIngestDocument)) { @@ -256,13 +254,6 @@ static private void annotateIngestPipelineFailure(final Event event, final Strin }); } - static private Throwable unwrapException(final Exception exception) { - if (FailProcessorExceptionBridge.isInstanceOf(exception.getCause())) { - return exception.getCause(); - } - return exception; - } - static private String diff(final Event original, final Event changed) { if (LOGGER.isTraceEnabled()) { // dot notation less than ideal for LS-internal, but better than re-writing it ourselves. diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java index a0e42823..057b58a9 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java @@ -22,7 +22,8 @@ import org.elasticsearch.logstashbridge.common.SettingsBridge; import org.elasticsearch.logstashbridge.core.IOUtilsBridge; import org.elasticsearch.logstashbridge.env.EnvironmentBridge; -import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorFactoryBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorParametersBridge; import org.elasticsearch.logstashbridge.plugins.IngestCommonPluginBridge; import org.elasticsearch.logstashbridge.plugins.IngestPluginBridge; import org.elasticsearch.logstashbridge.plugins.IngestUserAgentPluginBridge; @@ -196,7 +197,7 @@ private synchronized EventProcessorBuilder setFilterMatchListener(final FilterMa return this; } - public EventProcessorBuilder addProcessor(final String type, final Supplier processorFactorySupplier) { + public EventProcessorBuilder addProcessor(final String type, final Supplier processorFactorySupplier) { return this.addProcessorsFromPlugin(SingleProcessorIngestPlugin.of(type, processorFactorySupplier)); } @@ -226,14 +227,14 @@ public synchronized EventProcessor build(final PluginContext pluginContext) { try { final ArrayList services = new ArrayList<>(); - final ThreadPoolBridge threadPool = new ThreadPoolBridge(settings); - resourcesToClose.add(() -> ThreadPoolBridge.terminate(threadPool, 10, TimeUnit.SECONDS)); + final ThreadPoolBridge threadPool = ThreadPoolBridge.create(settings); + resourcesToClose.add(() -> threadPool.terminate(10, TimeUnit.SECONDS)); - final ScriptServiceBridge scriptService = new ScriptServiceBridge(settings, threadPool::absoluteTimeInMillis); + final ScriptServiceBridge scriptService = ScriptServiceBridge.create(settings, threadPool::absoluteTimeInMillis); resourcesToClose.add(scriptService); - final EnvironmentBridge env = new EnvironmentBridge(settings, null); - final ProcessorBridge.Parameters processorParameters = new ProcessorBridge.Parameters(env, scriptService, threadPool); + final EnvironmentBridge env = EnvironmentBridge.create(settings, null); + final ProcessorParametersBridge processorParameters = ProcessorParametersBridge.create(env, scriptService, threadPool); IngestPipelineFactory ingestPipelineFactory = new IngestPipelineFactory(scriptService); for (Supplier ingestPluginSupplier : ingestPlugins) { @@ -241,7 +242,7 @@ public synchronized EventProcessor build(final PluginContext pluginContext) { if (ingestPlugin instanceof Closeable closeableIngestPlugin) { resourcesToClose.add(closeableIngestPlugin); } - final Map processorFactories = ingestPlugin.getProcessors(processorParameters); + final Map processorFactories = ingestPlugin.getProcessors(processorParameters); ingestPipelineFactory = ingestPipelineFactory.withProcessors(processorFactories); } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestDuplexMarshaller.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestDuplexMarshaller.java index 74980f98..98406765 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestDuplexMarshaller.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestDuplexMarshaller.java @@ -103,7 +103,7 @@ public IngestDocumentBridge toIngestDocument(final Event event) { final Timestamp eventTimestamp = safeTimestampFrom(event.getField(org.logstash.Event.TIMESTAMP)); Map ingestMetadata = Map.of(INGEST_METADATA_TIMESTAMP_FIELD, Objects.requireNonNullElseGet(eventTimestamp, Timestamp::now).toString()); - return new IngestDocumentBridge(sourceAndMetadata, ingestMetadata); + return IngestDocumentBridge.create(sourceAndMetadata, ingestMetadata); } /** diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestPipelineFactory.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestPipelineFactory.java index 937a7913..b5a2a74f 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestPipelineFactory.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/IngestPipelineFactory.java @@ -12,6 +12,7 @@ import org.elasticsearch.logstashbridge.ingest.PipelineBridge; import org.elasticsearch.logstashbridge.ingest.PipelineConfigurationBridge; import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorFactoryBridge; import org.elasticsearch.logstashbridge.script.ScriptServiceBridge; import java.util.HashMap; @@ -24,7 +25,7 @@ */ public class IngestPipelineFactory { private final ScriptServiceBridge scriptService; - private final Map processorFactories; + private final Map processorFactories; private static final Logger LOGGER = LogManager.getLogger(IngestPipelineFactory.class); @@ -33,13 +34,13 @@ public IngestPipelineFactory(final ScriptServiceBridge scriptService) { } private IngestPipelineFactory(final ScriptServiceBridge scriptService, - final Map processorFactories) { + final Map processorFactories) { this.scriptService = scriptService; this.processorFactories = Map.copyOf(processorFactories); } - public IngestPipelineFactory withProcessors(final Map processorFactories) { - final Map intermediate = new HashMap<>(this.processorFactories); + public IngestPipelineFactory withProcessors(final Map processorFactories) { + final Map intermediate = new HashMap<>(this.processorFactories); intermediate.putAll(processorFactories); return new IngestPipelineFactory(scriptService, intermediate); } @@ -63,7 +64,7 @@ public Optional create(final PipelineConfigurationBridge pipelin * resolve pipelines through the provided {@link IngestPipelineResolver}. */ public IngestPipelineFactory withIngestPipelineResolver(final IngestPipelineResolver ingestPipelineResolver) { - final Map modifiedProcessorFactories = new HashMap<>(this.processorFactories); + final Map modifiedProcessorFactories = new HashMap<>(this.processorFactories); modifiedProcessorFactories.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(ingestPipelineResolver, this.scriptService)); return new IngestPipelineFactory(scriptService, modifiedProcessorFactories); } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/PipelineConfigurationFactory.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/PipelineConfigurationFactory.java index 8b13b816..0f712b04 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/PipelineConfigurationFactory.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/PipelineConfigurationFactory.java @@ -46,7 +46,7 @@ public PipelineConfigurationBridge parseNamedObject(final String json) throws Ex } public PipelineConfigurationBridge parseConfigOnly(final String pipelineId, final String jsonEncodedConfig) { - return new PipelineConfigurationBridge(pipelineId, jsonEncodedConfig); + return PipelineConfigurationBridge.create(pipelineId, jsonEncodedConfig); } @@ -66,7 +66,7 @@ public List get(){ } private static PipelineConfigurationBridge init(final String id, final String json) { - return new PipelineConfigurationBridge(id, json); + return PipelineConfigurationBridge.create(id, json); } } } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/GeoIpProcessorFactory.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/GeoIpProcessorFactory.java deleted file mode 100644 index c815f966..00000000 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/GeoIpProcessorFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. - * under one or more contributor license agreements. Licensed under the - * Elastic License 2.0; you may not use this file except in compliance - * with the Elastic License 2.0. - */ -package co.elastic.logstash.filters.elasticintegration.geoip; - -import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; -import org.elasticsearch.logstashbridge.geoip.GeoIpProcessorBridge; - -import java.util.Map; -import java.util.stream.Collectors; - -public class GeoIpProcessorFactory implements ProcessorBridge.Factory { - private final IpDatabaseProvider ipDatabaseProvider; - - public GeoIpProcessorFactory(final IpDatabaseProvider ipDatabaseProvider) { - this.ipDatabaseProvider = ipDatabaseProvider; - } - - @Override - public ProcessorBridge create(Map processorFactories, - String tag, - String description, - Map config) throws Exception { - return ProcessorBridge.fromInternal( - new GeoIpProcessorBridge.Factory("geoip", this.ipDatabaseProvider.toInternal()).toInternal() - .create(processorFactories.entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey,e -> e.getValue().toInternal())), - tag, - description, - config, - null)); - } -} diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseAdapter.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseAdapter.java index bf2836fe..f96ef401 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseAdapter.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseAdapter.java @@ -8,25 +8,30 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.logstashbridge.core.CheckedBiFunctionBridge; +import org.elasticsearch.logstashbridge.geoip.AbstractExternalIpDatabaseBridge; import org.elasticsearch.logstashbridge.geoip.IpDatabaseBridge; -import org.elasticsearch.logstashbridge.geoip.MaxMindDbBridge; +import org.elasticsearch.ingest.geoip.shaded.com.maxmind.db.CHMCache; +import org.elasticsearch.ingest.geoip.shaded.com.maxmind.db.NoCache; +import org.elasticsearch.ingest.geoip.shaded.com.maxmind.db.NodeCache; +import org.elasticsearch.ingest.geoip.shaded.com.maxmind.db.Reader; import java.io.File; import java.io.IOException; import java.nio.file.Path; import java.util.Optional; -public class IpDatabaseAdapter extends IpDatabaseBridge.AbstractExternal { +public class IpDatabaseAdapter extends AbstractExternalIpDatabaseBridge { private static final Logger LOGGER = LogManager.getLogger(IpDatabaseAdapter.class); - private final MaxMindDbBridge.Reader databaseReader; + private final Reader databaseReader; private final String databaseType; private volatile boolean isReaderClosed = false; - public IpDatabaseAdapter(final MaxMindDbBridge.Reader databaseReader) { + public IpDatabaseAdapter(final Reader databaseReader) { this.databaseReader = databaseReader; - this.databaseType = databaseReader.getDatabaseType(); + this.databaseType = databaseReader.getMetadata().getDatabaseType(); } @Override @@ -35,8 +40,19 @@ public String getDatabaseType() { } @Override - public MaxMindDbBridge.Reader getDatabaseReader() throws IOException { - return this.databaseReader; + public RESPONSE getResponse(String ipAddress, CheckedBiFunctionBridge responseProvider) { + try { + return responseProvider.apply(this.databaseReader, ipAddress); + } catch (Exception e) { + throw convertToRuntime(e); + } + } + + private static RuntimeException convertToRuntime(final Exception e) { + if (e instanceof RuntimeException re) { + return re; + } + return new RuntimeException(e); } @Override @@ -58,25 +74,25 @@ boolean isReaderClosed() { } public static IpDatabaseAdapter defaultForPath(final Path database) throws IOException { - return new Builder(database.toFile()).setCache(MaxMindDbBridge.NodeCache.get(10_000)).build(); + return new Builder(database.toFile()).setCache(new CHMCache(10_000)).build(); } public static class Builder { private File databasePath; - private MaxMindDbBridge.NodeCache nodeCache; + private NodeCache nodeCache; public Builder(final File databasePath) { this.databasePath = databasePath; } - public Builder setCache(final MaxMindDbBridge.NodeCache nodeCache) { + public Builder setCache(final NodeCache nodeCache) { this.nodeCache = nodeCache; return this; } public IpDatabaseAdapter build() throws IOException { - final MaxMindDbBridge.NodeCache nodeCache = Optional.ofNullable(this.nodeCache).orElseGet(MaxMindDbBridge.NodeCache::getInstance); - final MaxMindDbBridge.Reader databaseReader = new MaxMindDbBridge.Reader(this.databasePath, nodeCache); + final NodeCache nodeCache = Optional.ofNullable(this.nodeCache).orElseGet(NoCache::getInstance); + final Reader databaseReader = new Reader(this.databasePath, nodeCache); return new IpDatabaseAdapter(databaseReader); } } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseHolder.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseHolder.java index 49670eeb..6046d11f 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseHolder.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseHolder.java @@ -1,6 +1,6 @@ package co.elastic.logstash.filters.elasticintegration.geoip; -interface IpDatabaseHolder { +public interface IpDatabaseHolder { boolean isValid(); IpDatabaseAdapter getDatabase(); diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseProvider.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseProvider.java index a8f99d41..6aec6d71 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseProvider.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseProvider.java @@ -9,8 +9,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.logstashbridge.core.IOUtilsBridge; +import org.elasticsearch.logstashbridge.geoip.AbstractExternalIpDatabaseProviderBridge; import org.elasticsearch.logstashbridge.geoip.IpDatabaseBridge; -import org.elasticsearch.logstashbridge.geoip.IpDatabaseProviderBridge; import java.io.Closeable; import java.io.File; @@ -23,7 +23,7 @@ import java.util.Map; import java.util.Objects; -public class IpDatabaseProvider extends IpDatabaseProviderBridge.AbstractExternal implements Closeable { +public class IpDatabaseProvider extends AbstractExternalIpDatabaseProviderBridge implements Closeable { private static final Logger LOGGER = LogManager.getLogger(IpDatabaseProvider.class); diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/PipelineProcessor.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/PipelineProcessor.java index cc506bfc..0322418a 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/PipelineProcessor.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/PipelineProcessor.java @@ -8,29 +8,28 @@ import co.elastic.logstash.filters.elasticintegration.IngestPipeline; import co.elastic.logstash.filters.elasticintegration.IngestPipelineResolver; -import org.elasticsearch.logstashbridge.ingest.ConfigurationUtilsBridge; -import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge; -import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.common.ProjectIdBridge; +import org.elasticsearch.logstashbridge.ingest.*; import org.elasticsearch.logstashbridge.script.ScriptServiceBridge; -import org.elasticsearch.logstashbridge.script.TemplateScriptBridge; +import org.elasticsearch.logstashbridge.script.TemplateScriptFactoryBridge; import java.util.Map; import java.util.function.BiConsumer; -public class PipelineProcessor extends ProcessorBridge.AbstractExternal { +public class PipelineProcessor extends AbstractExternalProcessorBridge { public static final String TYPE = "pipeline"; private final String tag; private final String description; private final String pipelineName; - private final TemplateScriptBridge.Factory pipelineTemplate; + private final TemplateScriptFactoryBridge pipelineTemplate; private final IngestPipelineResolver pipelineProvider; private final boolean ignoreMissingPipeline; private PipelineProcessor(String tag, String description, - TemplateScriptBridge.Factory pipelineTemplate, + TemplateScriptFactoryBridge pipelineTemplate, String pipelineName, boolean ignoreMissingPipeline, IngestPipelineResolver pipelineProvider) { @@ -86,7 +85,7 @@ public void execute(IngestDocumentBridge ingestDocument, BiConsumer registry, - String processorTag, - String description, - Map config) throws Exception { + public ProcessorBridge create(Map registry, + String processorTag, + String description, + Map config, + ProjectIdBridge projectIdBridge) throws Exception { String pipeline = ConfigurationUtilsBridge.readStringProperty(TYPE, processorTag, config, "name"); - TemplateScriptBridge.Factory pipelineTemplate = ConfigurationUtilsBridge.compileTemplate(TYPE, processorTag, "name", pipeline, scriptService); + TemplateScriptFactoryBridge pipelineTemplate = ConfigurationUtilsBridge.compileTemplate(TYPE, processorTag, "name", pipeline, scriptService); boolean ignoreMissingPipeline = ConfigurationUtilsBridge.readBooleanProperty(TYPE, processorTag, config, "ignore_missing_pipeline", false); return new PipelineProcessor(processorTag, description, pipelineTemplate, pipeline, ignoreMissingPipeline, pipelineProvider); } + } } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SafeSubsetIngestPlugin.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SafeSubsetIngestPlugin.java index 79e2fa67..7971eb03 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SafeSubsetIngestPlugin.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SafeSubsetIngestPlugin.java @@ -1,7 +1,8 @@ package co.elastic.logstash.filters.elasticintegration.ingest; import org.elasticsearch.logstashbridge.core.IOUtilsBridge; -import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorFactoryBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorParametersBridge; import org.elasticsearch.logstashbridge.plugins.IngestPluginBridge; import javax.annotation.Nonnull; @@ -35,14 +36,14 @@ private SafeSubsetIngestPlugin(final @Nonnull Supplier inges } @Override - public Map getProcessors(ProcessorBridge.Parameters parameters) { - final Map providedProcessors = this.ingestPlugin.getProcessors(parameters); + public Map getProcessors(ProcessorParametersBridge parameters) { + final Map providedProcessors = this.ingestPlugin.getProcessors(parameters); - final Map acceptedProcessors = new HashMap<>(); + final Map acceptedProcessors = new HashMap<>(); final Set missingProcessors = new HashSet<>(); for (String requiredProcessor : this.requiredProcessors) { - final ProcessorBridge.Factory processor = providedProcessors.get(requiredProcessor); + final ProcessorFactoryBridge processor = providedProcessors.get(requiredProcessor); if (!Objects.nonNull(processor)) { missingProcessors.add(requiredProcessor); } else { diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SetSecurityUserProcessor.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SetSecurityUserProcessor.java index b8928ce1..6e7e1abf 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SetSecurityUserProcessor.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SetSecurityUserProcessor.java @@ -6,13 +6,13 @@ */ package co.elastic.logstash.filters.elasticintegration.ingest; -import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge; -import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.common.ProjectIdBridge; +import org.elasticsearch.logstashbridge.ingest.*; import java.util.Map; import java.util.function.BiConsumer; -public class SetSecurityUserProcessor extends ProcessorBridge.AbstractExternal { +public class SetSecurityUserProcessor extends AbstractExternalProcessorBridge { public static final String TYPE = "set_security_user"; private final String tag; @@ -49,13 +49,14 @@ public boolean isAsync() { return false; } - public static final class Factory implements ProcessorBridge.Factory { + public static class Factory extends AbstractExternalProcessorFactoryBridge { @Override - public ProcessorBridge create(Map registry, + public ProcessorBridge create(Map registry, String processorTag, String description, - Map config) { + Map config, + ProjectIdBridge projectId) { String[] supportedConfigs = {"field", "properties"}; for (String cfg : supportedConfigs) { config.remove(cfg); diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SingleProcessorIngestPlugin.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SingleProcessorIngestPlugin.java index c6249893..d59afbe7 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SingleProcessorIngestPlugin.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/ingest/SingleProcessorIngestPlugin.java @@ -8,6 +8,8 @@ import org.elasticsearch.logstashbridge.core.IOUtilsBridge; import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorFactoryBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorParametersBridge; import org.elasticsearch.logstashbridge.plugins.IngestPluginBridge; import java.io.Closeable; @@ -17,19 +19,19 @@ public class SingleProcessorIngestPlugin implements IngestPluginBridge, Closeable { private final String type; - private final ProcessorBridge.Factory processorFactory; + private final ProcessorFactoryBridge processorFactory; - public static Supplier of(final String type, final Supplier factorySupplier) { + public static Supplier of(final String type, final Supplier factorySupplier) { return () -> new SingleProcessorIngestPlugin(type, factorySupplier.get()); } - public SingleProcessorIngestPlugin(String type, ProcessorBridge.Factory processorFactory) { + public SingleProcessorIngestPlugin(String type, ProcessorFactoryBridge processorFactory) { this.type = type; this.processorFactory = processorFactory; } @Override - public Map getProcessors(ProcessorBridge.Parameters parameters) { + public Map getProcessors(ProcessorParametersBridge parameters) { return Map.of(this.type, this.processorFactory); } diff --git a/src/test/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseProviderTest.java b/src/test/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseProviderTest.java index 71ddfe14..27a8fd03 100644 --- a/src/test/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseProviderTest.java +++ b/src/test/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseProviderTest.java @@ -8,9 +8,10 @@ import co.elastic.logstash.filters.elasticintegration.util.IngestDocumentUtil; import co.elastic.logstash.filters.elasticintegration.util.ResourcesUtil; +import org.elasticsearch.logstashbridge.common.ProjectIdBridge; import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge; import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; -import org.elasticsearch.logstashbridge.geoip.GeoIpProcessorBridge; +import org.elasticsearch.logstashbridge.geoip.GeoIpProcessorFactoryBridge; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -131,18 +132,14 @@ static void withVendoredGeoIpDatabaseProvider(final ExceptionalConsumer config, ExceptionalConsumer geoIpProcessorConsumer) throws Exception { - final ProcessorBridge bridgeProcessor = ProcessorBridge.fromInternal( - new GeoIpProcessorBridge.Factory("geoip", geoIpDatabaseProvider.toInternal()).toInternal() - .create(Map.of(), null, null, config, null)); + final GeoIpProcessorFactoryBridge factory = GeoIpProcessorFactoryBridge.create(geoIpDatabaseProvider); + final ProcessorBridge bridgeProcessor = factory.create(Map.of(), null, null, config, ProjectIdBridge.getDefault()); geoIpProcessorConsumer.accept(bridgeProcessor); } static IngestDocumentBridge processIngestDocumentSynchronously(final IngestDocumentBridge input, final ProcessorBridge processor) throws Exception { if (!processor.isAsync()) { - return new IngestDocumentBridge( - processor.toInternal().execute(input.toInternal()).getSourceAndMetadata(), - processor.toInternal().execute(input.toInternal()).getIngestMetadata() - ); + return processor.execute(input); } else { final CompletableFuture future = new CompletableFuture<>(); processor.execute(input, (id, ex) -> { diff --git a/src/test/java/co/elastic/logstash/filters/elasticintegration/util/IngestDocumentUtil.java b/src/test/java/co/elastic/logstash/filters/elasticintegration/util/IngestDocumentUtil.java index acfd231f..a076a1aa 100644 --- a/src/test/java/co/elastic/logstash/filters/elasticintegration/util/IngestDocumentUtil.java +++ b/src/test/java/co/elastic/logstash/filters/elasticintegration/util/IngestDocumentUtil.java @@ -16,6 +16,6 @@ public static IngestDocumentBridge createIngestDocument(Map data final Map merged_source_and_metadata = new HashMap<>(BASE_SOURCE_AND_METADATA); merged_source_and_metadata.putAll(data); - return new IngestDocumentBridge(merged_source_and_metadata, Map.of("timestamp", Instant.now().toString())); + return IngestDocumentBridge.create(merged_source_and_metadata, Map.of("timestamp", Instant.now().toString())); } } \ No newline at end of file