Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a3d2323
Use default project resolver instead null which caused NPE when runni…
mashhurs Aug 6, 2025
2c03eb8
Introduce BridgeProjectIdResolver to applying null for project ID.
mashhurs Aug 6, 2025
d047add
Put a note to the ProjectIdResolverBridge about resolving project ID …
mashhurs Aug 6, 2025
74a65a0
GeoIP interfaces for the Logstash bridge where elastic_integration pl…
mashhurs Aug 8, 2025
3a48edf
Add a note to maxmind depenedency of the ingest-geoip module that wha…
mashhurs Aug 8, 2025
67a81ed
A safeguard in case if bridge becomes null, might happen while cyclic…
mashhurs Aug 8, 2025
7eb593c
Introduce bridges for FailProcessorException, Releasable and RefCount…
mashhurs Aug 11, 2025
caf01a5
hamfisted cleanup
yaauie Aug 26, 2025
99948ba
add static GeoIpProcessorBridge#newFactory(IpDatabaseProviderBridge)
yaauie Aug 28, 2025
e8e349f
provide ProcessorBridge.Factory.AbstractExternal
yaauie Aug 29, 2025
100f1fc
add ProjectIdBridge
yaauie Aug 29, 2025
94d58d4
add synchronous ProcessorBridge#execute
yaauie Aug 29, 2025
ac0c926
migrate RefCountingRunnableBridge to interface pattern
yaauie Aug 29, 2025
ec1afea
migrate SettingsBridge to interface pattern
yaauie Aug 29, 2025
e2ea87c
migrate PipelineConfigurationBridge to interface pattern
yaauie Sep 2, 2025
624ee14
migrate EnvironmentBridge to interface pattern
yaauie Sep 2, 2025
f92b6d6
migrate IngestDocumentBridge to interface pattern
yaauie Sep 2, 2025
0124c5d
migrate PipelineBridge to interface pattern
yaauie Sep 2, 2025
771e8b5
reduce unnecessary proxying in ProcessorBridge
yaauie Sep 2, 2025
2846dc9
migrate MetadataBridge to interface pattern
yaauie Sep 2, 2025
eb43cde
migrate ScriptServiceBridge to interface pattern
yaauie Sep 2, 2025
bb6a907
migrate ThreadPoolBridge to interface pattern
yaauie Sep 2, 2025
e3ba1cb
visibility cleanup, move AbstractExternal-s
yaauie Sep 2, 2025
abe7548
Merge pull request #3 from yaauie/rye-bridge-refinement-moar
mashhurs Sep 3, 2025
c5af625
Merge branch 'main' into logstash-bridge-geoip-interfaces
mashhurs Sep 3, 2025
ce08630
Remove accidental change where ingest-geoip module was opened to maxm…
mashhurs Sep 5, 2025
b1032aa
Merge branch 'main' into logstash-bridge-geoip-interfaces
mashhurs Sep 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions libs/logstash-bridge/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@ other Elasticsearch internals.

If a change is introduced in a separate Elasticsearch project that causes this project to fail,
please consult with members of @elastic/logstash to chart a path forward.

## How to build the module?
```shell
./gradlew :lib:logstash-bridge:build
```
2 changes: 2 additions & 0 deletions libs/logstash-bridge/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ dependencies {
compileOnly project(':x-pack:plugin:redact')
compileOnly project(':x-pack:plugin:spatial')
compileOnly project(':x-pack:plugin:wildcard')

compileOnly('com.maxmind.db:maxmind-db:3.1.1')
}

tasks.named('forbiddenApisMain').configure {
Expand Down
2 changes: 2 additions & 0 deletions libs/logstash-bridge/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
requires org.elasticsearch.redact;
requires org.elasticsearch.spatial;
requires org.elasticsearch.wildcard;
requires org.elasticsearch.ingest.geoip;
requires com.maxmind.db;

exports org.elasticsearch.logstashbridge;
exports org.elasticsearch.logstashbridge.common;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ static <T, B extends StableBridgeAPI<T>> B fromInternal(final T delegate, final
* An {@code ProxyInternal<INTERNAL>} is an implementation of {@code StableBridgeAPI<INTERNAL>} that
* proxies calls to a delegate that is an actual {@code INTERNAL}.
*
* <p>
* implementations are intended to be <em>opaque</em> to consumers of this library,
* and should <em>NOT</em> have public constructors.
* </p>
*
* @param <INTERNAL>
*/
abstract class ProxyInternal<INTERNAL> implements StableBridgeAPI<INTERNAL> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.logstashbridge.common;

import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.logstashbridge.StableBridgeAPI;

/**
* A {@link StableBridgeAPI} for {@link ProjectId}
*/
public interface ProjectIdBridge extends StableBridgeAPI<ProjectId> {
String id();

static ProjectIdBridge fromInternal(final ProjectId projectId) {
return new ProxyInternal(projectId);
}

static ProjectIdBridge fromId(final String id) {
final ProjectId internal = ProjectId.fromId(id);
return new ProxyInternal(internal);
}

static ProjectIdBridge getDefault() {
return ProxyInternal.DEFAULT;
}

/**
* An implementation of {@link ProjectIdBridge} that proxies calls to
* an internal {@link ProjectId} instance.
*
* @see StableBridgeAPI.ProxyInternal
*/
final class ProxyInternal extends StableBridgeAPI.ProxyInternal<ProjectId> implements ProjectIdBridge {
private static final ProjectIdBridge.ProxyInternal DEFAULT = new ProjectIdBridge.ProxyInternal(ProjectId.DEFAULT);

ProxyInternal(ProjectId internalDelegate) {
super(internalDelegate);
}

@Override
public String id() {
return this.internalDelegate.id();
}

@Override
public ProjectId toInternal() {
return this.internalDelegate;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,46 +12,27 @@
import org.elasticsearch.logstashbridge.StableBridgeAPI;

/**
* An external bridge for {@link Settings}
* A {@link StableBridgeAPI} for {@link Settings}
*/
public class SettingsBridge extends StableBridgeAPI.ProxyInternal<Settings> {
public interface SettingsBridge extends StableBridgeAPI<Settings> {

public static SettingsBridge fromInternal(final Settings delegate) {
return new SettingsBridge(delegate);
static SettingsBridge fromInternal(final Settings delegate) {
return new ProxyInternal(delegate);
}

public static Builder builder() {
return Builder.fromInternal(Settings.builder());
}

public SettingsBridge(final Settings delegate) {
super(delegate);
}

@Override
public Settings toInternal() {
return this.internalDelegate;
static SettingsBuilderBridge builder() {
return SettingsBuilderBridge.fromInternal(Settings.builder());
}

/**
* An external bridge for {@link Settings.Builder} that proxies calls to a real {@link Settings.Builder}
* An implementation of {@link SettingsBridge} that proxies calls to
* an internal {@link Settings} instance.
*
* @see StableBridgeAPI.ProxyInternal
*/
public static class Builder extends StableBridgeAPI.ProxyInternal<Settings.Builder> {
static Builder fromInternal(final Settings.Builder delegate) {
return new Builder(delegate);
}

private Builder(final Settings.Builder delegate) {
super(delegate);
}

public Builder put(final String key, final String value) {
this.internalDelegate.put(key, value);
return this;
}

public SettingsBridge build() {
return new SettingsBridge(this.internalDelegate.build());
final class ProxyInternal extends StableBridgeAPI.ProxyInternal<Settings> implements SettingsBridge {
ProxyInternal(Settings internalDelegate) {
super(internalDelegate);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.logstashbridge.common;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.logstashbridge.StableBridgeAPI;

/**
* A {@link StableBridgeAPI} for {@link Settings.Builder}.
*/
public interface SettingsBuilderBridge extends StableBridgeAPI<Settings.Builder> {

SettingsBuilderBridge put(String key, String value);

SettingsBridge build();

static SettingsBuilderBridge fromInternal(final Settings.Builder builder) {
return new ProxyInternal(builder);
}

/**
* An implementation of {@link SettingsBuilderBridge} that proxies calls to
* an internal {@link Settings.Builder} instance.
*
* @see StableBridgeAPI.ProxyInternal
*/
final class ProxyInternal extends StableBridgeAPI.ProxyInternal<Settings.Builder> implements SettingsBuilderBridge {
ProxyInternal(Settings.Builder internalDelegate) {
super(internalDelegate);
}

@Override
public SettingsBuilderBridge put(String key, String value) {
internalDelegate.put(key, value);
return this;
}

@Override
public SettingsBridge build() {
final Settings delegate = internalDelegate.build();
return SettingsBridge.fromInternal(delegate);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.logstashbridge.core;

import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.logstashbridge.StableBridgeAPI;

/**
* A stable interface on top of {@link CheckedBiFunction}.
* @param <T> type of lhs parameter
* @param <U> type of rhs parameter
* @param <R> type of return value
* @param <E> type of anticipated exception
*/
@FunctionalInterface
public interface CheckedBiFunctionBridge<T, U, R, E extends Exception> extends StableBridgeAPI<CheckedBiFunction<T, U, R, E>> {
R apply(T t, U u) throws E;

@Override
default CheckedBiFunction<T, U, R, E> toInternal() {
return this::apply;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* An external bridge for {@link IOUtils}
*/
public class IOUtilsBridge {
private IOUtilsBridge() {}

public static void closeWhileHandlingException(final Iterable<? extends Closeable> objects) {
IOUtils.closeWhileHandlingException(objects);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.logstashbridge.core;

import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.logstashbridge.StableBridgeAPI;

import java.io.Closeable;

/**
* A {@link StableBridgeAPI} for {@link RefCountingRunnable}
*/
public interface RefCountingRunnableBridge extends StableBridgeAPI<RefCountingRunnable>, Closeable {

@Override // only RuntimeException
void close();

ReleasableBridge acquire();

/**
* An API-stable factory method for {@link RefCountingRunnableBridge}
* @param delegate the {@link Runnable} to execute when all refs are closed
* @return a {@link RefCountingRunnableBridge} that will execute the provided
* block when all refs are closed
*/
static RefCountingRunnableBridge create(final Runnable delegate) {
final RefCountingRunnable refCountingRunnable = new RefCountingRunnable(delegate);
return new ProxyInternal(refCountingRunnable);
}

/**
* An implementation of {@link RefCountingRunnableBridge} that proxies calls through
* to an internal {@link RefCountingRunnable}.
* @see StableBridgeAPI.ProxyInternal
*/
final class ProxyInternal extends StableBridgeAPI.ProxyInternal<RefCountingRunnable> implements RefCountingRunnableBridge {
private ProxyInternal(final RefCountingRunnable delegate) {
super(delegate);
}

@Override
public void close() {
toInternal().close();
}

@Override
public ReleasableBridge acquire() {
@SuppressWarnings("resource")
final Releasable releasable = toInternal().acquire();
return ReleasableBridge.fromInternal(releasable);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.logstashbridge.core;

import org.elasticsearch.core.Releasable;
import org.elasticsearch.logstashbridge.StableBridgeAPI;

import java.io.Closeable;

/**
* A {@link StableBridgeAPI} for {@link Releasable} for use with {@link RefCountingRunnableBridge}
*/
public interface ReleasableBridge extends StableBridgeAPI<Releasable>, Closeable {

@Override // only RuntimeException
void close();

static ReleasableBridge fromInternal(Releasable releasable) {
return new ProxyInternal(releasable);
}

/**
* An implementation of {@link ReleasableBridge} that proxies calls through
* to an internal {@link Releasable}.
* @see StableBridgeAPI.ProxyInternal
*/
final class ProxyInternal extends StableBridgeAPI.ProxyInternal<Releasable> implements ReleasableBridge {

private ProxyInternal(final Releasable delegate) {
super(delegate);
}

@Override
public void close() {
toInternal().close();
}
}
}
Loading