From a3d23236a8738801d812c1c9121249ae2f3e8a91 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Tue, 5 Aug 2025 22:25:24 -0700 Subject: [PATCH 1/7] Use default project resolver instead null which caused NPE when running elastic_integration integration tests. --- libs/logstash-bridge/README.md | 5 +++++ .../logstashbridge/script/ScriptServiceBridge.java | 5 +++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/libs/logstash-bridge/README.md b/libs/logstash-bridge/README.md index dd629724878b5..4cb2204902b1b 100644 --- a/libs/logstash-bridge/README.md +++ b/libs/logstash-bridge/README.md @@ -6,3 +6,8 @@ other Elasticsearch internals. If a change is introduced in a separate Elasticsearch project that causes this project to fail, please consult with members of @elastic/logstash to chart a path forward. + +## How to build the module? +```shell +./gradlew :lib:logstash-bridge:build +``` \ No newline at end of file diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java index e7d40a79baf47..8a2f245314804 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java @@ -8,6 +8,7 @@ */ package org.elasticsearch.logstashbridge.script; +import org.elasticsearch.cluster.project.DefaultProjectResolver; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.FixForMultiProject; @@ -68,8 +69,8 @@ private static ScriptService getScriptService(final Settings settings, final Lon MustacheScriptEngine.NAME, new MustacheScriptEngine(settings) ); - @FixForMultiProject // Should this be non-null? - final ProjectResolver projectResolver = null; + @FixForMultiProject // Should this be non-DefaultProjectResolver? + final ProjectResolver projectResolver = DefaultProjectResolver.INSTANCE; return new ScriptService(settings, scriptEngines, ScriptModule.CORE_CONTEXTS, timeProvider, projectResolver); } From 2c03eb8a8174bb71c078a1ff100e429ed362267d Mon Sep 17 00:00:00 2001 From: Mashhur Date: Tue, 5 Aug 2025 22:54:17 -0700 Subject: [PATCH 2/7] Introduce BridgeProjectIdResolver to applying null for project ID. --- .../script/ScriptServiceBridge.java | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java index 8a2f245314804..dbebcaee91673 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java @@ -8,9 +8,10 @@ */ package org.elasticsearch.logstashbridge.script; -import org.elasticsearch.cluster.project.DefaultProjectResolver; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.CheckedRunnable; import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.ingest.common.ProcessorsWhitelistExtension; import org.elasticsearch.logstashbridge.StableBridgeAPI; @@ -69,8 +70,8 @@ private static ScriptService getScriptService(final Settings settings, final Lon MustacheScriptEngine.NAME, new MustacheScriptEngine(settings) ); - @FixForMultiProject // Should this be non-DefaultProjectResolver? - final ProjectResolver projectResolver = DefaultProjectResolver.INSTANCE; + @FixForMultiProject // Should this be non-BridgeProjectIdResolver? + final ProjectResolver projectResolver = new BridgeProjectIdResolver(); return new ScriptService(settings, scriptEngines, ScriptModule.CORE_CONTEXTS, timeProvider, projectResolver); } @@ -113,4 +114,21 @@ public List loadExtensions(Class extensionPointType) { public void close() throws IOException { this.internalDelegate.close(); } + + static class BridgeProjectIdResolver implements ProjectResolver { + + @Override + public ProjectId getProjectId() { + return ProjectId.DEFAULT; + } + + @Override + public void executeOnProject(ProjectId projectId, CheckedRunnable body) throws E { + if (projectId.equals(ProjectId.DEFAULT)) { + body.run(); + } else { + throw new IllegalArgumentException("Cannot execute on a project other than [" + ProjectId.DEFAULT + "]"); + } + } + } } From d047addb97c429a096aac312168b8db1ade9c9af Mon Sep 17 00:00:00 2001 From: Mashhur Date: Wed, 6 Aug 2025 09:58:46 -0700 Subject: [PATCH 3/7] Put a note to the ProjectIdResolverBridge about resolving project ID for multi-project. --- .../logstashbridge/script/ScriptServiceBridge.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java index dbebcaee91673..9b57b50e3b199 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java @@ -70,9 +70,8 @@ private static ScriptService getScriptService(final Settings settings, final Lon MustacheScriptEngine.NAME, new MustacheScriptEngine(settings) ); - @FixForMultiProject // Should this be non-BridgeProjectIdResolver? - final ProjectResolver projectResolver = new BridgeProjectIdResolver(); - return new ScriptService(settings, scriptEngines, ScriptModule.CORE_CONTEXTS, timeProvider, projectResolver); + + return new ScriptService(settings, scriptEngines, ScriptModule.CORE_CONTEXTS, timeProvider, ProjectIdResolverBridge.INSTANCE); } private static List getPainlessBaseWhiteList() { @@ -115,7 +114,13 @@ public void close() throws IOException { this.internalDelegate.close(); } - static class BridgeProjectIdResolver implements ProjectResolver { + @FixForMultiProject + // Logstash resolves and runs ingest pipelines based on the datastream. + // How should ProjectIdResolverBridge behave in this case? + // In other words, it looks we need to find a way to figure out which ingest pipeline belongs to which project. + static class ProjectIdResolverBridge implements ProjectResolver { + + public static final ProjectIdResolverBridge INSTANCE = new ProjectIdResolverBridge(); @Override public ProjectId getProjectId() { From 74a65a0090a3aaaa28d018bcca0501d36cc6f6bc Mon Sep 17 00:00:00 2001 From: Mashhur Date: Fri, 8 Aug 2025 11:13:31 -0700 Subject: [PATCH 4/7] GeoIP interfaces for the Logstash bridge where elastic_integration plugin utilizes. --- libs/logstash-bridge/build.gradle | 2 + .../src/main/java/module-info.java | 2 + .../geoip/GeoIpProcessorBridge.java | 26 ++++ .../geoip/IpDatabaseBridge.java | 120 ++++++++++++++++++ .../geoip/IpDatabaseProviderBridge.java | 89 +++++++++++++ .../logstashbridge/geoip/MaxMindDbBridge.java | 59 +++++++++ .../script/ScriptServiceBridge.java | 2 +- .../src/main/java/module-info.java | 2 +- 8 files changed, 300 insertions(+), 2 deletions(-) create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseProviderBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/MaxMindDbBridge.java diff --git a/libs/logstash-bridge/build.gradle b/libs/logstash-bridge/build.gradle index 608d2e3a398e9..dd7cb47a5caa2 100644 --- a/libs/logstash-bridge/build.gradle +++ b/libs/logstash-bridge/build.gradle @@ -24,6 +24,8 @@ dependencies { compileOnly project(':x-pack:plugin:redact') compileOnly project(':x-pack:plugin:spatial') compileOnly project(':x-pack:plugin:wildcard') + + compileOnly('com.maxmind.db:maxmind-db:3.1.1') } tasks.named('forbiddenApisMain').configure { diff --git a/libs/logstash-bridge/src/main/java/module-info.java b/libs/logstash-bridge/src/main/java/module-info.java index 0d16376d2fbb0..8960c331a33ce 100644 --- a/libs/logstash-bridge/src/main/java/module-info.java +++ b/libs/logstash-bridge/src/main/java/module-info.java @@ -23,6 +23,8 @@ requires org.elasticsearch.redact; requires org.elasticsearch.spatial; requires org.elasticsearch.wildcard; + requires org.elasticsearch.ingest.geoip; + requires com.maxmind.db; exports org.elasticsearch.logstashbridge; exports org.elasticsearch.logstashbridge.common; diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorBridge.java new file mode 100644 index 0000000000000..2fc197ebe6ca9 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorBridge.java @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.logstashbridge.geoip; + +import org.elasticsearch.ingest.geoip.GeoIpProcessor; +import org.elasticsearch.ingest.geoip.IpDatabaseProvider; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +/** + * An external bridge for {@link GeoIpProcessor} + */ +public interface GeoIpProcessorBridge { + + class Factory extends StableBridgeAPI.ProxyInternal { + + public Factory(String type, IpDatabaseProvider ipDatabaseProvider) { + super(new GeoIpProcessor.Factory(type, ipDatabaseProvider)); + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseBridge.java new file mode 100644 index 0000000000000..5d058fabec39b --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseBridge.java @@ -0,0 +1,120 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.logstashbridge.geoip; + +import com.maxmind.db.Reader; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.CheckedBiFunction; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.ingest.geoip.IpDatabase; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +import java.io.IOException; + +/** + * An external bridge for {@link IpDatabase} + */ +public interface IpDatabaseBridge extends StableBridgeAPI { + + String getDatabaseType() throws IOException; + + MaxMindDbBridge.Reader getDatabaseReader() throws IOException; + + @Nullable + default RESPONSE getResponse(String ipAddress, CheckedBiFunction responseProvider) { + try { + return responseProvider.apply(this.getDatabaseReader().toInternal(), ipAddress); + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + } + + void close() throws IOException; + + static IpDatabaseBridge fromInternal(final IpDatabase internalDatabase) { + if (internalDatabase instanceof AbstractExternal.ProxyExternal externalProxy) { + return externalProxy.getIpDatabaseBridge(); + } + return new ProxyInternal(internalDatabase); + } + + /** + * The {@code IpDatabaseBridge.AbstractExternal} is an abstract base class for implementing + * the {@link IpDatabaseBridge} externally to the Elasticsearch code-base. It takes care of + * the details of maintaining a singular internal-form implementation of {@link IpDatabase} + * that proxies calls through the external implementation. + */ + abstract class AbstractExternal implements IpDatabaseBridge { + private ProxyExternal internalDatabase; + + @Override + public IpDatabase toInternal() { + if (internalDatabase == null) { + internalDatabase = new ProxyExternal(); + } + return internalDatabase; + } + + private class ProxyExternal implements IpDatabase { + + private AbstractExternal getIpDatabaseBridge() { + return AbstractExternal.this; + } + + @Override + public String getDatabaseType() throws IOException { + return AbstractExternal.this.getDatabaseType(); + } + + @Override + public RESPONSE getResponse( + String ipAddress, + CheckedBiFunction responseProvider + ) { + return AbstractExternal.this.getResponse(ipAddress, responseProvider); + } + + @Override + public void close() throws IOException { + AbstractExternal.this.close(); + } + } + } + + /** + * An implementation of {@link IpDatabaseBridge} that proxies to an internal {@link IpDatabase} + */ + class ProxyInternal extends StableBridgeAPI.ProxyInternal implements IpDatabaseBridge { + + public ProxyInternal(final IpDatabase delegate) { + super(delegate); + } + + @Override + public String getDatabaseType() throws IOException { + return toInternal().getDatabaseType(); + } + + @Override + public MaxMindDbBridge.Reader getDatabaseReader() throws IOException { + return null; + } + + @Override + public RESPONSE getResponse(String ipAddress, CheckedBiFunction responseProvider) { + return toInternal().getResponse(ipAddress, responseProvider); + } + + @Override + public void close() throws IOException { + toInternal().close(); + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseProviderBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseProviderBridge.java new file mode 100644 index 0000000000000..7172436ba1a16 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseProviderBridge.java @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.logstashbridge.geoip; + +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ingest.geoip.IpDatabase; +import org.elasticsearch.ingest.geoip.IpDatabaseProvider; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +import java.util.Objects; + +/** + * An external bridge for {@link Processor} + */ +public interface IpDatabaseProviderBridge extends StableBridgeAPI { + + Boolean isValid(String name); + + IpDatabaseBridge getDatabase(String name); + + static IpDatabaseProviderBridge fromInternal(final IpDatabaseProvider internalProvider) { + if (internalProvider instanceof IpDatabaseProviderBridge.AbstractExternal.ProxyExternal externalProxy) { + return externalProxy.getIpDatabaseProviderBridge(); + } + return new IpDatabaseProviderBridge.ProxyInternal(internalProvider); + } + + /** + * The {@code IpDatabaseProviderBridge.AbstractExternal} is an abstract base class for implementing + * the {@link IpDatabaseProviderBridge} externally to the Elasticsearch code-base. It takes care of + * the details of maintaining a singular internal-form implementation of {@link IpDatabaseProvider} + * that proxies calls through the external implementation. + */ + abstract class AbstractExternal implements IpDatabaseProviderBridge { + private AbstractExternal.ProxyExternal internalProcessor; + + public IpDatabaseProvider toInternal() { + if (internalProcessor == null) { + internalProcessor = new AbstractExternal.ProxyExternal(); + } + return internalProcessor; + } + + private class ProxyExternal implements IpDatabaseProvider { + + private AbstractExternal getIpDatabaseProviderBridge() { + return AbstractExternal.this; + } + + @Override + public Boolean isValid(ProjectId projectId, String name) { + return IpDatabaseProviderBridge.AbstractExternal.this.isValid(name); + } + + @Override + public IpDatabase getDatabase(ProjectId projectId, String name) { + IpDatabaseBridge bridge = IpDatabaseProviderBridge.AbstractExternal.this.getDatabase(name); + return Objects.isNull(bridge) ? null : bridge.toInternal(); + } + } + } + + /** + * An implementation of {@link IpDatabaseProviderBridge} that proxies to an internal {@link IpDatabaseProvider} + */ + class ProxyInternal extends StableBridgeAPI.ProxyInternal implements IpDatabaseProviderBridge { + public ProxyInternal(final IpDatabaseProvider delegate) { + super(delegate); + } + + @Override + public Boolean isValid(String name) { + return toInternal().isValid(ProjectId.DEFAULT, name); + } + + @Override + public IpDatabaseBridge getDatabase(String name) { + IpDatabase ipDatabase = toInternal().getDatabase(ProjectId.DEFAULT, name); + return new IpDatabaseBridge.ProxyInternal(ipDatabase); + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/MaxMindDbBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/MaxMindDbBridge.java new file mode 100644 index 0000000000000..f287a44738bee --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/MaxMindDbBridge.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.logstashbridge.geoip; + +import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +import java.io.File; +import java.io.IOException; + +public interface MaxMindDbBridge { + + class Reader extends StableBridgeAPI.ProxyInternal { + + @SuppressForbidden(reason = "Maxmind Reader constructor requires java.io.File") + public Reader(final File databasePath, final NodeCache nodeCache) throws IOException { + super(new com.maxmind.db.Reader(databasePath, nodeCache.toInternal())); + } + + protected Reader(final com.maxmind.db.Reader internalDelegate) { + super(internalDelegate); + } + + @Override + public com.maxmind.db.Reader toInternal() { + return internalDelegate; + } + + public String getDatabaseType() { + return toInternal().getMetadata().getDatabaseType(); + } + + public void close() throws IOException { + toInternal().close(); + } + } + + class NodeCache extends StableBridgeAPI.ProxyInternal { + + protected NodeCache(final com.maxmind.db.NodeCache internalDelegate) { + super(internalDelegate); + } + + public static NodeCache get(final int capacity) { + return new NodeCache(new com.maxmind.db.CHMCache(capacity)); + } + + public static NodeCache getInstance() { + return new NodeCache(com.maxmind.db.NoCache.getInstance()); + } + } + +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java index 9b57b50e3b199..7a706f1a8cae7 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java @@ -117,7 +117,7 @@ public void close() throws IOException { @FixForMultiProject // Logstash resolves and runs ingest pipelines based on the datastream. // How should ProjectIdResolverBridge behave in this case? - // In other words, it looks we need to find a way to figure out which ingest pipeline belongs to which project. + // In other words, it looks we need to find a way to figure out which ingest pipeline belongs to which project. static class ProjectIdResolverBridge implements ProjectResolver { public static final ProjectIdResolverBridge INSTANCE = new ProjectIdResolverBridge(); diff --git a/modules/ingest-geoip/src/main/java/module-info.java b/modules/ingest-geoip/src/main/java/module-info.java index 0703d9fb449aa..1e903444fc9e9 100644 --- a/modules/ingest-geoip/src/main/java/module-info.java +++ b/modules/ingest-geoip/src/main/java/module-info.java @@ -19,5 +19,5 @@ exports org.elasticsearch.ingest.geoip.direct to org.elasticsearch.server; exports org.elasticsearch.ingest.geoip.stats to org.elasticsearch.server; - exports org.elasticsearch.ingest.geoip to com.maxmind.db; + exports org.elasticsearch.ingest.geoip to com.maxmind.db, org.elasticsearch.logstashbridge; } From 3a48edf7755f48564477723a6b8ada19d1a6391f Mon Sep 17 00:00:00 2001 From: Mashhur Date: Fri, 8 Aug 2025 12:17:55 -0700 Subject: [PATCH 5/7] Add a note to maxmind depenedency of the ingest-geoip module that what upgrading the dependency, apply same to the logstash-bridge. --- modules/ingest-geoip/build.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/ingest-geoip/build.gradle b/modules/ingest-geoip/build.gradle index 34efcb290885b..f494d1ea65500 100644 --- a/modules/ingest-geoip/build.gradle +++ b/modules/ingest-geoip/build.gradle @@ -34,6 +34,8 @@ dependencies { runtimeOnly("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}") runtimeOnly("com.fasterxml.jackson.core:jackson-databind:${versions.jackson}") runtimeOnly("com.fasterxml.jackson.core:jackson-core:${versions.jackson}") + // when you upgrade maxmind dependency, please also do so in libs/logstash-bridge/build.gradle + // elastic_integration plugin embeds this module but references through logstash-bridge implementation('com.maxmind.db:maxmind-db:3.1.1') testImplementation 'org.elasticsearch:geolite2-databases:20191119' From 67a81ed3ab75c356835089d1fd91bbb5607002de Mon Sep 17 00:00:00 2001 From: Mashhur Date: Fri, 8 Aug 2025 16:14:22 -0700 Subject: [PATCH 6/7] A safeguard in case if bridge becomes null, might happen while cyclic pipeline execution. --- .../logstashbridge/ingest/ProcessorBridge.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java index 2d7d15b90908f..ce7473f4db460 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java @@ -20,6 +20,7 @@ import org.elasticsearch.logstashbridge.threadpool.ThreadPoolBridge; import java.util.Map; +import java.util.Objects; import java.util.function.BiConsumer; /** @@ -81,7 +82,10 @@ public String getDescription() { public void execute(IngestDocument ingestDocument, BiConsumer handler) { AbstractExternal.this.execute( IngestDocumentBridge.fromInternalNullable(ingestDocument), - (idb, e) -> handler.accept(idb.toInternal(), e) + (ingestDocumentBridge, e) -> handler.accept( + Objects.isNull(ingestDocumentBridge) ? null : ingestDocumentBridge.toInternal(), + e + ) ); } From 7eb593cc9cf4a0a4701c803b8188f070d31aa4b7 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Mon, 11 Aug 2025 12:34:39 -0700 Subject: [PATCH 7/7] Introduce bridges for FailProcessorException, Releasable and RefCountingRunnable ES core components. --- .../core/FailProcessorExceptionBridge.java | 23 ++++++++++++ .../core/RefCountingRunnableBridge.java | 37 +++++++++++++++++++ .../logstashbridge/core/ReleasableBridge.java | 30 +++++++++++++++ 3 files changed, 90 insertions(+) create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/FailProcessorExceptionBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/RefCountingRunnableBridge.java create mode 100644 libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/ReleasableBridge.java diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/FailProcessorExceptionBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/FailProcessorExceptionBridge.java new file mode 100644 index 0000000000000..ba4e73a9c2250 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/FailProcessorExceptionBridge.java @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.core; + +import org.elasticsearch.ingest.common.FailProcessorException; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +public class FailProcessorExceptionBridge extends StableBridgeAPI.ProxyInternal { + protected FailProcessorExceptionBridge(FailProcessorException internalDelegate) { + super(internalDelegate); + } + + public static boolean isInstanceOf(Throwable exception) { + return exception instanceof FailProcessorException; + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/RefCountingRunnableBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/RefCountingRunnableBridge.java new file mode 100644 index 0000000000000..305407e18c435 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/RefCountingRunnableBridge.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.core; + +import org.elasticsearch.action.support.RefCountingRunnable; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +public class RefCountingRunnableBridge extends StableBridgeAPI.ProxyInternal { + + private RefCountingRunnableBridge(final RefCountingRunnable delegate) { + super(delegate); + } + + public RefCountingRunnableBridge(final Runnable delegate) { + super(new RefCountingRunnable(delegate)); + } + + public void close() { + toInternal().close(); + } + + public ReleasableBridge acquire() { + return new ReleasableBridge.ProxyInternal(toInternal().acquire()); + } + + @Override + public RefCountingRunnable toInternal() { + return this.internalDelegate; + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/ReleasableBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/ReleasableBridge.java new file mode 100644 index 0000000000000..eefe8c4dd08ae --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/ReleasableBridge.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.core; + +import org.elasticsearch.core.Releasable; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +public interface ReleasableBridge extends StableBridgeAPI { + + void close(); + + class ProxyInternal extends StableBridgeAPI.ProxyInternal implements ReleasableBridge { + + public ProxyInternal(final Releasable delegate) { + super(delegate); + } + + @Override + public void close() { + toInternal().close(); + } + } +}