Skip to content

Commit 94d58d4

Browse files
committed
add synchronous ProcessorBridge#execute
1 parent 100f1fc commit 94d58d4

File tree

1 file changed

+13
-14
lines changed

1 file changed

+13
-14
lines changed

libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.elasticsearch.logstashbridge.threadpool.ThreadPoolBridge;
2222

2323
import java.util.Map;
24-
import java.util.Objects;
2524
import java.util.function.BiConsumer;
2625

2726
/**
@@ -37,7 +36,18 @@ public interface ProcessorBridge extends StableBridgeAPI<Processor> {
3736

3837
boolean isAsync();
3938

40-
void execute(IngestDocumentBridge ingestDocumentBridge, BiConsumer<IngestDocumentBridge, Exception> handler);
39+
default void execute(IngestDocumentBridge ingestDocumentBridge, BiConsumer<IngestDocumentBridge, Exception> handler) {
40+
toInternal().execute(
41+
StableBridgeAPI.toInternalNullable(ingestDocumentBridge),
42+
(id, exception) -> handler.accept(IngestDocumentBridge.fromInternalNullable(id), exception)
43+
);
44+
}
45+
46+
default IngestDocumentBridge execute(IngestDocumentBridge ingestDocumentBridge) throws Exception {
47+
IngestDocument internalSourceIngestDocument = ingestDocumentBridge.toInternal();
48+
IngestDocument internalResultIngestDocument = toInternal().execute(internalSourceIngestDocument);
49+
return IngestDocumentBridge.fromInternalNullable(internalResultIngestDocument);
50+
}
4151

4252
static ProcessorBridge fromInternal(final Processor internalProcessor) {
4353
if (internalProcessor instanceof AbstractExternal.ProxyExternal externalProxy) {
@@ -83,10 +93,7 @@ public String getDescription() {
8393
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
8494
AbstractExternal.this.execute(
8595
IngestDocumentBridge.fromInternalNullable(ingestDocument),
86-
(ingestDocumentBridge, e) -> handler.accept(
87-
Objects.isNull(ingestDocumentBridge) ? null : ingestDocumentBridge.toInternal(),
88-
e
89-
)
96+
(ingestDocumentBridge, exception) -> handler.accept(StableBridgeAPI.toInternalNullable(ingestDocumentBridge), exception)
9097
);
9198
}
9299

@@ -128,14 +135,6 @@ public String getDescription() {
128135
public boolean isAsync() {
129136
return toInternal().isAsync();
130137
}
131-
132-
@Override
133-
public void execute(final IngestDocumentBridge ingestDocumentBridge, final BiConsumer<IngestDocumentBridge, Exception> handler) {
134-
internalDelegate.execute(
135-
StableBridgeAPI.toInternalNullable(ingestDocumentBridge),
136-
(id, e) -> handler.accept(IngestDocumentBridge.fromInternalNullable(id), e)
137-
);
138-
}
139138
}
140139

141140
/**

0 commit comments

Comments
 (0)