2929import org .slf4j .Logger ;
3030import org .slf4j .LoggerFactory ;
3131import org .springframework .scheduling .concurrent .ThreadPoolTaskExecutor ;
32- import org .springframework .util .concurrent .ListenableFutureCallback ;
33- import org .springframework .util .concurrent .ListenableFutureTask ;
3432
3533import java .util .Comparator ;
3634import java .util .List ;
4341import java .util .concurrent .CancellationException ;
4442import java .util .concurrent .ConcurrentHashMap ;
4543import java .util .concurrent .ConcurrentMap ;
44+ import java .util .concurrent .ExecutionException ;
45+ import java .util .concurrent .FutureTask ;
4646import java .util .concurrent .LinkedBlockingQueue ;
4747import java .util .concurrent .PriorityBlockingQueue ;
4848import java .util .concurrent .atomic .AtomicBoolean ;
@@ -167,15 +167,15 @@ protected PrefetchingTask prepareNewDatabase(PipelineKey key, int priority) {
167167 databaseCount .decrementAndGet ();
168168
169169 if (databaseToRemove .getKey ().equals (key )) {
170- return executeTask (key , PrefetchingTask .withDatabase (databaseToRemove .getValue (), priority ));
170+ return executeTask (PrefetchingTask .withDatabase (key , databaseToRemove .getValue (), priority ));
171171 } else {
172172 databaseToRemove .getValue ().close ();
173173 DatabasePipeline pipeline = pipelines .get (databaseToRemove .getKey ());
174174 logger .trace ("Prepared database has been cleaned: {}" , pipeline .key );
175175 }
176176 }
177177
178- return executeTask (key , PrefetchingTask .forPreparer (key .provider , key .preparer , priority ));
178+ return executeTask (PrefetchingTask .forPreparer (key , key .provider , key .preparer , priority ));
179179 }
180180
181181 protected Optional <PrefetchingTask > prepareExistingDatabase (PipelineKey key , int priority ) {
@@ -197,7 +197,7 @@ protected Optional<PrefetchingTask> prepareExistingDatabase(PipelineKey key, int
197197 if (result != null ) {
198198 CompositeDatabasePreparer complementaryPreparer = new CompositeDatabasePreparer (preparers .subList (i , preparers .size ()));
199199 logger .trace ("Preparing existing database from {} pipeline by using the complementary preparer {}" , existingPipeline .key , complementaryPreparer );
200- PrefetchingTask task = executeTask (key , PrefetchingTask .withDatabase (result .get (), complementaryPreparer , priority ));
200+ PrefetchingTask task = executeTask (PrefetchingTask .withDatabase (key , result .get (), complementaryPreparer , priority ));
201201
202202 prepareDatabase (pipelineKey , LOWEST_PRECEDENCE );
203203 reschedulePipeline (pipelineKey );
@@ -223,34 +223,12 @@ protected void reschedulePipeline(PipelineKey key) {
223223
224224 for (int i = 0 ; i < cancelledTasks .size (); i ++) {
225225 int priority = -1 * (int ) (invocationCount / cancelledTasks .size () * (i + 1 ));
226- executeTask (key , PrefetchingTask .fromTask (cancelledTasks .get (i ), priority ));
226+ executeTask (PrefetchingTask .fromTask (key , cancelledTasks .get (i ), priority ));
227227 }
228228 }
229229 }
230230
231- protected PrefetchingTask executeTask (PipelineKey key , PrefetchingTask task ) {
232- DatabasePipeline pipeline = pipelines .get (key );
233-
234- task .addCallback (new ListenableFutureCallback <EmbeddedDatabase >() {
235- @ Override
236- public void onSuccess (EmbeddedDatabase result ) {
237- if (task .type == NEW_DATABASE ) {
238- pipeline .state .set (INITIALIZED );
239- }
240- pipeline .tasks .remove (task );
241- pipeline .results .offer (PreparedResult .success (result ));
242- }
243-
244- @ Override
245- public void onFailure (Throwable error ) {
246- pipeline .tasks .remove (task );
247- if (!(error instanceof CancellationException )) {
248- pipeline .results .offer (PreparedResult .failure (error ));
249- }
250- }
251- });
252-
253- pipeline .tasks .add (task );
231+ protected PrefetchingTask executeTask (PrefetchingTask task ) {
254232 taskExecutor .execute (task );
255233 return task ;
256234 }
@@ -379,36 +357,38 @@ protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
379357 }
380358 }
381359
382- protected static class PrefetchingTask extends ListenableFutureTask <EmbeddedDatabase > implements Comparable <PrefetchingTask > {
360+ protected static class PrefetchingTask extends FutureTask <EmbeddedDatabase > implements Comparable <PrefetchingTask > {
383361
384362 private final AtomicBoolean executed = new AtomicBoolean (false );
385363
364+ public final PipelineKey key ;
386365 public final Callable <EmbeddedDatabase > action ;
387366 public final TaskType type ;
388367 public final int priority ;
389368
390- public static PrefetchingTask forPreparer (DatabaseProvider provider , DatabasePreparer preparer , int priority ) {
391- return new PrefetchingTask (priority , NEW_DATABASE , () -> provider .createDatabase (preparer ));
369+ public static PrefetchingTask forPreparer (PipelineKey key , DatabaseProvider provider , DatabasePreparer preparer , int priority ) {
370+ return new PrefetchingTask (key , priority , NEW_DATABASE , () -> provider .createDatabase (preparer ));
392371 }
393372
394- public static PrefetchingTask withDatabase (EmbeddedDatabase database , DatabasePreparer preparer , int priority ) {
395- return new PrefetchingTask (priority , EXISTING_DATABASE , () -> {
373+ public static PrefetchingTask withDatabase (PipelineKey key , EmbeddedDatabase database , DatabasePreparer preparer , int priority ) {
374+ return new PrefetchingTask (key , priority , EXISTING_DATABASE , () -> {
396375 preparer .prepare (database );
397376 return database ;
398377 });
399378 }
400379
401- public static PrefetchingTask withDatabase (EmbeddedDatabase database , int priority ) {
402- return new PrefetchingTask (priority , EXISTING_DATABASE , () -> database );
380+ public static PrefetchingTask withDatabase (PipelineKey key , EmbeddedDatabase database , int priority ) {
381+ return new PrefetchingTask (key , priority , EXISTING_DATABASE , () -> database );
403382 }
404383
405- public static PrefetchingTask fromTask (PrefetchingTask task , int priority ) {
406- return new PrefetchingTask (priority , task .type , task .action );
384+ public static PrefetchingTask fromTask (PipelineKey key , PrefetchingTask task , int priority ) {
385+ return new PrefetchingTask (key , priority , task .type , task .action );
407386 }
408387
409- private PrefetchingTask (int priority , TaskType type , Callable <EmbeddedDatabase > action ) {
388+ private PrefetchingTask (PipelineKey key , int priority , TaskType type , Callable <EmbeddedDatabase > action ) {
410389 super (action );
411390
391+ this .key = key ;
412392 this .action = action ;
413393 this .type = type ;
414394 this .priority = priority ;
@@ -417,7 +397,40 @@ private PrefetchingTask(int priority, TaskType type, Callable<EmbeddedDatabase>
417397 @ Override
418398 public void run () {
419399 if (executed .compareAndSet (false , true )) {
420- super .run ();
400+ DatabasePipeline pipeline = pipelines .get (key );
401+ pipeline .tasks .add (this );
402+
403+ Throwable cause ;
404+ try {
405+ super .run ();
406+ EmbeddedDatabase result = get ();
407+
408+ if (type == NEW_DATABASE ) {
409+ pipeline .state .set (INITIALIZED );
410+ }
411+ pipeline .tasks .remove (this );
412+ pipeline .results .offer (PreparedResult .success (result ));
413+ return ;
414+ }
415+ catch (InterruptedException ex ) {
416+ Thread .currentThread ().interrupt ();
417+ pipeline .tasks .remove (this );
418+ return ;
419+ }
420+ catch (ExecutionException ex ) {
421+ cause = ex .getCause ();
422+ if (cause == null ) {
423+ cause = ex ;
424+ }
425+ }
426+ catch (Throwable ex ) {
427+ cause = ex ;
428+ }
429+
430+ pipeline .tasks .remove (this );
431+ if (!(cause instanceof CancellationException )) {
432+ pipeline .results .offer (PreparedResult .failure (cause ));
433+ }
421434 }
422435 }
423436
0 commit comments