20
20
import com .google .common .util .concurrent .Service ;
21
21
import com .google .common .util .concurrent .ServiceManager ;
22
22
import org .elasticsearch .client .RestClient ;
23
- import org .elasticsearch .common .settings .Settings ;
24
- import org .elasticsearch .core .IOUtils ;
25
- import org .elasticsearch .core .TimeValue ;
26
- import org .elasticsearch .env .Environment ;
27
- import org .elasticsearch .ingest .IngestService ;
28
- import org .elasticsearch .ingest .LogstashInternalBridge ;
29
- import org .elasticsearch .ingest .Processor ;
30
- import org .elasticsearch .ingest .common .IngestCommonPlugin ;
31
- import org .elasticsearch .ingest .common .ProcessorsWhitelistExtension ;
32
- import org .elasticsearch .ingest .useragent .IngestUserAgentPlugin ;
33
- import org .elasticsearch .painless .PainlessPlugin ;
34
- import org .elasticsearch .painless .PainlessScriptEngine ;
35
- import org .elasticsearch .painless .spi .PainlessExtension ;
36
- import org .elasticsearch .painless .spi .Whitelist ;
37
- import org .elasticsearch .plugins .ExtensiblePlugin ;
38
- import org .elasticsearch .plugins .IngestPlugin ;
39
- import org .elasticsearch .script .IngestConditionalScript ;
40
- import org .elasticsearch .script .IngestScript ;
41
- import org .elasticsearch .script .ScriptEngine ;
42
- import org .elasticsearch .script .ScriptModule ;
43
- import org .elasticsearch .script .ScriptService ;
44
- import org .elasticsearch .script .mustache .MustacheScriptEngine ;
45
- import org .elasticsearch .threadpool .ThreadPool ;
46
- import org .elasticsearch .xpack .constantkeyword .ConstantKeywordPainlessExtension ;
47
- import org .elasticsearch .xpack .spatial .SpatialPainlessExtension ;
48
- import org .elasticsearch .xpack .wildcard .WildcardPainlessExtension ;
23
+ import org .elasticsearch .logstashbridge .common .SettingsBridge ;
24
+ import org .elasticsearch .logstashbridge .core .IOUtilsBridge ;
25
+ import org .elasticsearch .logstashbridge .env .EnvironmentBridge ;
26
+ import org .elasticsearch .logstashbridge .ingest .ProcessorBridge ;
27
+ import org .elasticsearch .logstashbridge .plugins .IngestPluginBridge ;
28
+ import org .elasticsearch .logstashbridge .script .ScriptServiceBridge ;
29
+ import org .elasticsearch .logstashbridge .threadpool .ThreadPoolBridge ;
49
30
50
31
import java .io .Closeable ;
51
- import java .io .IOException ;
52
32
import java .time .Duration ;
53
33
import java .util .ArrayList ;
54
- import java .util .HashMap ;
55
34
import java .util .List ;
56
35
import java .util .Map ;
57
36
import java .util .Objects ;
@@ -91,7 +70,7 @@ public static EventProcessorBuilder fromElasticsearch(final RestClient elasticse
91
70
}
92
71
93
72
public EventProcessorBuilder () {
94
- this .addProcessorsFromPlugin (IngestCommonPlugin :: new , Set .of (
73
+ this .addProcessorsFromPlugin (() -> IngestPluginBridge . wrap ( new org . elasticsearch . ingest . common . IngestCommonPlugin ()) , Set .of (
95
74
org .elasticsearch .ingest .common .AppendProcessor .TYPE ,
96
75
org .elasticsearch .ingest .common .BytesProcessor .TYPE ,
97
76
org .elasticsearch .ingest .common .CommunityIdProcessor .TYPE ,
@@ -127,7 +106,7 @@ public EventProcessorBuilder() {
127
106
org .elasticsearch .ingest .common .URLDecodeProcessor .TYPE ,
128
107
org .elasticsearch .ingest .common .UppercaseProcessor .TYPE ,
129
108
org .elasticsearch .ingest .common .UriPartsProcessor .TYPE ));
130
- this .addProcessorsFromPlugin (IngestUserAgentPlugin :: new );
109
+ this .addProcessorsFromPlugin (() -> IngestPluginBridge . wrap ( new org . elasticsearch . ingest . useragent . IngestUserAgentPlugin ()) );
131
110
this .addProcessorsFromPlugin (RedactPlugin ::new );
132
111
this .addProcessor (SetSecurityUserProcessor .TYPE , SetSecurityUserProcessor .Factory ::new );
133
112
}
@@ -150,7 +129,7 @@ public EventProcessorBuilder() {
150
129
// filer match listener
151
130
private FilterMatchListener filterMatchListener ;
152
131
153
- private final List <Supplier <IngestPlugin >> ingestPlugins = new ArrayList <>();
132
+ private final List <Supplier <IngestPluginBridge >> ingestPlugins = new ArrayList <>();
154
133
155
134
public synchronized EventProcessorBuilder setPipelineConfigurationResolver (final PipelineConfigurationResolver pipelineConfigurationResolver ) {
156
135
if (Objects .nonNull (this .pipelineConfigurationResolver )) {
@@ -215,15 +194,15 @@ private synchronized EventProcessorBuilder setFilterMatchListener(final FilterMa
215
194
return this ;
216
195
}
217
196
218
- public EventProcessorBuilder addProcessor (final String type , final Supplier <Processor .Factory > processorFactorySupplier ) {
197
+ public EventProcessorBuilder addProcessor (final String type , final Supplier <ProcessorBridge .Factory > processorFactorySupplier ) {
219
198
return this .addProcessorsFromPlugin (SingleProcessorIngestPlugin .of (type , processorFactorySupplier ));
220
199
}
221
200
222
- public EventProcessorBuilder addProcessorsFromPlugin (Supplier <IngestPlugin > pluginSupplier , Set <String > requiredProcessors ) {
201
+ public EventProcessorBuilder addProcessorsFromPlugin (Supplier <IngestPluginBridge > pluginSupplier , Set <String > requiredProcessors ) {
223
202
return this .addProcessorsFromPlugin (safeSubset (pluginSupplier , requiredProcessors ));
224
203
}
225
204
226
- public synchronized EventProcessorBuilder addProcessorsFromPlugin (Supplier <IngestPlugin > pluginSupplier ) {
205
+ public synchronized EventProcessorBuilder addProcessorsFromPlugin (Supplier <IngestPluginBridge > pluginSupplier ) {
227
206
this .ingestPlugins .add (pluginSupplier );
228
207
return this ;
229
208
}
@@ -233,7 +212,7 @@ public synchronized EventProcessor build(final PluginContext pluginContext) {
233
212
Objects .requireNonNull (this .eventToIndexNameResolver , "event index name resolver is REQUIRED" );
234
213
Objects .requireNonNull (this .indexNameToPipelineNameResolver , "pipeline name resolver is REQUIRED" );
235
214
236
- final Settings settings = Settings .builder ()
215
+ final SettingsBridge settings = SettingsBridge .builder ()
237
216
.put ("path.home" , "/" )
238
217
.put ("node.name" , "logstash.filter.elastic_integration." + pluginContext .pluginId ())
239
218
.put ("ingest.grok.watchdog.interval" , "1s" )
@@ -245,33 +224,22 @@ public synchronized EventProcessor build(final PluginContext pluginContext) {
245
224
try {
246
225
final ArrayList <Service > services = new ArrayList <>();
247
226
248
- final ThreadPool threadPool = LogstashInternalBridge . createThreadPool (settings );
249
- resourcesToClose .add (() -> ThreadPool .terminate (threadPool , 10 , TimeUnit .SECONDS ));
227
+ final ThreadPoolBridge threadPool = new ThreadPoolBridge (settings );
228
+ resourcesToClose .add (() -> ThreadPoolBridge .terminate (threadPool , 10 , TimeUnit .SECONDS ));
250
229
251
- final ScriptService scriptService = initScriptService (settings , threadPool );
230
+ final ScriptServiceBridge scriptService = new ScriptServiceBridge (settings , threadPool :: absoluteTimeInMillis );
252
231
resourcesToClose .add (scriptService );
253
232
254
- final Environment env = new Environment (settings , null );
255
- final Processor .Parameters processorParameters = new Processor .Parameters (
256
- env ,
257
- scriptService ,
258
- null ,
259
- threadPool .getThreadContext (),
260
- threadPool ::relativeTimeInMillis ,
261
- (delay , command ) -> threadPool .schedule (command , TimeValue .timeValueMillis (delay ), threadPool .generic ()),
262
- null ,
263
- null ,
264
- threadPool .generic ()::execute ,
265
- IngestService .createGrokThreadWatchdog (env , threadPool )
266
- );
233
+ final EnvironmentBridge env = new EnvironmentBridge (settings , null );
234
+ final ProcessorBridge .Parameters processorParameters = new ProcessorBridge .Parameters (env , scriptService , threadPool );
267
235
268
236
IngestPipelineFactory ingestPipelineFactory = new IngestPipelineFactory (scriptService );
269
- for (Supplier <IngestPlugin > ingestPluginSupplier : ingestPlugins ) {
270
- final IngestPlugin ingestPlugin = ingestPluginSupplier .get ();
237
+ for (Supplier <IngestPluginBridge > ingestPluginSupplier : ingestPlugins ) {
238
+ final IngestPluginBridge ingestPlugin = ingestPluginSupplier .get ();
271
239
if (ingestPlugin instanceof Closeable closeableIngestPlugin ) {
272
240
resourcesToClose .add (closeableIngestPlugin );
273
241
}
274
- final Map <String , Processor .Factory > processorFactories = ingestPlugin .getProcessors (processorParameters );
242
+ final Map <String , ProcessorBridge .Factory > processorFactories = ingestPlugin .getProcessors (processorParameters );
275
243
ingestPipelineFactory = ingestPipelineFactory .withProcessors (processorFactories );
276
244
}
277
245
@@ -309,47 +277,8 @@ public synchronized EventProcessor build(final PluginContext pluginContext) {
309
277
indexNameToPipelineNameResolver ,
310
278
resourcesToClose );
311
279
} catch (Exception e ) {
312
- IOUtils .closeWhileHandlingException (resourcesToClose );
280
+ IOUtilsBridge .closeWhileHandlingException (resourcesToClose );
313
281
throw Exceptions .wrap (e , "Failed to build EventProcessor" );
314
282
}
315
283
}
316
-
317
- private static ScriptService initScriptService (final Settings settings , final ThreadPool threadPool ) throws IOException {
318
- Map <String , ScriptEngine > engines = new HashMap <>();
319
- engines .put (PainlessScriptEngine .NAME , getPainlessScriptEngine (settings ));
320
- engines .put (MustacheScriptEngine .NAME , new MustacheScriptEngine (settings ));
321
- return new ScriptService (settings , engines , ScriptModule .CORE_CONTEXTS , threadPool ::absoluteTimeInMillis );
322
- }
323
-
324
- /**
325
- * @param settings the Elasticsearch settings object
326
- * @return a {@link ScriptEngine} for painless scripts for use in {@link IngestScript} and
327
- * {@link IngestConditionalScript} contexts, including all available {@link PainlessExtension}s.
328
- * @throws IOException when the underlying script engine cannot be created
329
- */
330
- private static ScriptEngine getPainlessScriptEngine (final Settings settings ) throws IOException {
331
- try (final PainlessPlugin painlessPlugin = new PainlessPlugin ()) {
332
-
333
- painlessPlugin .loadExtensions (new ExtensiblePlugin .ExtensionLoader () {
334
- @ Override
335
- @ SuppressWarnings ("unchecked" )
336
- public <T > List <T > loadExtensions (Class <T > extensionPointType ) {
337
- if (extensionPointType .isAssignableFrom (PainlessExtension .class )) {
338
- final List <PainlessExtension > extensions = new ArrayList <>();
339
-
340
- extensions .add (new ConstantKeywordPainlessExtension ()); // module: constant-keyword
341
- extensions .add (new ProcessorsWhitelistExtension ()); // module: ingest-common
342
- extensions .add (new SpatialPainlessExtension ()); // module: spatial
343
- extensions .add (new WildcardPainlessExtension ()); // module: wildcard
344
-
345
- return (List <T >) extensions ;
346
- } else {
347
- return List .of ();
348
- }
349
- }
350
- });
351
-
352
- return painlessPlugin .getScriptEngine (settings , Set .of (IngestScript .CONTEXT , IngestConditionalScript .CONTEXT ));
353
- }
354
- }
355
284
}
0 commit comments