File tree Expand file tree Collapse file tree 2 files changed +9
-4
lines changed
beam/core/src/main/java/cz/o2/proxima/beam/io
direct/io-pubsub/module/src/main/java/cz/o2/proxima/direct/io/pubsub Expand file tree Collapse file tree 2 files changed +9
-4
lines changed Original file line number Diff line number Diff line change @@ -124,7 +124,8 @@ public void finishBundle() {
124124 while (missingResponses .get () > 0 ) {
125125 long elapsed = System .currentTimeMillis () - startTime ;
126126 if (elapsed >= bundleFinalizeTimeoutMs ) {
127- throw new IllegalStateException ("Failed to flush bundle within timeout of 5s" );
127+ throw new IllegalStateException (
128+ "Failed to flush bundle within timeout of " + bundleFinalizeTimeoutMs );
128129 }
129130 // clone to avoid ConcurrentModificationException
130131 final Collection <CompletableFuture <Pair <Boolean , Throwable >>> unfinished ;
@@ -135,9 +136,12 @@ public void finishBundle() {
135136 Optional <Pair <Boolean , Throwable >> failedFuture =
136137 unfinished .stream ()
137138 .map (
138- f ->
139- ExceptionUtils .uncheckedFactory (
140- () -> f .get (bundleFinalizeTimeoutMs - elapsed , TimeUnit .MILLISECONDS )))
139+ f -> {
140+ long maxWaitMs =
141+ bundleFinalizeTimeoutMs - System .currentTimeMillis () + startTime ;
142+ return ExceptionUtils .uncheckedFactory (
143+ () -> f .get (maxWaitMs , TimeUnit .MILLISECONDS ));
144+ })
141145 .filter (p -> !p .getFirst ())
142146 // this will be retried
143147 .filter (p -> !(p .getSecond () instanceof TransactionRejectedException ))
Original file line number Diff line number Diff line change @@ -142,6 +142,7 @@ public synchronized void close() {
142142 }
143143 executor .shutdown ();
144144 executor .awaitTermination (10 , TimeUnit .SECONDS );
145+ executor = null ;
145146 publisher .shutdown ();
146147 } catch (Exception ex ) {
147148 log .warn ("Failed to shutdown publisher {}" , publisher , ex );
You can’t perform that action at this time.
0 commit comments