|
13 | 13 | import com.google.common.collect.Maps; |
14 | 14 | import org.apache.logging.log4j.LogManager; |
15 | 15 | import org.apache.logging.log4j.Logger; |
16 | | -import org.elasticsearch.action.support.RefCountingRunnable; |
17 | | -import org.elasticsearch.ingest.common.FailProcessorException; |
| 16 | +import org.elasticsearch.logstashbridge.core.FailProcessorExceptionBridge; |
18 | 17 | import org.elasticsearch.logstashbridge.core.IOUtilsBridge; |
| 18 | +import org.elasticsearch.logstashbridge.core.RefCountingRunnableBridge; |
19 | 19 | import org.elasticsearch.logstashbridge.ingest.IngestDocumentBridge; |
20 | 20 |
|
21 | 21 | import java.io.Closeable; |
@@ -93,8 +93,11 @@ public Collection<Event> processEvents(final Collection<Event> incomingEvents) t |
93 | 93 | final CountDownLatch latch = new CountDownLatch(1); |
94 | 94 | final IntegrationBatch batch = new IntegrationBatch(incomingEvents); |
95 | 95 |
|
96 | | - try (RefCountingRunnable ref = new RefCountingRunnable(latch::countDown)) { |
| 96 | + RefCountingRunnableBridge ref = new RefCountingRunnableBridge(latch::countDown); |
| 97 | + try { |
97 | 98 | batch.eachRequest(ref::acquire, this::processRequest); |
| 99 | + } finally { |
| 100 | + ref.close(); |
98 | 101 | } |
99 | 102 |
|
100 | 103 | // await on work that has gone async |
@@ -254,7 +257,9 @@ static private void annotateIngestPipelineFailure(final Event event, final Strin |
254 | 257 | } |
255 | 258 |
|
256 | 259 | static private Throwable unwrapException(final Exception exception) { |
257 | | - if (exception.getCause() instanceof FailProcessorException) { return exception.getCause(); } |
| 260 | + if (FailProcessorExceptionBridge.isInstanceOf(exception.getCause())) { |
| 261 | + return exception.getCause(); |
| 262 | + } |
258 | 263 | return exception; |
259 | 264 | } |
260 | 265 |
|
|
0 commit comments