19
19
import com .datastax .oss .quarkus .runtime .api .mapper .QuarkusGeneratedDaoBean ;
20
20
import com .datastax .oss .quarkus .runtime .api .mapper .QuarkusGeneratedMapperBean ;
21
21
import com .datastax .oss .quarkus .runtime .api .session .QuarkusCqlSession ;
22
+ import io .quarkus .arc .ClientProxy ;
22
23
import io .quarkus .runtime .StartupEvent ;
23
24
import io .smallrye .mutiny .Uni ;
24
25
import java .time .Duration ;
25
26
import java .util .concurrent .CompletionStage ;
26
27
import java .util .concurrent .ExecutionException ;
28
+ import java .util .concurrent .ExecutorService ;
29
+ import java .util .concurrent .Executors ;
30
+ import java .util .concurrent .Future ;
27
31
import java .util .concurrent .TimeUnit ;
28
32
import java .util .concurrent .TimeoutException ;
29
33
import javax .enterprise .context .Dependent ;
@@ -49,8 +53,6 @@ public class CassandraClientStarter {
49
53
@ Inject @ QuarkusGeneratedMapperBean Instance <Object > mappers ;
50
54
@ Inject @ QuarkusGeneratedDaoBean Instance <Object > daos ;
51
55
52
- private Duration timeout ;
53
-
54
56
@ SuppressWarnings ("unused" )
55
57
public void onStartup (@ Observes StartupEvent event )
56
58
throws ExecutionException , InterruptedException {
@@ -64,33 +66,39 @@ public void onStartup(@Observes StartupEvent event)
64
66
}
65
67
if (config .cassandraClientInitConfig .eagerInit ) {
66
68
LOG .info ("Eagerly initializing Quarkus Cassandra client." );
67
- timeout = config .cassandraClientInitConfig .eagerInitTimeout ;
68
- initializeBeans (sessions , "session" );
69
- initializeBeans (mappers , "mapper" );
70
- initializeBeans (daos , "DAO" );
69
+ Duration timeout = config .cassandraClientInitConfig .eagerInitTimeout ;
70
+ ExecutorService executor = Executors .newSingleThreadExecutor ();
71
+ Future <Void > initFuture =
72
+ executor .submit (
73
+ () -> {
74
+ initializeBeans (sessions );
75
+ initializeBeans (mappers );
76
+ initializeBeans (daos );
77
+ return null ;
78
+ });
79
+ try {
80
+ initFuture .get (timeout .toMillis (), TimeUnit .MILLISECONDS );
81
+ } catch (TimeoutException e ) {
82
+ initFuture .cancel (true );
83
+ LOG .warn (
84
+ "Eager initialization of Quarkus Cassandra client did not complete within {}; "
85
+ + "resuming application startup with an uninitialized client." ,
86
+ timeout );
87
+ }
88
+ executor .shutdownNow ();
71
89
} else {
72
90
LOG .debug (
73
91
"Eager initialization of Quarkus Cassandra client at startup is disabled by configuration." );
74
92
}
75
93
}
76
94
77
- private void initializeBeans (Instance <?> beans , String beanName )
78
- throws InterruptedException , ExecutionException {
95
+ private void initializeBeans (Instance <?> beans ) throws InterruptedException , ExecutionException {
79
96
for (Object bean : beans ) {
80
- try {
81
- if (bean instanceof CompletionStage ) {
82
- ((CompletionStage <?>) bean )
83
- .toCompletableFuture ()
84
- .get (timeout .toNanos (), TimeUnit .NANOSECONDS );
85
- } else if (bean instanceof Uni ) {
86
- ((Uni <?>) bean ).await ().atMost (timeout );
87
- }
88
- } catch (TimeoutException | io .smallrye .mutiny .TimeoutException e ) {
89
- LOG .warn (
90
- "Eager initialization of a {} bean did not complete within {}; "
91
- + "resuming application startup with uninitialized bean." ,
92
- beanName ,
93
- timeout );
97
+ ClientProxy .unwrap (bean );
98
+ if (bean instanceof CompletionStage ) {
99
+ ((CompletionStage <?>) bean ).toCompletableFuture ().get ();
100
+ } else if (bean instanceof Uni ) {
101
+ ((Uni <?>) bean ).await ().indefinitely ();
94
102
}
95
103
}
96
104
}
0 commit comments