Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 58 additions & 34 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ def _requiredLogstashJar(pathPrefix, jarSpec, flavorSpec = null) {
}
}

static OutputStreamFunneler outputStreamFunneler(File logFile) {
logFile.parentFile.mkdirs()
logFile.delete()
logFile.createNewFile()

return new OutputStreamFunneler(new LazyFileOutputStream(logFile))
}

// https://docs.github.com/en/repositories/working-with-files/using-files/downloading-source-code-archives#source-code-archive-urls
String githubArchivePath(repo, treeish="main", archiveFormat="zip") {
def pathFragment = {
Expand Down Expand Up @@ -203,8 +211,10 @@ task downloadElasticsearchSourceZip(type: Download) {
task unzipDownloadedElasticsearchSourceZip(dependsOn: downloadElasticsearchSourceZip, type: Copy) {
description "extracts Elasticsearch source from a downloaded zip file"

ext.location = "${buildDir}/elasticsearch-source/"

from zipTree(downloadElasticsearchSourceZip.dest)
into "${buildDir}/elasticsearch-source/"
into ext.location
eachFile {
// strip top-level directory
path = path.replaceFirst(/^.+?\//, "")
Expand All @@ -216,15 +226,14 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource

def logFile = project.file("${buildDir}/elasticsearch-build.log")
doFirst {
def funneler = new OutputStreamFunneler(new LazyFileOutputStream(logFile))
def funneler = outputStreamFunneler(logFile)
standardOutput = funneler.funnelInstance
errorOutput = funneler.funnelInstance
}

def esSource = "${buildDir}/elasticsearch-source/"
def esSource = "${unzipDownloadedElasticsearchSourceZip.outputs.files.singleFile}"
def esBuildDir = "${esSource}/build"

inputs.dir esSource
outputs.dir esBuildDir

ext.buildRoot = esBuildDir
Expand All @@ -238,7 +247,7 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource
ext.module = { moduleName -> localDistroResult.map { "${it}/modules/${moduleName}"} }

workingDir esSource
commandLine "./gradlew", "localDistro"
commandLine "./gradlew", "--stacktrace", "localDistro"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️


ignoreExitValue true // handled in doLast
doLast {
Expand All @@ -260,20 +269,22 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource
task buildElasticsearchLogstashBridge(type: Exec) {
description "builds logstash-bridge lib module"

dependsOn buildElasticsearchLocalDistro
dependsOn unzipDownloadedElasticsearchSourceZip
dependsOn buildElasticsearchLocalDistro // mustRunAfter?

def logFile = project.file("${buildDir}/logstash-bridge-build.log")
doFirst {
def funneler = new OutputStreamFunneler(new LazyFileOutputStream(logFile))
def funneler = outputStreamFunneler(logFile)
standardOutput = funneler.funnelInstance
errorOutput = funneler.funnelInstance
}

def esSource = "${buildDir}/elasticsearch-source/"
def esSource = "${unzipDownloadedElasticsearchSourceZip.outputs.files.singleFile}"
def esBuildDir = "${esSource}/build"

inputs.dir esSource
outputs.dir "${esBuildDir}/libs/logstash-bridge"
inputs.dir "${esSource}/libs/logstash-bridge"

outputs.dir("${esSource}/libs/logstash-bridge/build/distributions")

ext.buildRoot = esBuildDir
workingDir esSource
Expand All @@ -295,6 +306,28 @@ task buildElasticsearchLogstashBridge(type: Exec) {
}
}

def ingestGeoipPluginShadeNamespace = "org.elasticsearch.ingest.geoip.shaded"

/**
* The StableBridge exposes GeoIP plugin internals, so it needs to relocate references to
* its bundled dependencies to match the shaded locations in our import of that plugin.
*/
task shadeElasticsearchStableBridge(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
description "Shades Maxmind dependencies"

dependsOn buildElasticsearchLogstashBridge

from(buildElasticsearchLogstashBridge)

archiveFileName = "logstash-stable-bridge-shaded.jar"
destinationDirectory = file("${buildDir}/shaded")

relocate('com.fasterxml.jackson', "${ingestGeoipPluginShadeNamespace}.com.fasterxml.jackson")
relocate('com.maxmind', "${ingestGeoipPluginShadeNamespace}.com.maxmind")

mergeServiceFiles()
}

task shadeElasticsearchIngestGeoIpModule(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
description "Shades embedded dependencies of the Elasticsearch Ingest GeoIP module"

Expand All @@ -305,11 +338,16 @@ task shadeElasticsearchIngestGeoIpModule(type: com.github.jengelman.gradle.plugi
archiveFileName = 'ingest-geoip-shaded.jar'
destinationDirectory = file("${buildDir}/shaded")

relocate('com.fasterxml.jackson', "${ingestGeoipPluginShadeNamespace}.com.fasterxml.jackson")
relocate('com.maxmind', "${ingestGeoipPluginShadeNamespace}.com.maxmind")

mergeServiceFiles()

exclude '**/module-info.class'
}

def ingestGrokPluginShadeNamespace = "org.elasticsearch.grok.shaded"

task shadeElasticsearchGrokImplementation(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
description "Shades embedded dependencies of the Elasticsearch Grok implementation"

Expand All @@ -325,13 +363,16 @@ task shadeElasticsearchGrokImplementation(type: com.github.jengelman.gradle.plug
destinationDirectory = file("${buildDir}/shaded")

mergeServiceFiles()
String shadeNamespace = "org.elasticsearch.grok.shaded"
relocate('org.joni', "${shadeNamespace}.org.joni")
relocate('org.jcodings', "${shadeNamespace}.org.jcodings")
relocate('org.joni', "${ingestGrokPluginShadeNamespace}.org.joni")
relocate('org.jcodings', "${ingestGrokPluginShadeNamespace}.org.jcodings")

exclude '**/module-info.class'
}

/**
* The x-pack redact plugin reaches into the grok plugin's implementation, so
* they both need to point to the same relocated shaded components.
*/
task shadeElasticsearchRedactPlugin(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
description "Shades Elasticsearch Redact plugin to reference Grok's shaded dependencies"
dependsOn buildElasticsearchLocalDistro
Expand All @@ -343,24 +384,8 @@ task shadeElasticsearchRedactPlugin(type: com.github.jengelman.gradle.plugins.sh
destinationDirectory = file("${buildDir}/shaded")

// relocate elasticsearch-grok's dependencies to match
String shadeNamespace = "org.elasticsearch.grok.shaded"
relocate('org.joni', "${shadeNamespace}.org.joni")
relocate('org.jcodings', "${shadeNamespace}.org.jcodings")

exclude '**/module-info.class'
}

task shadeElasticsearchLogstashBridge(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
description "Shades the Elasticsearch logstash-bridge jar"

dependsOn buildElasticsearchLogstashBridge

from("${buildDir}/elasticsearch-source/libs/logstash-bridge/build/distributions") {
include "elasticsearch-logstash-bridge-*.jar"
}

archiveFileName = "elasticsearch-logstash-bridge-shaded.jar"
destinationDirectory = file("${buildDir}/shaded")
relocate('org.joni', "${ingestGrokPluginShadeNamespace}.org.joni")
relocate('org.jcodings', "${ingestGrokPluginShadeNamespace}.org.jcodings")

exclude '**/module-info.class'
}
Expand All @@ -369,11 +394,10 @@ task importMinimalElasticsearch() {
description "Imports minimal portions of Elasticsearch localDistro"

dependsOn buildElasticsearchLocalDistro
dependsOn buildElasticsearchLogstashBridge
dependsOn shadeElasticsearchStableBridge
dependsOn shadeElasticsearchIngestGeoIpModule
dependsOn shadeElasticsearchGrokImplementation
dependsOn shadeElasticsearchRedactPlugin
dependsOn shadeElasticsearchLogstashBridge

ext.jars = "${buildDir}/elasticsearch-minimal-jars"

Expand All @@ -392,7 +416,7 @@ task importMinimalElasticsearch() {
include jarPackageNamed("lucene-core")
include jarPackageNamed("lucene-analysis-common")
}
from(shadeElasticsearchLogstashBridge)
from(shadeElasticsearchStableBridge.outputs.files.singleFile)
from(shadeElasticsearchGrokImplementation)
from(buildElasticsearchLocalDistro.module("x-pack-core"))

Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
LOGSTASH_PATH=../../logstash
ELASTICSEARCH_REPO=mashhurs/elasticsearch
ELASTICSEARCH_TREEISH=logstash-bridge-geoip-interfaces
ELASTICSEARCH_REPO=yaauie/elasticsearch
ELASTICSEARCH_TREEISH=rye-bridge-refinement-progress
4 changes: 2 additions & 2 deletions lib/logstash/filters/elastic_integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,11 @@ def _elasticsearch_rest_client(config, &builder_interceptor)

def initialize_event_processor!
java_import('co.elastic.logstash.filters.elasticintegration.EventProcessorBuilder')
java_import('co.elastic.logstash.filters.elasticintegration.geoip.GeoIpProcessorFactory')
java_import('org.elasticsearch.logstashbridge.geoip.GeoIpProcessorBridge')

@event_processor = EventProcessorBuilder.fromElasticsearch(@elasticsearch_rest_client, extract_immutable_config)
.setFilterMatchListener(method(:filter_matched_java).to_proc)
.addProcessor("geoip") { GeoIpProcessorFactory.new(@geoip_database_provider) }
.addProcessor("geoip") { GeoIpProcessorBridge::newFactory(@geoip_database_provider) }
.build(@plugin_context)
rescue => exception
raise_config_error!("configuration did not produce an EventProcessor: #{exception}")
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
@@ -1 +1 @@
rootProject.name = 'logstash-filter-elastic_integration'
rootProject.name = 'logstash-filter-elastic_integration'
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.logstashbridge.core.FailProcessorExceptionBridge;
import org.elasticsearch.logstashbridge.core.IOUtilsBridge;
import org.elasticsearch.logstashbridge.core.RefCountingRunnableBridge;
import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge;
Expand Down Expand Up @@ -179,12 +178,11 @@ private void executePipeline(final IngestDocumentBridge ingestDocument, final In
// If no exception, then the original event is to be _replaced_ by the result
if (Objects.nonNull(ingestPipelineException)) {
// If we had an exception in the IngestPipeline, tag and emit the original Event
final Throwable unwrappedException = unwrapException(ingestPipelineException);
LOGGER.warn(() -> String.format("ingest pipeline `%s` failed", pipelineName), unwrappedException);
LOGGER.warn(() -> String.format("ingest pipeline `%s` failed", pipelineName), ingestPipelineException);
request.complete(incomingEvent -> {
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of(
"message", unwrappedException.getMessage(),
"exception", unwrappedException.getClass().getName()
"message", ingestPipelineException.getMessage(),
"exception", ingestPipelineException.getClass().getName()
));
});
} else if (Objects.isNull(resultIngestDocument)) {
Expand Down Expand Up @@ -256,13 +254,6 @@ static private void annotateIngestPipelineFailure(final Event event, final Strin
});
}

static private Throwable unwrapException(final Exception exception) {
if (FailProcessorExceptionBridge.isInstanceOf(exception.getCause())) {
return exception.getCause();
}
return exception;
}

static private String diff(final Event original, final Event changed) {
if (LOGGER.isTraceEnabled()) {
// dot notation less than ideal for LS-internal, but better than re-writing it ourselves.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.logstashbridge.core.CheckedBiFunctionBridge;
import org.elasticsearch.logstashbridge.geoip.IpDatabaseBridge;
import org.elasticsearch.logstashbridge.geoip.MaxMindDbBridge;
import org.elasticsearch.ingest.geoip.shaded.com.maxmind.db.CHMCache;
import org.elasticsearch.ingest.geoip.shaded.com.maxmind.db.NoCache;
import org.elasticsearch.ingest.geoip.shaded.com.maxmind.db.NodeCache;
import org.elasticsearch.ingest.geoip.shaded.com.maxmind.db.Reader;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this was my confusion that we fully move to the ES logstash-bridge w/o embedding the JARs but after analyzing the risk of conflict, as we took offline discussion, our original approach is the right one.

This indicates we DON'T NEED maxmind dependencies in

  • libs/logstash-bridge/build.gradle -> compileOnly('com.maxmind.db:maxmind-db:3.1.1')
  • and libs/logstash-bridge/src/main/java/module-info.java -> requires com.maxmind.db;

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove these dependencies, just noting here to remind myself while reviewing.


import java.io.File;
import java.io.IOException;
Expand All @@ -19,14 +23,14 @@
public class IpDatabaseAdapter extends IpDatabaseBridge.AbstractExternal {
private static final Logger LOGGER = LogManager.getLogger(IpDatabaseAdapter.class);

private final MaxMindDbBridge.Reader databaseReader;
private final Reader databaseReader;
private final String databaseType;

private volatile boolean isReaderClosed = false;

public IpDatabaseAdapter(final MaxMindDbBridge.Reader databaseReader) {
public IpDatabaseAdapter(final Reader databaseReader) {
this.databaseReader = databaseReader;
this.databaseType = databaseReader.getDatabaseType();
this.databaseType = databaseReader.getMetadata().getDatabaseType();
}

@Override
Expand All @@ -35,8 +39,19 @@ public String getDatabaseType() {
}

@Override
public MaxMindDbBridge.Reader getDatabaseReader() throws IOException {
return this.databaseReader;
public <RESPONSE> RESPONSE getResponse(String ipAddress, CheckedBiFunctionBridge<Reader, String, RESPONSE, Exception> responseProvider) {
try {
return responseProvider.apply(this.databaseReader, ipAddress);
} catch (Exception e) {
throw convertToRuntime(e);
}
}

private static RuntimeException convertToRuntime(final Exception e) {
if (e instanceof RuntimeException re) {
return re;
}
return new RuntimeException(e);
}

@Override
Expand All @@ -58,25 +73,25 @@ boolean isReaderClosed() {
}

public static IpDatabaseAdapter defaultForPath(final Path database) throws IOException {
return new Builder(database.toFile()).setCache(MaxMindDbBridge.NodeCache.get(10_000)).build();
return new Builder(database.toFile()).setCache(new CHMCache(10_000)).build();
}

public static class Builder {
private File databasePath;
private MaxMindDbBridge.NodeCache nodeCache;
private NodeCache nodeCache;

public Builder(final File databasePath) {
this.databasePath = databasePath;
}

public Builder setCache(final MaxMindDbBridge.NodeCache nodeCache) {
public Builder setCache(final NodeCache nodeCache) {
this.nodeCache = nodeCache;
return this;
}

public IpDatabaseAdapter build() throws IOException {
final MaxMindDbBridge.NodeCache nodeCache = Optional.ofNullable(this.nodeCache).orElseGet(MaxMindDbBridge.NodeCache::getInstance);
final MaxMindDbBridge.Reader databaseReader = new MaxMindDbBridge.Reader(this.databasePath, nodeCache);
final NodeCache nodeCache = Optional.ofNullable(this.nodeCache).orElseGet(NoCache::getInstance);
final Reader databaseReader = new Reader(this.databasePath, nodeCache);
return new IpDatabaseAdapter(databaseReader);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package co.elastic.logstash.filters.elasticintegration.geoip;

interface IpDatabaseHolder {
public interface IpDatabaseHolder {
boolean isValid();

IpDatabaseAdapter getDatabase();
Expand Down
Loading