@@ -74,7 +74,7 @@ public EventPersistor(@NotNull ErrorCollector errorCollector,
7474 this .persisotrTerminationTimeout = config .getPersisotrTerminationTimeout ();
7575 this .cradleStorage = requireNonNull (cradleStorage , "Cradle storage can't be null" );
7676 this .taskQueue = new BlockingScheduledRetryableTaskQueue <>(config .getMaxTaskCount (), config .getMaxTaskDataSize (), scheduler );
77- this .futures = new FutureTracker <> ();
77+ this .futures = FutureTracker . createUnlimited ();
7878 this .metrics = new EventPersistorMetrics <>(taskQueue );
7979 this .executor = Executors .newScheduledThreadPool (config .getProcessingThreads (), THREAD_FACTORY );
8080 }
@@ -117,7 +117,7 @@ public void run() {
117117 resolveTaskError (task , e );
118118 }
119119 } catch (InterruptedException ie ) {
120- LOGGER .debug ("Received InterruptedException. aborting" );
120+ LOGGER .debug ("Received InterruptedException. aborting" , ie );
121121 break ;
122122 }
123123 }
@@ -193,7 +193,7 @@ public void close () {
193193 }
194194
195195
196- void processTask (ScheduledRetryableTask <PersistenceTask > task ) throws IOException , CradleStorageException {
196+ void processTask (ScheduledRetryableTask <PersistenceTask > task ) throws IOException , CradleStorageException , InterruptedException {
197197
198198 final TestEventToStore event = task .getPayload ().eventBatch ;
199199 final Histogram .Timer timer = metrics .startMeasuringPersistenceLatency ();
@@ -213,7 +213,9 @@ void processTask(ScheduledRetryableTask<PersistenceTask> task) throws IOExceptio
213213 executor
214214 );
215215
216- futures .track (result );
216+ if (!futures .track (result )) {
217+ LOGGER .warn ("Store test even future isn't tracked" );
218+ }
217219 }
218220
219221
0 commit comments