2929import org .slf4j .Logger ;
3030import org .slf4j .LoggerFactory ;
3131import org .springframework .scheduling .concurrent .ThreadPoolTaskExecutor ;
32- import org .springframework .util .concurrent .ListenableFutureCallback ;
33- import org .springframework .util .concurrent .ListenableFutureTask ;
3432
3533import java .util .Comparator ;
3634import java .util .List ;
4341import java .util .concurrent .CancellationException ;
4442import java .util .concurrent .ConcurrentHashMap ;
4543import java .util .concurrent .ConcurrentMap ;
44+ import java .util .concurrent .ExecutionException ;
45+ import java .util .concurrent .FutureTask ;
4646import java .util .concurrent .LinkedBlockingQueue ;
4747import java .util .concurrent .PriorityBlockingQueue ;
4848import java .util .concurrent .atomic .AtomicBoolean ;
@@ -167,15 +167,15 @@ protected PrefetchingTask prepareNewDatabase(PipelineKey key, int priority) {
167167 databaseCount .decrementAndGet ();
168168
169169 if (databaseToRemove .getKey ().equals (key )) {
170- return executeTask (key , PrefetchingTask .withDatabase (databaseToRemove .getValue (), priority ));
170+ return executeTask (PrefetchingTask .withDatabase (key , databaseToRemove .getValue (), priority ));
171171 } else {
172172 databaseToRemove .getValue ().close ();
173173 DatabasePipeline pipeline = pipelines .get (databaseToRemove .getKey ());
174174 logger .trace ("Prepared database has been cleaned: {}" , pipeline .key );
175175 }
176176 }
177177
178- return executeTask (key , PrefetchingTask .forPreparer (key .provider , key .preparer , priority ));
178+ return executeTask (PrefetchingTask .forPreparer (key , key .provider , key .preparer , priority ));
179179 }
180180
181181 protected Optional <PrefetchingTask > prepareExistingDatabase (PipelineKey key , int priority ) {
@@ -197,7 +197,7 @@ protected Optional<PrefetchingTask> prepareExistingDatabase(PipelineKey key, int
197197 if (result != null ) {
198198 CompositeDatabasePreparer complementaryPreparer = new CompositeDatabasePreparer (preparers .subList (i , preparers .size ()));
199199 logger .trace ("Preparing existing database from {} pipeline by using the complementary preparer {}" , existingPipeline .key , complementaryPreparer );
200- PrefetchingTask task = executeTask (key , PrefetchingTask .withDatabase (result .get (), complementaryPreparer , priority ));
200+ PrefetchingTask task = executeTask (PrefetchingTask .withDatabase (key , result .get (), complementaryPreparer , priority ));
201201
202202 prepareDatabase (pipelineKey , LOWEST_PRECEDENCE );
203203 reschedulePipeline (pipelineKey );
@@ -223,33 +223,13 @@ protected void reschedulePipeline(PipelineKey key) {
223223
224224 for (int i = 0 ; i < cancelledTasks .size (); i ++) {
225225 int priority = -1 * (int ) (invocationCount / cancelledTasks .size () * (i + 1 ));
226- executeTask (key , PrefetchingTask .fromTask (cancelledTasks .get (i ), priority ));
226+ executeTask (PrefetchingTask .fromTask (key , cancelledTasks .get (i ), priority ));
227227 }
228228 }
229229 }
230230
231- protected PrefetchingTask executeTask (PipelineKey key , PrefetchingTask task ) {
232- DatabasePipeline pipeline = pipelines .get (key );
233-
234- task .addCallback (new ListenableFutureCallback <EmbeddedDatabase >() {
235- @ Override
236- public void onSuccess (EmbeddedDatabase result ) {
237- if (task .type == NEW_DATABASE ) {
238- pipeline .state .set (INITIALIZED );
239- }
240- pipeline .tasks .remove (task );
241- pipeline .results .offer (PreparedResult .success (result ));
242- }
243-
244- @ Override
245- public void onFailure (Throwable error ) {
246- pipeline .tasks .remove (task );
247- if (!(error instanceof CancellationException )) {
248- pipeline .results .offer (PreparedResult .failure (error ));
249- }
250- }
251- });
252-
231+ protected PrefetchingTask executeTask (PrefetchingTask task ) {
232+ DatabasePipeline pipeline = pipelines .get (task .key );
253233 pipeline .tasks .add (task );
254234 taskExecutor .execute (task );
255235 return task ;
@@ -379,36 +359,38 @@ protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
379359 }
380360 }
381361
382- protected static class PrefetchingTask extends ListenableFutureTask <EmbeddedDatabase > implements Comparable <PrefetchingTask > {
362+ protected static class PrefetchingTask extends FutureTask <EmbeddedDatabase > implements Comparable <PrefetchingTask > {
383363
384364 private final AtomicBoolean executed = new AtomicBoolean (false );
385365
366+ public final PipelineKey key ;
386367 public final Callable <EmbeddedDatabase > action ;
387368 public final TaskType type ;
388369 public final int priority ;
389370
390- public static PrefetchingTask forPreparer (DatabaseProvider provider , DatabasePreparer preparer , int priority ) {
391- return new PrefetchingTask (priority , NEW_DATABASE , () -> provider .createDatabase (preparer ));
371+ public static PrefetchingTask forPreparer (PipelineKey key , DatabaseProvider provider , DatabasePreparer preparer , int priority ) {
372+ return new PrefetchingTask (key , priority , NEW_DATABASE , () -> provider .createDatabase (preparer ));
392373 }
393374
394- public static PrefetchingTask withDatabase (EmbeddedDatabase database , DatabasePreparer preparer , int priority ) {
395- return new PrefetchingTask (priority , EXISTING_DATABASE , () -> {
375+ public static PrefetchingTask withDatabase (PipelineKey key , EmbeddedDatabase database , DatabasePreparer preparer , int priority ) {
376+ return new PrefetchingTask (key , priority , EXISTING_DATABASE , () -> {
396377 preparer .prepare (database );
397378 return database ;
398379 });
399380 }
400381
401- public static PrefetchingTask withDatabase (EmbeddedDatabase database , int priority ) {
402- return new PrefetchingTask (priority , EXISTING_DATABASE , () -> database );
382+ public static PrefetchingTask withDatabase (PipelineKey key , EmbeddedDatabase database , int priority ) {
383+ return new PrefetchingTask (key , priority , EXISTING_DATABASE , () -> database );
403384 }
404385
405- public static PrefetchingTask fromTask (PrefetchingTask task , int priority ) {
406- return new PrefetchingTask (priority , task .type , task .action );
386+ public static PrefetchingTask fromTask (PipelineKey key , PrefetchingTask task , int priority ) {
387+ return new PrefetchingTask (key , priority , task .type , task .action );
407388 }
408389
409- private PrefetchingTask (int priority , TaskType type , Callable <EmbeddedDatabase > action ) {
390+ private PrefetchingTask (PipelineKey key , int priority , TaskType type , Callable <EmbeddedDatabase > action ) {
410391 super (action );
411392
393+ this .key = key ;
412394 this .action = action ;
413395 this .type = type ;
414396 this .priority = priority ;
@@ -421,6 +403,37 @@ public void run() {
421403 }
422404 }
423405
406+ @ Override
407+ protected void done () {
408+ DatabasePipeline pipeline = pipelines .get (key );
409+ Throwable cause ;
410+
411+ try {
412+ EmbeddedDatabase result = get ();
413+
414+ if (type == NEW_DATABASE ) {
415+ pipeline .state .set (INITIALIZED );
416+ }
417+ pipeline .tasks .remove (this );
418+ pipeline .results .offer (PreparedResult .success (result ));
419+ return ;
420+ }
421+ catch (ExecutionException ex ) {
422+ cause = ex .getCause ();
423+ if (cause == null ) {
424+ cause = ex ;
425+ }
426+ }
427+ catch (Throwable ex ) {
428+ cause = ex ;
429+ }
430+
431+ pipeline .tasks .remove (this );
432+ if (!(cause instanceof CancellationException )) {
433+ pipeline .results .offer (PreparedResult .failure (cause ));
434+ }
435+ }
436+
424437 @ Override
425438 public boolean cancel (boolean mayInterruptIfRunning ) {
426439 if (mayInterruptIfRunning || executed .compareAndSet (false , true )) {
0 commit comments