Skip to content

Commit fb94ed0

Browse files
committed
1 parent 2dda5e1 commit fb94ed0

File tree

11 files changed

+109
-113
lines changed

11 files changed

+109
-113
lines changed

build.gradle

Lines changed: 58 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,14 @@ def _requiredLogstashJar(pathPrefix, jarSpec, flavorSpec = null) {
165165
}
166166
}
167167

168+
static OutputStreamFunneler outputStreamFunneler(File logFile) {
169+
logFile.parentFile.mkdirs()
170+
logFile.delete()
171+
logFile.createNewFile()
172+
173+
return new OutputStreamFunneler(new LazyFileOutputStream(logFile))
174+
}
175+
168176
// https://docs.github.com/en/repositories/working-with-files/using-files/downloading-source-code-archives#source-code-archive-urls
169177
String githubArchivePath(repo, treeish="main", archiveFormat="zip") {
170178
def pathFragment = {
@@ -203,8 +211,10 @@ task downloadElasticsearchSourceZip(type: Download) {
203211
task unzipDownloadedElasticsearchSourceZip(dependsOn: downloadElasticsearchSourceZip, type: Copy) {
204212
description "extracts Elasticsearch source from a downloaded zip file"
205213

214+
ext.location = "${buildDir}/elasticsearch-source/"
215+
206216
from zipTree(downloadElasticsearchSourceZip.dest)
207-
into "${buildDir}/elasticsearch-source/"
217+
into ext.location
208218
eachFile {
209219
// strip top-level directory
210220
path = path.replaceFirst(/^.+?\//, "")
@@ -216,15 +226,14 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource
216226

217227
def logFile = project.file("${buildDir}/elasticsearch-build.log")
218228
doFirst {
219-
def funneler = new OutputStreamFunneler(new LazyFileOutputStream(logFile))
229+
def funneler = outputStreamFunneler(logFile)
220230
standardOutput = funneler.funnelInstance
221231
errorOutput = funneler.funnelInstance
222232
}
223233

224-
def esSource = "${buildDir}/elasticsearch-source/"
234+
def esSource = "${unzipDownloadedElasticsearchSourceZip.outputs.files.singleFile}"
225235
def esBuildDir = "${esSource}/build"
226236

227-
inputs.dir esSource
228237
outputs.dir esBuildDir
229238

230239
ext.buildRoot = esBuildDir
@@ -238,7 +247,7 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource
238247
ext.module = { moduleName -> localDistroResult.map { "${it}/modules/${moduleName}"} }
239248

240249
workingDir esSource
241-
commandLine "./gradlew", "localDistro"
250+
commandLine "./gradlew", "--stacktrace", "localDistro"
242251

243252
ignoreExitValue true // handled in doLast
244253
doLast {
@@ -260,20 +269,22 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource
260269
task buildElasticsearchLogstashBridge(type: Exec) {
261270
description "builds logstash-bridge lib module"
262271

263-
dependsOn buildElasticsearchLocalDistro
272+
dependsOn unzipDownloadedElasticsearchSourceZip
273+
dependsOn buildElasticsearchLocalDistro // mustRunAfter?
264274

265275
def logFile = project.file("${buildDir}/logstash-bridge-build.log")
266276
doFirst {
267-
def funneler = new OutputStreamFunneler(new LazyFileOutputStream(logFile))
277+
def funneler = outputStreamFunneler(logFile)
268278
standardOutput = funneler.funnelInstance
269279
errorOutput = funneler.funnelInstance
270280
}
271281

272-
def esSource = "${buildDir}/elasticsearch-source/"
282+
def esSource = "${unzipDownloadedElasticsearchSourceZip.outputs.files.singleFile}"
273283
def esBuildDir = "${esSource}/build"
274284

275-
inputs.dir esSource
276-
outputs.dir "${esBuildDir}/libs/logstash-bridge"
285+
inputs.dir "${esSource}/libs/logstash-bridge"
286+
287+
outputs.dir("${esSource}/libs/logstash-bridge/build/distributions")
277288

278289
ext.buildRoot = esBuildDir
279290
workingDir esSource
@@ -295,6 +306,28 @@ task buildElasticsearchLogstashBridge(type: Exec) {
295306
}
296307
}
297308

309+
def ingestGeoipPluginShadeNamespace = "org.elasticsearch.ingest.geoip.shaded"
310+
311+
/**
312+
* The StableBridge exposes GeoIP plugin internals, so it needs to relocate references to
313+
* its bundled dependencies to match the shaded locations in our import of that plugin.
314+
*/
315+
task shadeElasticsearchStableBridge(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
316+
description "Shades Maxmind dependencies"
317+
318+
dependsOn buildElasticsearchLogstashBridge
319+
320+
from(buildElasticsearchLogstashBridge)
321+
322+
archiveFileName = "logstash-stable-bridge-shaded.jar"
323+
destinationDirectory = file("${buildDir}/shaded")
324+
325+
relocate('com.fasterxml.jackson', "${ingestGeoipPluginShadeNamespace}.com.fasterxml.jackson")
326+
relocate('com.maxmind', "${ingestGeoipPluginShadeNamespace}.com.maxmind")
327+
328+
mergeServiceFiles()
329+
}
330+
298331
task shadeElasticsearchIngestGeoIpModule(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
299332
description "Shades embedded dependencies of the Elasticsearch Ingest GeoIP module"
300333

@@ -305,11 +338,16 @@ task shadeElasticsearchIngestGeoIpModule(type: com.github.jengelman.gradle.plugi
305338
archiveFileName = 'ingest-geoip-shaded.jar'
306339
destinationDirectory = file("${buildDir}/shaded")
307340

341+
relocate('com.fasterxml.jackson', "${ingestGeoipPluginShadeNamespace}.com.fasterxml.jackson")
342+
relocate('com.maxmind', "${ingestGeoipPluginShadeNamespace}.com.maxmind")
343+
308344
mergeServiceFiles()
309345

310346
exclude '**/module-info.class'
311347
}
312348

349+
def ingestGrokPluginShadeNamespace = "org.elasticsearch.grok.shaded"
350+
313351
task shadeElasticsearchGrokImplementation(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
314352
description "Shades embedded dependencies of the Elasticsearch Grok implementation"
315353

@@ -325,13 +363,16 @@ task shadeElasticsearchGrokImplementation(type: com.github.jengelman.gradle.plug
325363
destinationDirectory = file("${buildDir}/shaded")
326364

327365
mergeServiceFiles()
328-
String shadeNamespace = "org.elasticsearch.grok.shaded"
329-
relocate('org.joni', "${shadeNamespace}.org.joni")
330-
relocate('org.jcodings', "${shadeNamespace}.org.jcodings")
366+
relocate('org.joni', "${ingestGrokPluginShadeNamespace}.org.joni")
367+
relocate('org.jcodings', "${ingestGrokPluginShadeNamespace}.org.jcodings")
331368

332369
exclude '**/module-info.class'
333370
}
334371

372+
/**
373+
* The x-pack redact plugin reaches into the grok plugin's implementation, so
374+
* they both need to point to the same relocated shaded components.
375+
*/
335376
task shadeElasticsearchRedactPlugin(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
336377
description "Shades Elasticsearch Redact plugin to reference Grok's shaded dependencies"
337378
dependsOn buildElasticsearchLocalDistro
@@ -343,24 +384,8 @@ task shadeElasticsearchRedactPlugin(type: com.github.jengelman.gradle.plugins.sh
343384
destinationDirectory = file("${buildDir}/shaded")
344385

345386
// relocate elasticsearch-grok's dependencies to match
346-
String shadeNamespace = "org.elasticsearch.grok.shaded"
347-
relocate('org.joni', "${shadeNamespace}.org.joni")
348-
relocate('org.jcodings', "${shadeNamespace}.org.jcodings")
349-
350-
exclude '**/module-info.class'
351-
}
352-
353-
task shadeElasticsearchLogstashBridge(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
354-
description "Shades the Elasticsearch logstash-bridge jar"
355-
356-
dependsOn buildElasticsearchLogstashBridge
357-
358-
from("${buildDir}/elasticsearch-source/libs/logstash-bridge/build/distributions") {
359-
include "elasticsearch-logstash-bridge-*.jar"
360-
}
361-
362-
archiveFileName = "elasticsearch-logstash-bridge-shaded.jar"
363-
destinationDirectory = file("${buildDir}/shaded")
387+
relocate('org.joni', "${ingestGrokPluginShadeNamespace}.org.joni")
388+
relocate('org.jcodings', "${ingestGrokPluginShadeNamespace}.org.jcodings")
364389

365390
exclude '**/module-info.class'
366391
}
@@ -369,11 +394,10 @@ task importMinimalElasticsearch() {
369394
description "Imports minimal portions of Elasticsearch localDistro"
370395

371396
dependsOn buildElasticsearchLocalDistro
372-
dependsOn buildElasticsearchLogstashBridge
397+
dependsOn shadeElasticsearchStableBridge
373398
dependsOn shadeElasticsearchIngestGeoIpModule
374399
dependsOn shadeElasticsearchGrokImplementation
375400
dependsOn shadeElasticsearchRedactPlugin
376-
dependsOn shadeElasticsearchLogstashBridge
377401

378402
ext.jars = "${buildDir}/elasticsearch-minimal-jars"
379403

@@ -392,7 +416,7 @@ task importMinimalElasticsearch() {
392416
include jarPackageNamed("lucene-core")
393417
include jarPackageNamed("lucene-analysis-common")
394418
}
395-
from(shadeElasticsearchLogstashBridge)
419+
from(shadeElasticsearchStableBridge.outputs.files.singleFile)
396420
from(shadeElasticsearchGrokImplementation)
397421
from(buildElasticsearchLocalDistro.module("x-pack-core"))
398422

gradle.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
LOGSTASH_PATH=../../logstash
2-
ELASTICSEARCH_REPO=mashhurs/elasticsearch
3-
ELASTICSEARCH_TREEISH=logstash-bridge-geoip-interfaces
2+
ELASTICSEARCH_REPO=yaauie/elasticsearch
3+
ELASTICSEARCH_TREEISH=rye-bridge-refinement-progress

lib/logstash/filters/elastic_integration.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,11 +368,11 @@ def _elasticsearch_rest_client(config, &builder_interceptor)
368368

369369
def initialize_event_processor!
370370
java_import('co.elastic.logstash.filters.elasticintegration.EventProcessorBuilder')
371-
java_import('co.elastic.logstash.filters.elasticintegration.geoip.GeoIpProcessorFactory')
371+
java_import('org.elasticsearch.logstashbridge.geoip.GeoIpProcessorBridge')
372372

373373
@event_processor = EventProcessorBuilder.fromElasticsearch(@elasticsearch_rest_client, extract_immutable_config)
374374
.setFilterMatchListener(method(:filter_matched_java).to_proc)
375-
.addProcessor("geoip") { GeoIpProcessorFactory.new(@geoip_database_provider) }
375+
.addProcessor("geoip") { GeoIpProcessorBridge::newFactory(@geoip_database_provider) }
376376
.build(@plugin_context)
377377
rescue => exception
378378
raise_config_error!("configuration did not produce an EventProcessor: #{exception}")

settings.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
rootProject.name = 'logstash-filter-elastic_integration'
1+
rootProject.name = 'logstash-filter-elastic_integration'

src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessor.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import com.google.common.collect.Maps;
1414
import org.apache.logging.log4j.LogManager;
1515
import org.apache.logging.log4j.Logger;
16-
import org.elasticsearch.logstashbridge.core.FailProcessorExceptionBridge;
1716
import org.elasticsearch.logstashbridge.core.IOUtilsBridge;
1817
import org.elasticsearch.logstashbridge.core.RefCountingRunnableBridge;
1918
import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge;
@@ -179,12 +178,11 @@ private void executePipeline(final IngestDocumentBridge ingestDocument, final In
179178
// If no exception, then the original event is to be _replaced_ by the result
180179
if (Objects.nonNull(ingestPipelineException)) {
181180
// If we had an exception in the IngestPipeline, tag and emit the original Event
182-
final Throwable unwrappedException = unwrapException(ingestPipelineException);
183-
LOGGER.warn(() -> String.format("ingest pipeline `%s` failed", pipelineName), unwrappedException);
181+
LOGGER.warn(() -> String.format("ingest pipeline `%s` failed", pipelineName), ingestPipelineException);
184182
request.complete(incomingEvent -> {
185183
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of(
186-
"message", unwrappedException.getMessage(),
187-
"exception", unwrappedException.getClass().getName()
184+
"message", ingestPipelineException.getMessage(),
185+
"exception", ingestPipelineException.getClass().getName()
188186
));
189187
});
190188
} else if (Objects.isNull(resultIngestDocument)) {
@@ -256,13 +254,6 @@ static private void annotateIngestPipelineFailure(final Event event, final Strin
256254
});
257255
}
258256

259-
static private Throwable unwrapException(final Exception exception) {
260-
if (FailProcessorExceptionBridge.isInstanceOf(exception.getCause())) {
261-
return exception.getCause();
262-
}
263-
return exception;
264-
}
265-
266257
static private String diff(final Event original, final Event changed) {
267258
if (LOGGER.isTraceEnabled()) {
268259
// dot notation less than ideal for LS-internal, but better than re-writing it ourselves.

src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/GeoIpProcessorFactory.java

Lines changed: 0 additions & 37 deletions
This file was deleted.

src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseAdapter.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@
88

99
import org.apache.logging.log4j.LogManager;
1010
import org.apache.logging.log4j.Logger;
11+
import org.elasticsearch.logstashbridge.core.CheckedBiFunctionBridge;
1112
import org.elasticsearch.logstashbridge.geoip.IpDatabaseBridge;
12-
import org.elasticsearch.logstashbridge.geoip.MaxMindDbBridge;
13+
import org.elasticsearch.ingest.geoip.shaded.com.maxmind.db.CHMCache;
14+
import org.elasticsearch.ingest.geoip.shaded.com.maxmind.db.NoCache;
15+
import org.elasticsearch.ingest.geoip.shaded.com.maxmind.db.NodeCache;
16+
import org.elasticsearch.ingest.geoip.shaded.com.maxmind.db.Reader;
1317

1418
import java.io.File;
1519
import java.io.IOException;
@@ -19,14 +23,14 @@
1923
public class IpDatabaseAdapter extends IpDatabaseBridge.AbstractExternal {
2024
private static final Logger LOGGER = LogManager.getLogger(IpDatabaseAdapter.class);
2125

22-
private final MaxMindDbBridge.Reader databaseReader;
26+
private final Reader databaseReader;
2327
private final String databaseType;
2428

2529
private volatile boolean isReaderClosed = false;
2630

27-
public IpDatabaseAdapter(final MaxMindDbBridge.Reader databaseReader) {
31+
public IpDatabaseAdapter(final Reader databaseReader) {
2832
this.databaseReader = databaseReader;
29-
this.databaseType = databaseReader.getDatabaseType();
33+
this.databaseType = databaseReader.getMetadata().getDatabaseType();
3034
}
3135

3236
@Override
@@ -35,8 +39,19 @@ public String getDatabaseType() {
3539
}
3640

3741
@Override
38-
public MaxMindDbBridge.Reader getDatabaseReader() throws IOException {
39-
return this.databaseReader;
42+
public <RESPONSE> RESPONSE getResponse(String ipAddress, CheckedBiFunctionBridge<Reader, String, RESPONSE, Exception> responseProvider) {
43+
try {
44+
return responseProvider.apply(this.databaseReader, ipAddress);
45+
} catch (Exception e) {
46+
throw convertToRuntime(e);
47+
}
48+
}
49+
50+
private static RuntimeException convertToRuntime(final Exception e) {
51+
if (e instanceof RuntimeException re) {
52+
return re;
53+
}
54+
return new RuntimeException(e);
4055
}
4156

4257
@Override
@@ -58,25 +73,25 @@ boolean isReaderClosed() {
5873
}
5974

6075
public static IpDatabaseAdapter defaultForPath(final Path database) throws IOException {
61-
return new Builder(database.toFile()).setCache(MaxMindDbBridge.NodeCache.get(10_000)).build();
76+
return new Builder(database.toFile()).setCache(new CHMCache(10_000)).build();
6277
}
6378

6479
public static class Builder {
6580
private File databasePath;
66-
private MaxMindDbBridge.NodeCache nodeCache;
81+
private NodeCache nodeCache;
6782

6883
public Builder(final File databasePath) {
6984
this.databasePath = databasePath;
7085
}
7186

72-
public Builder setCache(final MaxMindDbBridge.NodeCache nodeCache) {
87+
public Builder setCache(final NodeCache nodeCache) {
7388
this.nodeCache = nodeCache;
7489
return this;
7590
}
7691

7792
public IpDatabaseAdapter build() throws IOException {
78-
final MaxMindDbBridge.NodeCache nodeCache = Optional.ofNullable(this.nodeCache).orElseGet(MaxMindDbBridge.NodeCache::getInstance);
79-
final MaxMindDbBridge.Reader databaseReader = new MaxMindDbBridge.Reader(this.databasePath, nodeCache);
93+
final NodeCache nodeCache = Optional.ofNullable(this.nodeCache).orElseGet(NoCache::getInstance);
94+
final Reader databaseReader = new Reader(this.databasePath, nodeCache);
8095
return new IpDatabaseAdapter(databaseReader);
8196
}
8297
}

src/main/java/co/elastic/logstash/filters/elasticintegration/geoip/IpDatabaseHolder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package co.elastic.logstash.filters.elasticintegration.geoip;
22

3-
interface IpDatabaseHolder {
3+
public interface IpDatabaseHolder {
44
boolean isValid();
55

66
IpDatabaseAdapter getDatabase();

0 commit comments

Comments
 (0)