Skip to content

Commit c2e7e10

Browse files
yaauiemashhurs
andauthored
painless: register ProcessorsWhitelistExtension to painless engine (#162)
* painless: register available PainlessExtensions to painless engine Enables the use of the `org.elasticsearch.ingest.common.Processors` in ingest pipelines, as is found in integrations like `logs-aws.cloudtrail`. Fixes: #161 * Add an integration test for the painless script which points to Ingest Processors. * Update CHANGELOG.md --------- Co-authored-by: Mashhur <[email protected]>
1 parent a87a967 commit c2e7e10

File tree

8 files changed

+139
-64
lines changed

8 files changed

+139
-64
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 0.1.14
2+
- Fix: register available PainlessExtension-s, resolving an issue where the pipelines for some integrations would fail to compile [#162](https://github.com/elastic/logstash-filter-elastic_integration/pull/162)
3+
14
## 0.1.13
25
- Update default elasticsearch tree branch to 8.15 [#156](https://github.com/elastic/logstash-filter-elastic_integration/pull/156)
36

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.1.13
1+
0.1.14

build.gradle

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,16 @@ task importMinimalElasticsearch() {
354354
include jarPackageNamed("spi/elasticsearch-scripting-painless-spi")
355355
}
356356

357+
from(buildElasticsearchLocalDistro.module("wildcard")) {
358+
include jarPackageNamed("x-pack-wildcard")
359+
}
360+
from(buildElasticsearchLocalDistro.module("constant-keyword")) {
361+
include jarPackageNamed("x-pack-constant-keyword")
362+
}
363+
from(buildElasticsearchLocalDistro.module("spatial")) {
364+
include jarPackageNamed("spatial")
365+
}
366+
357367
into ext.jars
358368

359369
includeEmptyDirs(false)

spec/integration/elastic_integration_spec.rb

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -745,33 +745,65 @@
745745

746746
end
747747

748-
describe 'with script processor' do
749-
let(:pipeline_processor) {
750-
'{
751-
"script": {
752-
"lang": "painless",
753-
"source": "ctx[\'_index\'] = ctx[\'lang\'] + \'-\' + params[\'dataset\'];",
754-
"params": {
755-
"dataset": "catalog"
748+
context "#painless script" do
749+
750+
describe 'with simple script' do
751+
let(:pipeline_processor) {
752+
'{
753+
"script": {
754+
"lang": "painless",
755+
"source": "ctx[\'_index\'] = ctx[\'lang\'] + \'-\' + params[\'dataset\'];",
756+
"params": {
757+
"dataset": "catalog"
758+
}
756759
}
757-
}
758-
}'
759-
}
760+
}'
761+
}
760762

761-
it 'runs painless script on a given field' do
762-
events = [LogStash::Event.new(
763-
"message" => "Should extract prod tag from env field",
764-
"lang" => "uz",
765-
"data_stream" => data_stream)]
763+
it 'runs painless script on a given field' do
764+
events = [LogStash::Event.new(
765+
"message" => "Should extract prod tag from env field",
766+
"lang" => "uz",
767+
"data_stream" => data_stream)]
766768

767-
subject.multi_filter(events).each do |event|
768-
expect(event.get("[@metadata][_ingest_document][index]")).to eql "uz-catalog"
769-
expect(event.get("[@metadata][target_ingest_pipeline]")).to eql '_none'
769+
subject.multi_filter(events).each do |event|
770+
expect(event.get("[@metadata][_ingest_document][index]")).to eql "uz-catalog"
771+
expect(event.get("[@metadata][target_ingest_pipeline]")).to eql '_none'
772+
end
770773
end
774+
771775
end
772776

777+
# makes sure Ingest Processors can be loaded
778+
describe 'with ingest Processors in the script' do
779+
let(:pipeline_processor) {
780+
'{
781+
"script": {
782+
"lang": "painless",
783+
"source": "long bytes = Processors.bytes(params[\'size\']); ctx.size_in_bytes = bytes;",
784+
"params": {
785+
"size": "1kb"
786+
}
787+
}
788+
}'
789+
}
790+
791+
it 'calculates the bytes with Processors.bytes()' do
792+
events = [LogStash::Event.new(
793+
"message" => "Should run painless script which has an ingest Processor#bytes in it",
794+
"data_stream" => data_stream)]
795+
796+
subject.multi_filter(events).each do |event|
797+
# make sure AT LEAST we don't get pipeline load error
798+
expect(event.get("[@metadata][_ingest_document][index]")).to eql pipeline_setting
799+
expect(event.get("[@metadata][target_ingest_pipeline]")).to eql '_none'
800+
end
801+
end
802+
end
773803
end
774804

805+
# TODO: add more painless script integration tests
806+
775807
describe 'with set processor' do
776808
let(:pipeline_processor) {
777809
'{

src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,27 @@
2828
import org.elasticsearch.ingest.LogstashInternalBridge;
2929
import org.elasticsearch.ingest.Processor;
3030
import org.elasticsearch.ingest.common.IngestCommonPlugin;
31+
import org.elasticsearch.ingest.common.ProcessorsWhitelistExtension;
3132
import org.elasticsearch.ingest.useragent.IngestUserAgentPlugin;
3233
import org.elasticsearch.painless.PainlessPlugin;
3334
import org.elasticsearch.painless.PainlessScriptEngine;
35+
import org.elasticsearch.painless.spi.PainlessExtension;
3436
import org.elasticsearch.painless.spi.Whitelist;
37+
import org.elasticsearch.plugins.ExtensiblePlugin;
3538
import org.elasticsearch.plugins.IngestPlugin;
3639
import org.elasticsearch.script.IngestConditionalScript;
3740
import org.elasticsearch.script.IngestScript;
38-
import org.elasticsearch.script.ScriptContext;
3941
import org.elasticsearch.script.ScriptEngine;
4042
import org.elasticsearch.script.ScriptModule;
4143
import org.elasticsearch.script.ScriptService;
4244
import org.elasticsearch.script.mustache.MustacheScriptEngine;
4345
import org.elasticsearch.threadpool.ThreadPool;
46+
import org.elasticsearch.xpack.constantkeyword.ConstantKeywordPainlessExtension;
47+
import org.elasticsearch.xpack.spatial.SpatialPainlessExtension;
48+
import org.elasticsearch.xpack.wildcard.WildcardPainlessExtension;
4449

4550
import java.io.Closeable;
51+
import java.io.IOException;
4652
import java.time.Duration;
4753
import java.util.ArrayList;
4854
import java.util.HashMap;
@@ -307,37 +313,42 @@ public synchronized EventProcessor build(final PluginContext pluginContext) {
307313
}
308314
}
309315

310-
private static ScriptService initScriptService(final Settings settings, final ThreadPool threadPool) {
311-
final List<Whitelist> painlessBaseWhitelist = getPainlessBaseWhiteList();
312-
final Map<ScriptContext<?>, List<Whitelist>> scriptContexts = Map.of(
313-
IngestScript.CONTEXT, painlessBaseWhitelist,
314-
IngestConditionalScript.CONTEXT, painlessBaseWhitelist);
315-
316+
private static ScriptService initScriptService(final Settings settings, final ThreadPool threadPool) throws IOException {
316317
Map<String, ScriptEngine> engines = new HashMap<>();
317-
engines.put(PainlessScriptEngine.NAME, new PainlessScriptEngine(settings, scriptContexts));
318+
engines.put(PainlessScriptEngine.NAME, getPainlessScriptEngine(settings));
318319
engines.put(MustacheScriptEngine.NAME, new MustacheScriptEngine());
319320
return new ScriptService(settings, engines, ScriptModule.CORE_CONTEXTS, threadPool::absoluteTimeInMillis);
320321
}
321322

322323
/**
323-
* @implNote handles breaking changes introduced in Elasticsearch 8.14 series; once 8.14 is
324-
* released and all builds of this plugin depend on Elasticsearch 8.14+, this can
325-
* be simplified to call {@code PainlessPlugin.baseWhiteList()} directly.
326-
* @return the PainlessPlugin's default base whitelists
324+
* @param settings the Elasticsearch settings object
325+
* @return a {@link ScriptEngine} for painless scripts for use in {@link IngestScript} and
326+
* {@link IngestConditionalScript} contexts, including all available {@link PainlessExtension}s.
327+
* @throws IOException when the underlying script engine cannot be created
327328
*/
328-
@SuppressWarnings({"JavaReflectionMemberAccess", "unchecked"})
329-
static List<Whitelist> getPainlessBaseWhiteList() {
330-
Class<PainlessPlugin> cls = PainlessPlugin.class;
331-
try {
332-
try {
333-
// In 8.14+: PainlessPlugin.baseWhiteList()
334-
return (List<Whitelist>) cls.getMethod("baseWhiteList").invoke(null);
335-
} catch (NoSuchMethodException e) {
336-
// in 8.x->8.13.x: PainlessPlugin.BASE_WHITELISTS
337-
return (List<Whitelist>) cls.getField("BASE_WHITELISTS").get(null);
338-
}
339-
} catch (java.lang.reflect.InvocationTargetException | IllegalAccessException | NoSuchFieldException e) {
340-
throw new RuntimeException("Unsupported PainlessPlugin does not provide access to its base whitelists", e);
329+
private static ScriptEngine getPainlessScriptEngine(final Settings settings) throws IOException {
330+
try (final PainlessPlugin painlessPlugin = new PainlessPlugin()) {
331+
332+
painlessPlugin.loadExtensions(new ExtensiblePlugin.ExtensionLoader() {
333+
@Override
334+
@SuppressWarnings("unchecked")
335+
public <T> List<T> loadExtensions(Class<T> extensionPointType) {
336+
if (extensionPointType.isAssignableFrom(PainlessExtension.class)) {
337+
final List<PainlessExtension> extensions = new ArrayList<>();
338+
339+
extensions.add(new ConstantKeywordPainlessExtension()); // module: constant-keyword
340+
extensions.add(new ProcessorsWhitelistExtension()); // module: ingest-common
341+
extensions.add(new SpatialPainlessExtension()); // module: spatial
342+
extensions.add(new WildcardPainlessExtension()); // module: wildcard
343+
344+
return (List<T>) extensions;
345+
} else {
346+
return List.of();
347+
}
348+
}
349+
});
350+
351+
return painlessPlugin.getScriptEngine(settings, Set.of(IngestScript.CONTEXT, IngestConditionalScript.CONTEXT));
341352
}
342353
}
343354
}

src/test/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilderTest.java

Lines changed: 0 additions & 19 deletions
This file was deleted.

src/test/java/co/elastic/logstash/filters/elasticintegration/SmokeTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,33 @@ public void testSinglePipelineMutatingEvents() {
237237
}));
238238
}
239239

240+
@Test void testPainlessAccessToIngestCommonProcessors() {
241+
final List<Event> matchedEvents = new ArrayList<>();
242+
final EventProcessorBuilder eventProcessorBuilder = EventProcessor.builder()
243+
.setEventPipelineNameResolver((event, exceptionConsumer) -> Optional.of("pipeline"))
244+
.setEventIndexNameResolver((event, handler) -> Optional.empty()) // no index name
245+
.setIndexNamePipelineNameResolver(((indexName, handler) -> Optional.empty())) // no default pipeline
246+
.setPipelineConfigurationResolver(new LocalDirectoryPipelineConfigurationResolver(getPreparedPipelinesResourcePath("script-processor-pipelines")))
247+
.setFilterMatchListener(matchedEvents::add);
248+
249+
final List<Event> inputEvents = List.of(
250+
newEvent(Map.of("id", "baseline", "lower", "lower","mixed", "MiXeD"), Map.of())
251+
);
252+
253+
withEventProcessor(eventProcessorBuilder, (eventProcessor) -> {
254+
final Collection<Event> outputEvents = eventProcessor.processEvents(inputEvents);
255+
assertThat("event count is unchanged", outputEvents, hasSize(inputEvents.size()));
256+
257+
validateEvent(outputEvents, eventWithId("baseline"), (event) -> {
258+
assertAll(String.format("EVENT(data: %s; meta: %s)", event.getData(), event.getMetadata()), () -> {
259+
assertThat(event, not(isTagged("_ingest_pipeline_failure")));
260+
assertThat(event, includesField("[lower]").withValue(equalTo("mixed")));
261+
assertThat(event, includesField("[mixed]").withValue(equalTo("MiXeD")));
262+
});
263+
});
264+
});
265+
}
266+
240267
@Test void testReroutePipelinesMutatingEvents() {
241268
final List<Event> matchedEvents = new ArrayList<>();
242269

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"description": "An pipeline with a script processor referencing ingest common utils",
3+
"processors": [
4+
{
5+
"script": {
6+
"lang": "painless",
7+
"source": "ctx.lower = Processors.lowercase(ctx.mixed)"
8+
}
9+
}
10+
]
11+
}

0 commit comments

Comments
 (0)