@@ -205,14 +205,18 @@ private void mergeDone(MergeTask mergeTask) {
205205 // when one merge is done, maybe a backlogged one can now execute
206206 enqueueBackloggedTasks ();
207207 // signal here, because, when closing, we wait for all currently running merges to finish
208- if (closed && currentlyRunningMergeTasks .isEmpty ()) {
209- closedWithNoCurrentlyRunningMerges .countDown ();
210- }
208+ maybeSignalAllMergesDoneAfterClose ();
211209 }
212210 doneMergeTaskCount .incrementAndGet ();
213211 checkMergeTaskThrottling ();
214212 }
215213
214+ private synchronized void maybeSignalAllMergesDoneAfterClose () {
215+ if (closed && currentlyRunningMergeTasks .isEmpty () && closedWithNoCurrentlyRunningMerges .getCount () > 0 ) {
216+ closedWithNoCurrentlyRunningMerges .countDown ();
217+ }
218+ }
219+
216220 private synchronized void enqueueBackloggedTasks () {
217221 int maxBackloggedTasksToEnqueue = config .getMaxThreadCount () - currentlyRunningMergeTasks .size ();
218222 // enqueue all backlogged tasks when closing, as the queue expects all backlogged tasks to always be enqueued back
@@ -415,31 +419,16 @@ protected void message(String message) {
415419 @ Override
416420 public void close () throws IOException {
417421 closed = true ;
418- boolean interrupted = false ;
422+ // enqueue any backlogged merge tasks, because the merge queue assumes that the backlogged tasks are always re-enqueued
423+ enqueueBackloggedTasks ();
424+ maybeSignalAllMergesDoneAfterClose ();
419425 try {
420- synchronized (this ) {
421- // enqueue any backlogged merge tasks, because the merge queue assumes that the backlogged tasks are always re-enqueued
422- enqueueBackloggedTasks ();
423- // wait until all running merges are done
424- while (currentlyRunningMergeTasks .isEmpty () == false ) {
425- try {
426- // wait with a timeout, just to cover for something that failed to notify
427- if (closedWithNoCurrentlyRunningMerges .await (1 , TimeUnit .SECONDS )) {
428- assert currentlyRunningMergeTasks .isEmpty ();
429- }
430- } catch (InterruptedException e ) {
431- // ignore interruption, we will retry until all currently running merge tasks are done
432- interrupted = true ;
433- }
434- }
435- ;
436- }
426+ closedWithNoCurrentlyRunningMerges .await ();
427+ } catch (InterruptedException e ) {
428+ Thread .currentThread ().interrupt ();
437429 } finally {
438- // this closes an executor that may be used by ongoing merges, so close it only after all running merges finished
430+ // this closes an executor that may be used by ongoing merges, so better close it only after all running merges finished
439431 super .close ();
440- if (interrupted ) {
441- Thread .currentThread ().interrupt ();
442- }
443432 }
444433 }
445434
0 commit comments