1717import co .elastic .logstash .filters .elasticintegration .resolver .ResolverCache ;
1818import co .elastic .logstash .filters .elasticintegration .util .Exceptions ;
1919import co .elastic .logstash .filters .elasticintegration .util .PluginContext ;
20- import co .elastic .logstash .filters .elasticintegration .util .PluginProjectResolver ;
2120import com .google .common .util .concurrent .Service ;
2221import com .google .common .util .concurrent .ServiceManager ;
2322import org .elasticsearch .client .RestClient ;
24- import org .elasticsearch .common .settings .Settings ;
25- import org .elasticsearch .core .IOUtils ;
26- import org .elasticsearch .core .TimeValue ;
27- import org .elasticsearch .env .Environment ;
28- import org .elasticsearch .ingest .IngestService ;
29- import org .elasticsearch .ingest .LogstashInternalBridge ;
30- import org .elasticsearch .ingest .Processor ;
31- import org .elasticsearch .ingest .common .IngestCommonPlugin ;
32- import org .elasticsearch .ingest .common .ProcessorsWhitelistExtension ;
33- import org .elasticsearch .ingest .useragent .IngestUserAgentPlugin ;
34- import org .elasticsearch .painless .PainlessPlugin ;
35- import org .elasticsearch .painless .PainlessScriptEngine ;
36- import org .elasticsearch .painless .spi .PainlessExtension ;
37- import org .elasticsearch .plugins .ExtensiblePlugin ;
38- import org .elasticsearch .plugins .IngestPlugin ;
39- import org .elasticsearch .script .IngestConditionalScript ;
40- import org .elasticsearch .script .IngestScript ;
41- import org .elasticsearch .script .ScriptEngine ;
42- import org .elasticsearch .script .ScriptModule ;
43- import org .elasticsearch .script .ScriptService ;
44- import org .elasticsearch .script .mustache .MustacheScriptEngine ;
45- import org .elasticsearch .threadpool .ThreadPool ;
46- import org .elasticsearch .xpack .constantkeyword .ConstantKeywordPainlessExtension ;
47- import org .elasticsearch .xpack .spatial .SpatialPainlessExtension ;
48- import org .elasticsearch .xpack .wildcard .WildcardPainlessExtension ;
23+ import org .elasticsearch .logstashbridge .common .SettingsBridge ;
24+ import org .elasticsearch .logstashbridge .core .IOUtilsBridge ;
25+ import org .elasticsearch .logstashbridge .env .EnvironmentBridge ;
26+ import org .elasticsearch .logstashbridge .ingest .ProcessorBridge ;
27+ import org .elasticsearch .logstashbridge .plugins .IngestPluginBridge ;
28+ import org .elasticsearch .logstashbridge .script .ScriptServiceBridge ;
29+ import org .elasticsearch .logstashbridge .threadpool .ThreadPoolBridge ;
4930
5031import java .io .Closeable ;
51- import java .io .IOException ;
5232import java .time .Duration ;
5333import java .util .ArrayList ;
54- import java .util .HashMap ;
5534import java .util .List ;
5635import java .util .Map ;
5736import java .util .Objects ;
@@ -91,7 +70,7 @@ public static EventProcessorBuilder fromElasticsearch(final RestClient elasticse
9170 }
9271
9372 public EventProcessorBuilder () {
94- this .addProcessorsFromPlugin (IngestCommonPlugin :: new , Set .of (
73+ this .addProcessorsFromPlugin (() -> IngestPluginBridge . wrap ( new org . elasticsearch . ingest . common . IngestCommonPlugin ()) , Set .of (
9574 org .elasticsearch .ingest .common .AppendProcessor .TYPE ,
9675 org .elasticsearch .ingest .common .BytesProcessor .TYPE ,
9776 org .elasticsearch .ingest .common .CommunityIdProcessor .TYPE ,
@@ -127,7 +106,7 @@ public EventProcessorBuilder() {
127106 org .elasticsearch .ingest .common .URLDecodeProcessor .TYPE ,
128107 org .elasticsearch .ingest .common .UppercaseProcessor .TYPE ,
129108 org .elasticsearch .ingest .common .UriPartsProcessor .TYPE ));
130- this .addProcessorsFromPlugin (IngestUserAgentPlugin :: new );
109+ this .addProcessorsFromPlugin (() -> IngestPluginBridge . wrap ( new org . elasticsearch . ingest . useragent . IngestUserAgentPlugin ()) );
131110 this .addProcessorsFromPlugin (RedactPlugin ::new );
132111 this .addProcessor (SetSecurityUserProcessor .TYPE , SetSecurityUserProcessor .Factory ::new );
133112 }
@@ -150,7 +129,7 @@ public EventProcessorBuilder() {
150129 // filer match listener
151130 private FilterMatchListener filterMatchListener ;
152131
153- private final List <Supplier <IngestPlugin >> ingestPlugins = new ArrayList <>();
132+ private final List <Supplier <IngestPluginBridge >> ingestPlugins = new ArrayList <>();
154133
155134 public synchronized EventProcessorBuilder setPipelineConfigurationResolver (final PipelineConfigurationResolver pipelineConfigurationResolver ) {
156135 if (Objects .nonNull (this .pipelineConfigurationResolver )) {
@@ -215,15 +194,15 @@ private synchronized EventProcessorBuilder setFilterMatchListener(final FilterMa
215194 return this ;
216195 }
217196
218- public EventProcessorBuilder addProcessor (final String type , final Supplier <Processor .Factory > processorFactorySupplier ) {
197+ public EventProcessorBuilder addProcessor (final String type , final Supplier <ProcessorBridge .Factory > processorFactorySupplier ) {
219198 return this .addProcessorsFromPlugin (SingleProcessorIngestPlugin .of (type , processorFactorySupplier ));
220199 }
221200
222- public EventProcessorBuilder addProcessorsFromPlugin (Supplier <IngestPlugin > pluginSupplier , Set <String > requiredProcessors ) {
201+ public EventProcessorBuilder addProcessorsFromPlugin (Supplier <IngestPluginBridge > pluginSupplier , Set <String > requiredProcessors ) {
223202 return this .addProcessorsFromPlugin (safeSubset (pluginSupplier , requiredProcessors ));
224203 }
225204
226- public synchronized EventProcessorBuilder addProcessorsFromPlugin (Supplier <IngestPlugin > pluginSupplier ) {
205+ public synchronized EventProcessorBuilder addProcessorsFromPlugin (Supplier <IngestPluginBridge > pluginSupplier ) {
227206 this .ingestPlugins .add (pluginSupplier );
228207 return this ;
229208 }
@@ -233,7 +212,7 @@ public synchronized EventProcessor build(final PluginContext pluginContext) {
233212 Objects .requireNonNull (this .eventToIndexNameResolver , "event index name resolver is REQUIRED" );
234213 Objects .requireNonNull (this .indexNameToPipelineNameResolver , "pipeline name resolver is REQUIRED" );
235214
236- final Settings settings = Settings .builder ()
215+ final SettingsBridge settings = SettingsBridge .builder ()
237216 .put ("path.home" , "/" )
238217 .put ("node.name" , "logstash.filter.elastic_integration." + pluginContext .pluginId ())
239218 .put ("ingest.grok.watchdog.interval" , "1s" )
@@ -245,33 +224,22 @@ public synchronized EventProcessor build(final PluginContext pluginContext) {
245224 try {
246225 final ArrayList <Service > services = new ArrayList <>();
247226
248- final ThreadPool threadPool = LogstashInternalBridge . createThreadPool (settings );
249- resourcesToClose .add (() -> ThreadPool .terminate (threadPool , 10 , TimeUnit .SECONDS ));
227+ final ThreadPoolBridge threadPool = new ThreadPoolBridge (settings );
228+ resourcesToClose .add (() -> ThreadPoolBridge .terminate (threadPool , 10 , TimeUnit .SECONDS ));
250229
251- final ScriptService scriptService = initScriptService (settings , threadPool );
230+ final ScriptServiceBridge scriptService = new ScriptServiceBridge (settings , threadPool :: absoluteTimeInMillis );
252231 resourcesToClose .add (scriptService );
253232
254- final Environment env = new Environment (settings , null );
255- final Processor .Parameters processorParameters = new Processor .Parameters (
256- env ,
257- scriptService ,
258- null ,
259- threadPool .getThreadContext (),
260- threadPool ::relativeTimeInMillis ,
261- (delay , command ) -> threadPool .schedule (command , TimeValue .timeValueMillis (delay ), threadPool .generic ()),
262- null ,
263- null ,
264- threadPool .generic ()::execute ,
265- IngestService .createGrokThreadWatchdog (env , threadPool )
266- );
233+ final EnvironmentBridge env = new EnvironmentBridge (settings , null );
234+ final ProcessorBridge .Parameters processorParameters = new ProcessorBridge .Parameters (env , scriptService , threadPool );
267235
268236 IngestPipelineFactory ingestPipelineFactory = new IngestPipelineFactory (scriptService );
269- for (Supplier <IngestPlugin > ingestPluginSupplier : ingestPlugins ) {
270- final IngestPlugin ingestPlugin = ingestPluginSupplier .get ();
237+ for (Supplier <IngestPluginBridge > ingestPluginSupplier : ingestPlugins ) {
238+ final IngestPluginBridge ingestPlugin = ingestPluginSupplier .get ();
271239 if (ingestPlugin instanceof Closeable closeableIngestPlugin ) {
272240 resourcesToClose .add (closeableIngestPlugin );
273241 }
274- final Map <String , Processor .Factory > processorFactories = ingestPlugin .getProcessors (processorParameters );
242+ final Map <String , ProcessorBridge .Factory > processorFactories = ingestPlugin .getProcessors (processorParameters );
275243 ingestPipelineFactory = ingestPipelineFactory .withProcessors (processorFactories );
276244 }
277245
@@ -309,53 +277,8 @@ public synchronized EventProcessor build(final PluginContext pluginContext) {
309277 indexNameToPipelineNameResolver ,
310278 resourcesToClose );
311279 } catch (Exception e ) {
312- IOUtils .closeWhileHandlingException (resourcesToClose );
280+ IOUtilsBridge .closeWhileHandlingException (resourcesToClose );
313281 throw Exceptions .wrap (e , "Failed to build EventProcessor" );
314282 }
315283 }
316-
317- private static ScriptService initScriptService (final Settings settings , final ThreadPool threadPool ) throws IOException {
318- Map <String , ScriptEngine > engines = new HashMap <>();
319- engines .put (PainlessScriptEngine .NAME , getPainlessScriptEngine (settings ));
320- engines .put (MustacheScriptEngine .NAME , new MustacheScriptEngine (settings ));
321-
322- return new ScriptService (
323- settings ,
324- engines ,
325- ScriptModule .CORE_CONTEXTS ,
326- threadPool ::absoluteTimeInMillis ,
327- new PluginProjectResolver ());
328- }
329-
330- /**
331- * @param settings the Elasticsearch settings object
332- * @return a {@link ScriptEngine} for painless scripts for use in {@link IngestScript} and
333- * {@link IngestConditionalScript} contexts, including all available {@link PainlessExtension}s.
334- * @throws IOException when the underlying script engine cannot be created
335- */
336- private static ScriptEngine getPainlessScriptEngine (final Settings settings ) throws IOException {
337- try (final PainlessPlugin painlessPlugin = new PainlessPlugin ()) {
338-
339- painlessPlugin .loadExtensions (new ExtensiblePlugin .ExtensionLoader () {
340- @ Override
341- @ SuppressWarnings ("unchecked" )
342- public <T > List <T > loadExtensions (Class <T > extensionPointType ) {
343- if (extensionPointType .isAssignableFrom (PainlessExtension .class )) {
344- final List <PainlessExtension > extensions = new ArrayList <>();
345-
346- extensions .add (new ConstantKeywordPainlessExtension ()); // module: constant-keyword
347- extensions .add (new ProcessorsWhitelistExtension ()); // module: ingest-common
348- extensions .add (new SpatialPainlessExtension ()); // module: spatial
349- extensions .add (new WildcardPainlessExtension ()); // module: wildcard
350-
351- return (List <T >) extensions ;
352- } else {
353- return List .of ();
354- }
355- }
356- });
357-
358- return painlessPlugin .getScriptEngine (settings , Set .of (IngestScript .CONTEXT , IngestConditionalScript .CONTEXT ));
359- }
360- }
361284}
0 commit comments