Skip to content

[Logstash] Move elastic_integration plugin usage to ES logstash-bridge. #131486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b9cdd5b
Testing painless processors extension.
mashhurs Jul 14, 2025
e3310cd
Test paintless spaial, wildcard and constant keyword extensions.
mashhurs Jul 14, 2025
3a6a795
Fix the bug that when null applied for Map.of() which may cause NPE.
mashhurs Jul 15, 2025
e2dd123
Debug
mashhurs Jul 15, 2025
84f38e3
Debugging: require delegate and its metadata non null to get specific…
mashhurs Jul 16, 2025
78a3394
Debugging: instead of Map.copyOf(), send ingest metadata as it is.
mashhurs Jul 16, 2025
38b25ce
Make metadata version field accessible, introduce inverse wrapper wit…
mashhurs Jul 16, 2025
0e70a6c
RedactPlugin, IngestCommonPlugin and IngestUserAgent plugins moved to…
mashhurs Jul 17, 2025
b7ac030
Ingest common plugin simplification.
mashhurs Jul 17, 2025
0d0acef
[CI] Auto commit changes from spotless
Jul 17, 2025
577e0f2
Open an access for the x-pack spatial module resources.
mashhurs Jul 17, 2025
94aa3e2
[CI] Auto commit changes from spotless
Jul 17, 2025
87774ae
Open access to resources in mapper constant keyword.
mashhurs Jul 18, 2025
bfa2d27
Add an access to resource in x-pack wildcard module.
mashhurs Jul 18, 2025
4b1ef11
Export and open access to sub-package spatial modules.
mashhurs Jul 18, 2025
fb835ba
Provide module service implementations with provides keyword in x-pac…
mashhurs Jul 18, 2025
b4346d3
Export x-pack spatial module packages to make accessible to ES server…
mashhurs Jul 18, 2025
a9981f5
Merge branch 'main' into move-to-bridge-stable-api-investigation
mashhurs Jul 18, 2025
290e82a
spike refactor of logstashbridge stable API
yaauie Jul 21, 2025
ba80d06
Merge pull request #1 from yaauie/stable-api-clarity-external-internal
mashhurs Jul 22, 2025
cd44d2f
Format the recent logstash-bridge changes.
mashhurs Jul 22, 2025
4eee343
Wildcard and mapper constant keyword modules open resource access to …
mashhurs Jul 22, 2025
ede5de5
Merge branch 'main' into move-to-bridge-stable-api-investigation
mashhurs Jul 22, 2025
166d4be
Rename constant-keyword, wildcard, redact and spatial modules in a wa…
mashhurs Jul 23, 2025
c9b3be1
Merge branch 'main' into move-to-bridge-stable-api-investigation
mashhurs Jul 23, 2025
7f84caf
Remove unnecessary constant class in the ingest common plugin bridge.
mashhurs Jul 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion libs/logstash-bridge/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ dependencies {
compileOnly project(':modules:lang-painless:spi')
compileOnly project(':modules:lang-mustache')
compileOnly project(':modules:ingest-common')
// compileOnly project(':modules:ingest-geoip')
compileOnly project(':modules:ingest-geoip')
compileOnly project(':modules:ingest-user-agent')
compileOnly project(':x-pack:plugin:core')
compileOnly project(':x-pack:plugin:mapper-constant-keyword')
compileOnly project(':x-pack:plugin:redact')
compileOnly project(':x-pack:plugin:spatial')
compileOnly project(':x-pack:plugin:wildcard')
}

tasks.named('forbiddenApisMain').configure {
Expand Down
7 changes: 7 additions & 0 deletions libs/logstash-bridge/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@
requires org.elasticsearch.server;
requires org.elasticsearch.painless;
requires org.elasticsearch.painless.spi;
requires org.elasticsearch.ingest.common;
requires org.elasticsearch.ingest.useragent;
requires org.elasticsearch.mustache;
requires org.elasticsearch.xcontent;
requires org.elasticsearch.xcore;
requires org.elasticsearch.xpack.constantkeyword;
requires org.elasticsearch.xpack.redact;
requires org.elasticsearch.xpack.spatial;
requires org.elasticsearch.xpack.wildcard;

exports org.elasticsearch.logstashbridge;
exports org.elasticsearch.logstashbridge.common;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
/**
* A {@code StableBridgeAPI} is the stable bridge to an Elasticsearch API, and can produce instances
* from the actual API that they mirror. As part of the LogstashBridge project, these classes are relied
* upon by the "Elastic Integration Filter Plugin" for Logstash and their external shapes mut not change
* upon by the "Elastic Integration Filter Plugin" for Logstash and their external shapes must not change
* without coordination with the maintainers of that project.
*
* @param <T> the actual type of the Elasticsearch API being mirrored
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@

public class IngestDocumentBridge extends StableBridgeAPI.Proxy<IngestDocument> {

public static final class Constants {
public static final String METADATA_VERSION_FIELD_NAME = IngestDocument.Metadata.VERSION.getFieldName();

private Constants() {}
}

public static IngestDocumentBridge wrap(final IngestDocument ingestDocument) {
if (ingestDocument == null) {
return null;
Expand Down Expand Up @@ -60,7 +66,7 @@ public void resetReroute() {
}

public Map<String, Object> getIngestMetadata() {
return Map.copyOf(delegate.getIngestMetadata());
return delegate.getIngestMetadata();
}

public <T> T getFieldValue(final String fieldName, final Class<T> type) {
Expand All @@ -83,7 +89,6 @@ public void removeField(final String path) {
delegate.removeField(path);
}

// public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
public void executePipeline(final PipelineBridge pipelineBridge, final BiConsumer<IngestDocumentBridge, Exception> handler) {
this.delegate.executePipeline(pipelineBridge.unwrap(), (unwrapped, e) -> handler.accept(IngestDocumentBridge.wrap(unwrapped), e));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
*/
package org.elasticsearch.logstashbridge.ingest;

import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.logstashbridge.StableBridgeAPI;
Expand All @@ -21,6 +23,45 @@
import java.util.function.BiConsumer;

public interface ProcessorBridge extends StableBridgeAPI<Processor> {

final class Constants {
private Constants() {}

public static final String APPEND_PROCESSOR_TYPE = org.elasticsearch.ingest.common.AppendProcessor.TYPE;
public static final String BYTES_PROCESSOR_TYPE = org.elasticsearch.ingest.common.BytesProcessor.TYPE;
public static final String COMMUNITY_ID_PROCESSOR_TYPE = org.elasticsearch.ingest.common.CommunityIdProcessor.TYPE;
public static final String CONVERT_PROCESSOR_TYPE = org.elasticsearch.ingest.common.ConvertProcessor.TYPE;
public static final String CSV_PROCESSOR_TYPE = org.elasticsearch.ingest.common.CsvProcessor.TYPE;
public static final String DATE_INDEX_NAME_PROCESSOR_TYPE = org.elasticsearch.ingest.common.DateIndexNameProcessor.TYPE;
public static final String DATE_PROCESSOR_TYPE = org.elasticsearch.ingest.common.DateProcessor.TYPE;
public static final String DISSECT_PROCESSOR_TYPE = org.elasticsearch.ingest.common.DissectProcessor.TYPE;
public static final String DROP_PROCESSOR_TYPE = org.elasticsearch.ingest.DropProcessor.TYPE;
public static final String FAIL_PROCESSOR_TYPE = org.elasticsearch.ingest.common.FailProcessor.TYPE;
public static final String FINGERPRINT_PROCESSOR_TYPE = org.elasticsearch.ingest.common.FingerprintProcessor.TYPE;
public static final String FOR_EACH_PROCESSOR_TYPE = org.elasticsearch.ingest.common.ForEachProcessor.TYPE;
public static final String GROK_PROCESSOR_TYPE = org.elasticsearch.ingest.common.GrokProcessor.TYPE;
public static final String GSUB_PROCESSOR_TYPE = org.elasticsearch.ingest.common.GsubProcessor.TYPE;
public static final String HTML_STRIP_PROCESSOR_TYPE = org.elasticsearch.ingest.common.HtmlStripProcessor.TYPE;
public static final String JOIN_PROCESSOR_TYPE = org.elasticsearch.ingest.common.JoinProcessor.TYPE;
public static final String JSON_PROCESSOR_TYPE = org.elasticsearch.ingest.common.JsonProcessor.TYPE;
public static final String KEY_VALUE_PROCESSOR_TYPE = org.elasticsearch.ingest.common.KeyValueProcessor.TYPE;
public static final String LOWERCASE_PROCESSOR_TYPE = org.elasticsearch.ingest.common.LowercaseProcessor.TYPE;
public static final String NETWORK_DIRECTION_PROCESSOR_TYPE = org.elasticsearch.ingest.common.NetworkDirectionProcessor.TYPE;
public static final String REGISTERED_DOMAIN_PROCESSOR_TYPE = org.elasticsearch.ingest.common.RegisteredDomainProcessor.TYPE;
public static final String REMOVE_PROCESSOR_TYPE = org.elasticsearch.ingest.common.RemoveProcessor.TYPE;
public static final String RENAME_PROCESSOR_TYPE = org.elasticsearch.ingest.common.RenameProcessor.TYPE;
public static final String REROUTE_PROCESSOR_TYPE = org.elasticsearch.ingest.common.RerouteProcessor.TYPE;
public static final String SCRIPT_PROCESSOR_TYPE = org.elasticsearch.ingest.common.ScriptProcessor.TYPE;
public static final String SET_PROCESSOR_TYPE = org.elasticsearch.ingest.common.SetProcessor.TYPE;
public static final String SORT_PROCESSOR_TYPE = org.elasticsearch.ingest.common.SortProcessor.TYPE;
public static final String SPLIT_PROCESSOR_TYPE = org.elasticsearch.ingest.common.SplitProcessor.TYPE;
public static final String TRIM_PROCESSOR_TYPE = org.elasticsearch.ingest.common.TrimProcessor.TYPE;
public static final String URL_DECODE_PROCESSOR_TYPE = org.elasticsearch.ingest.common.URLDecodeProcessor.TYPE;
public static final String UPPERCASE_PROCESSOR_TYPE = org.elasticsearch.ingest.common.UppercaseProcessor.TYPE;
public static final String URI_PARTS_PROCESSOR_TYPE = org.elasticsearch.ingest.common.UriPartsProcessor.TYPE;

}

String getType();

String getTag();
Expand All @@ -29,9 +70,12 @@ public interface ProcessorBridge extends StableBridgeAPI<Processor> {

boolean isAsync();

void execute(IngestDocumentBridge ingestDocumentBridge, BiConsumer<IngestDocumentBridge, Exception> handler) throws Exception;
void execute(IngestDocumentBridge ingestDocumentBridge, BiConsumer<IngestDocumentBridge, Exception> handler);

static ProcessorBridge wrap(final Processor delegate) {
if (delegate instanceof InverseWrapped inverseWrapped) {
return inverseWrapped.delegate;
}
return new Wrapped(delegate);
}

Expand Down Expand Up @@ -61,15 +105,52 @@ public boolean isAsync() {
}

@Override
public void execute(final IngestDocumentBridge ingestDocumentBridge, final BiConsumer<IngestDocumentBridge, Exception> handler)
throws Exception {
public void execute(final IngestDocumentBridge ingestDocumentBridge, final BiConsumer<IngestDocumentBridge, Exception> handler) {
delegate.execute(
StableBridgeAPI.unwrapNullable(ingestDocumentBridge),
(id, e) -> handler.accept(IngestDocumentBridge.wrap(id), e)
);
}
}

@Override
default Processor unwrap() {
return new InverseWrapped(this);
}

class InverseWrapped implements Processor {
private final ProcessorBridge delegate;

public InverseWrapped(final ProcessorBridge delegate) {
this.delegate = delegate;
}

@Override
public String getType() {
return delegate.getType();
}

@Override
public String getTag() {
return delegate.getTag();
}

@Override
public String getDescription() {
return delegate.getDescription();
}

@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
this.delegate.execute(IngestDocumentBridge.wrap(ingestDocument), (idb, e) -> handler.accept(idb.unwrap(), e));
}

@Override
public boolean isAsync() {
return delegate.isAsync();
}
}

class Parameters extends StableBridgeAPI.Proxy<Processor.Parameters> {

public Parameters(
Expand Down Expand Up @@ -141,7 +222,7 @@ public ProcessorBridge create(
final Map<String, Object> config
) throws Exception {
return ProcessorBridge.wrap(
this.delegate.create(StableBridgeAPI.unwrap(registry), processorTag, description, config, null)
this.delegate.create(StableBridgeAPI.unwrap(registry), processorTag, description, config, ProjectId.DEFAULT)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.logstashbridge.plugins;

import org.elasticsearch.ingest.common.IngestCommonPlugin;
import org.elasticsearch.logstashbridge.StableBridgeAPI;
import org.elasticsearch.logstashbridge.ingest.ProcessorBridge;

import java.util.Map;

public class IngestCommonPluginBridge implements IngestPluginBridge {

private final IngestCommonPlugin delegate;

public IngestCommonPluginBridge() {
delegate = new IngestCommonPlugin();
}

@Override
public Map<String, ProcessorBridge.Factory> getProcessors(final ProcessorBridge.Parameters parameters) {
return StableBridgeAPI.wrap(this.delegate.getProcessors(parameters.unwrap()), ProcessorBridge.Factory::wrap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.logstashbridge.plugins;

import org.elasticsearch.ingest.useragent.IngestUserAgentPlugin;
import org.elasticsearch.logstashbridge.StableBridgeAPI;
import org.elasticsearch.logstashbridge.ingest.ProcessorBridge;

import java.util.Map;

public class IngestUserAgentPluginBridge implements IngestPluginBridge {

private final IngestUserAgentPlugin delegate;

public IngestUserAgentPluginBridge() {
delegate = new IngestUserAgentPlugin();
}

public Map<String, ProcessorBridge.Factory> getProcessors(final ProcessorBridge.Parameters parameters) {
return StableBridgeAPI.wrap(this.delegate.getProcessors(parameters.unwrap()), ProcessorBridge.Factory::wrap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.logstashbridge.plugins;

import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.logstashbridge.ingest.ProcessorBridge;
import org.elasticsearch.xpack.redact.RedactProcessor;

import java.util.Map;

public class RedactPluginBridge implements IngestPluginBridge {
@Override
public Map<String, ProcessorBridge.Factory> getProcessors(ProcessorBridge.Parameters parameters) {
// Provide a TRIAL license state to the redact processor
final XPackLicenseState trialLicenseState = new XPackLicenseState(parameters.unwrap().relativeTimeSupplier);

return Map.of(
RedactProcessor.TYPE,
ProcessorBridge.Factory.wrap(new RedactProcessor.Factory(trialLicenseState, parameters.unwrap().matcherWatchdog))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,47 @@
package org.elasticsearch.logstashbridge.script;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.common.ProcessorsWhitelistExtension;
import org.elasticsearch.logstashbridge.StableBridgeAPI;
import org.elasticsearch.logstashbridge.common.SettingsBridge;
import org.elasticsearch.painless.PainlessPlugin;
import org.elasticsearch.painless.PainlessScriptEngine;
import org.elasticsearch.painless.spi.PainlessExtension;
import org.elasticsearch.painless.spi.Whitelist;
import org.elasticsearch.plugins.ExtensiblePlugin;
import org.elasticsearch.script.IngestConditionalScript;
import org.elasticsearch.script.IngestScript;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.mustache.MustacheScriptEngine;
import org.elasticsearch.xpack.constantkeyword.ConstantKeywordPainlessExtension;
import org.elasticsearch.xpack.spatial.SpatialPainlessExtension;
import org.elasticsearch.xpack.wildcard.WildcardPainlessExtension;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.LongSupplier;

public class ScriptServiceBridge extends StableBridgeAPI.Proxy<ScriptService> implements Closeable {
public ScriptServiceBridge wrap(final ScriptService delegate) {
return new ScriptServiceBridge(delegate);
}

public ScriptServiceBridge(final SettingsBridge settingsBridge, final LongSupplier timeProvider) {
public ScriptServiceBridge(final SettingsBridge settingsBridge, final LongSupplier timeProvider) throws IOException {
super(getScriptService(settingsBridge.unwrap(), timeProvider));
}

public ScriptServiceBridge(ScriptService delegate) {
super(delegate);
}

private static ScriptService getScriptService(final Settings settings, final LongSupplier timeProvider) {
private static ScriptService getScriptService(final Settings settings, final LongSupplier timeProvider) throws IOException {
final List<Whitelist> painlessBaseWhitelist = getPainlessBaseWhiteList();
final Map<ScriptContext<?>, List<Whitelist>> scriptContexts = Map.of(
IngestScript.CONTEXT,
Expand All @@ -51,7 +59,7 @@ private static ScriptService getScriptService(final Settings settings, final Lon
);
final Map<String, ScriptEngine> scriptEngines = Map.of(
PainlessScriptEngine.NAME,
new PainlessScriptEngine(settings, scriptContexts),
getPainlessScriptEngine(settings),
MustacheScriptEngine.NAME,
new MustacheScriptEngine(settings)
);
Expand All @@ -62,6 +70,37 @@ private static List<Whitelist> getPainlessBaseWhiteList() {
return PainlessPlugin.baseWhiteList();
}

/**
* @param settings the Elasticsearch settings object
* @return a {@link ScriptEngine} for painless scripts for use in {@link IngestScript} and
* {@link IngestConditionalScript} contexts, including all available {@link PainlessExtension}s.
* @throws IOException when the underlying script engine cannot be created
*/
private static ScriptEngine getPainlessScriptEngine(final Settings settings) throws IOException {
try (PainlessPlugin painlessPlugin = new PainlessPlugin()) {
painlessPlugin.loadExtensions(new ExtensiblePlugin.ExtensionLoader() {
@Override
@SuppressWarnings("unchecked")
public <T> List<T> loadExtensions(Class<T> extensionPointType) {
if (extensionPointType.isAssignableFrom(PainlessExtension.class)) {
final List<PainlessExtension> extensions = new ArrayList<>();

extensions.add(new ConstantKeywordPainlessExtension()); // module: constant-keyword
extensions.add(new ProcessorsWhitelistExtension()); // module: ingest-common
extensions.add(new SpatialPainlessExtension()); // module: spatial
extensions.add(new WildcardPainlessExtension()); // module: wildcard

return (List<T>) extensions;
} else {
return List.of();
}
}
});

return painlessPlugin.getScriptEngine(settings, Set.of(IngestScript.CONTEXT, IngestConditionalScript.CONTEXT));
}
}

@Override
public void close() throws IOException {
this.delegate.close();
Expand Down
2 changes: 2 additions & 0 deletions modules/ingest-user-agent/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@
requires org.elasticsearch.server;
requires org.elasticsearch.xcontent;
requires org.elasticsearch.base;

exports org.elasticsearch.ingest.useragent;
}
Loading