1414import org .apache .logging .log4j .LogManager ;
1515import org .apache .logging .log4j .Logger ;
1616import org .elasticsearch .action .support .RefCountingRunnable ;
17- import org .elasticsearch .core .IOUtils ;
18- import org .elasticsearch .ingest .IngestDocument ;
19- import org .elasticsearch .ingest .LogstashInternalBridge ;
2017import org .elasticsearch .ingest .common .FailProcessorException ;
18+ import org .elasticsearch .logstashbridge .core .IOUtilsBridge ;
19+ import org .elasticsearch .logstashbridge .ingest .IngestDocumentBridge ;
2120
2221import java .io .Closeable ;
2322import java .io .IOException ;
2423import java .util .Collection ;
2524import java .util .List ;
25+ import java .util .Locale ;
2626import java .util .Map ;
2727import java .util .Objects ;
2828import java .util .Optional ;
3232
3333import static co .elastic .logstash .filters .elasticintegration .util .EventUtil .eventAsMap ;
3434import static co .elastic .logstash .filters .elasticintegration .util .EventUtil .serializeEventForLog ;
35- import static org .elasticsearch .core .Strings .format ;
3635
3736/**
3837 * An {@link EventProcessor} processes {@link Event}s by:
@@ -151,7 +150,7 @@ void processRequest(final IntegrationRequest request) {
151150
152151 final IngestPipeline ingestPipeline = loadedPipeline .get ();
153152 LOGGER .trace (() -> String .format ("Using loaded pipeline `%s` (%s)" , pipelineName , System .identityHashCode (ingestPipeline )));
154- final IngestDocument ingestDocument = eventMarshaller .toIngestDocument (request .event ());
153+ final IngestDocumentBridge ingestDocument = eventMarshaller .toIngestDocument (request .event ());
155154
156155 resolvedIndexName .ifPresent (indexName -> {
157156 ingestDocument .getMetadata ().setIndex (indexName );
@@ -170,7 +169,7 @@ void processRequest(final IntegrationRequest request) {
170169 }
171170 }
172171
173- private void executePipeline (final IngestDocument ingestDocument , final IngestPipeline ingestPipeline , final IntegrationRequest request ) {
172+ private void executePipeline (final IngestDocumentBridge ingestDocument , final IngestPipeline ingestPipeline , final IntegrationRequest request ) {
174173 final String pipelineName = ingestPipeline .getId ();
175174 final String originalIndex = ingestDocument .getMetadata ().getIndex ();
176175 ingestPipeline .execute (ingestDocument , (resultIngestDocument , ingestPipelineException ) -> {
@@ -193,17 +192,17 @@ private void executePipeline(final IngestDocument ingestDocument, final IngestPi
193192 } else {
194193
195194 final String newIndex = resultIngestDocument .getMetadata ().getIndex ();
196- if (!Objects .equals (originalIndex , newIndex ) && LogstashInternalBridge .isReroute (resultIngestDocument )) {
197- LogstashInternalBridge .resetReroute (resultIngestDocument );
195+ if (!Objects .equals (originalIndex , newIndex ) && ingestDocument .isReroute ()) {
196+ ingestDocument .resetReroute ();
198197 boolean cycle = !resultIngestDocument .updateIndexHistory (newIndex );
199198 if (cycle ) {
200199 request .complete (incomingEvent -> {
201- annotateIngestPipelineFailure (incomingEvent , pipelineName , Map .of ("message" , format (
202- "index cycle detected while processing pipeline [%s]: %s + %s" ,
203- pipelineName ,
204- resultIngestDocument .getIndexHistory (),
205- newIndex
206- ))) ;
200+ annotateIngestPipelineFailure (incomingEvent , pipelineName , Map .of ("message" ,
201+ String . format ( Locale . ROOT , "index cycle detected while processing pipeline [%s]: %s + %s" ,
202+ pipelineName ,
203+ resultIngestDocument .getIndexHistory (),
204+ newIndex )
205+ ));
207206 });
208207 return ;
209208 }
@@ -214,12 +213,14 @@ private void executePipeline(final IngestDocument ingestDocument, final IngestPi
214213 final Optional <IngestPipeline > reroutePipeline = resolve (reroutePipelineName .get (), internalPipelineProvider );
215214 if (reroutePipeline .isEmpty ()) {
216215 request .complete (incomingEvent -> {
217- annotateIngestPipelineFailure (incomingEvent , pipelineName , Map . of ( "message" , format (
218- "reroute failed to load next pipeline [%s]: %s -> %s" ,
216+ annotateIngestPipelineFailure (
217+ incomingEvent ,
219218 pipelineName ,
220- resultIngestDocument .getIndexHistory (),
221- reroutePipelineName .get ()
222- )));
219+ Map .of ("message" ,
220+ String .format (Locale .ROOT , "reroute failed to load next pipeline [%s]: %s -> %s" ,
221+ pipelineName ,
222+ resultIngestDocument .getIndexHistory (),
223+ reroutePipelineName .get ())));
223224 });
224225 } else {
225226 executePipeline (resultIngestDocument , reroutePipeline .get (), request );
@@ -277,6 +278,6 @@ static private <T,R> Optional<R> resolve(T resolvable, Resolver<T,R> resolver) {
277278
278279 @ Override
279280 public void close () throws IOException {
280- IOUtils .closeWhileHandlingException (this .resourcesToClose );
281+ IOUtilsBridge .closeWhileHandlingException (this .resourcesToClose );
281282 }
282283}
0 commit comments