diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/ProjectIdBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/ProjectIdBridge.java new file mode 100644 index 0000000000000..551924b71c343 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/ProjectIdBridge.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.common; + +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +public interface ProjectIdBridge extends StableBridgeAPI { + String id(); + + static ProjectIdBridge fromInternal(final ProjectId projectId) { + return new ProxyInternal(projectId); + } + + static ProjectIdBridge fromId(final String id) { + final ProjectId internal = ProjectId.fromId(id); + return new ProxyInternal(internal); + } + + static ProjectIdBridge getDefault() { + return ProxyInternal.DEFAULT; + } + + class ProxyInternal implements ProjectIdBridge { + private final ProjectId internalDelegate; + + static final ProjectIdBridge.ProxyInternal DEFAULT = new ProjectIdBridge.ProxyInternal(ProjectId.DEFAULT); + + public ProxyInternal(ProjectId internalDelegate) { + this.internalDelegate = internalDelegate; + } + + @Override + public String id() { + return toInternal().id(); + } + + @Override + public ProjectId toInternal() { + return this.internalDelegate; + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/FailProcessorExceptionBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/CheckedBiFunctionBridge.java similarity index 58% rename from libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/FailProcessorExceptionBridge.java rename to libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/CheckedBiFunctionBridge.java index ba4e73a9c2250..5b0c344a3ffbc 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/FailProcessorExceptionBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/CheckedBiFunctionBridge.java @@ -9,15 +9,14 @@ package org.elasticsearch.logstashbridge.core; -import org.elasticsearch.ingest.common.FailProcessorException; +import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.logstashbridge.StableBridgeAPI; -public class FailProcessorExceptionBridge extends StableBridgeAPI.ProxyInternal { - protected FailProcessorExceptionBridge(FailProcessorException internalDelegate) { - super(internalDelegate); - } +public interface CheckedBiFunctionBridge extends StableBridgeAPI> { + R apply(T t, U u) throws E; - public static boolean isInstanceOf(Throwable exception) { - return exception instanceof FailProcessorException; + @Override + default CheckedBiFunction toInternal() { + return this::apply; } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorBridge.java index 2fc197ebe6ca9..37a65da1953d0 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorBridge.java @@ -9,18 +9,25 @@ package org.elasticsearch.logstashbridge.geoip; import org.elasticsearch.ingest.geoip.GeoIpProcessor; -import org.elasticsearch.ingest.geoip.IpDatabaseProvider; import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; /** * An external bridge for {@link GeoIpProcessor} */ -public interface GeoIpProcessorBridge { +public interface GeoIpProcessorBridge extends StableBridgeAPI { - class Factory extends StableBridgeAPI.ProxyInternal { + static Factory newFactory(final IpDatabaseProviderBridge ipDatabaseProviderBridge) { + return new Factory(ipDatabaseProviderBridge); + } + + /** + * A {@link ProcessorBridge.Factory} implementation for the {@link GeoIpProcessor} + */ + class Factory extends ProcessorBridge.Factory.ProxyInternal { - public Factory(String type, IpDatabaseProvider ipDatabaseProvider) { - super(new GeoIpProcessor.Factory(type, ipDatabaseProvider)); + Factory(final IpDatabaseProviderBridge ipDatabaseProviderBridge) { + super(new GeoIpProcessor.Factory("geoip", ipDatabaseProviderBridge.toInternal())); } } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseBridge.java index 5d058fabec39b..075a3408f63c4 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseBridge.java @@ -10,11 +10,11 @@ import com.maxmind.db.Reader; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.core.Nullable; import org.elasticsearch.ingest.geoip.IpDatabase; import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.logstashbridge.core.CheckedBiFunctionBridge; import java.io.IOException; @@ -25,31 +25,16 @@ public interface IpDatabaseBridge extends StableBridgeAPI { String getDatabaseType() throws IOException; - MaxMindDbBridge.Reader getDatabaseReader() throws IOException; - @Nullable - default RESPONSE getResponse(String ipAddress, CheckedBiFunction responseProvider) { - try { - return responseProvider.apply(this.getDatabaseReader().toInternal(), ipAddress); - } catch (Exception e) { - throw ExceptionsHelper.convertToRuntime(e); - } - } + RESPONSE getResponse(String ipAddress, CheckedBiFunctionBridge responseProvider); void close() throws IOException; - static IpDatabaseBridge fromInternal(final IpDatabase internalDatabase) { - if (internalDatabase instanceof AbstractExternal.ProxyExternal externalProxy) { - return externalProxy.getIpDatabaseBridge(); - } - return new ProxyInternal(internalDatabase); - } - /** * The {@code IpDatabaseBridge.AbstractExternal} is an abstract base class for implementing * the {@link IpDatabaseBridge} externally to the Elasticsearch code-base. It takes care of * the details of maintaining a singular internal-form implementation of {@link IpDatabase} - * that proxies calls through the external implementation. + * that proxies calls to the external implementation. */ abstract class AbstractExternal implements IpDatabaseBridge { private ProxyExternal internalDatabase; @@ -64,10 +49,6 @@ public IpDatabase toInternal() { private class ProxyExternal implements IpDatabase { - private AbstractExternal getIpDatabaseBridge() { - return AbstractExternal.this; - } - @Override public String getDatabaseType() throws IOException { return AbstractExternal.this.getDatabaseType(); @@ -78,7 +59,7 @@ public RESPONSE getResponse( String ipAddress, CheckedBiFunction responseProvider ) { - return AbstractExternal.this.getResponse(ipAddress, responseProvider); + return AbstractExternal.this.getResponse(ipAddress, responseProvider::apply); } @Override @@ -87,34 +68,4 @@ public void close() throws IOException { } } } - - /** - * An implementation of {@link IpDatabaseBridge} that proxies to an internal {@link IpDatabase} - */ - class ProxyInternal extends StableBridgeAPI.ProxyInternal implements IpDatabaseBridge { - - public ProxyInternal(final IpDatabase delegate) { - super(delegate); - } - - @Override - public String getDatabaseType() throws IOException { - return toInternal().getDatabaseType(); - } - - @Override - public MaxMindDbBridge.Reader getDatabaseReader() throws IOException { - return null; - } - - @Override - public RESPONSE getResponse(String ipAddress, CheckedBiFunction responseProvider) { - return toInternal().getResponse(ipAddress, responseProvider); - } - - @Override - public void close() throws IOException { - toInternal().close(); - } - } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseProviderBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseProviderBridge.java index 7172436ba1a16..9d5ccc5d514b0 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseProviderBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseProviderBridge.java @@ -14,7 +14,7 @@ import org.elasticsearch.ingest.geoip.IpDatabaseProvider; import org.elasticsearch.logstashbridge.StableBridgeAPI; -import java.util.Objects; +import static org.elasticsearch.logstashbridge.StableBridgeAPI.toInternalNullable; /** * An external bridge for {@link Processor} @@ -25,18 +25,11 @@ public interface IpDatabaseProviderBridge extends StableBridgeAPI implements IpDatabaseProviderBridge { - public ProxyInternal(final IpDatabaseProvider delegate) { - super(delegate); - } - - @Override - public Boolean isValid(String name) { - return toInternal().isValid(ProjectId.DEFAULT, name); - } - - @Override - public IpDatabaseBridge getDatabase(String name) { - IpDatabase ipDatabase = toInternal().getDatabase(ProjectId.DEFAULT, name); - return new IpDatabaseBridge.ProxyInternal(ipDatabase); - } - } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/MaxMindDbBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/MaxMindDbBridge.java deleted file mode 100644 index f287a44738bee..0000000000000 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/MaxMindDbBridge.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ -package org.elasticsearch.logstashbridge.geoip; - -import org.elasticsearch.core.SuppressForbidden; -import org.elasticsearch.logstashbridge.StableBridgeAPI; - -import java.io.File; -import java.io.IOException; - -public interface MaxMindDbBridge { - - class Reader extends StableBridgeAPI.ProxyInternal { - - @SuppressForbidden(reason = "Maxmind Reader constructor requires java.io.File") - public Reader(final File databasePath, final NodeCache nodeCache) throws IOException { - super(new com.maxmind.db.Reader(databasePath, nodeCache.toInternal())); - } - - protected Reader(final com.maxmind.db.Reader internalDelegate) { - super(internalDelegate); - } - - @Override - public com.maxmind.db.Reader toInternal() { - return internalDelegate; - } - - public String getDatabaseType() { - return toInternal().getMetadata().getDatabaseType(); - } - - public void close() throws IOException { - toInternal().close(); - } - } - - class NodeCache extends StableBridgeAPI.ProxyInternal { - - protected NodeCache(final com.maxmind.db.NodeCache internalDelegate) { - super(internalDelegate); - } - - public static NodeCache get(final int capacity) { - return new NodeCache(new com.maxmind.db.CHMCache(capacity)); - } - - public static NodeCache getInstance() { - return new NodeCache(com.maxmind.db.NoCache.getInstance()); - } - } - -} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineBridge.java index 19ee577ef0874..d1ebb7a097e3e 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineBridge.java @@ -8,7 +8,7 @@ */ package org.elasticsearch.logstashbridge.ingest; -import org.elasticsearch.core.FixForMultiProject; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.logstashbridge.StableBridgeAPI; import org.elasticsearch.logstashbridge.script.ScriptServiceBridge; @@ -24,7 +24,6 @@ public static PipelineBridge fromInternal(final Pipeline pipeline) { return new PipelineBridge(pipeline); } - @FixForMultiProject(description = "should we pass a non-null project ID here?") public static PipelineBridge create( String id, Map config, @@ -37,7 +36,7 @@ public static PipelineBridge create( config, StableBridgeAPI.toInternal(processorFactories), StableBridgeAPI.toInternalNullable(scriptServiceBridge), - null + ProjectId.DEFAULT ) ); } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java index ce7473f4db460..8498501f20b9e 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java @@ -15,12 +15,12 @@ import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Processor; import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.logstashbridge.common.ProjectIdBridge; import org.elasticsearch.logstashbridge.env.EnvironmentBridge; import org.elasticsearch.logstashbridge.script.ScriptServiceBridge; import org.elasticsearch.logstashbridge.threadpool.ThreadPoolBridge; import java.util.Map; -import java.util.Objects; import java.util.function.BiConsumer; /** @@ -36,7 +36,18 @@ public interface ProcessorBridge extends StableBridgeAPI { boolean isAsync(); - void execute(IngestDocumentBridge ingestDocumentBridge, BiConsumer handler); + default void execute(IngestDocumentBridge ingestDocumentBridge, BiConsumer handler) { + toInternal().execute( + StableBridgeAPI.toInternalNullable(ingestDocumentBridge), + (id, exception) -> handler.accept(IngestDocumentBridge.fromInternalNullable(id), exception) + ); + } + + default IngestDocumentBridge execute(IngestDocumentBridge ingestDocumentBridge) throws Exception { + IngestDocument internalSourceIngestDocument = ingestDocumentBridge.toInternal(); + IngestDocument internalResultIngestDocument = toInternal().execute(internalSourceIngestDocument); + return IngestDocumentBridge.fromInternalNullable(internalResultIngestDocument); + } static ProcessorBridge fromInternal(final Processor internalProcessor) { if (internalProcessor instanceof AbstractExternal.ProxyExternal externalProxy) { @@ -82,10 +93,7 @@ public String getDescription() { public void execute(IngestDocument ingestDocument, BiConsumer handler) { AbstractExternal.this.execute( IngestDocumentBridge.fromInternalNullable(ingestDocument), - (ingestDocumentBridge, e) -> handler.accept( - Objects.isNull(ingestDocumentBridge) ? null : ingestDocumentBridge.toInternal(), - e - ) + (ingestDocumentBridge, exception) -> handler.accept(StableBridgeAPI.toInternalNullable(ingestDocumentBridge), exception) ); } @@ -127,14 +135,6 @@ public String getDescription() { public boolean isAsync() { return toInternal().isAsync(); } - - @Override - public void execute(final IngestDocumentBridge ingestDocumentBridge, final BiConsumer handler) { - internalDelegate.execute( - StableBridgeAPI.toInternalNullable(ingestDocumentBridge), - (id, e) -> handler.accept(IngestDocumentBridge.fromInternalNullable(id), e) - ); - } } /** @@ -178,33 +178,37 @@ public Processor.Parameters toInternal() { * An external bridge for {@link Processor.Factory} */ interface Factory extends StableBridgeAPI { - ProcessorBridge create( + + @Deprecated // supply ProjectIdBridge + default ProcessorBridge create( Map registry, String processorTag, String description, Map config + ) throws Exception { + return this.create(registry, processorTag, description, config, ProjectIdBridge.getDefault()); + } + + ProcessorBridge create( + Map registry, + String processorTag, + String description, + Map config, + ProjectIdBridge projectId ) throws Exception; static Factory fromInternal(final Processor.Factory delegate) { + if (delegate instanceof AbstractExternal.InternalProxy internalProxy) { + return internalProxy.toExternal(); + } return new ProxyInternal(delegate); } - @Override - default Processor.Factory toInternal() { - final Factory stableAPIFactory = this; - return (registry, tag, description, config, projectId) -> stableAPIFactory.create( - StableBridgeAPI.fromInternal(registry, Factory::fromInternal), - tag, - description, - config - ).toInternal(); - } - /** * An implementation of {@link ProcessorBridge.Factory} that proxies to an internal {@link Processor.Factory} */ class ProxyInternal extends StableBridgeAPI.ProxyInternal implements Factory { - private ProxyInternal(final Processor.Factory delegate) { + protected ProxyInternal(final Processor.Factory delegate) { super(delegate); } @@ -214,11 +218,14 @@ public ProcessorBridge create( final Map registry, final String processorTag, final String description, - final Map config + final Map config, + final ProjectIdBridge bridgedProjectId ) throws Exception { - return ProcessorBridge.fromInternal( - this.internalDelegate.create(StableBridgeAPI.toInternal(registry), processorTag, description, config, ProjectId.DEFAULT) - ); + final Map internalRegistry = StableBridgeAPI.toInternal(registry); + final Processor.Factory internalFactory = toInternal(); + final ProjectId projectId = bridgedProjectId.toInternal(); + final Processor internalProcessor = internalFactory.create(internalRegistry, processorTag, description, config, projectId); + return ProcessorBridge.fromInternal(internalProcessor); } @Override @@ -226,6 +233,53 @@ public Processor.Factory toInternal() { return this.internalDelegate; } } + + /** + * The {@code ProcessorBridge.Factory.AbstractExternal} is an abstract base class for implementing + * the {@link ProcessorBridge.Factory} externally to the Elasticsearch code-base. It takes care of + * the details of maintaining a singular internal-form implementation of {@link Processor.Factory} + * that proxies calls to the external implementation. + */ + abstract class AbstractExternal implements Factory { + InternalProxy internalDelegate; + + @Override + public Processor.Factory toInternal() { + if (this.internalDelegate == null) { + internalDelegate = new InternalProxy(); + } + return this.internalDelegate; + } + + private class InternalProxy implements Processor.Factory { + @Override + public Processor create( + final Map processorFactories, + final String tag, + final String description, + final Map config, + final ProjectId projectId + ) throws Exception { + final Map bridgedProcessorFactories = StableBridgeAPI.fromInternal( + processorFactories, + ProcessorBridge.Factory::fromInternal + ); + final ProjectIdBridge bridgedProjectId = ProjectIdBridge.fromInternal(projectId); + final ProcessorBridge bridgedProcessor = AbstractExternal.this.create( + bridgedProcessorFactories, + tag, + description, + config, + bridgedProjectId + ); + return bridgedProcessor.toInternal(); + } + + ProcessorBridge.Factory toExternal() { + return AbstractExternal.this; + } + } + } } }