@@ -384,16 +384,30 @@ private void init(CqlIdentifier keyspace) {
384
384
.getTopologyMonitor ()
385
385
.init ()
386
386
.thenCompose (v -> metadataManager .refreshNodes ())
387
- .thenAccept (v -> afterInitialNodeListRefresh (keyspace ))
388
- .exceptionally (
389
- error -> {
390
- initFuture .completeExceptionally (error );
391
- RunOrSchedule .on (adminExecutor , this ::close );
392
- return null ;
387
+ .thenCompose (v -> checkProtocolVersion ())
388
+ .thenCompose (v -> initialSchemaRefresh ())
389
+ .thenCompose (v -> initializePools (keyspace ))
390
+ .whenComplete (
391
+ (v , error ) -> {
392
+ if (error == null ) {
393
+ LOG .debug ("[{}] Initialization complete, ready" , logPrefix );
394
+ notifyListeners ();
395
+ initFuture .complete (DefaultSession .this );
396
+ } else {
397
+ LOG .debug ("[{}] Initialization failed, force closing" , logPrefix , error );
398
+ forceCloseAsync ()
399
+ .whenComplete (
400
+ (v1 , error1 ) -> {
401
+ if (error1 != null ) {
402
+ error .addSuppressed (error1 );
403
+ }
404
+ initFuture .completeExceptionally (error );
405
+ });
406
+ }
393
407
});
394
408
}
395
409
396
- private void afterInitialNodeListRefresh ( CqlIdentifier keyspace ) {
410
+ private CompletionStage < Void > checkProtocolVersion ( ) {
397
411
try {
398
412
boolean protocolWasForced =
399
413
context .getConfig ().getDefaultProfile ().isDefined (DefaultDriverOption .PROTOCOL_VERSION );
@@ -426,48 +440,39 @@ private void afterInitialNodeListRefresh(CqlIdentifier keyspace) {
426
440
bestVersion );
427
441
}
428
442
}
429
- metadataManager
443
+ return CompletableFuture .completedFuture (null );
444
+ } catch (Throwable throwable ) {
445
+ return CompletableFutures .failedFuture (throwable );
446
+ }
447
+ }
448
+
449
+ private CompletionStage <RefreshSchemaResult > initialSchemaRefresh () {
450
+ try {
451
+ return metadataManager
430
452
.refreshSchema (null , false , true )
431
- .whenComplete (
432
- (metadata , error ) -> {
433
- if (error != null ) {
434
- Loggers .warnWithException (
435
- LOG ,
436
- "[{}] Unexpected error while refreshing schema during initialization, "
437
- + "keeping previous version" ,
438
- logPrefix ,
439
- error );
440
- }
441
- afterInitialSchemaRefresh (keyspace );
453
+ .exceptionally (
454
+ error -> {
455
+ Loggers .warnWithException (
456
+ LOG ,
457
+ "[{}] Unexpected error while refreshing schema during initialization, "
458
+ + "proceeding without schema metadata" ,
459
+ logPrefix ,
460
+ error );
461
+ return null ;
442
462
});
443
463
} catch (Throwable throwable ) {
444
- initFuture . completeExceptionally (throwable );
464
+ return CompletableFutures . failedFuture (throwable );
445
465
}
446
466
}
447
467
448
- private void afterInitialSchemaRefresh (CqlIdentifier keyspace ) {
468
+ private CompletionStage < Void > initializePools (CqlIdentifier keyspace ) {
449
469
try {
450
470
nodeStateManager .markInitialized ();
451
471
context .getLoadBalancingPolicyWrapper ().init ();
452
472
context .getConfigLoader ().onDriverInit (context );
453
- LOG .debug ("[{}] Initialization complete, ready" , logPrefix );
454
- poolManager
455
- .init (keyspace )
456
- .whenComplete (
457
- (v , error ) -> {
458
- if (error != null ) {
459
- initFuture .completeExceptionally (error );
460
- } else {
461
- notifyListeners ();
462
- initFuture .complete (DefaultSession .this );
463
- }
464
- });
473
+ return poolManager .init (keyspace );
465
474
} catch (Throwable throwable ) {
466
- forceCloseAsync ()
467
- .whenComplete (
468
- (v , error ) -> {
469
- initFuture .completeExceptionally (throwable );
470
- });
475
+ return CompletableFutures .failedFuture (throwable );
471
476
}
472
477
}
473
478
0 commit comments