Skip to content

Commit 2dda5e1

Browse files
committed
Move RefCountingRunnable, FailProcessorException and Releasable usages to logstash-bridge.
1 parent 16dfbdb commit 2dda5e1

File tree

2 files changed

+13
-8
lines changed

2 files changed

+13
-8
lines changed

src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessor.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
import com.google.common.collect.Maps;
1414
import org.apache.logging.log4j.LogManager;
1515
import org.apache.logging.log4j.Logger;
16-
import org.elasticsearch.action.support.RefCountingRunnable;
17-
import org.elasticsearch.ingest.common.FailProcessorException;
16+
import org.elasticsearch.logstashbridge.core.FailProcessorExceptionBridge;
1817
import org.elasticsearch.logstashbridge.core.IOUtilsBridge;
18+
import org.elasticsearch.logstashbridge.core.RefCountingRunnableBridge;
1919
import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge;
2020

2121
import java.io.Closeable;
@@ -93,8 +93,11 @@ public Collection<Event> processEvents(final Collection<Event> incomingEvents) t
9393
final CountDownLatch latch = new CountDownLatch(1);
9494
final IntegrationBatch batch = new IntegrationBatch(incomingEvents);
9595

96-
try (RefCountingRunnable ref = new RefCountingRunnable(latch::countDown)) {
96+
RefCountingRunnableBridge ref = new RefCountingRunnableBridge(latch::countDown);
97+
try {
9798
batch.eachRequest(ref::acquire, this::processRequest);
99+
} finally {
100+
ref.close();
98101
}
99102

100103
// await on work that has gone async
@@ -254,7 +257,9 @@ static private void annotateIngestPipelineFailure(final Event event, final Strin
254257
}
255258

256259
static private Throwable unwrapException(final Exception exception) {
257-
if (exception.getCause() instanceof FailProcessorException) { return exception.getCause(); }
260+
if (FailProcessorExceptionBridge.isInstanceOf(exception.getCause())) {
261+
return exception.getCause();
262+
}
258263
return exception;
259264
}
260265

src/main/java/co/elastic/logstash/filters/elasticintegration/IntegrationBatch.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
package co.elastic.logstash.filters.elasticintegration;
88

99
import co.elastic.logstash.api.Event;
10-
import org.elasticsearch.core.Releasable;
10+
import org.elasticsearch.logstashbridge.core.ReleasableBridge;
1111

1212
import java.util.ArrayList;
1313
import java.util.Collection;
@@ -22,17 +22,17 @@ public IntegrationBatch(Collection<Event> events) {
2222
this.events = new ArrayList<>(events);
2323
}
2424

25-
void eachRequest(Supplier<Releasable> releasableSupplier, Consumer<IntegrationRequest> consumer) {
25+
void eachRequest(Supplier<ReleasableBridge> releasableSupplier, Consumer<IntegrationRequest> consumer) {
2626
for (int i = 0; i < this.events.size(); i++) {
2727
consumer.accept(new Request(i, releasableSupplier.get()));
2828
}
2929
}
3030

3131
private class Request implements IntegrationRequest {
3232
private final int idx;
33-
private final Releasable handle;
33+
private final ReleasableBridge handle;
3434

35-
public Request(final int idx, final Releasable releasable) {
35+
public Request(final int idx, final ReleasableBridge releasable) {
3636
this.idx = idx;
3737
this.handle = releasable;
3838
}

0 commit comments

Comments
 (0)