diff --git a/libs/logstash-bridge/README.md b/libs/logstash-bridge/README.md index dd629724878b5..4cb2204902b1b 100644 --- a/libs/logstash-bridge/README.md +++ b/libs/logstash-bridge/README.md @@ -6,3 +6,8 @@ other Elasticsearch internals. If a change is introduced in a separate Elasticsearch project that causes this project to fail, please consult with members of @elastic/logstash to chart a path forward. + +## How to build the module? +```shell +./gradlew :lib:logstash-bridge:build +``` \ No newline at end of file diff --git a/libs/logstash-bridge/build.gradle b/libs/logstash-bridge/build.gradle index 608d2e3a398e9..dd7cb47a5caa2 100644 --- a/libs/logstash-bridge/build.gradle +++ b/libs/logstash-bridge/build.gradle @@ -24,6 +24,8 @@ dependencies { compileOnly project(':x-pack:plugin:redact') compileOnly project(':x-pack:plugin:spatial') compileOnly project(':x-pack:plugin:wildcard') + + compileOnly('com.maxmind.db:maxmind-db:3.1.1') } tasks.named('forbiddenApisMain').configure { diff --git a/libs/logstash-bridge/src/main/java/module-info.java b/libs/logstash-bridge/src/main/java/module-info.java index 0d16376d2fbb0..8960c331a33ce 100644 --- a/libs/logstash-bridge/src/main/java/module-info.java +++ b/libs/logstash-bridge/src/main/java/module-info.java @@ -23,6 +23,8 @@ requires org.elasticsearch.redact; requires org.elasticsearch.spatial; requires org.elasticsearch.wildcard; + requires org.elasticsearch.ingest.geoip; + requires com.maxmind.db; exports org.elasticsearch.logstashbridge; exports org.elasticsearch.logstashbridge.common; diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/FailProcessorExceptionBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/FailProcessorExceptionBridge.java new file mode 100644 index 0000000000000..ba4e73a9c2250 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/FailProcessorExceptionBridge.java @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.core; + +import org.elasticsearch.ingest.common.FailProcessorException; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +public class FailProcessorExceptionBridge extends StableBridgeAPI.ProxyInternal { + protected FailProcessorExceptionBridge(FailProcessorException internalDelegate) { + super(internalDelegate); + } + + public static boolean isInstanceOf(Throwable exception) { + return exception instanceof FailProcessorException; + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/RefCountingRunnableBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/RefCountingRunnableBridge.java new file mode 100644 index 0000000000000..305407e18c435 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/RefCountingRunnableBridge.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.core; + +import org.elasticsearch.action.support.RefCountingRunnable; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +public class RefCountingRunnableBridge extends StableBridgeAPI.ProxyInternal { + + private RefCountingRunnableBridge(final RefCountingRunnable delegate) { + super(delegate); + } + + public RefCountingRunnableBridge(final Runnable delegate) { + super(new RefCountingRunnable(delegate)); + } + + public void close() { + toInternal().close(); + } + + public ReleasableBridge acquire() { + return new ReleasableBridge.ProxyInternal(toInternal().acquire()); + } + + @Override + public RefCountingRunnable toInternal() { + return this.internalDelegate; + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/ReleasableBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/ReleasableBridge.java new file mode 100644 index 0000000000000..eefe8c4dd08ae --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/core/ReleasableBridge.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.logstashbridge.core; + +import org.elasticsearch.core.Releasable; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +public interface ReleasableBridge extends StableBridgeAPI { + + void close(); + + class ProxyInternal extends StableBridgeAPI.ProxyInternal implements ReleasableBridge { + + public ProxyInternal(final Releasable delegate) { + super(delegate); + } + + @Override + public void close() { + toInternal().close(); + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorBridge.java new file mode 100644 index 0000000000000..2fc197ebe6ca9 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/GeoIpProcessorBridge.java @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.logstashbridge.geoip; + +import org.elasticsearch.ingest.geoip.GeoIpProcessor; +import org.elasticsearch.ingest.geoip.IpDatabaseProvider; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +/** + * An external bridge for {@link GeoIpProcessor} + */ +public interface GeoIpProcessorBridge { + + class Factory extends StableBridgeAPI.ProxyInternal { + + public Factory(String type, IpDatabaseProvider ipDatabaseProvider) { + super(new GeoIpProcessor.Factory(type, ipDatabaseProvider)); + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseBridge.java new file mode 100644 index 0000000000000..5d058fabec39b --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseBridge.java @@ -0,0 +1,120 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.logstashbridge.geoip; + +import com.maxmind.db.Reader; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.CheckedBiFunction; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.ingest.geoip.IpDatabase; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +import java.io.IOException; + +/** + * An external bridge for {@link IpDatabase} + */ +public interface IpDatabaseBridge extends StableBridgeAPI { + + String getDatabaseType() throws IOException; + + MaxMindDbBridge.Reader getDatabaseReader() throws IOException; + + @Nullable + default RESPONSE getResponse(String ipAddress, CheckedBiFunction responseProvider) { + try { + return responseProvider.apply(this.getDatabaseReader().toInternal(), ipAddress); + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + } + + void close() throws IOException; + + static IpDatabaseBridge fromInternal(final IpDatabase internalDatabase) { + if (internalDatabase instanceof AbstractExternal.ProxyExternal externalProxy) { + return externalProxy.getIpDatabaseBridge(); + } + return new ProxyInternal(internalDatabase); + } + + /** + * The {@code IpDatabaseBridge.AbstractExternal} is an abstract base class for implementing + * the {@link IpDatabaseBridge} externally to the Elasticsearch code-base. It takes care of + * the details of maintaining a singular internal-form implementation of {@link IpDatabase} + * that proxies calls through the external implementation. + */ + abstract class AbstractExternal implements IpDatabaseBridge { + private ProxyExternal internalDatabase; + + @Override + public IpDatabase toInternal() { + if (internalDatabase == null) { + internalDatabase = new ProxyExternal(); + } + return internalDatabase; + } + + private class ProxyExternal implements IpDatabase { + + private AbstractExternal getIpDatabaseBridge() { + return AbstractExternal.this; + } + + @Override + public String getDatabaseType() throws IOException { + return AbstractExternal.this.getDatabaseType(); + } + + @Override + public RESPONSE getResponse( + String ipAddress, + CheckedBiFunction responseProvider + ) { + return AbstractExternal.this.getResponse(ipAddress, responseProvider); + } + + @Override + public void close() throws IOException { + AbstractExternal.this.close(); + } + } + } + + /** + * An implementation of {@link IpDatabaseBridge} that proxies to an internal {@link IpDatabase} + */ + class ProxyInternal extends StableBridgeAPI.ProxyInternal implements IpDatabaseBridge { + + public ProxyInternal(final IpDatabase delegate) { + super(delegate); + } + + @Override + public String getDatabaseType() throws IOException { + return toInternal().getDatabaseType(); + } + + @Override + public MaxMindDbBridge.Reader getDatabaseReader() throws IOException { + return null; + } + + @Override + public RESPONSE getResponse(String ipAddress, CheckedBiFunction responseProvider) { + return toInternal().getResponse(ipAddress, responseProvider); + } + + @Override + public void close() throws IOException { + toInternal().close(); + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseProviderBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseProviderBridge.java new file mode 100644 index 0000000000000..7172436ba1a16 --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/IpDatabaseProviderBridge.java @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.logstashbridge.geoip; + +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ingest.geoip.IpDatabase; +import org.elasticsearch.ingest.geoip.IpDatabaseProvider; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +import java.util.Objects; + +/** + * An external bridge for {@link Processor} + */ +public interface IpDatabaseProviderBridge extends StableBridgeAPI { + + Boolean isValid(String name); + + IpDatabaseBridge getDatabase(String name); + + static IpDatabaseProviderBridge fromInternal(final IpDatabaseProvider internalProvider) { + if (internalProvider instanceof IpDatabaseProviderBridge.AbstractExternal.ProxyExternal externalProxy) { + return externalProxy.getIpDatabaseProviderBridge(); + } + return new IpDatabaseProviderBridge.ProxyInternal(internalProvider); + } + + /** + * The {@code IpDatabaseProviderBridge.AbstractExternal} is an abstract base class for implementing + * the {@link IpDatabaseProviderBridge} externally to the Elasticsearch code-base. It takes care of + * the details of maintaining a singular internal-form implementation of {@link IpDatabaseProvider} + * that proxies calls through the external implementation. + */ + abstract class AbstractExternal implements IpDatabaseProviderBridge { + private AbstractExternal.ProxyExternal internalProcessor; + + public IpDatabaseProvider toInternal() { + if (internalProcessor == null) { + internalProcessor = new AbstractExternal.ProxyExternal(); + } + return internalProcessor; + } + + private class ProxyExternal implements IpDatabaseProvider { + + private AbstractExternal getIpDatabaseProviderBridge() { + return AbstractExternal.this; + } + + @Override + public Boolean isValid(ProjectId projectId, String name) { + return IpDatabaseProviderBridge.AbstractExternal.this.isValid(name); + } + + @Override + public IpDatabase getDatabase(ProjectId projectId, String name) { + IpDatabaseBridge bridge = IpDatabaseProviderBridge.AbstractExternal.this.getDatabase(name); + return Objects.isNull(bridge) ? null : bridge.toInternal(); + } + } + } + + /** + * An implementation of {@link IpDatabaseProviderBridge} that proxies to an internal {@link IpDatabaseProvider} + */ + class ProxyInternal extends StableBridgeAPI.ProxyInternal implements IpDatabaseProviderBridge { + public ProxyInternal(final IpDatabaseProvider delegate) { + super(delegate); + } + + @Override + public Boolean isValid(String name) { + return toInternal().isValid(ProjectId.DEFAULT, name); + } + + @Override + public IpDatabaseBridge getDatabase(String name) { + IpDatabase ipDatabase = toInternal().getDatabase(ProjectId.DEFAULT, name); + return new IpDatabaseBridge.ProxyInternal(ipDatabase); + } + } +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/MaxMindDbBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/MaxMindDbBridge.java new file mode 100644 index 0000000000000..f287a44738bee --- /dev/null +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/geoip/MaxMindDbBridge.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.logstashbridge.geoip; + +import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.logstashbridge.StableBridgeAPI; + +import java.io.File; +import java.io.IOException; + +public interface MaxMindDbBridge { + + class Reader extends StableBridgeAPI.ProxyInternal { + + @SuppressForbidden(reason = "Maxmind Reader constructor requires java.io.File") + public Reader(final File databasePath, final NodeCache nodeCache) throws IOException { + super(new com.maxmind.db.Reader(databasePath, nodeCache.toInternal())); + } + + protected Reader(final com.maxmind.db.Reader internalDelegate) { + super(internalDelegate); + } + + @Override + public com.maxmind.db.Reader toInternal() { + return internalDelegate; + } + + public String getDatabaseType() { + return toInternal().getMetadata().getDatabaseType(); + } + + public void close() throws IOException { + toInternal().close(); + } + } + + class NodeCache extends StableBridgeAPI.ProxyInternal { + + protected NodeCache(final com.maxmind.db.NodeCache internalDelegate) { + super(internalDelegate); + } + + public static NodeCache get(final int capacity) { + return new NodeCache(new com.maxmind.db.CHMCache(capacity)); + } + + public static NodeCache getInstance() { + return new NodeCache(com.maxmind.db.NoCache.getInstance()); + } + } + +} diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java index 2d7d15b90908f..ce7473f4db460 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java @@ -20,6 +20,7 @@ import org.elasticsearch.logstashbridge.threadpool.ThreadPoolBridge; import java.util.Map; +import java.util.Objects; import java.util.function.BiConsumer; /** @@ -81,7 +82,10 @@ public String getDescription() { public void execute(IngestDocument ingestDocument, BiConsumer handler) { AbstractExternal.this.execute( IngestDocumentBridge.fromInternalNullable(ingestDocument), - (idb, e) -> handler.accept(idb.toInternal(), e) + (ingestDocumentBridge, e) -> handler.accept( + Objects.isNull(ingestDocumentBridge) ? null : ingestDocumentBridge.toInternal(), + e + ) ); } diff --git a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java index e7d40a79baf47..7a706f1a8cae7 100644 --- a/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java +++ b/libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/script/ScriptServiceBridge.java @@ -8,8 +8,10 @@ */ package org.elasticsearch.logstashbridge.script; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.CheckedRunnable; import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.ingest.common.ProcessorsWhitelistExtension; import org.elasticsearch.logstashbridge.StableBridgeAPI; @@ -68,9 +70,8 @@ private static ScriptService getScriptService(final Settings settings, final Lon MustacheScriptEngine.NAME, new MustacheScriptEngine(settings) ); - @FixForMultiProject // Should this be non-null? - final ProjectResolver projectResolver = null; - return new ScriptService(settings, scriptEngines, ScriptModule.CORE_CONTEXTS, timeProvider, projectResolver); + + return new ScriptService(settings, scriptEngines, ScriptModule.CORE_CONTEXTS, timeProvider, ProjectIdResolverBridge.INSTANCE); } private static List getPainlessBaseWhiteList() { @@ -112,4 +113,27 @@ public List loadExtensions(Class extensionPointType) { public void close() throws IOException { this.internalDelegate.close(); } + + @FixForMultiProject + // Logstash resolves and runs ingest pipelines based on the datastream. + // How should ProjectIdResolverBridge behave in this case? + // In other words, it looks we need to find a way to figure out which ingest pipeline belongs to which project. + static class ProjectIdResolverBridge implements ProjectResolver { + + public static final ProjectIdResolverBridge INSTANCE = new ProjectIdResolverBridge(); + + @Override + public ProjectId getProjectId() { + return ProjectId.DEFAULT; + } + + @Override + public void executeOnProject(ProjectId projectId, CheckedRunnable body) throws E { + if (projectId.equals(ProjectId.DEFAULT)) { + body.run(); + } else { + throw new IllegalArgumentException("Cannot execute on a project other than [" + ProjectId.DEFAULT + "]"); + } + } + } } diff --git a/modules/ingest-geoip/build.gradle b/modules/ingest-geoip/build.gradle index 34efcb290885b..f494d1ea65500 100644 --- a/modules/ingest-geoip/build.gradle +++ b/modules/ingest-geoip/build.gradle @@ -34,6 +34,8 @@ dependencies { runtimeOnly("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}") runtimeOnly("com.fasterxml.jackson.core:jackson-databind:${versions.jackson}") runtimeOnly("com.fasterxml.jackson.core:jackson-core:${versions.jackson}") + // when you upgrade maxmind dependency, please also do so in libs/logstash-bridge/build.gradle + // elastic_integration plugin embeds this module but references through logstash-bridge implementation('com.maxmind.db:maxmind-db:3.1.1') testImplementation 'org.elasticsearch:geolite2-databases:20191119' diff --git a/modules/ingest-geoip/src/main/java/module-info.java b/modules/ingest-geoip/src/main/java/module-info.java index 0703d9fb449aa..1e903444fc9e9 100644 --- a/modules/ingest-geoip/src/main/java/module-info.java +++ b/modules/ingest-geoip/src/main/java/module-info.java @@ -19,5 +19,5 @@ exports org.elasticsearch.ingest.geoip.direct to org.elasticsearch.server; exports org.elasticsearch.ingest.geoip.stats to org.elasticsearch.server; - exports org.elasticsearch.ingest.geoip to com.maxmind.db; + exports org.elasticsearch.ingest.geoip to com.maxmind.db, org.elasticsearch.logstashbridge; }