Skip to content

Commit 6c55a10

Browse files
committed
Replace ES classes with their Bridge Stable API pairs.
1 parent febb207 commit 6c55a10

17 files changed

+239
-270
lines changed

build.gradle

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -358,14 +358,14 @@ task shadeElasticsearchLogstashBridge(type: com.github.jengelman.gradle.plugins.
358358
description "Shades the Elasticsearch logstash-bridge jar"
359359

360360
dependsOn buildElasticsearchLogstashBridge
361-
361+
362362
from("${buildDir}/elasticsearch-source/libs/logstash-bridge/build/distributions") {
363363
include "elasticsearch-logstash-bridge-*.jar"
364364
}
365-
365+
366366
archiveFileName = "elasticsearch-logstash-bridge-shaded.jar"
367367
destinationDirectory = file("${buildDir}/shaded")
368-
368+
369369
exclude '**/module-info.class'
370370
}
371371

src/main/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchPipelineConfigurationResolver.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import org.elasticsearch.client.Response;
1616
import org.elasticsearch.client.ResponseException;
1717
import org.elasticsearch.client.RestClient;
18-
import org.elasticsearch.ingest.PipelineConfiguration;
18+
import org.elasticsearch.logstashbridge.ingest.PipelineConfigurationBridge;
1919

2020
import java.util.Optional;
2121

@@ -24,7 +24,7 @@
2424
* that retrieves pipelines from Elasticsearch.
2525
*/
2626
public class ElasticsearchPipelineConfigurationResolver
27-
extends AbstractSimpleResolver<String,PipelineConfiguration>
27+
extends AbstractSimpleResolver<String, PipelineConfigurationBridge>
2828
implements PipelineConfigurationResolver {
2929
private final RestClient elasticsearchRestClient;
3030
private final PipelineConfigurationFactory pipelineConfigurationFactory;
@@ -37,13 +37,13 @@ public ElasticsearchPipelineConfigurationResolver(final RestClient elasticsearch
3737
}
3838

3939
@Override
40-
public Optional<PipelineConfiguration> resolveSafely(String pipelineName) throws Exception {
40+
public Optional<PipelineConfigurationBridge> resolveSafely(String pipelineName) throws Exception {
4141
final Response response;
4242
try {
4343
final Request request = new Request("GET", URLEncodedUtils.formatSegments("_ingest", "pipeline", pipelineName));
4444
response = elasticsearchRestClient.performRequest(request);
4545
final String jsonEncodedPayload = EntityUtils.toString(response.getEntity());
46-
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationFactory.parseNamedObject(jsonEncodedPayload);
46+
final PipelineConfigurationBridge pipelineConfiguration = pipelineConfigurationFactory.parseNamedObject(jsonEncodedPayload);
4747
return Optional.of(pipelineConfiguration);
4848
} catch (ResponseException re) {
4949
if (re.getResponse().getStatusLine().getStatusCode() == 404) {

src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessor.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
import org.apache.logging.log4j.LogManager;
1515
import org.apache.logging.log4j.Logger;
1616
import org.elasticsearch.action.support.RefCountingRunnable;
17-
import org.elasticsearch.core.IOUtils;
18-
import org.elasticsearch.ingest.IngestDocument;
19-
import org.elasticsearch.ingest.LogstashInternalBridge;
2017
import org.elasticsearch.ingest.common.FailProcessorException;
18+
import org.elasticsearch.logstashbridge.core.IOUtilsBridge;
19+
import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge;
2120

2221
import java.io.Closeable;
2322
import java.io.IOException;
2423
import java.util.Collection;
2524
import java.util.List;
25+
import java.util.Locale;
2626
import java.util.Map;
2727
import java.util.Objects;
2828
import java.util.Optional;
@@ -32,7 +32,6 @@
3232

3333
import static co.elastic.logstash.filters.elasticintegration.util.EventUtil.eventAsMap;
3434
import static co.elastic.logstash.filters.elasticintegration.util.EventUtil.serializeEventForLog;
35-
import static org.elasticsearch.core.Strings.format;
3635

3736
/**
3837
* An {@link EventProcessor} processes {@link Event}s by:
@@ -151,7 +150,7 @@ void processRequest(final IntegrationRequest request) {
151150

152151
final IngestPipeline ingestPipeline = loadedPipeline.get();
153152
LOGGER.trace(() -> String.format("Using loaded pipeline `%s` (%s)", pipelineName, System.identityHashCode(ingestPipeline)));
154-
final IngestDocument ingestDocument = eventMarshaller.toIngestDocument(request.event());
153+
final IngestDocumentBridge ingestDocument = eventMarshaller.toIngestDocument(request.event());
155154

156155
resolvedIndexName.ifPresent(indexName -> {
157156
ingestDocument.getMetadata().setIndex(indexName);
@@ -170,7 +169,7 @@ void processRequest(final IntegrationRequest request) {
170169
}
171170
}
172171

173-
private void executePipeline(final IngestDocument ingestDocument, final IngestPipeline ingestPipeline, final IntegrationRequest request) {
172+
private void executePipeline(final IngestDocumentBridge ingestDocument, final IngestPipeline ingestPipeline, final IntegrationRequest request) {
174173
final String pipelineName = ingestPipeline.getId();
175174
final String originalIndex = ingestDocument.getMetadata().getIndex();
176175
ingestPipeline.execute(ingestDocument, (resultIngestDocument, ingestPipelineException) -> {
@@ -193,17 +192,17 @@ private void executePipeline(final IngestDocument ingestDocument, final IngestPi
193192
} else {
194193

195194
final String newIndex = resultIngestDocument.getMetadata().getIndex();
196-
if (!Objects.equals(originalIndex, newIndex) && LogstashInternalBridge.isReroute(resultIngestDocument)) {
197-
LogstashInternalBridge.resetReroute(resultIngestDocument);
195+
if (!Objects.equals(originalIndex, newIndex) && ingestDocument.isReroute()) {
196+
ingestDocument.resetReroute();
198197
boolean cycle = !resultIngestDocument.updateIndexHistory(newIndex);
199198
if (cycle) {
200199
request.complete(incomingEvent -> {
201-
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message", format(
202-
"index cycle detected while processing pipeline [%s]: %s + %s",
203-
pipelineName,
204-
resultIngestDocument.getIndexHistory(),
205-
newIndex
206-
)));
200+
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message",
201+
String.format(Locale.ROOT, "index cycle detected while processing pipeline [%s]: %s + %s",
202+
pipelineName,
203+
resultIngestDocument.getIndexHistory(),
204+
newIndex)
205+
));
207206
});
208207
return;
209208
}
@@ -214,12 +213,14 @@ private void executePipeline(final IngestDocument ingestDocument, final IngestPi
214213
final Optional<IngestPipeline> reroutePipeline = resolve(reroutePipelineName.get(), internalPipelineProvider);
215214
if (reroutePipeline.isEmpty()) {
216215
request.complete(incomingEvent -> {
217-
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message", format(
218-
"reroute failed to load next pipeline [%s]: %s -> %s",
216+
annotateIngestPipelineFailure(
217+
incomingEvent,
219218
pipelineName,
220-
resultIngestDocument.getIndexHistory(),
221-
reroutePipelineName.get()
222-
)));
219+
Map.of("message",
220+
String.format(Locale.ROOT, "reroute failed to load next pipeline [%s]: %s -> %s",
221+
pipelineName,
222+
resultIngestDocument.getIndexHistory(),
223+
reroutePipelineName.get())));
223224
});
224225
} else {
225226
executePipeline(resultIngestDocument, reroutePipeline.get(), request);
@@ -277,6 +278,6 @@ static private <T,R> Optional<R> resolve(T resolvable, Resolver<T,R> resolver) {
277278

278279
@Override
279280
public void close() throws IOException {
280-
IOUtils.closeWhileHandlingException(this.resourcesToClose);
281+
IOUtilsBridge.closeWhileHandlingException(this.resourcesToClose);
281282
}
282283
}

src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java

Lines changed: 23 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -17,41 +17,20 @@
1717
import co.elastic.logstash.filters.elasticintegration.resolver.ResolverCache;
1818
import co.elastic.logstash.filters.elasticintegration.util.Exceptions;
1919
import co.elastic.logstash.filters.elasticintegration.util.PluginContext;
20-
import co.elastic.logstash.filters.elasticintegration.util.PluginProjectResolver;
2120
import com.google.common.util.concurrent.Service;
2221
import com.google.common.util.concurrent.ServiceManager;
2322
import org.elasticsearch.client.RestClient;
24-
import org.elasticsearch.common.settings.Settings;
25-
import org.elasticsearch.core.IOUtils;
26-
import org.elasticsearch.core.TimeValue;
27-
import org.elasticsearch.env.Environment;
28-
import org.elasticsearch.ingest.IngestService;
29-
import org.elasticsearch.ingest.LogstashInternalBridge;
30-
import org.elasticsearch.ingest.Processor;
31-
import org.elasticsearch.ingest.common.IngestCommonPlugin;
32-
import org.elasticsearch.ingest.common.ProcessorsWhitelistExtension;
33-
import org.elasticsearch.ingest.useragent.IngestUserAgentPlugin;
34-
import org.elasticsearch.painless.PainlessPlugin;
35-
import org.elasticsearch.painless.PainlessScriptEngine;
36-
import org.elasticsearch.painless.spi.PainlessExtension;
37-
import org.elasticsearch.plugins.ExtensiblePlugin;
38-
import org.elasticsearch.plugins.IngestPlugin;
39-
import org.elasticsearch.script.IngestConditionalScript;
40-
import org.elasticsearch.script.IngestScript;
41-
import org.elasticsearch.script.ScriptEngine;
42-
import org.elasticsearch.script.ScriptModule;
43-
import org.elasticsearch.script.ScriptService;
44-
import org.elasticsearch.script.mustache.MustacheScriptEngine;
45-
import org.elasticsearch.threadpool.ThreadPool;
46-
import org.elasticsearch.xpack.constantkeyword.ConstantKeywordPainlessExtension;
47-
import org.elasticsearch.xpack.spatial.SpatialPainlessExtension;
48-
import org.elasticsearch.xpack.wildcard.WildcardPainlessExtension;
23+
import org.elasticsearch.logstashbridge.common.SettingsBridge;
24+
import org.elasticsearch.logstashbridge.core.IOUtilsBridge;
25+
import org.elasticsearch.logstashbridge.env.EnvironmentBridge;
26+
import org.elasticsearch.logstashbridge.ingest.ProcessorBridge;
27+
import org.elasticsearch.logstashbridge.plugins.IngestPluginBridge;
28+
import org.elasticsearch.logstashbridge.script.ScriptServiceBridge;
29+
import org.elasticsearch.logstashbridge.threadpool.ThreadPoolBridge;
4930

5031
import java.io.Closeable;
51-
import java.io.IOException;
5232
import java.time.Duration;
5333
import java.util.ArrayList;
54-
import java.util.HashMap;
5534
import java.util.List;
5635
import java.util.Map;
5736
import java.util.Objects;
@@ -91,7 +70,7 @@ public static EventProcessorBuilder fromElasticsearch(final RestClient elasticse
9170
}
9271

9372
public EventProcessorBuilder() {
94-
this.addProcessorsFromPlugin(IngestCommonPlugin::new, Set.of(
73+
this.addProcessorsFromPlugin(() -> IngestPluginBridge.wrap(new org.elasticsearch.ingest.common.IngestCommonPlugin()), Set.of(
9574
org.elasticsearch.ingest.common.AppendProcessor.TYPE,
9675
org.elasticsearch.ingest.common.BytesProcessor.TYPE,
9776
org.elasticsearch.ingest.common.CommunityIdProcessor.TYPE,
@@ -127,7 +106,7 @@ public EventProcessorBuilder() {
127106
org.elasticsearch.ingest.common.URLDecodeProcessor.TYPE,
128107
org.elasticsearch.ingest.common.UppercaseProcessor.TYPE,
129108
org.elasticsearch.ingest.common.UriPartsProcessor.TYPE));
130-
this.addProcessorsFromPlugin(IngestUserAgentPlugin::new);
109+
this.addProcessorsFromPlugin(() -> IngestPluginBridge.wrap(new org.elasticsearch.ingest.useragent.IngestUserAgentPlugin()));
131110
this.addProcessorsFromPlugin(RedactPlugin::new);
132111
this.addProcessor(SetSecurityUserProcessor.TYPE, SetSecurityUserProcessor.Factory::new);
133112
}
@@ -150,7 +129,7 @@ public EventProcessorBuilder() {
150129
// filer match listener
151130
private FilterMatchListener filterMatchListener;
152131

153-
private final List<Supplier<IngestPlugin>> ingestPlugins = new ArrayList<>();
132+
private final List<Supplier<IngestPluginBridge>> ingestPlugins = new ArrayList<>();
154133

155134
public synchronized EventProcessorBuilder setPipelineConfigurationResolver(final PipelineConfigurationResolver pipelineConfigurationResolver) {
156135
if (Objects.nonNull(this.pipelineConfigurationResolver)) {
@@ -215,15 +194,15 @@ private synchronized EventProcessorBuilder setFilterMatchListener(final FilterMa
215194
return this;
216195
}
217196

218-
public EventProcessorBuilder addProcessor(final String type, final Supplier<Processor.Factory> processorFactorySupplier) {
197+
public EventProcessorBuilder addProcessor(final String type, final Supplier<ProcessorBridge.Factory> processorFactorySupplier) {
219198
return this.addProcessorsFromPlugin(SingleProcessorIngestPlugin.of(type, processorFactorySupplier));
220199
}
221200

222-
public EventProcessorBuilder addProcessorsFromPlugin(Supplier<IngestPlugin> pluginSupplier, Set<String> requiredProcessors) {
201+
public EventProcessorBuilder addProcessorsFromPlugin(Supplier<IngestPluginBridge> pluginSupplier, Set<String> requiredProcessors) {
223202
return this.addProcessorsFromPlugin(safeSubset(pluginSupplier, requiredProcessors));
224203
}
225204

226-
public synchronized EventProcessorBuilder addProcessorsFromPlugin(Supplier<IngestPlugin> pluginSupplier) {
205+
public synchronized EventProcessorBuilder addProcessorsFromPlugin(Supplier<IngestPluginBridge> pluginSupplier) {
227206
this.ingestPlugins.add(pluginSupplier);
228207
return this;
229208
}
@@ -233,7 +212,7 @@ public synchronized EventProcessor build(final PluginContext pluginContext) {
233212
Objects.requireNonNull(this.eventToIndexNameResolver, "event index name resolver is REQUIRED");
234213
Objects.requireNonNull(this.indexNameToPipelineNameResolver, "pipeline name resolver is REQUIRED");
235214

236-
final Settings settings = Settings.builder()
215+
final SettingsBridge settings = SettingsBridge.builder()
237216
.put("path.home", "/")
238217
.put("node.name", "logstash.filter.elastic_integration." + pluginContext.pluginId())
239218
.put("ingest.grok.watchdog.interval", "1s")
@@ -245,33 +224,22 @@ public synchronized EventProcessor build(final PluginContext pluginContext) {
245224
try {
246225
final ArrayList<Service> services = new ArrayList<>();
247226

248-
final ThreadPool threadPool = LogstashInternalBridge.createThreadPool(settings);
249-
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
227+
final ThreadPoolBridge threadPool = new ThreadPoolBridge(settings);
228+
resourcesToClose.add(() -> ThreadPoolBridge.terminate(threadPool, 10, TimeUnit.SECONDS));
250229

251-
final ScriptService scriptService = initScriptService(settings, threadPool);
230+
final ScriptServiceBridge scriptService = new ScriptServiceBridge(settings, threadPool::absoluteTimeInMillis);
252231
resourcesToClose.add(scriptService);
253232

254-
final Environment env = new Environment(settings, null);
255-
final Processor.Parameters processorParameters = new Processor.Parameters(
256-
env,
257-
scriptService,
258-
null,
259-
threadPool.getThreadContext(),
260-
threadPool::relativeTimeInMillis,
261-
(delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic()),
262-
null,
263-
null,
264-
threadPool.generic()::execute,
265-
IngestService.createGrokThreadWatchdog(env, threadPool)
266-
);
233+
final EnvironmentBridge env = new EnvironmentBridge(settings, null);
234+
final ProcessorBridge.Parameters processorParameters = new ProcessorBridge.Parameters(env, scriptService, threadPool);
267235

268236
IngestPipelineFactory ingestPipelineFactory = new IngestPipelineFactory(scriptService);
269-
for (Supplier<IngestPlugin> ingestPluginSupplier : ingestPlugins) {
270-
final IngestPlugin ingestPlugin = ingestPluginSupplier.get();
237+
for (Supplier<IngestPluginBridge> ingestPluginSupplier : ingestPlugins) {
238+
final IngestPluginBridge ingestPlugin = ingestPluginSupplier.get();
271239
if (ingestPlugin instanceof Closeable closeableIngestPlugin) {
272240
resourcesToClose.add(closeableIngestPlugin);
273241
}
274-
final Map<String, Processor.Factory> processorFactories = ingestPlugin.getProcessors(processorParameters);
242+
final Map<String, ProcessorBridge.Factory> processorFactories = ingestPlugin.getProcessors(processorParameters);
275243
ingestPipelineFactory = ingestPipelineFactory.withProcessors(processorFactories);
276244
}
277245

@@ -309,53 +277,8 @@ public synchronized EventProcessor build(final PluginContext pluginContext) {
309277
indexNameToPipelineNameResolver,
310278
resourcesToClose);
311279
} catch (Exception e) {
312-
IOUtils.closeWhileHandlingException(resourcesToClose);
280+
IOUtilsBridge.closeWhileHandlingException(resourcesToClose);
313281
throw Exceptions.wrap(e, "Failed to build EventProcessor");
314282
}
315283
}
316-
317-
private static ScriptService initScriptService(final Settings settings, final ThreadPool threadPool) throws IOException {
318-
Map<String, ScriptEngine> engines = new HashMap<>();
319-
engines.put(PainlessScriptEngine.NAME, getPainlessScriptEngine(settings));
320-
engines.put(MustacheScriptEngine.NAME, new MustacheScriptEngine(settings));
321-
322-
return new ScriptService(
323-
settings,
324-
engines,
325-
ScriptModule.CORE_CONTEXTS,
326-
threadPool::absoluteTimeInMillis,
327-
new PluginProjectResolver());
328-
}
329-
330-
/**
331-
* @param settings the Elasticsearch settings object
332-
* @return a {@link ScriptEngine} for painless scripts for use in {@link IngestScript} and
333-
* {@link IngestConditionalScript} contexts, including all available {@link PainlessExtension}s.
334-
* @throws IOException when the underlying script engine cannot be created
335-
*/
336-
private static ScriptEngine getPainlessScriptEngine(final Settings settings) throws IOException {
337-
try (final PainlessPlugin painlessPlugin = new PainlessPlugin()) {
338-
339-
painlessPlugin.loadExtensions(new ExtensiblePlugin.ExtensionLoader() {
340-
@Override
341-
@SuppressWarnings("unchecked")
342-
public <T> List<T> loadExtensions(Class<T> extensionPointType) {
343-
if (extensionPointType.isAssignableFrom(PainlessExtension.class)) {
344-
final List<PainlessExtension> extensions = new ArrayList<>();
345-
346-
extensions.add(new ConstantKeywordPainlessExtension()); // module: constant-keyword
347-
extensions.add(new ProcessorsWhitelistExtension()); // module: ingest-common
348-
extensions.add(new SpatialPainlessExtension()); // module: spatial
349-
extensions.add(new WildcardPainlessExtension()); // module: wildcard
350-
351-
return (List<T>) extensions;
352-
} else {
353-
return List.of();
354-
}
355-
}
356-
});
357-
358-
return painlessPlugin.getScriptEngine(settings, Set.of(IngestScript.CONTEXT, IngestConditionalScript.CONTEXT));
359-
}
360-
}
361284
}

0 commit comments

Comments
 (0)