Skip to content

Commit 0707b1b

Browse files
committed
Replace ES classes with their Bridge Stable API pairs.
1 parent c8bfd21 commit 0707b1b

16 files changed

+236
-261
lines changed

src/main/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchPipelineConfigurationResolver.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import org.elasticsearch.client.Response;
1616
import org.elasticsearch.client.ResponseException;
1717
import org.elasticsearch.client.RestClient;
18-
import org.elasticsearch.ingest.PipelineConfiguration;
18+
import org.elasticsearch.logstashbridge.ingest.PipelineConfigurationBridge;
1919

2020
import java.util.Optional;
2121

@@ -24,7 +24,7 @@
2424
* that retrieves pipelines from Elasticsearch.
2525
*/
2626
public class ElasticsearchPipelineConfigurationResolver
27-
extends AbstractSimpleResolver<String,PipelineConfiguration>
27+
extends AbstractSimpleResolver<String, PipelineConfigurationBridge>
2828
implements PipelineConfigurationResolver {
2929
private final RestClient elasticsearchRestClient;
3030
private final PipelineConfigurationFactory pipelineConfigurationFactory;
@@ -37,13 +37,13 @@ public ElasticsearchPipelineConfigurationResolver(final RestClient elasticsearch
3737
}
3838

3939
@Override
40-
public Optional<PipelineConfiguration> resolveSafely(String pipelineName) throws Exception {
40+
public Optional<PipelineConfigurationBridge> resolveSafely(String pipelineName) throws Exception {
4141
final Response response;
4242
try {
4343
final Request request = new Request("GET", URLEncodedUtils.formatSegments("_ingest", "pipeline", pipelineName));
4444
response = elasticsearchRestClient.performRequest(request);
4545
final String jsonEncodedPayload = EntityUtils.toString(response.getEntity());
46-
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationFactory.parseNamedObject(jsonEncodedPayload);
46+
final PipelineConfigurationBridge pipelineConfiguration = pipelineConfigurationFactory.parseNamedObject(jsonEncodedPayload);
4747
return Optional.of(pipelineConfiguration);
4848
} catch (ResponseException re) {
4949
if (re.getResponse().getStatusLine().getStatusCode() == 404) {

src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessor.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
import org.apache.logging.log4j.LogManager;
1515
import org.apache.logging.log4j.Logger;
1616
import org.elasticsearch.action.support.RefCountingRunnable;
17-
import org.elasticsearch.core.IOUtils;
18-
import org.elasticsearch.ingest.IngestDocument;
19-
import org.elasticsearch.ingest.LogstashInternalBridge;
2017
import org.elasticsearch.ingest.common.FailProcessorException;
18+
import org.elasticsearch.logstashbridge.core.IOUtilsBridge;
19+
import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge;
2120

2221
import java.io.Closeable;
2322
import java.io.IOException;
2423
import java.util.Collection;
2524
import java.util.List;
25+
import java.util.Locale;
2626
import java.util.Map;
2727
import java.util.Objects;
2828
import java.util.Optional;
@@ -32,7 +32,6 @@
3232

3333
import static co.elastic.logstash.filters.elasticintegration.util.EventUtil.eventAsMap;
3434
import static co.elastic.logstash.filters.elasticintegration.util.EventUtil.serializeEventForLog;
35-
import static org.elasticsearch.core.Strings.format;
3635

3736
/**
3837
* An {@link EventProcessor} processes {@link Event}s by:
@@ -151,7 +150,7 @@ void processRequest(final IntegrationRequest request) {
151150

152151
final IngestPipeline ingestPipeline = loadedPipeline.get();
153152
LOGGER.trace(() -> String.format("Using loaded pipeline `%s` (%s)", pipelineName, System.identityHashCode(ingestPipeline)));
154-
final IngestDocument ingestDocument = eventMarshaller.toIngestDocument(request.event());
153+
final IngestDocumentBridge ingestDocument = eventMarshaller.toIngestDocument(request.event());
155154

156155
resolvedIndexName.ifPresent(indexName -> {
157156
ingestDocument.getMetadata().setIndex(indexName);
@@ -170,7 +169,7 @@ void processRequest(final IntegrationRequest request) {
170169
}
171170
}
172171

173-
private void executePipeline(final IngestDocument ingestDocument, final IngestPipeline ingestPipeline, final IntegrationRequest request) {
172+
private void executePipeline(final IngestDocumentBridge ingestDocument, final IngestPipeline ingestPipeline, final IntegrationRequest request) {
174173
final String pipelineName = ingestPipeline.getId();
175174
final String originalIndex = ingestDocument.getMetadata().getIndex();
176175
ingestPipeline.execute(ingestDocument, (resultIngestDocument, ingestPipelineException) -> {
@@ -193,17 +192,17 @@ private void executePipeline(final IngestDocument ingestDocument, final IngestPi
193192
} else {
194193

195194
final String newIndex = resultIngestDocument.getMetadata().getIndex();
196-
if (!Objects.equals(originalIndex, newIndex) && LogstashInternalBridge.isReroute(resultIngestDocument)) {
197-
LogstashInternalBridge.resetReroute(resultIngestDocument);
195+
if (!Objects.equals(originalIndex, newIndex) && ingestDocument.isReroute()) {
196+
ingestDocument.resetReroute();
198197
boolean cycle = !resultIngestDocument.updateIndexHistory(newIndex);
199198
if (cycle) {
200199
request.complete(incomingEvent -> {
201-
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message", format(
202-
"index cycle detected while processing pipeline [%s]: %s + %s",
203-
pipelineName,
204-
resultIngestDocument.getIndexHistory(),
205-
newIndex
206-
)));
200+
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message",
201+
String.format(Locale.ROOT, "index cycle detected while processing pipeline [%s]: %s + %s",
202+
pipelineName,
203+
resultIngestDocument.getIndexHistory(),
204+
newIndex)
205+
));
207206
});
208207
return;
209208
}
@@ -214,12 +213,14 @@ private void executePipeline(final IngestDocument ingestDocument, final IngestPi
214213
final Optional<IngestPipeline> reroutePipeline = resolve(reroutePipelineName.get(), internalPipelineProvider);
215214
if (reroutePipeline.isEmpty()) {
216215
request.complete(incomingEvent -> {
217-
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message", format(
218-
"reroute failed to load next pipeline [%s]: %s -> %s",
216+
annotateIngestPipelineFailure(
217+
incomingEvent,
219218
pipelineName,
220-
resultIngestDocument.getIndexHistory(),
221-
reroutePipelineName.get()
222-
)));
219+
Map.of("message",
220+
String.format(Locale.ROOT, "reroute failed to load next pipeline [%s]: %s -> %s",
221+
pipelineName,
222+
resultIngestDocument.getIndexHistory(),
223+
reroutePipelineName.get())));
223224
});
224225
} else {
225226
executePipeline(resultIngestDocument, reroutePipeline.get(), request);
@@ -277,6 +278,6 @@ static private <T,R> Optional<R> resolve(T resolvable, Resolver<T,R> resolver) {
277278

278279
@Override
279280
public void close() throws IOException {
280-
IOUtils.closeWhileHandlingException(this.resourcesToClose);
281+
IOUtilsBridge.closeWhileHandlingException(this.resourcesToClose);
281282
}
282283
}

src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java

Lines changed: 23 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -20,38 +20,17 @@
2020
import com.google.common.util.concurrent.Service;
2121
import com.google.common.util.concurrent.ServiceManager;
2222
import org.elasticsearch.client.RestClient;
23-
import org.elasticsearch.common.settings.Settings;
24-
import org.elasticsearch.core.IOUtils;
25-
import org.elasticsearch.core.TimeValue;
26-
import org.elasticsearch.env.Environment;
27-
import org.elasticsearch.ingest.IngestService;
28-
import org.elasticsearch.ingest.LogstashInternalBridge;
29-
import org.elasticsearch.ingest.Processor;
30-
import org.elasticsearch.ingest.common.IngestCommonPlugin;
31-
import org.elasticsearch.ingest.common.ProcessorsWhitelistExtension;
32-
import org.elasticsearch.ingest.useragent.IngestUserAgentPlugin;
33-
import org.elasticsearch.painless.PainlessPlugin;
34-
import org.elasticsearch.painless.PainlessScriptEngine;
35-
import org.elasticsearch.painless.spi.PainlessExtension;
36-
import org.elasticsearch.painless.spi.Whitelist;
37-
import org.elasticsearch.plugins.ExtensiblePlugin;
38-
import org.elasticsearch.plugins.IngestPlugin;
39-
import org.elasticsearch.script.IngestConditionalScript;
40-
import org.elasticsearch.script.IngestScript;
41-
import org.elasticsearch.script.ScriptEngine;
42-
import org.elasticsearch.script.ScriptModule;
43-
import org.elasticsearch.script.ScriptService;
44-
import org.elasticsearch.script.mustache.MustacheScriptEngine;
45-
import org.elasticsearch.threadpool.ThreadPool;
46-
import org.elasticsearch.xpack.constantkeyword.ConstantKeywordPainlessExtension;
47-
import org.elasticsearch.xpack.spatial.SpatialPainlessExtension;
48-
import org.elasticsearch.xpack.wildcard.WildcardPainlessExtension;
23+
import org.elasticsearch.logstashbridge.common.SettingsBridge;
24+
import org.elasticsearch.logstashbridge.core.IOUtilsBridge;
25+
import org.elasticsearch.logstashbridge.env.EnvironmentBridge;
26+
import org.elasticsearch.logstashbridge.ingest.ProcessorBridge;
27+
import org.elasticsearch.logstashbridge.plugins.IngestPluginBridge;
28+
import org.elasticsearch.logstashbridge.script.ScriptServiceBridge;
29+
import org.elasticsearch.logstashbridge.threadpool.ThreadPoolBridge;
4930

5031
import java.io.Closeable;
51-
import java.io.IOException;
5232
import java.time.Duration;
5333
import java.util.ArrayList;
54-
import java.util.HashMap;
5534
import java.util.List;
5635
import java.util.Map;
5736
import java.util.Objects;
@@ -91,7 +70,7 @@ public static EventProcessorBuilder fromElasticsearch(final RestClient elasticse
9170
}
9271

9372
public EventProcessorBuilder() {
94-
this.addProcessorsFromPlugin(IngestCommonPlugin::new, Set.of(
73+
this.addProcessorsFromPlugin(() -> IngestPluginBridge.wrap(new org.elasticsearch.ingest.common.IngestCommonPlugin()), Set.of(
9574
org.elasticsearch.ingest.common.AppendProcessor.TYPE,
9675
org.elasticsearch.ingest.common.BytesProcessor.TYPE,
9776
org.elasticsearch.ingest.common.CommunityIdProcessor.TYPE,
@@ -126,7 +105,7 @@ public EventProcessorBuilder() {
126105
org.elasticsearch.ingest.common.URLDecodeProcessor.TYPE,
127106
org.elasticsearch.ingest.common.UppercaseProcessor.TYPE,
128107
org.elasticsearch.ingest.common.UriPartsProcessor.TYPE));
129-
this.addProcessorsFromPlugin(IngestUserAgentPlugin::new);
108+
this.addProcessorsFromPlugin(() -> IngestPluginBridge.wrap(new org.elasticsearch.ingest.useragent.IngestUserAgentPlugin()));
130109
this.addProcessorsFromPlugin(RedactPlugin::new);
131110
this.addProcessor(SetSecurityUserProcessor.TYPE, SetSecurityUserProcessor.Factory::new);
132111
}
@@ -149,7 +128,7 @@ public EventProcessorBuilder() {
149128
// filer match listener
150129
private FilterMatchListener filterMatchListener;
151130

152-
private final List<Supplier<IngestPlugin>> ingestPlugins = new ArrayList<>();
131+
private final List<Supplier<IngestPluginBridge>> ingestPlugins = new ArrayList<>();
153132

154133
public synchronized EventProcessorBuilder setPipelineConfigurationResolver(final PipelineConfigurationResolver pipelineConfigurationResolver) {
155134
if (Objects.nonNull(this.pipelineConfigurationResolver)) {
@@ -214,15 +193,15 @@ private synchronized EventProcessorBuilder setFilterMatchListener(final FilterMa
214193
return this;
215194
}
216195

217-
public EventProcessorBuilder addProcessor(final String type, final Supplier<Processor.Factory> processorFactorySupplier) {
196+
public EventProcessorBuilder addProcessor(final String type, final Supplier<ProcessorBridge.Factory> processorFactorySupplier) {
218197
return this.addProcessorsFromPlugin(SingleProcessorIngestPlugin.of(type, processorFactorySupplier));
219198
}
220199

221-
public EventProcessorBuilder addProcessorsFromPlugin(Supplier<IngestPlugin> pluginSupplier, Set<String> requiredProcessors) {
200+
public EventProcessorBuilder addProcessorsFromPlugin(Supplier<IngestPluginBridge> pluginSupplier, Set<String> requiredProcessors) {
222201
return this.addProcessorsFromPlugin(safeSubset(pluginSupplier, requiredProcessors));
223202
}
224203

225-
public synchronized EventProcessorBuilder addProcessorsFromPlugin(Supplier<IngestPlugin> pluginSupplier) {
204+
public synchronized EventProcessorBuilder addProcessorsFromPlugin(Supplier<IngestPluginBridge> pluginSupplier) {
226205
this.ingestPlugins.add(pluginSupplier);
227206
return this;
228207
}
@@ -232,7 +211,7 @@ public synchronized EventProcessor build(final PluginContext pluginContext) {
232211
Objects.requireNonNull(this.eventToIndexNameResolver, "event index name resolver is REQUIRED");
233212
Objects.requireNonNull(this.indexNameToPipelineNameResolver, "pipeline name resolver is REQUIRED");
234213

235-
final Settings settings = Settings.builder()
214+
final SettingsBridge settings = SettingsBridge.builder()
236215
.put("path.home", "/")
237216
.put("node.name", "logstash.filter.elastic_integration." + pluginContext.pluginId())
238217
.put("ingest.grok.watchdog.interval", "1s")
@@ -244,33 +223,22 @@ public synchronized EventProcessor build(final PluginContext pluginContext) {
244223
try {
245224
final ArrayList<Service> services = new ArrayList<>();
246225

247-
final ThreadPool threadPool = LogstashInternalBridge.createThreadPool(settings);
248-
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
226+
final ThreadPoolBridge threadPool = new ThreadPoolBridge(settings);
227+
resourcesToClose.add(() -> ThreadPoolBridge.terminate(threadPool, 10, TimeUnit.SECONDS));
249228

250-
final ScriptService scriptService = initScriptService(settings, threadPool);
229+
final ScriptServiceBridge scriptService = new ScriptServiceBridge(settings, threadPool::absoluteTimeInMillis);
251230
resourcesToClose.add(scriptService);
252231

253-
final Environment env = new Environment(settings, null);
254-
final Processor.Parameters processorParameters = new Processor.Parameters(
255-
env,
256-
scriptService,
257-
null,
258-
threadPool.getThreadContext(),
259-
threadPool::relativeTimeInMillis,
260-
(delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic()),
261-
null,
262-
null,
263-
threadPool.generic()::execute,
264-
IngestService.createGrokThreadWatchdog(env, threadPool)
265-
);
232+
final EnvironmentBridge env = new EnvironmentBridge(settings, null);
233+
final ProcessorBridge.Parameters processorParameters = new ProcessorBridge.Parameters(env, scriptService, threadPool);
266234

267235
IngestPipelineFactory ingestPipelineFactory = new IngestPipelineFactory(scriptService);
268-
for (Supplier<IngestPlugin> ingestPluginSupplier : ingestPlugins) {
269-
final IngestPlugin ingestPlugin = ingestPluginSupplier.get();
236+
for (Supplier<IngestPluginBridge> ingestPluginSupplier : ingestPlugins) {
237+
final IngestPluginBridge ingestPlugin = ingestPluginSupplier.get();
270238
if (ingestPlugin instanceof Closeable closeableIngestPlugin) {
271239
resourcesToClose.add(closeableIngestPlugin);
272240
}
273-
final Map<String, Processor.Factory> processorFactories = ingestPlugin.getProcessors(processorParameters);
241+
final Map<String, ProcessorBridge.Factory> processorFactories = ingestPlugin.getProcessors(processorParameters);
274242
ingestPipelineFactory = ingestPipelineFactory.withProcessors(processorFactories);
275243
}
276244

@@ -308,47 +276,8 @@ public synchronized EventProcessor build(final PluginContext pluginContext) {
308276
indexNameToPipelineNameResolver,
309277
resourcesToClose);
310278
} catch (Exception e) {
311-
IOUtils.closeWhileHandlingException(resourcesToClose);
279+
IOUtilsBridge.closeWhileHandlingException(resourcesToClose);
312280
throw Exceptions.wrap(e, "Failed to build EventProcessor");
313281
}
314282
}
315-
316-
private static ScriptService initScriptService(final Settings settings, final ThreadPool threadPool) throws IOException {
317-
Map<String, ScriptEngine> engines = new HashMap<>();
318-
engines.put(PainlessScriptEngine.NAME, getPainlessScriptEngine(settings));
319-
engines.put(MustacheScriptEngine.NAME, new MustacheScriptEngine(settings));
320-
return new ScriptService(settings, engines, ScriptModule.CORE_CONTEXTS, threadPool::absoluteTimeInMillis);
321-
}
322-
323-
/**
324-
* @param settings the Elasticsearch settings object
325-
* @return a {@link ScriptEngine} for painless scripts for use in {@link IngestScript} and
326-
* {@link IngestConditionalScript} contexts, including all available {@link PainlessExtension}s.
327-
* @throws IOException when the underlying script engine cannot be created
328-
*/
329-
private static ScriptEngine getPainlessScriptEngine(final Settings settings) throws IOException {
330-
try (final PainlessPlugin painlessPlugin = new PainlessPlugin()) {
331-
332-
painlessPlugin.loadExtensions(new ExtensiblePlugin.ExtensionLoader() {
333-
@Override
334-
@SuppressWarnings("unchecked")
335-
public <T> List<T> loadExtensions(Class<T> extensionPointType) {
336-
if (extensionPointType.isAssignableFrom(PainlessExtension.class)) {
337-
final List<PainlessExtension> extensions = new ArrayList<>();
338-
339-
extensions.add(new ConstantKeywordPainlessExtension()); // module: constant-keyword
340-
extensions.add(new ProcessorsWhitelistExtension()); // module: ingest-common
341-
extensions.add(new SpatialPainlessExtension()); // module: spatial
342-
extensions.add(new WildcardPainlessExtension()); // module: wildcard
343-
344-
return (List<T>) extensions;
345-
} else {
346-
return List.of();
347-
}
348-
}
349-
});
350-
351-
return painlessPlugin.getScriptEngine(settings, Set.of(IngestScript.CONTEXT, IngestConditionalScript.CONTEXT));
352-
}
353-
}
354283
}

0 commit comments

Comments
 (0)