Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 58 additions & 34 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ def _requiredLogstashJar(pathPrefix, jarSpec, flavorSpec = null) {
}
}

static OutputStreamFunneler outputStreamFunneler(File logFile) {
logFile.parentFile.mkdirs()
logFile.delete()
logFile.createNewFile()

return new OutputStreamFunneler(new LazyFileOutputStream(logFile))
}

// https://docs.github.com/en/repositories/working-with-files/using-files/downloading-source-code-archives#source-code-archive-urls
String githubArchivePath(repo, treeish="main", archiveFormat="zip") {
def pathFragment = {
Expand Down Expand Up @@ -203,8 +211,10 @@ task downloadElasticsearchSourceZip(type: Download) {
task unzipDownloadedElasticsearchSourceZip(dependsOn: downloadElasticsearchSourceZip, type: Copy) {
description "extracts Elasticsearch source from a downloaded zip file"

ext.location = "${buildDir}/elasticsearch-source/"

from zipTree(downloadElasticsearchSourceZip.dest)
into "${buildDir}/elasticsearch-source/"
into ext.location
eachFile {
// strip top-level directory
path = path.replaceFirst(/^.+?\//, "")
Expand All @@ -216,15 +226,14 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource

def logFile = project.file("${buildDir}/elasticsearch-build.log")
doFirst {
def funneler = new OutputStreamFunneler(new LazyFileOutputStream(logFile))
def funneler = outputStreamFunneler(logFile)
standardOutput = funneler.funnelInstance
errorOutput = funneler.funnelInstance
}

def esSource = "${buildDir}/elasticsearch-source/"
def esSource = "${unzipDownloadedElasticsearchSourceZip.outputs.files.singleFile}"
def esBuildDir = "${esSource}/build"

inputs.dir esSource
outputs.dir esBuildDir

ext.buildRoot = esBuildDir
Expand All @@ -238,7 +247,7 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource
ext.module = { moduleName -> localDistroResult.map { "${it}/modules/${moduleName}"} }

workingDir esSource
commandLine "./gradlew", "localDistro"
commandLine "./gradlew", "--stacktrace", "localDistro"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️


ignoreExitValue true // handled in doLast
doLast {
Expand All @@ -260,20 +269,22 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource
task buildElasticsearchLogstashBridge(type: Exec) {
description "builds logstash-bridge lib module"

dependsOn buildElasticsearchLocalDistro
dependsOn unzipDownloadedElasticsearchSourceZip
dependsOn buildElasticsearchLocalDistro // mustRunAfter?

def logFile = project.file("${buildDir}/logstash-bridge-build.log")
doFirst {
def funneler = new OutputStreamFunneler(new LazyFileOutputStream(logFile))
def funneler = outputStreamFunneler(logFile)
standardOutput = funneler.funnelInstance
errorOutput = funneler.funnelInstance
}

def esSource = "${buildDir}/elasticsearch-source/"
def esSource = "${unzipDownloadedElasticsearchSourceZip.outputs.files.singleFile}"
def esBuildDir = "${esSource}/build"

inputs.dir esSource
outputs.dir "${esBuildDir}/libs/logstash-bridge"
inputs.dir "${esSource}/libs/logstash-bridge"

outputs.dir("${esSource}/libs/logstash-bridge/build/distributions")

ext.buildRoot = esBuildDir
workingDir esSource
Expand All @@ -295,6 +306,28 @@ task buildElasticsearchLogstashBridge(type: Exec) {
}
}

def ingestGeoipPluginShadeNamespace = "org.elasticsearch.ingest.geoip.shaded"

/**
* The StableBridge exposes GeoIP plugin internals, so it needs to relocate references to
* its bundled dependencies to match the shaded locations in our import of that plugin.
*/
task shadeElasticsearchStableBridge(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
description "Shades Maxmind dependencies"

dependsOn buildElasticsearchLogstashBridge

from(buildElasticsearchLogstashBridge)

archiveFileName = "logstash-stable-bridge-shaded.jar"
destinationDirectory = file("${buildDir}/shaded")

relocate('com.fasterxml.jackson', "${ingestGeoipPluginShadeNamespace}.com.fasterxml.jackson")
relocate('com.maxmind', "${ingestGeoipPluginShadeNamespace}.com.maxmind")

mergeServiceFiles()
}

task shadeElasticsearchIngestGeoIpModule(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
description "Shades embedded dependencies of the Elasticsearch Ingest GeoIP module"

Expand All @@ -305,11 +338,16 @@ task shadeElasticsearchIngestGeoIpModule(type: com.github.jengelman.gradle.plugi
archiveFileName = 'ingest-geoip-shaded.jar'
destinationDirectory = file("${buildDir}/shaded")

relocate('com.fasterxml.jackson', "${ingestGeoipPluginShadeNamespace}.com.fasterxml.jackson")
relocate('com.maxmind', "${ingestGeoipPluginShadeNamespace}.com.maxmind")

mergeServiceFiles()

exclude '**/module-info.class'
}

def ingestGrokPluginShadeNamespace = "org.elasticsearch.grok.shaded"

task shadeElasticsearchGrokImplementation(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
description "Shades embedded dependencies of the Elasticsearch Grok implementation"

Expand All @@ -325,13 +363,16 @@ task shadeElasticsearchGrokImplementation(type: com.github.jengelman.gradle.plug
destinationDirectory = file("${buildDir}/shaded")

mergeServiceFiles()
String shadeNamespace = "org.elasticsearch.grok.shaded"
relocate('org.joni', "${shadeNamespace}.org.joni")
relocate('org.jcodings', "${shadeNamespace}.org.jcodings")
relocate('org.joni', "${ingestGrokPluginShadeNamespace}.org.joni")
relocate('org.jcodings', "${ingestGrokPluginShadeNamespace}.org.jcodings")

exclude '**/module-info.class'
}

/**
* The x-pack redact plugin reaches into the grok plugin's implementation, so
* they both need to point to the same relocated shaded components.
*/
task shadeElasticsearchRedactPlugin(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
description "Shades Elasticsearch Redact plugin to reference Grok's shaded dependencies"
dependsOn buildElasticsearchLocalDistro
Expand All @@ -343,24 +384,8 @@ task shadeElasticsearchRedactPlugin(type: com.github.jengelman.gradle.plugins.sh
destinationDirectory = file("${buildDir}/shaded")

// relocate elasticsearch-grok's dependencies to match
String shadeNamespace = "org.elasticsearch.grok.shaded"
relocate('org.joni', "${shadeNamespace}.org.joni")
relocate('org.jcodings', "${shadeNamespace}.org.jcodings")

exclude '**/module-info.class'
}

task shadeElasticsearchLogstashBridge(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
description "Shades the Elasticsearch logstash-bridge jar"

dependsOn buildElasticsearchLogstashBridge

from("${buildDir}/elasticsearch-source/libs/logstash-bridge/build/distributions") {
include "elasticsearch-logstash-bridge-*.jar"
}

archiveFileName = "elasticsearch-logstash-bridge-shaded.jar"
destinationDirectory = file("${buildDir}/shaded")
relocate('org.joni', "${ingestGrokPluginShadeNamespace}.org.joni")
relocate('org.jcodings', "${ingestGrokPluginShadeNamespace}.org.jcodings")

exclude '**/module-info.class'
}
Expand All @@ -369,11 +394,10 @@ task importMinimalElasticsearch() {
description "Imports minimal portions of Elasticsearch localDistro"

dependsOn buildElasticsearchLocalDistro
dependsOn buildElasticsearchLogstashBridge
dependsOn shadeElasticsearchStableBridge
dependsOn shadeElasticsearchIngestGeoIpModule
dependsOn shadeElasticsearchGrokImplementation
dependsOn shadeElasticsearchRedactPlugin
dependsOn shadeElasticsearchLogstashBridge

ext.jars = "${buildDir}/elasticsearch-minimal-jars"

Expand All @@ -392,7 +416,7 @@ task importMinimalElasticsearch() {
include jarPackageNamed("lucene-core")
include jarPackageNamed("lucene-analysis-common")
}
from(shadeElasticsearchLogstashBridge)
from(shadeElasticsearchStableBridge.outputs.files.singleFile)
from(shadeElasticsearchGrokImplementation)
from(buildElasticsearchLocalDistro.module("x-pack-core"))

Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
LOGSTASH_PATH=../../logstash
ELASTICSEARCH_REPO=mashhurs/elasticsearch
ELASTICSEARCH_TREEISH=logstash-bridge-geoip-interfaces
ELASTICSEARCH_REPO=yaauie/elasticsearch
ELASTICSEARCH_TREEISH=rye-bridge-refinement-moar
4 changes: 2 additions & 2 deletions lib/logstash/filters/elastic_integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,11 @@ def _elasticsearch_rest_client(config, &builder_interceptor)

def initialize_event_processor!
java_import('co.elastic.logstash.filters.elasticintegration.EventProcessorBuilder')
java_import('co.elastic.logstash.filters.elasticintegration.geoip.GeoIpProcessorFactory')
java_import('org.elasticsearch.logstashbridge.geoip.GeoIpProcessorFactoryBridge')

@event_processor = EventProcessorBuilder.fromElasticsearch(@elasticsearch_rest_client, extract_immutable_config)
.setFilterMatchListener(method(:filter_matched_java).to_proc)
.addProcessor("geoip") { GeoIpProcessorFactory.new(@geoip_database_provider) }
.addProcessor("geoip") { GeoIpProcessorFactoryBridge::create(@geoip_database_provider) }
.build(@plugin_context)
rescue => exception
raise_config_error!("configuration did not produce an EventProcessor: #{exception}")
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
@@ -1 +1 @@
rootProject.name = 'logstash-filter-elastic_integration'
rootProject.name = 'logstash-filter-elastic_integration'
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.logstashbridge.core.FailProcessorExceptionBridge;
import org.elasticsearch.logstashbridge.core.IOUtilsBridge;
import org.elasticsearch.logstashbridge.core.RefCountingRunnableBridge;
import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge;
Expand Down Expand Up @@ -93,7 +92,7 @@ public Collection<Event> processEvents(final Collection<Event> incomingEvents) t
final CountDownLatch latch = new CountDownLatch(1);
final IntegrationBatch batch = new IntegrationBatch(incomingEvents);

RefCountingRunnableBridge ref = new RefCountingRunnableBridge(latch::countDown);
RefCountingRunnableBridge ref = RefCountingRunnableBridge.create(latch::countDown);
try {
batch.eachRequest(ref::acquire, this::processRequest);
} finally {
Expand Down Expand Up @@ -179,12 +178,11 @@ private void executePipeline(final IngestDocumentBridge ingestDocument, final In
// If no exception, then the original event is to be _replaced_ by the result
if (Objects.nonNull(ingestPipelineException)) {
// If we had an exception in the IngestPipeline, tag and emit the original Event
final Throwable unwrappedException = unwrapException(ingestPipelineException);
LOGGER.warn(() -> String.format("ingest pipeline `%s` failed", pipelineName), unwrappedException);
LOGGER.warn(() -> String.format("ingest pipeline `%s` failed", pipelineName), ingestPipelineException);
request.complete(incomingEvent -> {
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of(
"message", unwrappedException.getMessage(),
"exception", unwrappedException.getClass().getName()
"message", ingestPipelineException.getMessage(),
"exception", ingestPipelineException.getClass().getName()
));
});
} else if (Objects.isNull(resultIngestDocument)) {
Expand Down Expand Up @@ -256,13 +254,6 @@ static private void annotateIngestPipelineFailure(final Event event, final Strin
});
}

static private Throwable unwrapException(final Exception exception) {
if (FailProcessorExceptionBridge.isInstanceOf(exception.getCause())) {
return exception.getCause();
}
return exception;
}

static private String diff(final Event original, final Event changed) {
if (LOGGER.isTraceEnabled()) {
// dot notation less than ideal for LS-internal, but better than re-writing it ourselves.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import org.elasticsearch.logstashbridge.common.SettingsBridge;
import org.elasticsearch.logstashbridge.core.IOUtilsBridge;
import org.elasticsearch.logstashbridge.env.EnvironmentBridge;
import org.elasticsearch.logstashbridge.ingest.ProcessorBridge;
import org.elasticsearch.logstashbridge.ingest.ProcessorFactoryBridge;
import org.elasticsearch.logstashbridge.ingest.ProcessorParametersBridge;
import org.elasticsearch.logstashbridge.plugins.IngestCommonPluginBridge;
import org.elasticsearch.logstashbridge.plugins.IngestPluginBridge;
import org.elasticsearch.logstashbridge.plugins.IngestUserAgentPluginBridge;
Expand Down Expand Up @@ -196,7 +197,7 @@ private synchronized EventProcessorBuilder setFilterMatchListener(final FilterMa
return this;
}

public EventProcessorBuilder addProcessor(final String type, final Supplier<ProcessorBridge.Factory> processorFactorySupplier) {
public EventProcessorBuilder addProcessor(final String type, final Supplier<ProcessorFactoryBridge> processorFactorySupplier) {
return this.addProcessorsFromPlugin(SingleProcessorIngestPlugin.of(type, processorFactorySupplier));
}

Expand Down Expand Up @@ -226,22 +227,22 @@ public synchronized EventProcessor build(final PluginContext pluginContext) {
try {
final ArrayList<Service> services = new ArrayList<>();

final ThreadPoolBridge threadPool = new ThreadPoolBridge(settings);
resourcesToClose.add(() -> ThreadPoolBridge.terminate(threadPool, 10, TimeUnit.SECONDS));
final ThreadPoolBridge threadPool = ThreadPoolBridge.create(settings);
resourcesToClose.add(() -> threadPool.terminate(10, TimeUnit.SECONDS));

final ScriptServiceBridge scriptService = new ScriptServiceBridge(settings, threadPool::absoluteTimeInMillis);
final ScriptServiceBridge scriptService = ScriptServiceBridge.create(settings, threadPool::absoluteTimeInMillis);
resourcesToClose.add(scriptService);

final EnvironmentBridge env = new EnvironmentBridge(settings, null);
final ProcessorBridge.Parameters processorParameters = new ProcessorBridge.Parameters(env, scriptService, threadPool);
final EnvironmentBridge env = EnvironmentBridge.create(settings, null);
final ProcessorParametersBridge processorParameters = ProcessorParametersBridge.create(env, scriptService, threadPool);

IngestPipelineFactory ingestPipelineFactory = new IngestPipelineFactory(scriptService);
for (Supplier<IngestPluginBridge> ingestPluginSupplier : ingestPlugins) {
final IngestPluginBridge ingestPlugin = ingestPluginSupplier.get();
if (ingestPlugin instanceof Closeable closeableIngestPlugin) {
resourcesToClose.add(closeableIngestPlugin);
}
final Map<String, ProcessorBridge.Factory> processorFactories = ingestPlugin.getProcessors(processorParameters);
final Map<String, ProcessorFactoryBridge> processorFactories = ingestPlugin.getProcessors(processorParameters);
ingestPipelineFactory = ingestPipelineFactory.withProcessors(processorFactories);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public IngestDocumentBridge toIngestDocument(final Event event) {
final Timestamp eventTimestamp = safeTimestampFrom(event.getField(org.logstash.Event.TIMESTAMP));
Map<String, Object> ingestMetadata = Map.of(INGEST_METADATA_TIMESTAMP_FIELD, Objects.requireNonNullElseGet(eventTimestamp, Timestamp::now).toString());

return new IngestDocumentBridge(sourceAndMetadata, ingestMetadata);
return IngestDocumentBridge.create(sourceAndMetadata, ingestMetadata);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.logstashbridge.ingest.PipelineBridge;
import org.elasticsearch.logstashbridge.ingest.PipelineConfigurationBridge;
import org.elasticsearch.logstashbridge.ingest.ProcessorBridge;
import org.elasticsearch.logstashbridge.ingest.ProcessorFactoryBridge;
import org.elasticsearch.logstashbridge.script.ScriptServiceBridge;

import java.util.HashMap;
Expand All @@ -24,7 +25,7 @@
*/
public class IngestPipelineFactory {
private final ScriptServiceBridge scriptService;
private final Map<String, ProcessorBridge.Factory> processorFactories;
private final Map<String, ProcessorFactoryBridge> processorFactories;

private static final Logger LOGGER = LogManager.getLogger(IngestPipelineFactory.class);

Expand All @@ -33,13 +34,13 @@ public IngestPipelineFactory(final ScriptServiceBridge scriptService) {
}

private IngestPipelineFactory(final ScriptServiceBridge scriptService,
final Map<String, ProcessorBridge.Factory> processorFactories) {
final Map<String, ProcessorFactoryBridge> processorFactories) {
this.scriptService = scriptService;
this.processorFactories = Map.copyOf(processorFactories);
}

public IngestPipelineFactory withProcessors(final Map<String, ProcessorBridge.Factory> processorFactories) {
final Map<String, ProcessorBridge.Factory> intermediate = new HashMap<>(this.processorFactories);
public IngestPipelineFactory withProcessors(final Map<String, ProcessorFactoryBridge> processorFactories) {
final Map<String, ProcessorFactoryBridge> intermediate = new HashMap<>(this.processorFactories);
intermediate.putAll(processorFactories);
return new IngestPipelineFactory(scriptService, intermediate);
}
Expand All @@ -63,7 +64,7 @@ public Optional<IngestPipeline> create(final PipelineConfigurationBridge pipelin
* resolve pipelines through the provided {@link IngestPipelineResolver}.
*/
public IngestPipelineFactory withIngestPipelineResolver(final IngestPipelineResolver ingestPipelineResolver) {
final Map<String, ProcessorBridge.Factory> modifiedProcessorFactories = new HashMap<>(this.processorFactories);
final Map<String, ProcessorFactoryBridge> modifiedProcessorFactories = new HashMap<>(this.processorFactories);
modifiedProcessorFactories.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(ingestPipelineResolver, this.scriptService));
return new IngestPipelineFactory(scriptService, modifiedProcessorFactories);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public PipelineConfigurationBridge parseNamedObject(final String json) throws Ex
}

public PipelineConfigurationBridge parseConfigOnly(final String pipelineId, final String jsonEncodedConfig) {
return new PipelineConfigurationBridge(pipelineId, jsonEncodedConfig);
return PipelineConfigurationBridge.create(pipelineId, jsonEncodedConfig);
}


Expand All @@ -66,7 +66,7 @@ public List<PipelineConfigurationBridge> get(){
}

private static PipelineConfigurationBridge init(final String id, final String json) {
return new PipelineConfigurationBridge(id, json);
return PipelineConfigurationBridge.create(id, json);
}
}
}
Loading