Skip to content

Commit 7c0b6c7

Browse files
authored
Merge pull request #1 from yaauie/bridge-alignment-review
align bridge to yaauie/elasticsearch@94d58d47cd
2 parents 2dda5e1 + bdef04b commit 7c0b6c7

19 files changed

+155
-157
lines changed

build.gradle

Lines changed: 58 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,14 @@ def _requiredLogstashJar(pathPrefix, jarSpec, flavorSpec = null) {
165165
}
166166
}
167167

168+
static OutputStreamFunneler outputStreamFunneler(File logFile) {
169+
logFile.parentFile.mkdirs()
170+
logFile.delete()
171+
logFile.createNewFile()
172+
173+
return new OutputStreamFunneler(new LazyFileOutputStream(logFile))
174+
}
175+
168176
// https://docs.github.com/en/repositories/working-with-files/using-files/downloading-source-code-archives#source-code-archive-urls
169177
String githubArchivePath(repo, treeish="main", archiveFormat="zip") {
170178
def pathFragment = {
@@ -203,8 +211,10 @@ task downloadElasticsearchSourceZip(type: Download) {
203211
task unzipDownloadedElasticsearchSourceZip(dependsOn: downloadElasticsearchSourceZip, type: Copy) {
204212
description "extracts Elasticsearch source from a downloaded zip file"
205213

214+
ext.location = "${buildDir}/elasticsearch-source/"
215+
206216
from zipTree(downloadElasticsearchSourceZip.dest)
207-
into "${buildDir}/elasticsearch-source/"
217+
into ext.location
208218
eachFile {
209219
// strip top-level directory
210220
path = path.replaceFirst(/^.+?\//, "")
@@ -216,15 +226,14 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource
216226

217227
def logFile = project.file("${buildDir}/elasticsearch-build.log")
218228
doFirst {
219-
def funneler = new OutputStreamFunneler(new LazyFileOutputStream(logFile))
229+
def funneler = outputStreamFunneler(logFile)
220230
standardOutput = funneler.funnelInstance
221231
errorOutput = funneler.funnelInstance
222232
}
223233

224-
def esSource = "${buildDir}/elasticsearch-source/"
234+
def esSource = "${unzipDownloadedElasticsearchSourceZip.outputs.files.singleFile}"
225235
def esBuildDir = "${esSource}/build"
226236

227-
inputs.dir esSource
228237
outputs.dir esBuildDir
229238

230239
ext.buildRoot = esBuildDir
@@ -238,7 +247,7 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource
238247
ext.module = { moduleName -> localDistroResult.map { "${it}/modules/${moduleName}"} }
239248

240249
workingDir esSource
241-
commandLine "./gradlew", "localDistro"
250+
commandLine "./gradlew", "--stacktrace", "localDistro"
242251

243252
ignoreExitValue true // handled in doLast
244253
doLast {
@@ -260,20 +269,22 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource
260269
task buildElasticsearchLogstashBridge(type: Exec) {
261270
description "builds logstash-bridge lib module"
262271

263-
dependsOn buildElasticsearchLocalDistro
272+
dependsOn unzipDownloadedElasticsearchSourceZip
273+
dependsOn buildElasticsearchLocalDistro // mustRunAfter?
264274

265275
def logFile = project.file("${buildDir}/logstash-bridge-build.log")
266276
doFirst {
267-
def funneler = new OutputStreamFunneler(new LazyFileOutputStream(logFile))
277+
def funneler = outputStreamFunneler(logFile)
268278
standardOutput = funneler.funnelInstance
269279
errorOutput = funneler.funnelInstance
270280
}
271281

272-
def esSource = "${buildDir}/elasticsearch-source/"
282+
def esSource = "${unzipDownloadedElasticsearchSourceZip.outputs.files.singleFile}"
273283
def esBuildDir = "${esSource}/build"
274284

275-
inputs.dir esSource
276-
outputs.dir "${esBuildDir}/libs/logstash-bridge"
285+
inputs.dir "${esSource}/libs/logstash-bridge"
286+
287+
outputs.dir("${esSource}/libs/logstash-bridge/build/distributions")
277288

278289
ext.buildRoot = esBuildDir
279290
workingDir esSource
@@ -295,6 +306,28 @@ task buildElasticsearchLogstashBridge(type: Exec) {
295306
}
296307
}
297308

309+
def ingestGeoipPluginShadeNamespace = "org.elasticsearch.ingest.geoip.shaded"
310+
311+
/**
312+
* The StableBridge exposes GeoIP plugin internals, so it needs to relocate references to
313+
* its bundled dependencies to match the shaded locations in our import of that plugin.
314+
*/
315+
task shadeElasticsearchStableBridge(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
316+
description "Shades Maxmind dependencies"
317+
318+
dependsOn buildElasticsearchLogstashBridge
319+
320+
from(buildElasticsearchLogstashBridge)
321+
322+
archiveFileName = "logstash-stable-bridge-shaded.jar"
323+
destinationDirectory = file("${buildDir}/shaded")
324+
325+
relocate('com.fasterxml.jackson', "${ingestGeoipPluginShadeNamespace}.com.fasterxml.jackson")
326+
relocate('com.maxmind', "${ingestGeoipPluginShadeNamespace}.com.maxmind")
327+
328+
mergeServiceFiles()
329+
}
330+
298331
task shadeElasticsearchIngestGeoIpModule(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
299332
description "Shades embedded dependencies of the Elasticsearch Ingest GeoIP module"
300333

@@ -305,11 +338,16 @@ task shadeElasticsearchIngestGeoIpModule(type: com.github.jengelman.gradle.plugi
305338
archiveFileName = 'ingest-geoip-shaded.jar'
306339
destinationDirectory = file("${buildDir}/shaded")
307340

341+
relocate('com.fasterxml.jackson', "${ingestGeoipPluginShadeNamespace}.com.fasterxml.jackson")
342+
relocate('com.maxmind', "${ingestGeoipPluginShadeNamespace}.com.maxmind")
343+
308344
mergeServiceFiles()
309345

310346
exclude '**/module-info.class'
311347
}
312348

349+
def ingestGrokPluginShadeNamespace = "org.elasticsearch.grok.shaded"
350+
313351
task shadeElasticsearchGrokImplementation(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
314352
description "Shades embedded dependencies of the Elasticsearch Grok implementation"
315353

@@ -325,13 +363,16 @@ task shadeElasticsearchGrokImplementation(type: com.github.jengelman.gradle.plug
325363
destinationDirectory = file("${buildDir}/shaded")
326364

327365
mergeServiceFiles()
328-
String shadeNamespace = "org.elasticsearch.grok.shaded"
329-
relocate('org.joni', "${shadeNamespace}.org.joni")
330-
relocate('org.jcodings', "${shadeNamespace}.org.jcodings")
366+
relocate('org.joni', "${ingestGrokPluginShadeNamespace}.org.joni")
367+
relocate('org.jcodings', "${ingestGrokPluginShadeNamespace}.org.jcodings")
331368

332369
exclude '**/module-info.class'
333370
}
334371

372+
/**
373+
* The x-pack redact plugin reaches into the grok plugin's implementation, so
374+
* they both need to point to the same relocated shaded components.
375+
*/
335376
task shadeElasticsearchRedactPlugin(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
336377
description "Shades Elasticsearch Redact plugin to reference Grok's shaded dependencies"
337378
dependsOn buildElasticsearchLocalDistro
@@ -343,24 +384,8 @@ task shadeElasticsearchRedactPlugin(type: com.github.jengelman.gradle.plugins.sh
343384
destinationDirectory = file("${buildDir}/shaded")
344385

345386
// relocate elasticsearch-grok's dependencies to match
346-
String shadeNamespace = "org.elasticsearch.grok.shaded"
347-
relocate('org.joni', "${shadeNamespace}.org.joni")
348-
relocate('org.jcodings', "${shadeNamespace}.org.jcodings")
349-
350-
exclude '**/module-info.class'
351-
}
352-
353-
task shadeElasticsearchLogstashBridge(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
354-
description "Shades the Elasticsearch logstash-bridge jar"
355-
356-
dependsOn buildElasticsearchLogstashBridge
357-
358-
from("${buildDir}/elasticsearch-source/libs/logstash-bridge/build/distributions") {
359-
include "elasticsearch-logstash-bridge-*.jar"
360-
}
361-
362-
archiveFileName = "elasticsearch-logstash-bridge-shaded.jar"
363-
destinationDirectory = file("${buildDir}/shaded")
387+
relocate('org.joni', "${ingestGrokPluginShadeNamespace}.org.joni")
388+
relocate('org.jcodings', "${ingestGrokPluginShadeNamespace}.org.jcodings")
364389

365390
exclude '**/module-info.class'
366391
}
@@ -369,11 +394,10 @@ task importMinimalElasticsearch() {
369394
description "Imports minimal portions of Elasticsearch localDistro"
370395

371396
dependsOn buildElasticsearchLocalDistro
372-
dependsOn buildElasticsearchLogstashBridge
397+
dependsOn shadeElasticsearchStableBridge
373398
dependsOn shadeElasticsearchIngestGeoIpModule
374399
dependsOn shadeElasticsearchGrokImplementation
375400
dependsOn shadeElasticsearchRedactPlugin
376-
dependsOn shadeElasticsearchLogstashBridge
377401

378402
ext.jars = "${buildDir}/elasticsearch-minimal-jars"
379403

@@ -392,7 +416,7 @@ task importMinimalElasticsearch() {
392416
include jarPackageNamed("lucene-core")
393417
include jarPackageNamed("lucene-analysis-common")
394418
}
395-
from(shadeElasticsearchLogstashBridge)
419+
from(shadeElasticsearchStableBridge.outputs.files.singleFile)
396420
from(shadeElasticsearchGrokImplementation)
397421
from(buildElasticsearchLocalDistro.module("x-pack-core"))
398422

gradle.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
LOGSTASH_PATH=../../logstash
2-
ELASTICSEARCH_REPO=mashhurs/elasticsearch
3-
ELASTICSEARCH_TREEISH=logstash-bridge-geoip-interfaces
2+
ELASTICSEARCH_REPO=yaauie/elasticsearch
3+
ELASTICSEARCH_TREEISH=rye-bridge-refinement-moar

lib/logstash/filters/elastic_integration.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,11 +368,11 @@ def _elasticsearch_rest_client(config, &builder_interceptor)
368368

369369
def initialize_event_processor!
370370
java_import('co.elastic.logstash.filters.elasticintegration.EventProcessorBuilder')
371-
java_import('co.elastic.logstash.filters.elasticintegration.geoip.GeoIpProcessorFactory')
371+
java_import('org.elasticsearch.logstashbridge.geoip.GeoIpProcessorFactoryBridge')
372372

373373
@event_processor = EventProcessorBuilder.fromElasticsearch(@elasticsearch_rest_client, extract_immutable_config)
374374
.setFilterMatchListener(method(:filter_matched_java).to_proc)
375-
.addProcessor("geoip") { GeoIpProcessorFactory.new(@geoip_database_provider) }
375+
.addProcessor("geoip") { GeoIpProcessorFactoryBridge::create(@geoip_database_provider) }
376376
.build(@plugin_context)
377377
rescue => exception
378378
raise_config_error!("configuration did not produce an EventProcessor: #{exception}")

settings.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
rootProject.name = 'logstash-filter-elastic_integration'
1+
rootProject.name = 'logstash-filter-elastic_integration'

src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessor.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import com.google.common.collect.Maps;
1414
import org.apache.logging.log4j.LogManager;
1515
import org.apache.logging.log4j.Logger;
16-
import org.elasticsearch.logstashbridge.core.FailProcessorExceptionBridge;
1716
import org.elasticsearch.logstashbridge.core.IOUtilsBridge;
1817
import org.elasticsearch.logstashbridge.core.RefCountingRunnableBridge;
1918
import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge;
@@ -93,7 +92,7 @@ public Collection<Event> processEvents(final Collection<Event> incomingEvents) t
9392
final CountDownLatch latch = new CountDownLatch(1);
9493
final IntegrationBatch batch = new IntegrationBatch(incomingEvents);
9594

96-
RefCountingRunnableBridge ref = new RefCountingRunnableBridge(latch::countDown);
95+
RefCountingRunnableBridge ref = RefCountingRunnableBridge.create(latch::countDown);
9796
try {
9897
batch.eachRequest(ref::acquire, this::processRequest);
9998
} finally {
@@ -179,12 +178,11 @@ private void executePipeline(final IngestDocumentBridge ingestDocument, final In
179178
// If no exception, then the original event is to be _replaced_ by the result
180179
if (Objects.nonNull(ingestPipelineException)) {
181180
// If we had an exception in the IngestPipeline, tag and emit the original Event
182-
final Throwable unwrappedException = unwrapException(ingestPipelineException);
183-
LOGGER.warn(() -> String.format("ingest pipeline `%s` failed", pipelineName), unwrappedException);
181+
LOGGER.warn(() -> String.format("ingest pipeline `%s` failed", pipelineName), ingestPipelineException);
184182
request.complete(incomingEvent -> {
185183
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of(
186-
"message", unwrappedException.getMessage(),
187-
"exception", unwrappedException.getClass().getName()
184+
"message", ingestPipelineException.getMessage(),
185+
"exception", ingestPipelineException.getClass().getName()
188186
));
189187
});
190188
} else if (Objects.isNull(resultIngestDocument)) {
@@ -256,13 +254,6 @@ static private void annotateIngestPipelineFailure(final Event event, final Strin
256254
});
257255
}
258256

259-
static private Throwable unwrapException(final Exception exception) {
260-
if (FailProcessorExceptionBridge.isInstanceOf(exception.getCause())) {
261-
return exception.getCause();
262-
}
263-
return exception;
264-
}
265-
266257
static private String diff(final Event original, final Event changed) {
267258
if (LOGGER.isTraceEnabled()) {
268259
// dot notation less than ideal for LS-internal, but better than re-writing it ourselves.

src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
import org.elasticsearch.logstashbridge.common.SettingsBridge;
2323
import org.elasticsearch.logstashbridge.core.IOUtilsBridge;
2424
import org.elasticsearch.logstashbridge.env.EnvironmentBridge;
25-
import org.elasticsearch.logstashbridge.ingest.ProcessorBridge;
25+
import org.elasticsearch.logstashbridge.ingest.ProcessorFactoryBridge;
26+
import org.elasticsearch.logstashbridge.ingest.ProcessorParametersBridge;
2627
import org.elasticsearch.logstashbridge.plugins.IngestCommonPluginBridge;
2728
import org.elasticsearch.logstashbridge.plugins.IngestPluginBridge;
2829
import org.elasticsearch.logstashbridge.plugins.IngestUserAgentPluginBridge;
@@ -196,7 +197,7 @@ private synchronized EventProcessorBuilder setFilterMatchListener(final FilterMa
196197
return this;
197198
}
198199

199-
public EventProcessorBuilder addProcessor(final String type, final Supplier<ProcessorBridge.Factory> processorFactorySupplier) {
200+
public EventProcessorBuilder addProcessor(final String type, final Supplier<ProcessorFactoryBridge> processorFactorySupplier) {
200201
return this.addProcessorsFromPlugin(SingleProcessorIngestPlugin.of(type, processorFactorySupplier));
201202
}
202203

@@ -226,22 +227,22 @@ public synchronized EventProcessor build(final PluginContext pluginContext) {
226227
try {
227228
final ArrayList<Service> services = new ArrayList<>();
228229

229-
final ThreadPoolBridge threadPool = new ThreadPoolBridge(settings);
230-
resourcesToClose.add(() -> ThreadPoolBridge.terminate(threadPool, 10, TimeUnit.SECONDS));
230+
final ThreadPoolBridge threadPool = ThreadPoolBridge.create(settings);
231+
resourcesToClose.add(() -> threadPool.terminate(10, TimeUnit.SECONDS));
231232

232-
final ScriptServiceBridge scriptService = new ScriptServiceBridge(settings, threadPool::absoluteTimeInMillis);
233+
final ScriptServiceBridge scriptService = ScriptServiceBridge.create(settings, threadPool::absoluteTimeInMillis);
233234
resourcesToClose.add(scriptService);
234235

235-
final EnvironmentBridge env = new EnvironmentBridge(settings, null);
236-
final ProcessorBridge.Parameters processorParameters = new ProcessorBridge.Parameters(env, scriptService, threadPool);
236+
final EnvironmentBridge env = EnvironmentBridge.create(settings, null);
237+
final ProcessorParametersBridge processorParameters = ProcessorParametersBridge.create(env, scriptService, threadPool);
237238

238239
IngestPipelineFactory ingestPipelineFactory = new IngestPipelineFactory(scriptService);
239240
for (Supplier<IngestPluginBridge> ingestPluginSupplier : ingestPlugins) {
240241
final IngestPluginBridge ingestPlugin = ingestPluginSupplier.get();
241242
if (ingestPlugin instanceof Closeable closeableIngestPlugin) {
242243
resourcesToClose.add(closeableIngestPlugin);
243244
}
244-
final Map<String, ProcessorBridge.Factory> processorFactories = ingestPlugin.getProcessors(processorParameters);
245+
final Map<String, ProcessorFactoryBridge> processorFactories = ingestPlugin.getProcessors(processorParameters);
245246
ingestPipelineFactory = ingestPipelineFactory.withProcessors(processorFactories);
246247
}
247248

src/main/java/co/elastic/logstash/filters/elasticintegration/IngestDuplexMarshaller.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public IngestDocumentBridge toIngestDocument(final Event event) {
103103
final Timestamp eventTimestamp = safeTimestampFrom(event.getField(org.logstash.Event.TIMESTAMP));
104104
Map<String, Object> ingestMetadata = Map.of(INGEST_METADATA_TIMESTAMP_FIELD, Objects.requireNonNullElseGet(eventTimestamp, Timestamp::now).toString());
105105

106-
return new IngestDocumentBridge(sourceAndMetadata, ingestMetadata);
106+
return IngestDocumentBridge.create(sourceAndMetadata, ingestMetadata);
107107
}
108108

109109
/**

src/main/java/co/elastic/logstash/filters/elasticintegration/IngestPipelineFactory.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.logstashbridge.ingest.PipelineBridge;
1313
import org.elasticsearch.logstashbridge.ingest.PipelineConfigurationBridge;
1414
import org.elasticsearch.logstashbridge.ingest.ProcessorBridge;
15+
import org.elasticsearch.logstashbridge.ingest.ProcessorFactoryBridge;
1516
import org.elasticsearch.logstashbridge.script.ScriptServiceBridge;
1617

1718
import java.util.HashMap;
@@ -24,7 +25,7 @@
2425
*/
2526
public class IngestPipelineFactory {
2627
private final ScriptServiceBridge scriptService;
27-
private final Map<String, ProcessorBridge.Factory> processorFactories;
28+
private final Map<String, ProcessorFactoryBridge> processorFactories;
2829

2930
private static final Logger LOGGER = LogManager.getLogger(IngestPipelineFactory.class);
3031

@@ -33,13 +34,13 @@ public IngestPipelineFactory(final ScriptServiceBridge scriptService) {
3334
}
3435

3536
private IngestPipelineFactory(final ScriptServiceBridge scriptService,
36-
final Map<String, ProcessorBridge.Factory> processorFactories) {
37+
final Map<String, ProcessorFactoryBridge> processorFactories) {
3738
this.scriptService = scriptService;
3839
this.processorFactories = Map.copyOf(processorFactories);
3940
}
4041

41-
public IngestPipelineFactory withProcessors(final Map<String, ProcessorBridge.Factory> processorFactories) {
42-
final Map<String, ProcessorBridge.Factory> intermediate = new HashMap<>(this.processorFactories);
42+
public IngestPipelineFactory withProcessors(final Map<String, ProcessorFactoryBridge> processorFactories) {
43+
final Map<String, ProcessorFactoryBridge> intermediate = new HashMap<>(this.processorFactories);
4344
intermediate.putAll(processorFactories);
4445
return new IngestPipelineFactory(scriptService, intermediate);
4546
}
@@ -63,7 +64,7 @@ public Optional<IngestPipeline> create(final PipelineConfigurationBridge pipelin
6364
* resolve pipelines through the provided {@link IngestPipelineResolver}.
6465
*/
6566
public IngestPipelineFactory withIngestPipelineResolver(final IngestPipelineResolver ingestPipelineResolver) {
66-
final Map<String, ProcessorBridge.Factory> modifiedProcessorFactories = new HashMap<>(this.processorFactories);
67+
final Map<String, ProcessorFactoryBridge> modifiedProcessorFactories = new HashMap<>(this.processorFactories);
6768
modifiedProcessorFactories.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(ingestPipelineResolver, this.scriptService));
6869
return new IngestPipelineFactory(scriptService, modifiedProcessorFactories);
6970
}

src/main/java/co/elastic/logstash/filters/elasticintegration/PipelineConfigurationFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public PipelineConfigurationBridge parseNamedObject(final String json) throws Ex
4646
}
4747

4848
public PipelineConfigurationBridge parseConfigOnly(final String pipelineId, final String jsonEncodedConfig) {
49-
return new PipelineConfigurationBridge(pipelineId, jsonEncodedConfig);
49+
return PipelineConfigurationBridge.create(pipelineId, jsonEncodedConfig);
5050
}
5151

5252

@@ -66,7 +66,7 @@ public List<PipelineConfigurationBridge> get(){
6666
}
6767

6868
private static PipelineConfigurationBridge init(final String id, final String json) {
69-
return new PipelineConfigurationBridge(id, json);
69+
return PipelineConfigurationBridge.create(id, json);
7070
}
7171
}
7272
}

0 commit comments

Comments
 (0)