Skip to content

Move to ES bridge API #336

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,6 @@ task shadeElasticsearchIngestGeoIpModule(type: com.github.jengelman.gradle.plugi

mergeServiceFiles()

String shadeNamespace = "org.elasticsearch.ingest.geoip.shaded"
relocate('com.fasterxml.jackson', "${shadeNamespace}.com.fasterxml.jackson")
relocate('com.maxmind', "${shadeNamespace}.com.maxmind")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We get "No X class found" exceptions if we relocate them and I am not seeing org.elasticsearch.ingest.geoip.shaded.com.fasterxml.jackson.* usages.


exclude '**/module-info.class'
}

Expand Down Expand Up @@ -358,14 +354,14 @@ task shadeElasticsearchLogstashBridge(type: com.github.jengelman.gradle.plugins.
description "Shades the Elasticsearch logstash-bridge jar"

dependsOn buildElasticsearchLogstashBridge

from("${buildDir}/elasticsearch-source/libs/logstash-bridge/build/distributions") {
include "elasticsearch-logstash-bridge-*.jar"
}

archiveFileName = "elasticsearch-logstash-bridge-shaded.jar"
destinationDirectory = file("${buildDir}/shaded")

exclude '**/module-info.class'
}

Expand Down
3 changes: 2 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
LOGSTASH_PATH=../../logstash
ELASTICSEARCH_TREEISH=main
ELASTICSEARCH_REPO=mashhurs/elasticsearch
ELASTICSEARCH_TREEISH=logstash-bridge-geoip-interfaces
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: this will be removed once upstream PR is merged.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.logstashbridge.ingest.PipelineConfigurationBridge;

import java.util.Optional;

Expand All @@ -24,7 +24,7 @@
* that retrieves pipelines from Elasticsearch.
*/
public class ElasticsearchPipelineConfigurationResolver
extends AbstractSimpleResolver<String,PipelineConfiguration>
extends AbstractSimpleResolver<String, PipelineConfigurationBridge>
implements PipelineConfigurationResolver {
private final RestClient elasticsearchRestClient;
private final PipelineConfigurationFactory pipelineConfigurationFactory;
Expand All @@ -37,13 +37,13 @@ public ElasticsearchPipelineConfigurationResolver(final RestClient elasticsearch
}

@Override
public Optional<PipelineConfiguration> resolveSafely(String pipelineName) throws Exception {
public Optional<PipelineConfigurationBridge> resolveSafely(String pipelineName) throws Exception {
final Response response;
try {
final Request request = new Request("GET", URLEncodedUtils.formatSegments("_ingest", "pipeline", pipelineName));
response = elasticsearchRestClient.performRequest(request);
final String jsonEncodedPayload = EntityUtils.toString(response.getEntity());
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationFactory.parseNamedObject(jsonEncodedPayload);
final PipelineConfigurationBridge pipelineConfiguration = pipelineConfigurationFactory.parseNamedObject(jsonEncodedPayload);
return Optional.of(pipelineConfiguration);
} catch (ResponseException re) {
if (re.getResponse().getStatusLine().getStatusCode() == 404) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.LogstashInternalBridge;
import org.elasticsearch.ingest.common.FailProcessorException;
import org.elasticsearch.logstashbridge.core.IOUtilsBridge;
import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -32,7 +32,6 @@

import static co.elastic.logstash.filters.elasticintegration.util.EventUtil.eventAsMap;
import static co.elastic.logstash.filters.elasticintegration.util.EventUtil.serializeEventForLog;
import static org.elasticsearch.core.Strings.format;

/**
* An {@link EventProcessor} processes {@link Event}s by:
Expand Down Expand Up @@ -151,7 +150,7 @@ void processRequest(final IntegrationRequest request) {

final IngestPipeline ingestPipeline = loadedPipeline.get();
LOGGER.trace(() -> String.format("Using loaded pipeline `%s` (%s)", pipelineName, System.identityHashCode(ingestPipeline)));
final IngestDocument ingestDocument = eventMarshaller.toIngestDocument(request.event());
final IngestDocumentBridge ingestDocument = eventMarshaller.toIngestDocument(request.event());

resolvedIndexName.ifPresent(indexName -> {
ingestDocument.getMetadata().setIndex(indexName);
Expand All @@ -170,7 +169,7 @@ void processRequest(final IntegrationRequest request) {
}
}

private void executePipeline(final IngestDocument ingestDocument, final IngestPipeline ingestPipeline, final IntegrationRequest request) {
private void executePipeline(final IngestDocumentBridge ingestDocument, final IngestPipeline ingestPipeline, final IntegrationRequest request) {
final String pipelineName = ingestPipeline.getId();
final String originalIndex = ingestDocument.getMetadata().getIndex();
ingestPipeline.execute(ingestDocument, (resultIngestDocument, ingestPipelineException) -> {
Expand All @@ -193,17 +192,17 @@ private void executePipeline(final IngestDocument ingestDocument, final IngestPi
} else {

final String newIndex = resultIngestDocument.getMetadata().getIndex();
if (!Objects.equals(originalIndex, newIndex) && LogstashInternalBridge.isReroute(resultIngestDocument)) {
LogstashInternalBridge.resetReroute(resultIngestDocument);
if (!Objects.equals(originalIndex, newIndex) && ingestDocument.isReroute()) {
ingestDocument.resetReroute();
boolean cycle = !resultIngestDocument.updateIndexHistory(newIndex);
if (cycle) {
request.complete(incomingEvent -> {
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message", format(
"index cycle detected while processing pipeline [%s]: %s + %s",
pipelineName,
resultIngestDocument.getIndexHistory(),
newIndex
)));
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message",
String.format(Locale.ROOT, "index cycle detected while processing pipeline [%s]: %s + %s",
pipelineName,
resultIngestDocument.getIndexHistory(),
newIndex)
));
});
return;
}
Expand All @@ -214,12 +213,14 @@ private void executePipeline(final IngestDocument ingestDocument, final IngestPi
final Optional<IngestPipeline> reroutePipeline = resolve(reroutePipelineName.get(), internalPipelineProvider);
if (reroutePipeline.isEmpty()) {
request.complete(incomingEvent -> {
annotateIngestPipelineFailure(incomingEvent, pipelineName, Map.of("message", format(
"reroute failed to load next pipeline [%s]: %s -> %s",
annotateIngestPipelineFailure(
incomingEvent,
pipelineName,
resultIngestDocument.getIndexHistory(),
reroutePipelineName.get()
)));
Map.of("message",
String.format(Locale.ROOT, "reroute failed to load next pipeline [%s]: %s -> %s",
pipelineName,
resultIngestDocument.getIndexHistory(),
reroutePipelineName.get())));
});
} else {
executePipeline(resultIngestDocument, reroutePipeline.get(), request);
Expand Down Expand Up @@ -277,6 +278,6 @@ static private <T,R> Optional<R> resolve(T resolvable, Resolver<T,R> resolver) {

@Override
public void close() throws IOException {
IOUtils.closeWhileHandlingException(this.resourcesToClose);
IOUtilsBridge.closeWhileHandlingException(this.resourcesToClose);
}
}
Loading