diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/StableBridgeAPI.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/StableBridgeAPI.java index 261b4fec043b9..51c483a81e538 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/StableBridgeAPI.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/StableBridgeAPI.java @@ -50,6 +50,11 @@ static > B fromInternal(final T delegate, final * An {@code ProxyInternal} is an implementation of {@code StableBridgeAPI} that * proxies calls to a delegate that is an actual {@code INTERNAL}. * + *

+ * implementations are intended to be opaque to consumers of this library, + * and should NOT have public constructors. + *

+ * * @param */ abstract class ProxyInternal implements StableBridgeAPI { diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/ProjectIdBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/ProjectIdBridge.java new file mode 100644 index 0000000000000..a585733304cdd --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/ProjectIdBridge.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.common; + +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +/** + * A {@link StableBridgeAPI} for {@link ProjectId} + */ +public interface ProjectIdBridge extends StableBridgeAPI { + String id(); + + static ProjectIdBridge fromInternal(final ProjectId projectId) { + return new ProxyInternal(projectId); + } + + static ProjectIdBridge fromId(final String id) { + final ProjectId internal = ProjectId.fromId(id); + return new ProxyInternal(internal); + } + + static ProjectIdBridge getDefault() { + return ProxyInternal.DEFAULT; + } + + /** + * An implementation of {@link ProjectIdBridge} that proxies calls to + * an internal {@link ProjectId} instance. + * + * @see StableBridgeAPI.ProxyInternal + */ + final class ProxyInternal extends StableBridgeAPI.ProxyInternal implements ProjectIdBridge { + private static final ProjectIdBridge.ProxyInternal DEFAULT = new ProjectIdBridge.ProxyInternal(ProjectId.DEFAULT); + + ProxyInternal(ProjectId internalDelegate) { + super(internalDelegate); + } + + @Override + public String id() { + return this.internalDelegate.id(); + } + + @Override + public ProjectId toInternal() { + return this.internalDelegate; + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/SettingsBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/SettingsBridge.java index 4139191995dc8..6ed71765b868e 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/SettingsBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/SettingsBridge.java @@ -12,46 +12,27 @@ import org.elasticsearch.logstashbridge.StableBridgeAPI; /** - * An external bridge for {@link Settings} + * A {@link StableBridgeAPI} for {@link Settings} */ -public class SettingsBridge extends StableBridgeAPI.ProxyInternal { +public interface SettingsBridge extends StableBridgeAPI { - public static SettingsBridge fromInternal(final Settings delegate) { - return new SettingsBridge(delegate); + static SettingsBridge fromInternal(final Settings delegate) { + return new ProxyInternal(delegate); } - public static Builder builder() { - return Builder.fromInternal(Settings.builder()); - } - - public SettingsBridge(final Settings delegate) { - super(delegate); - } - - @Override - public Settings toInternal() { - return this.internalDelegate; + static SettingsBuilderBridge builder() { + return SettingsBuilderBridge.fromInternal(Settings.builder()); } /** - * An external bridge for {@link Settings.Builder} that proxies calls to a real {@link Settings.Builder} + * An implementation of {@link SettingsBridge} that proxies calls to + * an internal {@link Settings} instance. + * + * @see StableBridgeAPI.ProxyInternal */ - public static class Builder extends StableBridgeAPI.ProxyInternal { - static Builder fromInternal(final Settings.Builder delegate) { - return new Builder(delegate); - } - - private Builder(final Settings.Builder delegate) { - super(delegate); - } - - public Builder put(final String key, final String value) { - this.internalDelegate.put(key, value); - return this; - } - - public SettingsBridge build() { - return new SettingsBridge(this.internalDelegate.build()); + final class ProxyInternal extends StableBridgeAPI.ProxyInternal implements SettingsBridge { + ProxyInternal(Settings internalDelegate) { + super(internalDelegate); } } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/SettingsBuilderBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/SettingsBuilderBridge.java new file mode 100644 index 0000000000000..677a19bcdd88d --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/common/SettingsBuilderBridge.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.common; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +/** + * A {@link StableBridgeAPI} for {@link Settings.Builder}. + */ +public interface SettingsBuilderBridge extends StableBridgeAPI { + + SettingsBuilderBridge put(String key, String value); + + SettingsBridge build(); + + static SettingsBuilderBridge fromInternal(final Settings.Builder builder) { + return new ProxyInternal(builder); + } + + /** + * An implementation of {@link SettingsBuilderBridge} that proxies calls to + * an internal {@link Settings.Builder} instance. + * + * @see StableBridgeAPI.ProxyInternal + */ + final class ProxyInternal extends StableBridgeAPI.ProxyInternal implements SettingsBuilderBridge { + ProxyInternal(Settings.Builder internalDelegate) { + super(internalDelegate); + } + + @Override + public SettingsBuilderBridge put(String key, String value) { + internalDelegate.put(key, value); + return this; + } + + @Override + public SettingsBridge build() { + final Settings delegate = internalDelegate.build(); + return SettingsBridge.fromInternal(delegate); + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/FailProcessorExceptionBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/CheckedBiFunctionBridge.java similarity index 51% rename from libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/FailProcessorExceptionBridge.java rename to libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/CheckedBiFunctionBridge.java index ba4e73a9c2250..7feaa06f57b5d 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/FailProcessorExceptionBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/CheckedBiFunctionBridge.java @@ -9,15 +9,22 @@ package org.elasticsearch.logstashbridge.core; -import org.elasticsearch.ingest.common.FailProcessorException; +import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.logstashbridge.StableBridgeAPI; -public class FailProcessorExceptionBridge extends StableBridgeAPI.ProxyInternal { - protected FailProcessorExceptionBridge(FailProcessorException internalDelegate) { - super(internalDelegate); - } +/** + * A stable interface on top of {@link CheckedBiFunction}. + * @param type of lhs parameter + * @param type of rhs parameter + * @param type of return value + * @param type of anticipated exception + */ +@FunctionalInterface +public interface CheckedBiFunctionBridge extends StableBridgeAPI> { + R apply(T t, U u) throws E; - public static boolean isInstanceOf(Throwable exception) { - return exception instanceof FailProcessorException; + @Override + default CheckedBiFunction toInternal() { + return this::apply; } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/IOUtilsBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/IOUtilsBridge.java index e73331c62c71a..f37170346a306 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/IOUtilsBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/IOUtilsBridge.java @@ -16,6 +16,8 @@ * An external bridge for {@link IOUtils} */ public class IOUtilsBridge { + private IOUtilsBridge() {} + public static void closeWhileHandlingException(final Iterable objects) { IOUtils.closeWhileHandlingException(objects); } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/RefCountingRunnableBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/RefCountingRunnableBridge.java index 305407e18c435..c94a893c48e8d 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/RefCountingRunnableBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/RefCountingRunnableBridge.java @@ -10,28 +10,52 @@ package org.elasticsearch.logstashbridge.core; import org.elasticsearch.action.support.RefCountingRunnable; +import org.elasticsearch.core.Releasable; import org.elasticsearch.logstashbridge.StableBridgeAPI; -public class RefCountingRunnableBridge extends StableBridgeAPI.ProxyInternal { +import java.io.Closeable; - private RefCountingRunnableBridge(final RefCountingRunnable delegate) { - super(delegate); - } - - public RefCountingRunnableBridge(final Runnable delegate) { - super(new RefCountingRunnable(delegate)); - } - - public void close() { - toInternal().close(); - } - - public ReleasableBridge acquire() { - return new ReleasableBridge.ProxyInternal(toInternal().acquire()); +/** + * A {@link StableBridgeAPI} for {@link RefCountingRunnable} + */ +public interface RefCountingRunnableBridge extends StableBridgeAPI, Closeable { + + @Override // only RuntimeException + void close(); + + ReleasableBridge acquire(); + + /** + * An API-stable factory method for {@link RefCountingRunnableBridge} + * @param delegate the {@link Runnable} to execute when all refs are closed + * @return a {@link RefCountingRunnableBridge} that will execute the provided + * block when all refs are closed + */ + static RefCountingRunnableBridge create(final Runnable delegate) { + final RefCountingRunnable refCountingRunnable = new RefCountingRunnable(delegate); + return new ProxyInternal(refCountingRunnable); } - @Override - public RefCountingRunnable toInternal() { - return this.internalDelegate; + /** + * An implementation of {@link RefCountingRunnableBridge} that proxies calls through + * to an internal {@link RefCountingRunnable}. + * @see StableBridgeAPI.ProxyInternal + */ + final class ProxyInternal extends StableBridgeAPI.ProxyInternal implements RefCountingRunnableBridge { + private ProxyInternal(final RefCountingRunnable delegate) { + super(delegate); + } + + @Override + public void close() { + toInternal().close(); + } + + @Override + public ReleasableBridge acquire() { + @SuppressWarnings("resource") + final Releasable releasable = toInternal().acquire(); + return ReleasableBridge.fromInternal(releasable); + } } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/ReleasableBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/ReleasableBridge.java index eefe8c4dd08ae..34161ffbf501f 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/ReleasableBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/ReleasableBridge.java @@ -12,13 +12,28 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.logstashbridge.StableBridgeAPI; -public interface ReleasableBridge extends StableBridgeAPI { +import java.io.Closeable; +/** + * A {@link StableBridgeAPI} for {@link Releasable} for use with {@link RefCountingRunnableBridge} + */ +public interface ReleasableBridge extends StableBridgeAPI, Closeable { + + @Override // only RuntimeException void close(); - class ProxyInternal extends StableBridgeAPI.ProxyInternal implements ReleasableBridge { + static ReleasableBridge fromInternal(Releasable releasable) { + return new ProxyInternal(releasable); + } + + /** + * An implementation of {@link ReleasableBridge} that proxies calls through + * to an internal {@link Releasable}. + * @see StableBridgeAPI.ProxyInternal + */ + final class ProxyInternal extends StableBridgeAPI.ProxyInternal implements ReleasableBridge { - public ProxyInternal(final Releasable delegate) { + private ProxyInternal(final Releasable delegate) { super(delegate); } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/env/EnvironmentBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/env/EnvironmentBridge.java index 086d2ae5a3dc4..48e6bbfc4b6b9 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/env/EnvironmentBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/env/EnvironmentBridge.java @@ -15,23 +15,30 @@ import java.nio.file.Path; /** - * An external bridge for {@link Environment} + * A {@link StableBridgeAPI} for {@link Environment} */ -public class EnvironmentBridge extends StableBridgeAPI.ProxyInternal { - public static EnvironmentBridge fromInternal(final Environment delegate) { - return new EnvironmentBridge(delegate); +public interface EnvironmentBridge extends StableBridgeAPI { + static EnvironmentBridge fromInternal(final Environment delegate) { + return new EnvironmentBridge.ProxyInternal(delegate); } - public EnvironmentBridge(final SettingsBridge settingsBridge, final Path configPath) { - this(new Environment(settingsBridge.toInternal(), configPath)); + static EnvironmentBridge create(final SettingsBridge bridgedSettings, final Path configPath) { + return fromInternal(new Environment(bridgedSettings.toInternal(), configPath)); } - private EnvironmentBridge(final Environment delegate) { - super(delegate); - } + /** + * An implementation of {@link EnvironmentBridge} that proxies calls through + * to an internal {@link Environment}. + * @see StableBridgeAPI.ProxyInternal + */ + final class ProxyInternal extends StableBridgeAPI.ProxyInternal implements EnvironmentBridge { + private ProxyInternal(final Environment delegate) { + super(delegate); + } - @Override - public Environment toInternal() { - return this.internalDelegate; + @Override + public Environment toInternal() { + return this.internalDelegate; + } } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/AbstractExternalIpDatabaseBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/AbstractExternalIpDatabaseBridge.java new file mode 100644 index 0000000000000..a202c56b64772 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/AbstractExternalIpDatabaseBridge.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.geoip; + +import com.maxmind.db.Reader; + +import org.elasticsearch.common.CheckedBiFunction; +import org.elasticsearch.ingest.geoip.IpDatabase; + +import java.io.IOException; + +/** + * The {@code IpDatabaseBridge.AbstractExternal} is an abstract base class for implementing + * the {@link IpDatabaseBridge} externally to the Elasticsearch code-base. It takes care of + * the details of maintaining a singular internal-form implementation of {@link IpDatabase} + * that proxies calls to the external implementation. + */ +public abstract class AbstractExternalIpDatabaseBridge implements IpDatabaseBridge { + private ProxyExternal internalDatabase; + + @Override + public IpDatabase toInternal() { + if (internalDatabase == null) { + internalDatabase = new ProxyExternal(); + } + return internalDatabase; + } + + /** + * An implementation of {@link IpDatabase} that proxies calls to + * a bridged {@link AbstractExternalIpDatabaseBridge} instance. + */ + private final class ProxyExternal implements IpDatabase { + + @Override + public String getDatabaseType() throws IOException { + return AbstractExternalIpDatabaseBridge.this.getDatabaseType(); + } + + @Override + public RESPONSE getResponse(String ipAddress, CheckedBiFunction responseProvider) { + return AbstractExternalIpDatabaseBridge.this.getResponse(ipAddress, responseProvider::apply); + } + + @Override + public void close() throws IOException { + AbstractExternalIpDatabaseBridge.this.close(); + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/AbstractExternalIpDatabaseProviderBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/AbstractExternalIpDatabaseProviderBridge.java new file mode 100644 index 0000000000000..e215d6a755586 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/AbstractExternalIpDatabaseProviderBridge.java @@ -0,0 +1,54 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.geoip; + +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.ingest.geoip.IpDatabase; +import org.elasticsearch.ingest.geoip.IpDatabaseProvider; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +/** + * The {@link AbstractExternalIpDatabaseProviderBridge} is an abstract base class for implementing + * the {@link IpDatabaseProviderBridge} externally to the Elasticsearch code-base. It takes care of + * the details of maintaining a singular internal-form implementation of {@link IpDatabaseProvider} + * that proxies calls to the external implementation. + */ +public abstract class AbstractExternalIpDatabaseProviderBridge implements IpDatabaseProviderBridge { + private ProxyExternal internalProcessor; + + @Override + public IpDatabaseProvider toInternal() { + if (internalProcessor == null) { + internalProcessor = new ProxyExternal(); + } + return internalProcessor; + } + + /** + * An implementation of {@link IpDatabaseProvider} that proxies calls to + * a bridged {@link AbstractExternalIpDatabaseProviderBridge} instance. + */ + private final class ProxyExternal implements IpDatabaseProvider { + + private AbstractExternalIpDatabaseProviderBridge getIpDatabaseProviderBridge() { + return AbstractExternalIpDatabaseProviderBridge.this; + } + + @Override + public Boolean isValid(ProjectId projectId, String name) { + return AbstractExternalIpDatabaseProviderBridge.this.isValid(name); + } + + @Override + public IpDatabase getDatabase(ProjectId projectId, String name) { + return StableBridgeAPI.toInternalNullable(AbstractExternalIpDatabaseProviderBridge.this.getDatabase(name)); + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorBridge.java deleted file mode 100644 index 2fc197ebe6ca9..0000000000000 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorBridge.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ -package org.elasticsearch.logstashbridge.geoip; - -import org.elasticsearch.ingest.geoip.GeoIpProcessor; -import org.elasticsearch.ingest.geoip.IpDatabaseProvider; -import org.elasticsearch.logstashbridge.StableBridgeAPI; - -/** - * An external bridge for {@link GeoIpProcessor} - */ -public interface GeoIpProcessorBridge { - - class Factory extends StableBridgeAPI.ProxyInternal { - - public Factory(String type, IpDatabaseProvider ipDatabaseProvider) { - super(new GeoIpProcessor.Factory(type, ipDatabaseProvider)); - } - } -} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorFactoryBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorFactoryBridge.java new file mode 100644 index 0000000000000..a026eddf1599a --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorFactoryBridge.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.geoip; + +import org.elasticsearch.ingest.geoip.GeoIpProcessor; +import org.elasticsearch.logstashbridge.ingest.ProcessorFactoryBridge; + +/** + * A {@link ProcessorFactoryBridge} implementation for the {@link GeoIpProcessor} + */ +public final class GeoIpProcessorFactoryBridge extends ProcessorFactoryBridge.ProxyInternal { + + public static GeoIpProcessorFactoryBridge create(final IpDatabaseProviderBridge bridgedIpDatabaseProvider) { + return new GeoIpProcessorFactoryBridge(bridgedIpDatabaseProvider); + } + + GeoIpProcessorFactoryBridge(final IpDatabaseProviderBridge ipDatabaseProviderBridge) { + super(new GeoIpProcessor.Factory("geoip", ipDatabaseProviderBridge.toInternal())); + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseBridge.java index 5d058fabec39b..bfd13d060cd1a 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseBridge.java @@ -10,111 +10,23 @@ import com.maxmind.db.Reader; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.core.Nullable; import org.elasticsearch.ingest.geoip.IpDatabase; import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.logstashbridge.core.CheckedBiFunctionBridge; import java.io.IOException; /** - * An external bridge for {@link IpDatabase} + * An {@link StableBridgeAPI} for {@link IpDatabase} */ public interface IpDatabaseBridge extends StableBridgeAPI { String getDatabaseType() throws IOException; - MaxMindDbBridge.Reader getDatabaseReader() throws IOException; - @Nullable - default RESPONSE getResponse(String ipAddress, CheckedBiFunction responseProvider) { - try { - return responseProvider.apply(this.getDatabaseReader().toInternal(), ipAddress); - } catch (Exception e) { - throw ExceptionsHelper.convertToRuntime(e); - } - } + RESPONSE getResponse(String ipAddress, CheckedBiFunctionBridge responseProvider); void close() throws IOException; - static IpDatabaseBridge fromInternal(final IpDatabase internalDatabase) { - if (internalDatabase instanceof AbstractExternal.ProxyExternal externalProxy) { - return externalProxy.getIpDatabaseBridge(); - } - return new ProxyInternal(internalDatabase); - } - - /** - * The {@code IpDatabaseBridge.AbstractExternal} is an abstract base class for implementing - * the {@link IpDatabaseBridge} externally to the Elasticsearch code-base. It takes care of - * the details of maintaining a singular internal-form implementation of {@link IpDatabase} - * that proxies calls through the external implementation. - */ - abstract class AbstractExternal implements IpDatabaseBridge { - private ProxyExternal internalDatabase; - - @Override - public IpDatabase toInternal() { - if (internalDatabase == null) { - internalDatabase = new ProxyExternal(); - } - return internalDatabase; - } - - private class ProxyExternal implements IpDatabase { - - private AbstractExternal getIpDatabaseBridge() { - return AbstractExternal.this; - } - - @Override - public String getDatabaseType() throws IOException { - return AbstractExternal.this.getDatabaseType(); - } - - @Override - public RESPONSE getResponse( - String ipAddress, - CheckedBiFunction responseProvider - ) { - return AbstractExternal.this.getResponse(ipAddress, responseProvider); - } - - @Override - public void close() throws IOException { - AbstractExternal.this.close(); - } - } - } - - /** - * An implementation of {@link IpDatabaseBridge} that proxies to an internal {@link IpDatabase} - */ - class ProxyInternal extends StableBridgeAPI.ProxyInternal implements IpDatabaseBridge { - - public ProxyInternal(final IpDatabase delegate) { - super(delegate); - } - - @Override - public String getDatabaseType() throws IOException { - return toInternal().getDatabaseType(); - } - - @Override - public MaxMindDbBridge.Reader getDatabaseReader() throws IOException { - return null; - } - - @Override - public RESPONSE getResponse(String ipAddress, CheckedBiFunction responseProvider) { - return toInternal().getResponse(ipAddress, responseProvider); - } - - @Override - public void close() throws IOException { - toInternal().close(); - } - } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseProviderBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseProviderBridge.java index 7172436ba1a16..e1ccdb5b07040 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseProviderBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseProviderBridge.java @@ -8,16 +8,11 @@ */ package org.elasticsearch.logstashbridge.geoip; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.ingest.geoip.IpDatabase; import org.elasticsearch.ingest.geoip.IpDatabaseProvider; import org.elasticsearch.logstashbridge.StableBridgeAPI; -import java.util.Objects; - /** - * An external bridge for {@link Processor} + * An {@link StableBridgeAPI} for {@link IpDatabaseProvider} */ public interface IpDatabaseProviderBridge extends StableBridgeAPI { @@ -25,65 +20,4 @@ public interface IpDatabaseProviderBridge extends StableBridgeAPI implements IpDatabaseProviderBridge { - public ProxyInternal(final IpDatabaseProvider delegate) { - super(delegate); - } - - @Override - public Boolean isValid(String name) { - return toInternal().isValid(ProjectId.DEFAULT, name); - } - - @Override - public IpDatabaseBridge getDatabase(String name) { - IpDatabase ipDatabase = toInternal().getDatabase(ProjectId.DEFAULT, name); - return new IpDatabaseBridge.ProxyInternal(ipDatabase); - } - } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/MaxMindDbBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/MaxMindDbBridge.java deleted file mode 100644 index f287a44738bee..0000000000000 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/MaxMindDbBridge.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ -package org.elasticsearch.logstashbridge.geoip; - -import org.elasticsearch.core.SuppressForbidden; -import org.elasticsearch.logstashbridge.StableBridgeAPI; - -import java.io.File; -import java.io.IOException; - -public interface MaxMindDbBridge { - - class Reader extends StableBridgeAPI.ProxyInternal { - - @SuppressForbidden(reason = "Maxmind Reader constructor requires java.io.File") - public Reader(final File databasePath, final NodeCache nodeCache) throws IOException { - super(new com.maxmind.db.Reader(databasePath, nodeCache.toInternal())); - } - - protected Reader(final com.maxmind.db.Reader internalDelegate) { - super(internalDelegate); - } - - @Override - public com.maxmind.db.Reader toInternal() { - return internalDelegate; - } - - public String getDatabaseType() { - return toInternal().getMetadata().getDatabaseType(); - } - - public void close() throws IOException { - toInternal().close(); - } - } - - class NodeCache extends StableBridgeAPI.ProxyInternal { - - protected NodeCache(final com.maxmind.db.NodeCache internalDelegate) { - super(internalDelegate); - } - - public static NodeCache get(final int capacity) { - return new NodeCache(new com.maxmind.db.CHMCache(capacity)); - } - - public static NodeCache getInstance() { - return new NodeCache(com.maxmind.db.NoCache.getInstance()); - } - } - -} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/AbstractExternalProcessorBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/AbstractExternalProcessorBridge.java new file mode 100644 index 0000000000000..de3b7776ae74c --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/AbstractExternalProcessorBridge.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.ingest; + +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +import java.util.function.BiConsumer; + +/** + * The {@link AbstractExternalProcessorBridge} is an abstract base class for implementing + * the {@link ProcessorBridge} externally to the Elasticsearch code-base. It takes care of + * the details of maintaining a singular internal-form implementation of {@link Processor} + * that proxies calls through the external implementation. + */ +public abstract class AbstractExternalProcessorBridge implements ProcessorBridge { + private ProxyExternal internalProcessor; + + public Processor toInternal() { + if (internalProcessor == null) { + internalProcessor = new ProxyExternal(); + } + return internalProcessor; + } + + /** + * The {@link AbstractExternalProcessorBridge.ProxyExternal} is shaped like an internal-{@link Processor}, + * but proxies calls through to an {@link AbstractExternalProcessorBridge}. + */ + final class ProxyExternal implements Processor { + + @Override + public String getType() { + return AbstractExternalProcessorBridge.this.getType(); + } + + @Override + public String getTag() { + return AbstractExternalProcessorBridge.this.getTag(); + } + + @Override + public String getDescription() { + return AbstractExternalProcessorBridge.this.getDescription(); + } + + @Override + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + AbstractExternalProcessorBridge.this.execute( + IngestDocumentBridge.fromInternalNullable(ingestDocument), + (ingestDocumentBridge, exception) -> handler.accept(StableBridgeAPI.toInternalNullable(ingestDocumentBridge), exception) + ); + } + + @Override + public boolean isAsync() { + return AbstractExternalProcessorBridge.this.isAsync(); + } + + AbstractExternalProcessorBridge getProcessorBridge() { + return AbstractExternalProcessorBridge.this; + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/AbstractExternalProcessorFactoryBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/AbstractExternalProcessorFactoryBridge.java new file mode 100644 index 0000000000000..69a2d0c4c2e57 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/AbstractExternalProcessorFactoryBridge.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.ingest; + +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.logstashbridge.common.ProjectIdBridge; + +import java.util.Map; + +/** + * The {@code ProcessorBridge.Factory.AbstractExternal} is an abstract base class for implementing + * the {@link ProcessorFactoryBridge} externally to the Elasticsearch code-base. It takes care of + * the details of maintaining a singular internal-form implementation of {@link Processor.Factory} + * that proxies calls to the external implementation. + */ +public abstract class AbstractExternalProcessorFactoryBridge implements ProcessorFactoryBridge { + InternalProxy internalDelegate; + + @Override + public Processor.Factory toInternal() { + if (this.internalDelegate == null) { + internalDelegate = new InternalProxy(); + } + return this.internalDelegate; + } + + final class InternalProxy implements Processor.Factory { + @Override + public Processor create( + final Map processorFactories, + final String tag, + final String description, + final Map config, + final ProjectId projectId + ) throws Exception { + final Map bridgedProcessorFactories = StableBridgeAPI.fromInternal( + processorFactories, + ProcessorFactoryBridge::fromInternal + ); + final ProjectIdBridge bridgedProjectId = ProjectIdBridge.fromInternal(projectId); + final ProcessorBridge bridgedProcessor = AbstractExternalProcessorFactoryBridge.this.create( + bridgedProcessorFactories, + tag, + description, + config, + bridgedProjectId + ); + return bridgedProcessor.toInternal(); + } + + ProcessorFactoryBridge toExternal() { + return AbstractExternalProcessorFactoryBridge.this; + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ConfigurationUtilsBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ConfigurationUtilsBridge.java index 07178ca65a460..f1616d47d7be6 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ConfigurationUtilsBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ConfigurationUtilsBridge.java @@ -10,7 +10,9 @@ import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.logstashbridge.script.ScriptServiceBridge; -import org.elasticsearch.logstashbridge.script.TemplateScriptBridge; +import org.elasticsearch.logstashbridge.script.TemplateScriptFactoryBridge; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.TemplateScript; import java.util.Map; @@ -18,16 +20,24 @@ * An external bridge for {@link ConfigurationUtils} */ public class ConfigurationUtilsBridge { - public static TemplateScriptBridge.Factory compileTemplate( + private ConfigurationUtilsBridge() {} + + public static TemplateScriptFactoryBridge compileTemplate( final String processorType, final String processorTag, final String propertyName, final String propertyValue, - final ScriptServiceBridge scriptServiceBridge + final ScriptServiceBridge bridgedScriptService ) { - return new TemplateScriptBridge.Factory( - ConfigurationUtils.compileTemplate(processorType, processorTag, propertyName, propertyValue, scriptServiceBridge.toInternal()) + ScriptService scriptService = bridgedScriptService.toInternal(); + final TemplateScript.Factory templateScriptFactory = ConfigurationUtils.compileTemplate( + processorType, + processorTag, + propertyName, + propertyValue, + scriptService ); + return TemplateScriptFactoryBridge.fromInternal(templateScriptFactory); } public static String readStringProperty( diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/IngestDocumentBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/IngestDocumentBridge.java index 1c4f8d6338ad8..3472e813831ed 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/IngestDocumentBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/IngestDocumentBridge.java @@ -9,92 +9,61 @@ package org.elasticsearch.logstashbridge.ingest; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.LogstashInternalBridge; import org.elasticsearch.logstashbridge.StableBridgeAPI; import org.elasticsearch.logstashbridge.script.MetadataBridge; -import org.elasticsearch.logstashbridge.script.TemplateScriptBridge; +import org.elasticsearch.logstashbridge.script.TemplateScriptFactoryBridge; import java.util.Map; import java.util.Set; import java.util.function.BiConsumer; /** - * An external bridge for {@link IngestDocument} that proxies calls through a real {@link IngestDocument} + * A {@link StableBridgeAPI} for {@link IngestDocument}. */ -public class IngestDocumentBridge extends StableBridgeAPI.ProxyInternal { +public interface IngestDocumentBridge extends StableBridgeAPI { - public static final class Constants { - public static final String METADATA_VERSION_FIELD_NAME = IngestDocument.Metadata.VERSION.getFieldName(); + MetadataBridge getMetadata(); - private Constants() {} - } + Map getSource(); - public static IngestDocumentBridge fromInternalNullable(final IngestDocument ingestDocument) { - if (ingestDocument == null) { - return null; - } - return new IngestDocumentBridge(ingestDocument); - } + boolean updateIndexHistory(String index); - public IngestDocumentBridge(final Map sourceAndMetadata, final Map ingestMetadata) { - this(new IngestDocument(sourceAndMetadata, ingestMetadata)); - } + Set getIndexHistory(); - private IngestDocumentBridge(IngestDocument inner) { - super(inner); - } + boolean isReroute(); - public MetadataBridge getMetadata() { - return new MetadataBridge(internalDelegate.getMetadata()); - } + void resetReroute(); - public Map getSource() { - return internalDelegate.getSource(); - } + Map getIngestMetadata(); - public boolean updateIndexHistory(final String index) { - return internalDelegate.updateIndexHistory(index); - } + T getFieldValue(String name, Class type); - public Set getIndexHistory() { - return Set.copyOf(internalDelegate.getIndexHistory()); - } + T getFieldValue(String name, Class type, boolean ignoreMissing); - public boolean isReroute() { - return LogstashInternalBridge.isReroute(internalDelegate); - } + String renderTemplate(TemplateScriptFactoryBridge bridgedTemplateScriptFactory); - public void resetReroute() { - LogstashInternalBridge.resetReroute(internalDelegate); - } + void setFieldValue(String path, Object value); - public Map getIngestMetadata() { - return internalDelegate.getIngestMetadata(); - } + void removeField(String path); - public T getFieldValue(final String fieldName, final Class type) { - return internalDelegate.getFieldValue(fieldName, type); - } + void executePipeline(PipelineBridge bridgedPipeline, BiConsumer bridgedHandler); - public T getFieldValue(final String fieldName, final Class type, final boolean ignoreMissing) { - return internalDelegate.getFieldValue(fieldName, type, ignoreMissing); - } + final class Constants { + public static final String METADATA_VERSION_FIELD_NAME = IngestDocument.Metadata.VERSION.getFieldName(); - public String renderTemplate(final TemplateScriptBridge.Factory templateScriptFactory) { - return internalDelegate.renderTemplate(templateScriptFactory.toInternal()); + private Constants() {} } - public void setFieldValue(final String path, final Object value) { - internalDelegate.setFieldValue(path, value); + static IngestDocumentBridge fromInternalNullable(final IngestDocument ingestDocument) { + if (ingestDocument == null) { + return null; + } + return new ProxyInternalIngestDocumentBridge(ingestDocument); } - public void removeField(final String path) { - internalDelegate.removeField(path); + static IngestDocumentBridge create(final Map sourceAndMetadata, final Map ingestMetadata) { + final IngestDocument internal = new IngestDocument(sourceAndMetadata, ingestMetadata); + return fromInternalNullable(internal); } - public void executePipeline(final PipelineBridge pipelineBridge, final BiConsumer handler) { - this.internalDelegate.executePipeline(pipelineBridge.toInternal(), (ingestDocument, e) -> { - handler.accept(IngestDocumentBridge.fromInternalNullable(ingestDocument), e); - }); - } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineBridge.java index 19ee577ef0874..1b2fb97971f3c 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineBridge.java @@ -8,7 +8,7 @@ */ package org.elasticsearch.logstashbridge.ingest; -import org.elasticsearch.core.FixForMultiProject; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.logstashbridge.StableBridgeAPI; import org.elasticsearch.logstashbridge.script.ScriptServiceBridge; @@ -17,18 +17,22 @@ import java.util.function.BiConsumer; /** - * An external bridge for {@link Pipeline} + * A {@link StableBridgeAPI} for {@link Pipeline} */ -public class PipelineBridge extends StableBridgeAPI.ProxyInternal { - public static PipelineBridge fromInternal(final Pipeline pipeline) { - return new PipelineBridge(pipeline); +public interface PipelineBridge extends StableBridgeAPI { + + String getId(); + + void execute(IngestDocumentBridge bridgedIngestDocument, BiConsumer bridgedHandler); + + static PipelineBridge fromInternal(final Pipeline pipeline) { + return new ProxyInternal(pipeline); } - @FixForMultiProject(description = "should we pass a non-null project ID here?") - public static PipelineBridge create( + static PipelineBridge create( String id, Map config, - Map processorFactories, + Map processorFactories, ScriptServiceBridge scriptServiceBridge ) throws Exception { return fromInternal( @@ -37,23 +41,33 @@ public static PipelineBridge create( config, StableBridgeAPI.toInternal(processorFactories), StableBridgeAPI.toInternalNullable(scriptServiceBridge), - null + ProjectId.DEFAULT ) ); } - public PipelineBridge(final Pipeline delegate) { - super(delegate); - } + /** + * An implementation of {@link PipelineBridge} that proxies calls through to + * an internal {@link Pipeline}. + * @see StableBridgeAPI.ProxyInternal + */ + class ProxyInternal extends StableBridgeAPI.ProxyInternal implements PipelineBridge { - public String getId() { - return internalDelegate.getId(); - } + ProxyInternal(final Pipeline delegate) { + super(delegate); + } - public void execute(final IngestDocumentBridge ingestDocumentBridge, final BiConsumer handler) { - this.internalDelegate.execute( - StableBridgeAPI.toInternalNullable(ingestDocumentBridge), - (ingestDocument, e) -> handler.accept(IngestDocumentBridge.fromInternalNullable(ingestDocument), e) - ); + @Override + public String getId() { + return internalDelegate.getId(); + } + + @Override + public void execute(final IngestDocumentBridge ingestDocumentBridge, final BiConsumer handler) { + this.internalDelegate.execute( + StableBridgeAPI.toInternalNullable(ingestDocumentBridge), + (ingestDocument, e) -> handler.accept(IngestDocumentBridge.fromInternalNullable(ingestDocument), e) + ); + } } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java index 22a039702f82c..f1b54758b7d7e 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineConfigurationBridge.java @@ -16,47 +16,29 @@ import java.util.Map; /** - * An external bridge for {@link PipelineConfiguration} + * A {@link StableBridgeAPI} for {@link PipelineConfiguration} */ -public class PipelineConfigurationBridge extends StableBridgeAPI.ProxyInternal { - public PipelineConfigurationBridge(final PipelineConfiguration delegate) { - super(delegate); - } +public interface PipelineConfigurationBridge extends StableBridgeAPI { - public PipelineConfigurationBridge(final String pipelineId, final String jsonEncodedConfig) { - this(new PipelineConfiguration(pipelineId, new BytesArray(jsonEncodedConfig), XContentType.JSON)); + static PipelineConfigurationBridge create(final String pipelineId, final String jsonEncodedConfig) { + final PipelineConfiguration internal = new PipelineConfiguration(pipelineId, new BytesArray(jsonEncodedConfig), XContentType.JSON); + return fromInternal(internal); } - public String getId() { - return internalDelegate.getId(); + static PipelineConfigurationBridge fromInternal(final PipelineConfiguration internal) { + return new ProxyInternalPipelineConfigurationBridge(internal); } - public Map getConfig() { - return internalDelegate.getConfig(); - } + String getId(); - public Map getConfig(final boolean unmodifiable) { - return internalDelegate.getConfig(unmodifiable); - } + Map getConfig(); - @Override - public int hashCode() { - return internalDelegate.hashCode(); - } + Map getConfig(boolean unmodifiable); - @Override - public String toString() { - return internalDelegate.toString(); - } + int hashCode(); + + String toString(); + + boolean equals(Object o); - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } else if (obj instanceof PipelineConfigurationBridge other) { - return internalDelegate.equals(other.internalDelegate); - } else { - return false; - } - } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java index ce7473f4db460..045514243cdd5 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java @@ -8,23 +8,20 @@ */ package org.elasticsearch.logstashbridge.ingest; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.core.FixForMultiProject; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Processor; import org.elasticsearch.logstashbridge.StableBridgeAPI; -import org.elasticsearch.logstashbridge.env.EnvironmentBridge; -import org.elasticsearch.logstashbridge.script.ScriptServiceBridge; -import org.elasticsearch.logstashbridge.threadpool.ThreadPoolBridge; -import java.util.Map; -import java.util.Objects; import java.util.function.BiConsumer; /** - * An external bridge for {@link Processor} + * An {@link StableBridgeAPI} for {@link Processor}. + * As a side-effect of upstream, {@link ProcessorBridge#isAsync()} is expected + * to have a constant value for any given instance, AND: + *
    + *
  • {@code true}: must implement {@link ProcessorBridge#execute(IngestDocumentBridge, BiConsumer)}
  • + *
  • {@code false}: must implement {@link ProcessorBridge#execute(IngestDocumentBridge)}
  • + *
*/ public interface ProcessorBridge extends StableBridgeAPI { @@ -36,74 +33,35 @@ public interface ProcessorBridge extends StableBridgeAPI { boolean isAsync(); - void execute(IngestDocumentBridge ingestDocumentBridge, BiConsumer handler); - - static ProcessorBridge fromInternal(final Processor internalProcessor) { - if (internalProcessor instanceof AbstractExternal.ProxyExternal externalProxy) { - return externalProxy.getProcessorBridge(); - } - return new ProxyInternal(internalProcessor); + default void execute(IngestDocumentBridge ingestDocumentBridge, BiConsumer handler) { + toInternal().execute( + StableBridgeAPI.toInternalNullable(ingestDocumentBridge), + (id, exception) -> handler.accept(IngestDocumentBridge.fromInternalNullable(id), exception) + ); } - /** - * The {@code ProcessorBridge.AbstractExternal} is an abstract base class for implementing - * the {@link ProcessorBridge} externally to the Elasticsearch code-base. It takes care of - * the details of maintaining a singular internal-form implementation of {@link Processor} - * that proxies calls through the external implementation. - */ - abstract class AbstractExternal implements ProcessorBridge { - private ProxyExternal internalProcessor; - - public Processor toInternal() { - if (internalProcessor == null) { - internalProcessor = new ProxyExternal(); - } - return internalProcessor; + default IngestDocumentBridge execute(IngestDocumentBridge ingestDocumentBridge) throws Exception { + IngestDocument internalSourceIngestDocument = ingestDocumentBridge.toInternal(); + IngestDocument internalResultIngestDocument = toInternal().execute(internalSourceIngestDocument); + + if (internalResultIngestDocument == internalSourceIngestDocument) { + return ingestDocumentBridge; } + return IngestDocumentBridge.fromInternalNullable(internalResultIngestDocument); + } - private class ProxyExternal implements Processor { - - @Override - public String getType() { - return AbstractExternal.this.getType(); - } - - @Override - public String getTag() { - return AbstractExternal.this.getTag(); - } - - @Override - public String getDescription() { - return AbstractExternal.this.getDescription(); - } - - @Override - public void execute(IngestDocument ingestDocument, BiConsumer handler) { - AbstractExternal.this.execute( - IngestDocumentBridge.fromInternalNullable(ingestDocument), - (ingestDocumentBridge, e) -> handler.accept( - Objects.isNull(ingestDocumentBridge) ? null : ingestDocumentBridge.toInternal(), - e - ) - ); - } - - @Override - public boolean isAsync() { - return AbstractExternal.this.isAsync(); - } - - private AbstractExternal getProcessorBridge() { - return AbstractExternal.this; - } + static ProcessorBridge fromInternal(final Processor internalProcessor) { + if (internalProcessor instanceof AbstractExternalProcessorBridge.ProxyExternal externalProxy) { + return externalProxy.getProcessorBridge(); } + return new ProxyInternal(internalProcessor); } /** * An implementation of {@link ProcessorBridge} that proxies to an internal {@link Processor} + * @see StableBridgeAPI.ProxyInternal */ - class ProxyInternal extends StableBridgeAPI.ProxyInternal implements ProcessorBridge { + final class ProxyInternal extends StableBridgeAPI.ProxyInternal implements ProcessorBridge { public ProxyInternal(final Processor delegate) { super(delegate); } @@ -127,105 +85,5 @@ public String getDescription() { public boolean isAsync() { return toInternal().isAsync(); } - - @Override - public void execute(final IngestDocumentBridge ingestDocumentBridge, final BiConsumer handler) { - internalDelegate.execute( - StableBridgeAPI.toInternalNullable(ingestDocumentBridge), - (id, e) -> handler.accept(IngestDocumentBridge.fromInternalNullable(id), e) - ); - } } - - /** - * An external bridge for {@link Processor.Parameters} - */ - class Parameters extends StableBridgeAPI.ProxyInternal { - - public Parameters( - final EnvironmentBridge environmentBridge, - final ScriptServiceBridge scriptServiceBridge, - final ThreadPoolBridge threadPoolBridge - ) { - this( - new Processor.Parameters( - environmentBridge.toInternal(), - scriptServiceBridge.toInternal(), - null, - threadPoolBridge.toInternal().getThreadContext(), - threadPoolBridge.toInternal()::relativeTimeInMillis, - (delay, command) -> threadPoolBridge.toInternal() - .schedule(command, TimeValue.timeValueMillis(delay), threadPoolBridge.toInternal().generic()), - null, - null, - threadPoolBridge.toInternal().generic()::execute, - IngestService.createGrokThreadWatchdog(environmentBridge.toInternal(), threadPoolBridge.toInternal()) - ) - ); - } - - private Parameters(final Processor.Parameters delegate) { - super(delegate); - } - - @Override - public Processor.Parameters toInternal() { - return this.internalDelegate; - } - } - - /** - * An external bridge for {@link Processor.Factory} - */ - interface Factory extends StableBridgeAPI { - ProcessorBridge create( - Map registry, - String processorTag, - String description, - Map config - ) throws Exception; - - static Factory fromInternal(final Processor.Factory delegate) { - return new ProxyInternal(delegate); - } - - @Override - default Processor.Factory toInternal() { - final Factory stableAPIFactory = this; - return (registry, tag, description, config, projectId) -> stableAPIFactory.create( - StableBridgeAPI.fromInternal(registry, Factory::fromInternal), - tag, - description, - config - ).toInternal(); - } - - /** - * An implementation of {@link ProcessorBridge.Factory} that proxies to an internal {@link Processor.Factory} - */ - class ProxyInternal extends StableBridgeAPI.ProxyInternal implements Factory { - private ProxyInternal(final Processor.Factory delegate) { - super(delegate); - } - - @FixForMultiProject(description = "should we pass a non-null project ID here?") - @Override - public ProcessorBridge create( - final Map registry, - final String processorTag, - final String description, - final Map config - ) throws Exception { - return ProcessorBridge.fromInternal( - this.internalDelegate.create(StableBridgeAPI.toInternal(registry), processorTag, description, config, ProjectId.DEFAULT) - ); - } - - @Override - public Processor.Factory toInternal() { - return this.internalDelegate; - } - } - } - } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorFactoryBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorFactoryBridge.java new file mode 100644 index 0000000000000..ba598389c9199 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorFactoryBridge.java @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.ingest; + +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.core.FixForMultiProject; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.logstashbridge.common.ProjectIdBridge; + +import java.util.Map; + +/** + * An {@link StableBridgeAPI} for {@link Processor.Factory} + */ +public interface ProcessorFactoryBridge extends StableBridgeAPI { + + @Deprecated // supply ProjectIdBridge + default ProcessorBridge create( + Map registry, + String processorTag, + String description, + Map config + ) throws Exception { + return this.create(registry, processorTag, description, config, ProjectIdBridge.getDefault()); + } + + ProcessorBridge create( + Map registry, + String processorTag, + String description, + Map config, + ProjectIdBridge projectId + ) throws Exception; + + static ProcessorFactoryBridge fromInternal(final Processor.Factory delegate) { + if (delegate instanceof AbstractExternalProcessorFactoryBridge.InternalProxy internalProxy) { + return internalProxy.toExternal(); + } + return new ProxyInternal(delegate); + } + + /** + * An implementation of {@link ProcessorFactoryBridge} that proxies to an internal {@link Processor.Factory} + * + * @see StableBridgeAPI.ProxyInternal + */ + class ProxyInternal extends StableBridgeAPI.ProxyInternal implements ProcessorFactoryBridge { + protected ProxyInternal(final Processor.Factory delegate) { + super(delegate); + } + + @FixForMultiProject(description = "should we pass a non-null project ID here?") + @Override + public ProcessorBridge create( + final Map registry, + final String processorTag, + final String description, + final Map config, + final ProjectIdBridge bridgedProjectId + ) throws Exception { + final Map internalRegistry = StableBridgeAPI.toInternal(registry); + final Processor.Factory internalFactory = toInternal(); + final ProjectId projectId = bridgedProjectId.toInternal(); + final Processor internalProcessor = internalFactory.create(internalRegistry, processorTag, description, config, projectId); + return ProcessorBridge.fromInternal(internalProcessor); + } + + @Override + public Processor.Factory toInternal() { + return this.internalDelegate; + } + } + +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorParametersBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorParametersBridge.java new file mode 100644 index 0000000000000..f462c2ecc99f4 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorParametersBridge.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.ingest; + +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.env.Environment; +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.logstashbridge.env.EnvironmentBridge; +import org.elasticsearch.logstashbridge.script.ScriptServiceBridge; +import org.elasticsearch.logstashbridge.threadpool.ThreadPoolBridge; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.threadpool.ThreadPool; + +public interface ProcessorParametersBridge extends StableBridgeAPI { + + static ProcessorParametersBridge create( + final EnvironmentBridge bridgedEnvironment, + final ScriptServiceBridge bridgedScriptService, + final ThreadPoolBridge bridgedThreadPool + ) { + Environment environment = bridgedEnvironment.toInternal(); + ScriptService scriptService = bridgedScriptService.toInternal(); + ThreadPool threadPool = bridgedThreadPool.toInternal(); + + final Processor.Parameters processorParameters = new Processor.Parameters( + environment, + scriptService, + null, + threadPool.getThreadContext(), + threadPool::relativeTimeInMillis, + (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic()), + null, + null, + threadPool.generic()::execute, + IngestService.createGrokThreadWatchdog(environment, threadPool) + ); + return new ProxyInternal(processorParameters); + } + + /** + * An implementation of {@link ProcessorParametersBridge} that proxies calls through + * to an internal {@link Processor.Parameters}. + * @see StableBridgeAPI.ProxyInternal + */ + final class ProxyInternal extends StableBridgeAPI.ProxyInternal implements ProcessorParametersBridge { + + private ProxyInternal(final Processor.Parameters delegate) { + super(delegate); + } + + @Override + public Processor.Parameters toInternal() { + return this.internalDelegate; + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProxyInternalIngestDocumentBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProxyInternalIngestDocumentBridge.java new file mode 100644 index 0000000000000..19e4362e24501 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProxyInternalIngestDocumentBridge.java @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.ingest; + +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.LogstashInternalBridge; +import org.elasticsearch.logstashbridge.StableBridgeAPI; +import org.elasticsearch.logstashbridge.script.MetadataBridge; +import org.elasticsearch.logstashbridge.script.TemplateScriptFactoryBridge; + +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; + +/** + * An implementation of {@link IngestDocumentBridge} that proxies calls through + * to an internal {@link IngestDocument}. + * @see StableBridgeAPI.ProxyInternal + */ +final class ProxyInternalIngestDocumentBridge extends StableBridgeAPI.ProxyInternal implements IngestDocumentBridge { + + private MetadataBridge metadataBridge; + + ProxyInternalIngestDocumentBridge(IngestDocument inner) { + super(inner); + } + + @Override + public MetadataBridge getMetadata() { + if (metadataBridge == null) { + this.metadataBridge = MetadataBridge.fromInternal(internalDelegate.getMetadata()); + } + return this.metadataBridge; + } + + @Override + public Map getSource() { + return internalDelegate.getSource(); + } + + @Override + public boolean updateIndexHistory(final String index) { + return internalDelegate.updateIndexHistory(index); + } + + @Override + public Set getIndexHistory() { + return Set.copyOf(internalDelegate.getIndexHistory()); + } + + @Override + public boolean isReroute() { + return LogstashInternalBridge.isReroute(internalDelegate); + } + + @Override + public void resetReroute() { + LogstashInternalBridge.resetReroute(internalDelegate); + } + + @Override + public Map getIngestMetadata() { + return internalDelegate.getIngestMetadata(); + } + + @Override + public T getFieldValue(final String fieldName, final Class type) { + return internalDelegate.getFieldValue(fieldName, type); + } + + @Override + public T getFieldValue(final String fieldName, final Class type, final boolean ignoreMissing) { + return internalDelegate.getFieldValue(fieldName, type, ignoreMissing); + } + + @Override + public String renderTemplate(final TemplateScriptFactoryBridge templateScriptFactory) { + return internalDelegate.renderTemplate(templateScriptFactory.toInternal()); + } + + @Override + public void setFieldValue(final String path, final Object value) { + internalDelegate.setFieldValue(path, value); + } + + @Override + public void removeField(final String path) { + internalDelegate.removeField(path); + } + + @Override + public void executePipeline(final PipelineBridge pipelineBridge, final BiConsumer handler) { + this.internalDelegate.executePipeline(pipelineBridge.toInternal(), (ingestDocument, e) -> { + handler.accept(IngestDocumentBridge.fromInternalNullable(ingestDocument), e); + }); + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProxyInternalPipelineConfigurationBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProxyInternalPipelineConfigurationBridge.java new file mode 100644 index 0000000000000..51a8ed2b5f47a --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProxyInternalPipelineConfigurationBridge.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.ingest; + +import org.elasticsearch.ingest.PipelineConfiguration; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +import java.util.Map; + +/** + * An implementation of {@link PipelineConfigurationBridge} that proxies calls through + * to an internal {@link PipelineConfiguration}. + * @see StableBridgeAPI.ProxyInternal + */ +final class ProxyInternalPipelineConfigurationBridge extends StableBridgeAPI.ProxyInternal + implements + PipelineConfigurationBridge { + ProxyInternalPipelineConfigurationBridge(final PipelineConfiguration delegate) { + super(delegate); + } + + @Override + public String getId() { + return internalDelegate.getId(); + } + + @Override + public Map getConfig() { + return internalDelegate.getConfig(); + } + + @Override + public Map getConfig(final boolean unmodifiable) { + return internalDelegate.getConfig(unmodifiable); + } + + @Override + public int hashCode() { + return internalDelegate.hashCode(); + } + + @Override + public String toString() { + return internalDelegate.toString(); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } else if (obj instanceof ProxyInternalPipelineConfigurationBridge other) { + return internalDelegate.equals(other.internalDelegate); + } else { + return false; + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestCommonPluginBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestCommonPluginBridge.java index 664481460fbf9..ab6d89162b84e 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestCommonPluginBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestCommonPluginBridge.java @@ -10,14 +10,15 @@ import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.logstashbridge.StableBridgeAPI; -import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorFactoryBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorParametersBridge; import java.util.Map; /** * An external bridge for {@link IngestCommonPlugin} */ -public class IngestCommonPluginBridge implements IngestPluginBridge { +public final class IngestCommonPluginBridge implements IngestPluginBridge { public static final String APPEND_PROCESSOR_TYPE = org.elasticsearch.ingest.common.AppendProcessor.TYPE; public static final String BYTES_PROCESSOR_TYPE = org.elasticsearch.ingest.common.BytesProcessor.TYPE; @@ -59,7 +60,7 @@ public IngestCommonPluginBridge() { } @Override - public Map getProcessors(final ProcessorBridge.Parameters parameters) { - return StableBridgeAPI.fromInternal(this.delegate.getProcessors(parameters.toInternal()), ProcessorBridge.Factory::fromInternal); + public Map getProcessors(final ProcessorParametersBridge parameters) { + return StableBridgeAPI.fromInternal(this.delegate.getProcessors(parameters.toInternal()), ProcessorFactoryBridge::fromInternal); } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestPluginBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestPluginBridge.java index c4e3d56805b28..0eb04aec86e4a 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestPluginBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestPluginBridge.java @@ -9,7 +9,8 @@ package org.elasticsearch.logstashbridge.plugins; import org.elasticsearch.logstashbridge.StableBridgeAPI; -import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorFactoryBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorParametersBridge; import org.elasticsearch.plugins.IngestPlugin; import java.io.Closeable; @@ -20,7 +21,7 @@ * An external bridge for {@link IngestPlugin} */ public interface IngestPluginBridge { - Map getProcessors(ProcessorBridge.Parameters parameters); + Map getProcessors(ProcessorParametersBridge parameters); static ProxyInternal fromInternal(final IngestPlugin delegate) { return new ProxyInternal(delegate); @@ -29,16 +30,16 @@ static ProxyInternal fromInternal(final IngestPlugin delegate) { /** * An implementation of {@link IngestPluginBridge} that proxies calls to an internal {@link IngestPlugin} */ - class ProxyInternal extends StableBridgeAPI.ProxyInternal implements IngestPluginBridge, Closeable { + final class ProxyInternal extends StableBridgeAPI.ProxyInternal implements IngestPluginBridge, Closeable { private ProxyInternal(final IngestPlugin delegate) { super(delegate); } - public Map getProcessors(final ProcessorBridge.Parameters parameters) { + public Map getProcessors(final ProcessorParametersBridge parameters) { return StableBridgeAPI.fromInternal( this.internalDelegate.getProcessors(parameters.toInternal()), - ProcessorBridge.Factory::fromInternal + ProcessorFactoryBridge::fromInternal ); } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestUserAgentPluginBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestUserAgentPluginBridge.java index 4c936e7143571..cb046a421ee8d 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestUserAgentPluginBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/IngestUserAgentPluginBridge.java @@ -10,14 +10,15 @@ import org.elasticsearch.ingest.useragent.IngestUserAgentPlugin; import org.elasticsearch.logstashbridge.StableBridgeAPI; -import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorFactoryBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorParametersBridge; import java.util.Map; /** * An external bridge for {@link IngestUserAgentPlugin} */ -public class IngestUserAgentPluginBridge implements IngestPluginBridge { +public final class IngestUserAgentPluginBridge implements IngestPluginBridge { private final IngestUserAgentPlugin delegate; @@ -25,7 +26,7 @@ public IngestUserAgentPluginBridge() { delegate = new IngestUserAgentPlugin(); } - public Map getProcessors(final ProcessorBridge.Parameters parameters) { - return StableBridgeAPI.fromInternal(this.delegate.getProcessors(parameters.toInternal()), ProcessorBridge.Factory::fromInternal); + public Map getProcessors(final ProcessorParametersBridge parameters) { + return StableBridgeAPI.fromInternal(this.delegate.getProcessors(parameters.toInternal()), ProcessorFactoryBridge::fromInternal); } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/RedactPluginBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/RedactPluginBridge.java index 4e59e5531838f..c53a4d2603e1d 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/RedactPluginBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/plugins/RedactPluginBridge.java @@ -9,7 +9,8 @@ package org.elasticsearch.logstashbridge.plugins; import org.elasticsearch.license.XPackLicenseState; -import org.elasticsearch.logstashbridge.ingest.ProcessorBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorFactoryBridge; +import org.elasticsearch.logstashbridge.ingest.ProcessorParametersBridge; import org.elasticsearch.xpack.redact.RedactProcessor; import java.util.Map; @@ -17,15 +18,15 @@ /** * An external bridge for {@link org.elasticsearch.xpack.redact.RedactPlugin} */ -public class RedactPluginBridge implements IngestPluginBridge { +public final class RedactPluginBridge implements IngestPluginBridge { @Override - public Map getProcessors(ProcessorBridge.Parameters parameters) { + public Map getProcessors(ProcessorParametersBridge parameters) { // Provide a TRIAL license state to the redact processor final XPackLicenseState trialLicenseState = new XPackLicenseState(parameters.toInternal().relativeTimeSupplier); return Map.of( RedactProcessor.TYPE, - ProcessorBridge.Factory.fromInternal(new RedactProcessor.Factory(trialLicenseState, parameters.toInternal().matcherWatchdog)) + ProcessorFactoryBridge.fromInternal(new RedactProcessor.Factory(trialLicenseState, parameters.toInternal().matcherWatchdog)) ); } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/MetadataBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/MetadataBridge.java index 1a9f93d65af3c..30781854294fc 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/MetadataBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/MetadataBridge.java @@ -16,52 +16,86 @@ /** * An external bridge for {@link Metadata} */ -public class MetadataBridge extends StableBridgeAPI.ProxyInternal { - public MetadataBridge(final Metadata delegate) { - super(delegate); - } +public interface MetadataBridge extends StableBridgeAPI { - public String getIndex() { - return internalDelegate.getIndex(); - } + String getIndex(); - public void setIndex(final String index) { - internalDelegate.setIndex(index); - } + void setIndex(String index); - public String getId() { - return internalDelegate.getId(); - } + String getId(); - public void setId(final String id) { - internalDelegate.setId(id); - } + void setId(String id); - public long getVersion() { - return internalDelegate.getVersion(); - } + long getVersion(); - public void setVersion(final long version) { - internalDelegate.setVersion(version); - } + void setVersion(long version); - public String getVersionType() { - return internalDelegate.getVersionType(); - } + String getVersionType(); - public void setVersionType(final String versionType) { - internalDelegate.setVersionType(versionType); - } + void setVersionType(String versionType); - public String getRouting() { - return internalDelegate.getRouting(); - } + String getRouting(); + + void setRouting(String routing); + + ZonedDateTime getNow(); - public void setRouting(final String routing) { - internalDelegate.setRouting(routing); + static MetadataBridge fromInternal(final Metadata metadata) { + return new ProxyInternal(metadata); } - public ZonedDateTime getNow() { - return internalDelegate.getNow(); + /** + * An implementation of {@link MetadataBridge} that proxies calls through + * to an internal {@link Metadata}. + * @see StableBridgeAPI.ProxyInternal + */ + final class ProxyInternal extends StableBridgeAPI.ProxyInternal implements MetadataBridge { + ProxyInternal(final Metadata delegate) { + super(delegate); + } + + public String getIndex() { + return internalDelegate.getIndex(); + } + + public void setIndex(final String index) { + internalDelegate.setIndex(index); + } + + public String getId() { + return internalDelegate.getId(); + } + + public void setId(final String id) { + internalDelegate.setId(id); + } + + public long getVersion() { + return internalDelegate.getVersion(); + } + + public void setVersion(final long version) { + internalDelegate.setVersion(version); + } + + public String getVersionType() { + return internalDelegate.getVersionType(); + } + + public void setVersionType(final String versionType) { + internalDelegate.setVersionType(versionType); + } + + public String getRouting() { + return internalDelegate.getRouting(); + } + + public void setRouting(final String routing) { + internalDelegate.setRouting(routing); + } + + public ZonedDateTime getNow() { + return internalDelegate.getNow(); + } } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java index 7a706f1a8cae7..4a523d9a54ab4 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java @@ -43,96 +43,112 @@ /** * An external bridge for {@link ScriptService} */ -public class ScriptServiceBridge extends StableBridgeAPI.ProxyInternal implements Closeable { - public ScriptServiceBridge fromInternal(final ScriptService delegate) { - return new ScriptServiceBridge(delegate); - } - - public ScriptServiceBridge(final SettingsBridge settingsBridge, final LongSupplier timeProvider) throws IOException { - super(getScriptService(settingsBridge.toInternal(), timeProvider)); - } +public interface ScriptServiceBridge extends StableBridgeAPI, Closeable { - public ScriptServiceBridge(ScriptService delegate) { - super(delegate); + static ScriptServiceBridge fromInternal(final ScriptService delegate) { + return new ScriptServiceBridge.ProxyInternal(delegate); } - private static ScriptService getScriptService(final Settings settings, final LongSupplier timeProvider) throws IOException { - final List painlessBaseWhitelist = getPainlessBaseWhiteList(); - final Map, List> scriptContexts = Map.of( - IngestScript.CONTEXT, - painlessBaseWhitelist, - IngestConditionalScript.CONTEXT, - painlessBaseWhitelist - ); - final Map scriptEngines = Map.of( - PainlessScriptEngine.NAME, - getPainlessScriptEngine(settings), - MustacheScriptEngine.NAME, - new MustacheScriptEngine(settings) - ); - - return new ScriptService(settings, scriptEngines, ScriptModule.CORE_CONTEXTS, timeProvider, ProjectIdResolverBridge.INSTANCE); - } - - private static List getPainlessBaseWhiteList() { - return PainlessPlugin.baseWhiteList(); + static ScriptServiceBridge create(final SettingsBridge bridgedSettings, final LongSupplier timeProvider) throws IOException { + final ScriptService scriptService = ProxyInternal.getScriptService(bridgedSettings.toInternal(), timeProvider); + return fromInternal(scriptService); } /** - * @param settings the Elasticsearch settings object - * @return a {@link ScriptEngine} for painless scripts for use in {@link IngestScript} and - * {@link IngestConditionalScript} contexts, including all available {@link PainlessExtension}s. - * @throws IOException when the underlying script engine cannot be created + * An implementation of {@link ScriptServiceBridge} that proxies calls through + * to an internal {@link ScriptService}. + * @see StableBridgeAPI.ProxyInternal */ - private static ScriptEngine getPainlessScriptEngine(final Settings settings) throws IOException { - try (PainlessPlugin painlessPlugin = new PainlessPlugin()) { - painlessPlugin.loadExtensions(new ExtensiblePlugin.ExtensionLoader() { - @Override - @SuppressWarnings("unchecked") - public List loadExtensions(Class extensionPointType) { - if (extensionPointType.isAssignableFrom(PainlessExtension.class)) { - final List extensions = new ArrayList<>(); - - extensions.add(new ConstantKeywordPainlessExtension()); // module: constant-keyword - extensions.add(new ProcessorsWhitelistExtension()); // module: ingest-common - extensions.add(new SpatialPainlessExtension()); // module: spatial - extensions.add(new WildcardPainlessExtension()); // module: wildcard - - return (List) extensions; - } else { - return List.of(); - } - } - }); + final class ProxyInternal extends StableBridgeAPI.ProxyInternal implements ScriptServiceBridge { - return painlessPlugin.getScriptEngine(settings, Set.of(IngestScript.CONTEXT, IngestConditionalScript.CONTEXT)); + ProxyInternal(ScriptService delegate) { + super(delegate); } - } - @Override - public void close() throws IOException { - this.internalDelegate.close(); - } + private static ScriptService getScriptService(final Settings settings, final LongSupplier timeProvider) throws IOException { + final List painlessBaseWhitelist = getPainlessBaseWhiteList(); + final Map, List> scriptContexts = Map.of( + IngestScript.CONTEXT, + painlessBaseWhitelist, + IngestConditionalScript.CONTEXT, + painlessBaseWhitelist + ); + final Map scriptEngines = Map.of( + PainlessScriptEngine.NAME, + getPainlessScriptEngine(settings), + MustacheScriptEngine.NAME, + new MustacheScriptEngine(settings) + ); + + return new ScriptService( + settings, + scriptEngines, + ScriptModule.CORE_CONTEXTS, + timeProvider, + LockdownOnlyDefaultProjectIdResolver.INSTANCE + ); + } - @FixForMultiProject - // Logstash resolves and runs ingest pipelines based on the datastream. - // How should ProjectIdResolverBridge behave in this case? - // In other words, it looks we need to find a way to figure out which ingest pipeline belongs to which project. - static class ProjectIdResolverBridge implements ProjectResolver { + private static List getPainlessBaseWhiteList() { + return PainlessPlugin.baseWhiteList(); + } - public static final ProjectIdResolverBridge INSTANCE = new ProjectIdResolverBridge(); + /** + * @param settings the Elasticsearch settings object + * @return a {@link ScriptEngine} for painless scripts for use in {@link IngestScript} and + * {@link IngestConditionalScript} contexts, including all available {@link PainlessExtension}s. + * @throws IOException when the underlying script engine cannot be created + */ + private static ScriptEngine getPainlessScriptEngine(final Settings settings) throws IOException { + try (PainlessPlugin painlessPlugin = new PainlessPlugin()) { + painlessPlugin.loadExtensions(new ExtensiblePlugin.ExtensionLoader() { + @Override + @SuppressWarnings("unchecked") + public List loadExtensions(Class extensionPointType) { + if (extensionPointType.isAssignableFrom(PainlessExtension.class)) { + final List extensions = new ArrayList<>(); + + extensions.add(new ConstantKeywordPainlessExtension()); // module: constant-keyword + extensions.add(new ProcessorsWhitelistExtension()); // module: ingest-common + extensions.add(new SpatialPainlessExtension()); // module: spatial + extensions.add(new WildcardPainlessExtension()); // module: wildcard + + return (List) extensions; + } else { + return List.of(); + } + } + }); - @Override - public ProjectId getProjectId() { - return ProjectId.DEFAULT; + return painlessPlugin.getScriptEngine(settings, Set.of(IngestScript.CONTEXT, IngestConditionalScript.CONTEXT)); + } } @Override - public void executeOnProject(ProjectId projectId, CheckedRunnable body) throws E { - if (projectId.equals(ProjectId.DEFAULT)) { - body.run(); - } else { - throw new IllegalArgumentException("Cannot execute on a project other than [" + ProjectId.DEFAULT + "]"); + public void close() throws IOException { + this.internalDelegate.close(); + } + + @FixForMultiProject + // Logstash resolves and runs ingest pipelines based on the datastream. + // How should LockdownOnlyDefaultProjectIdResolver behave in this case? + // In other words, it looks we need to find a way to figure out which ingest pipeline belongs to which project. + static class LockdownOnlyDefaultProjectIdResolver implements ProjectResolver { + + public static final LockdownOnlyDefaultProjectIdResolver INSTANCE = new LockdownOnlyDefaultProjectIdResolver(); + + @Override + public ProjectId getProjectId() { + return ProjectId.DEFAULT; + } + + @Override + public void executeOnProject(ProjectId projectId, CheckedRunnable body) throws E { + if (projectId.equals(ProjectId.DEFAULT)) { + body.run(); + } else { + throw new IllegalArgumentException("Cannot execute on a project other than [" + ProjectId.DEFAULT + "]"); + } } } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/TemplateScriptBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/TemplateScriptFactoryBridge.java similarity index 50% rename from libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/TemplateScriptBridge.java rename to libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/TemplateScriptFactoryBridge.java index 58c38a8f224a3..ba94cf4ad1590 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/TemplateScriptBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/TemplateScriptFactoryBridge.java @@ -6,31 +6,28 @@ * your election, the "Elastic License 2.0", the "GNU Affero General Public * License v3.0 only", or the "Server Side Public License, v 1". */ + package org.elasticsearch.logstashbridge.script; import org.elasticsearch.logstashbridge.StableBridgeAPI; import org.elasticsearch.script.TemplateScript; /** - * An external bridge for {@link TemplateScript} + * An external bridge for {@link TemplateScript.Factory} */ -public class TemplateScriptBridge { +public interface TemplateScriptFactoryBridge extends StableBridgeAPI { + static TemplateScriptFactoryBridge fromInternal(final TemplateScript.Factory delegate) { + return new ProxyInternal(delegate); + } /** - * An external bridge for {@link TemplateScript.Factory} + * An implementation of {@link TemplateScriptFactoryBridge} that proxies calls through + * to an internal {@link TemplateScript.Factory}. + * @see StableBridgeAPI.ProxyInternal */ - public static class Factory extends StableBridgeAPI.ProxyInternal { - public static Factory fromInternal(final TemplateScript.Factory delegate) { - return new Factory(delegate); - } - - public Factory(final TemplateScript.Factory delegate) { + final class ProxyInternal extends StableBridgeAPI.ProxyInternal implements TemplateScriptFactoryBridge { + ProxyInternal(final TemplateScript.Factory delegate) { super(delegate); } - - @Override - public TemplateScript.Factory toInternal() { - return this.internalDelegate; - } } } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/threadpool/ThreadPoolBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/threadpool/ThreadPoolBridge.java index dee0381d30c22..0d8477e3bd118 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/threadpool/ThreadPoolBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/threadpool/ThreadPoolBridge.java @@ -19,25 +19,43 @@ /** * An external bridge for {@link ThreadPool} */ -public class ThreadPoolBridge extends StableBridgeAPI.ProxyInternal { +public interface ThreadPoolBridge extends StableBridgeAPI { - public ThreadPoolBridge(final SettingsBridge settingsBridge) { - this(new ThreadPool(settingsBridge.toInternal(), MeterRegistry.NOOP, new DefaultBuiltInExecutorBuilders())); - } + long relativeTimeInMillis(); - public ThreadPoolBridge(final ThreadPool delegate) { - super(delegate); - } + long absoluteTimeInMillis(); - public static boolean terminate(final ThreadPoolBridge pool, final long timeout, final TimeUnit timeUnit) { - return ThreadPool.terminate(pool.toInternal(), timeout, timeUnit); - } + boolean terminate(long timeout, TimeUnit timeUnit); - public long relativeTimeInMillis() { - return internalDelegate.relativeTimeInMillis(); + static ThreadPoolBridge create(final SettingsBridge bridgedSettings) { + final ThreadPool internal = new ThreadPool(bridgedSettings.toInternal(), MeterRegistry.NOOP, new DefaultBuiltInExecutorBuilders()); + return new ProxyInternal(internal); } - public long absoluteTimeInMillis() { - return internalDelegate.absoluteTimeInMillis(); + /** + * An implementation of {@link ThreadPoolBridge} that proxies calls through + * to an internal {@link ThreadPool}. + * @see StableBridgeAPI.ProxyInternal + */ + class ProxyInternal extends StableBridgeAPI.ProxyInternal implements ThreadPoolBridge { + + ProxyInternal(final ThreadPool delegate) { + super(delegate); + } + + @Override + public long relativeTimeInMillis() { + return internalDelegate.relativeTimeInMillis(); + } + + @Override + public long absoluteTimeInMillis() { + return internalDelegate.absoluteTimeInMillis(); + } + + @Override + public boolean terminate(final long timeout, final TimeUnit timeUnit) { + return ThreadPool.terminate(internalDelegate, timeout, timeUnit); + } } }