Skip to content

Commit fd524b7

Browse files
mashhursyaauie
andauthored
Move to ES bridge API (elastic#336)
* Replace ES classes with their Bridge Stable API pairs. * Points to personal ES repo which provides Stable API interface updates. Removes required upwrap interface since upstream moved to default method. Utilizes metadata version field since upstream made it accessible. * Plugins and processors moved to the bridge * Apply conceptually modification (internal, external and bridge processor) changes. * Utilize moved processor definition constants in the IngestCommonPluginBridge * Use logstash-bridge GeoIP interfaces. * Apply interface changes in unit tests. * Move RefCountingRunnable, FailProcessorException and Releasable usages to logstash-bridge. * align bridge to yaauie/elasticsearch@94d58d47cd * catch up to upstream@e3ba1cba268 - `new ${BRIDGE}(...)` -> `${BRIDGE}.create(...)`: use bridge-provided factory methods - `${INTERNAL}Bridge.${NESTED}` -> `${INTERNAL}${NESTED}Bridge`: extract nested bridges to top-level - `${BRIDGE}.AbstractExternal` -> `AbstractExternal${BRIDGE}`: extract nested "AbstractExternal" base implementations to top-level * Point to PR base ES upstream repo and branch, minor source simplifications. * Point to ES main after merging upstream ES changes. --------- Co-authored-by: Ry Biesemeyer <[email protected]>
1 parent 6f75475 commit fd524b7

32 files changed

+424
-546
lines changed

build.gradle

Lines changed: 58 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,14 @@ def _requiredLogstashJar(pathPrefix, jarSpec, flavorSpec = null) {
165165
}
166166
}
167167

168+
static OutputStreamFunneler outputStreamFunneler(File logFile) {
169+
logFile.parentFile.mkdirs()
170+
logFile.delete()
171+
logFile.createNewFile()
172+
173+
return new OutputStreamFunneler(new LazyFileOutputStream(logFile))
174+
}
175+
168176
// https://docs.github.com/en/repositories/working-with-files/using-files/downloading-source-code-archives#source-code-archive-urls
169177
String githubArchivePath(repo, treeish="main", archiveFormat="zip") {
170178
def pathFragment = {
@@ -203,8 +211,10 @@ task downloadElasticsearchSourceZip(type: Download) {
203211
task unzipDownloadedElasticsearchSourceZip(dependsOn: downloadElasticsearchSourceZip, type: Copy) {
204212
description "extracts Elasticsearch source from a downloaded zip file"
205213

214+
ext.location = "${buildDir}/elasticsearch-source/"
215+
206216
from zipTree(downloadElasticsearchSourceZip.dest)
207-
into "${buildDir}/elasticsearch-source/"
217+
into ext.location
208218
eachFile {
209219
// strip top-level directory
210220
path = path.replaceFirst(/^.+?\//, "")
@@ -216,15 +226,14 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource
216226

217227
def logFile = project.file("${buildDir}/elasticsearch-build.log")
218228
doFirst {
219-
def funneler = new OutputStreamFunneler(new LazyFileOutputStream(logFile))
229+
def funneler = outputStreamFunneler(logFile)
220230
standardOutput = funneler.funnelInstance
221231
errorOutput = funneler.funnelInstance
222232
}
223233

224-
def esSource = "${buildDir}/elasticsearch-source/"
234+
def esSource = "${unzipDownloadedElasticsearchSourceZip.outputs.files.singleFile}"
225235
def esBuildDir = "${esSource}/build"
226236

227-
inputs.dir esSource
228237
outputs.dir esBuildDir
229238

230239
ext.buildRoot = esBuildDir
@@ -238,7 +247,7 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource
238247
ext.module = { moduleName -> localDistroResult.map { "${it}/modules/${moduleName}"} }
239248

240249
workingDir esSource
241-
commandLine "./gradlew", "localDistro"
250+
commandLine "./gradlew", "--stacktrace", "localDistro"
242251

243252
ignoreExitValue true // handled in doLast
244253
doLast {
@@ -260,20 +269,22 @@ task buildElasticsearchLocalDistro(dependsOn: unzipDownloadedElasticsearchSource
260269
task buildElasticsearchLogstashBridge(type: Exec) {
261270
description "builds logstash-bridge lib module"
262271

263-
dependsOn buildElasticsearchLocalDistro
272+
dependsOn unzipDownloadedElasticsearchSourceZip
273+
dependsOn buildElasticsearchLocalDistro // mustRunAfter?
264274

265275
def logFile = project.file("${buildDir}/logstash-bridge-build.log")
266276
doFirst {
267-
def funneler = new OutputStreamFunneler(new LazyFileOutputStream(logFile))
277+
def funneler = outputStreamFunneler(logFile)
268278
standardOutput = funneler.funnelInstance
269279
errorOutput = funneler.funnelInstance
270280
}
271281

272-
def esSource = "${buildDir}/elasticsearch-source/"
282+
def esSource = "${unzipDownloadedElasticsearchSourceZip.outputs.files.singleFile}"
273283
def esBuildDir = "${esSource}/build"
274284

275-
inputs.dir esSource
276-
outputs.dir "${esBuildDir}/libs/logstash-bridge"
285+
inputs.dir "${esSource}/libs/logstash-bridge"
286+
287+
outputs.dir("${esSource}/libs/logstash-bridge/build/distributions")
277288

278289
ext.buildRoot = esBuildDir
279290
workingDir esSource
@@ -295,6 +306,28 @@ task buildElasticsearchLogstashBridge(type: Exec) {
295306
}
296307
}
297308

309+
def ingestGeoipPluginShadeNamespace = "org.elasticsearch.ingest.geoip.shaded"
310+
311+
/**
312+
* The StableBridge exposes GeoIP plugin internals, so it needs to relocate references to
313+
* its bundled dependencies to match the shaded locations in our import of that plugin.
314+
*/
315+
task shadeElasticsearchStableBridge(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
316+
description "Shades Maxmind dependencies"
317+
318+
dependsOn buildElasticsearchLogstashBridge
319+
320+
from(buildElasticsearchLogstashBridge)
321+
322+
archiveFileName = "logstash-stable-bridge-shaded.jar"
323+
destinationDirectory = file("${buildDir}/shaded")
324+
325+
relocate('com.fasterxml.jackson', "${ingestGeoipPluginShadeNamespace}.com.fasterxml.jackson")
326+
relocate('com.maxmind', "${ingestGeoipPluginShadeNamespace}.com.maxmind")
327+
328+
mergeServiceFiles()
329+
}
330+
298331
task shadeElasticsearchIngestGeoIpModule(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
299332
description "Shades embedded dependencies of the Elasticsearch Ingest GeoIP module"
300333

@@ -305,15 +338,16 @@ task shadeElasticsearchIngestGeoIpModule(type: com.github.jengelman.gradle.plugi
305338
archiveFileName = 'ingest-geoip-shaded.jar'
306339
destinationDirectory = file("${buildDir}/shaded")
307340

308-
mergeServiceFiles()
341+
relocate('com.fasterxml.jackson', "${ingestGeoipPluginShadeNamespace}.com.fasterxml.jackson")
342+
relocate('com.maxmind', "${ingestGeoipPluginShadeNamespace}.com.maxmind")
309343

310-
String shadeNamespace = "org.elasticsearch.ingest.geoip.shaded"
311-
relocate('com.fasterxml.jackson', "${shadeNamespace}.com.fasterxml.jackson")
312-
relocate('com.maxmind', "${shadeNamespace}.com.maxmind")
344+
mergeServiceFiles()
313345

314346
exclude '**/module-info.class'
315347
}
316348

349+
def ingestGrokPluginShadeNamespace = "org.elasticsearch.grok.shaded"
350+
317351
task shadeElasticsearchGrokImplementation(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
318352
description "Shades embedded dependencies of the Elasticsearch Grok implementation"
319353

@@ -329,13 +363,16 @@ task shadeElasticsearchGrokImplementation(type: com.github.jengelman.gradle.plug
329363
destinationDirectory = file("${buildDir}/shaded")
330364

331365
mergeServiceFiles()
332-
String shadeNamespace = "org.elasticsearch.grok.shaded"
333-
relocate('org.joni', "${shadeNamespace}.org.joni")
334-
relocate('org.jcodings', "${shadeNamespace}.org.jcodings")
366+
relocate('org.joni', "${ingestGrokPluginShadeNamespace}.org.joni")
367+
relocate('org.jcodings', "${ingestGrokPluginShadeNamespace}.org.jcodings")
335368

336369
exclude '**/module-info.class'
337370
}
338371

372+
/**
373+
* The x-pack redact plugin reaches into the grok plugin's implementation, so
374+
* they both need to point to the same relocated shaded components.
375+
*/
339376
task shadeElasticsearchRedactPlugin(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
340377
description "Shades Elasticsearch Redact plugin to reference Grok's shaded dependencies"
341378
dependsOn buildElasticsearchLocalDistro
@@ -347,37 +384,20 @@ task shadeElasticsearchRedactPlugin(type: com.github.jengelman.gradle.plugins.sh
347384
destinationDirectory = file("${buildDir}/shaded")
348385

349386
// relocate elasticsearch-grok's dependencies to match
350-
String shadeNamespace = "org.elasticsearch.grok.shaded"
351-
relocate('org.joni', "${shadeNamespace}.org.joni")
352-
relocate('org.jcodings', "${shadeNamespace}.org.jcodings")
387+
relocate('org.joni', "${ingestGrokPluginShadeNamespace}.org.joni")
388+
relocate('org.jcodings', "${ingestGrokPluginShadeNamespace}.org.jcodings")
353389

354390
exclude '**/module-info.class'
355391
}
356392

357-
task shadeElasticsearchLogstashBridge(type: com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar) {
358-
description "Shades the Elasticsearch logstash-bridge jar"
359-
360-
dependsOn buildElasticsearchLogstashBridge
361-
362-
from("${buildDir}/elasticsearch-source/libs/logstash-bridge/build/distributions") {
363-
include "elasticsearch-logstash-bridge-*.jar"
364-
}
365-
366-
archiveFileName = "elasticsearch-logstash-bridge-shaded.jar"
367-
destinationDirectory = file("${buildDir}/shaded")
368-
369-
exclude '**/module-info.class'
370-
}
371-
372393
task importMinimalElasticsearch() {
373394
description "Imports minimal portions of Elasticsearch localDistro"
374395

375396
dependsOn buildElasticsearchLocalDistro
376-
dependsOn buildElasticsearchLogstashBridge
397+
dependsOn shadeElasticsearchStableBridge
377398
dependsOn shadeElasticsearchIngestGeoIpModule
378399
dependsOn shadeElasticsearchGrokImplementation
379400
dependsOn shadeElasticsearchRedactPlugin
380-
dependsOn shadeElasticsearchLogstashBridge
381401

382402
ext.jars = "${buildDir}/elasticsearch-minimal-jars"
383403

@@ -396,7 +416,7 @@ task importMinimalElasticsearch() {
396416
include jarPackageNamed("lucene-core")
397417
include jarPackageNamed("lucene-analysis-common")
398418
}
399-
from(shadeElasticsearchLogstashBridge)
419+
from(shadeElasticsearchStableBridge.outputs.files.singleFile)
400420
from(shadeElasticsearchGrokImplementation)
401421
from(buildElasticsearchLocalDistro.module("x-pack-core"))
402422

lib/logstash/filters/elastic_integration.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,11 +368,11 @@ def _elasticsearch_rest_client(config, &builder_interceptor)
368368

369369
def initialize_event_processor!
370370
java_import('co.elastic.logstash.filters.elasticintegration.EventProcessorBuilder')
371-
java_import('co.elastic.logstash.filters.elasticintegration.geoip.GeoIpProcessorFactory')
371+
java_import('org.elasticsearch.logstashbridge.geoip.GeoIpProcessorFactoryBridge')
372372

373373
@event_processor = EventProcessorBuilder.fromElasticsearch(@elasticsearch_rest_client, extract_immutable_config)
374374
.setFilterMatchListener(method(:filter_matched_java).to_proc)
375-
.addProcessor("geoip") { GeoIpProcessorFactory.new(@geoip_database_provider) }
375+
.addProcessor("geoip") { GeoIpProcessorFactoryBridge::create(@geoip_database_provider) }
376376
.build(@plugin_context)
377377
rescue => exception
378378
raise_config_error!("configuration did not produce an EventProcessor: #{exception}")

settings.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
rootProject.name = 'logstash-filter-elastic_integration'
1+
rootProject.name = 'logstash-filter-elastic_integration'

src/main/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchPipelineConfigurationResolver.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import org.elasticsearch.client.Response;
1616
import org.elasticsearch.client.ResponseException;
1717
import org.elasticsearch.client.RestClient;
18-
import org.elasticsearch.ingest.PipelineConfiguration;
18+
import org.elasticsearch.logstashbridge.ingest.PipelineConfigurationBridge;
1919

2020
import java.util.Optional;
2121

@@ -24,7 +24,7 @@
2424
* that retrieves pipelines from Elasticsearch.
2525
*/
2626
public class ElasticsearchPipelineConfigurationResolver
27-
extends AbstractSimpleResolver<String,PipelineConfiguration>
27+
extends AbstractSimpleResolver<String, PipelineConfigurationBridge>
2828
implements PipelineConfigurationResolver {
2929
private final RestClient elasticsearchRestClient;
3030
private final PipelineConfigurationFactory pipelineConfigurationFactory;
@@ -37,13 +37,13 @@ public ElasticsearchPipelineConfigurationResolver(final RestClient elasticsearch
3737
}
3838

3939
@Override
40-
public Optional<PipelineConfiguration> resolveSafely(String pipelineName) throws Exception {
40+
public Optional<PipelineConfigurationBridge> resolveSafely(String pipelineName) throws Exception {
4141
final Response response;
4242
try {
4343
final Request request = new Request("GET", URLEncodedUtils.formatSegments("_ingest", "pipeline", pipelineName));
4444
response = elasticsearchRestClient.performRequest(request);
4545
final String jsonEncodedPayload = EntityUtils.toString(response.getEntity());
46-
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationFactory.parseNamedObject(jsonEncodedPayload);
46+
final PipelineConfigurationBridge pipelineConfiguration = pipelineConfigurationFactory.parseNamedObject(jsonEncodedPayload);
4747
return Optional.of(pipelineConfiguration);
4848
} catch (ResponseException re) {
4949
if (re.getResponse().getStatusLine().getStatusCode() == 404) {

src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessor.java

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,15 @@
1313
import com.google.common.collect.Maps;
1414
import org.apache.logging.log4j.LogManager;
1515
import org.apache.logging.log4j.Logger;
16-
import org.elasticsearch.action.support.RefCountingRunnable;
17-
import org.elasticsearch.core.IOUtils;
18-
import org.elasticsearch.ingest.IngestDocument;
19-
import org.elasticsearch.ingest.LogstashInternalBridge;
20-
import org.elasticsearch.ingest.common.FailProcessorException;
16+
import org.elasticsearch.logstashbridge.core.IOUtilsBridge;
17+
import org.elasticsearch.logstashbridge.core.RefCountingRunnableBridge;
18+
import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge;
2119

2220
import java.io.Closeable;
2321
import java.io.IOException;
2422
import java.util.Collection;
2523
import java.util.List;
24+
import java.util.Locale;
2625
import java.util.Map;
2726
import java.util.Objects;
2827
import java.util.Optional;
@@ -32,7 +31,6 @@
3231

3332
import static co.elastic.logstash.filters.elasticintegration.util.EventUtil.eventAsMap;
3433
import static co.elastic.logstash.filters.elasticintegration.util.EventUtil.serializeEventForLog;
35-
import static org.elasticsearch.core.Strings.format;
3634

3735
/**
3836
* An {@link EventProcessor} processes {@link Event}s by:
@@ -94,7 +92,7 @@ public Collection<Event> processEvents(final Collection<Event> incomingEvents) t
9492
final CountDownLatch latch = new CountDownLatch(1);
9593
final IntegrationBatch batch = new IntegrationBatch(incomingEvents);
9694

97-
try (RefCountingRunnable ref = new RefCountingRunnable(latch::countDown)) {
95+
try (RefCountingRunnableBridge ref = RefCountingRunnableBridge.create(latch::countDown)) {
9896
batch.eachRequest(ref::acquire, this::processRequest);
9997
}
10098

@@ -151,7 +149,7 @@ void processRequest(final IntegrationRequest request) {
151149

152150
final IngestPipeline ingestPipeline = loadedPipeline.get();
153151
LOGGER.trace(() -> String.format("Using loaded pipeline `%s` (%s)", pipelineName, System.identityHashCode(ingestPipeline)));
154-
final IngestDocument ingestDocument = eventMarshaller.toIngestDocument(request.event());
152+
final IngestDocumentBridge ingestDocument = eventMarshaller.toIngestDocument(request.event());
155153

156154
resolvedIndexName.ifPresent(indexName -> {
157155
ingestDocument.getMetadata().setIndex(indexName);
@@ -170,19 +168,18 @@ void processRequest(final IntegrationRequest request) {
170168
}
171169
}
172170

173-
private void executePipeline(final IngestDocument ingestDocument, final IngestPipeline ingestPipeline, final IntegrationRequest request) {
171+
private void executePipeline(final IngestDocumentBridge ingestDocument, final IngestPipeline ingestPipeline, final IntegrationRequest request) {
174172
final String pipelineName = ingestPipeline.getId();
175173
final String originalIndex = ingestDocument.getMetadata().getIndex();
176174
ingestPipeline.execute(ingestDocument, (resultIngestDocument, ingestPipelineException) -> {
177175
// If no exception, then the original event is to be _replaced_ by the result
178176
if (Objects.nonNull(ingestPipelineException)) {
179177
// If we had an exception in the IngestPipeline, tag and emit the original Event
180-
final Throwable unwrappedException = unwrapException(ingestPipelineException);
181-
LOGGER.warn(() -> String.format("ingest pipeline `%s` failed", pipelineName), unwrappedException);
178+
LOGGER.warn(() -> String.format("ingest pipeline `%s` failed", pipelineName), ingestPipelineException);
182179
request.complete(incomingEvent -> {
183180
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of(
184-
"message", unwrappedException.getMessage(),
185-
"exception", unwrappedException.getClass().getName()
181+
"message", ingestPipelineException.getMessage(),
182+
"exception", ingestPipelineException.getClass().getName()
186183
));
187184
});
188185
} else if (Objects.isNull(resultIngestDocument)) {
@@ -193,17 +190,17 @@ private void executePipeline(final IngestDocument ingestDocument, final IngestPi
193190
} else {
194191

195192
final String newIndex = resultIngestDocument.getMetadata().getIndex();
196-
if (!Objects.equals(originalIndex, newIndex) && LogstashInternalBridge.isReroute(resultIngestDocument)) {
197-
LogstashInternalBridge.resetReroute(resultIngestDocument);
193+
if (!Objects.equals(originalIndex, newIndex) && ingestDocument.isReroute()) {
194+
ingestDocument.resetReroute();
198195
boolean cycle = !resultIngestDocument.updateIndexHistory(newIndex);
199196
if (cycle) {
200197
request.complete(incomingEvent -> {
201-
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message", format(
202-
"index cycle detected while processing pipeline [%s]: %s + %s",
203-
pipelineName,
204-
resultIngestDocument.getIndexHistory(),
205-
newIndex
206-
)));
198+
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message",
199+
String.format(Locale.ROOT, "index cycle detected while processing pipeline [%s]: %s + %s",
200+
pipelineName,
201+
resultIngestDocument.getIndexHistory(),
202+
newIndex)
203+
));
207204
});
208205
return;
209206
}
@@ -214,12 +211,14 @@ private void executePipeline(final IngestDocument ingestDocument, final IngestPi
214211
final Optional<IngestPipeline> reroutePipeline = resolve(reroutePipelineName.get(), internalPipelineProvider);
215212
if (reroutePipeline.isEmpty()) {
216213
request.complete(incomingEvent -> {
217-
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message", format(
218-
"reroute failed to load next pipeline [%s]: %s -> %s",
214+
annotateIngestPipelineFailure(
215+
incomingEvent,
219216
pipelineName,
220-
resultIngestDocument.getIndexHistory(),
221-
reroutePipelineName.get()
222-
)));
217+
Map.of("message",
218+
String.format(Locale.ROOT, "reroute failed to load next pipeline [%s]: %s -> %s",
219+
pipelineName,
220+
resultIngestDocument.getIndexHistory(),
221+
reroutePipelineName.get())));
223222
});
224223
} else {
225224
executePipeline(resultIngestDocument, reroutePipeline.get(), request);
@@ -252,11 +251,6 @@ static private void annotateIngestPipelineFailure(final Event event, final Strin
252251
});
253252
}
254253

255-
static private Throwable unwrapException(final Exception exception) {
256-
if (exception.getCause() instanceof FailProcessorException) { return exception.getCause(); }
257-
return exception;
258-
}
259-
260254
static private String diff(final Event original, final Event changed) {
261255
if (LOGGER.isTraceEnabled()) {
262256
// dot notation less than ideal for LS-internal, but better than re-writing it ourselves.
@@ -277,6 +271,6 @@ static private <T,R> Optional<R> resolve(T resolvable, Resolver<T,R> resolver) {
277271

278272
@Override
279273
public void close() throws IOException {
280-
IOUtils.closeWhileHandlingException(this.resourcesToClose);
274+
IOUtilsBridge.closeWhileHandlingException(this.resourcesToClose);
281275
}
282276
}

0 commit comments

Comments
 (0)