3030import java .util .Set ;
3131import java .util .TreeMap ;
3232import java .util .TreeSet ;
33+ import java .util .concurrent .Callable ;
34+ import java .util .concurrent .ExecutionException ;
35+ import java .util .concurrent .ExecutorService ;
36+ import java .util .concurrent .Executors ;
37+ import java .util .concurrent .Future ;
3338import java .util .concurrent .TimeUnit ;
39+ import java .util .concurrent .TimeoutException ;
3440import java .util .concurrent .locks .Lock ;
3541import java .util .stream .Collectors ;
3642
@@ -433,7 +439,12 @@ private void syncTeamDataSources(ConnectionConfig connection) {
433439 Long currentProjectId = connection .getProjectId ();
434440 List <String > blockedDatabaseNames = listBlockedDatabaseNames (connection .getDialectType ());
435441 DataSource teamDataSource = new OBConsoleDataSourceFactory (connection , true , false ).getDataSource ();
436- try (Connection conn = teamDataSource .getConnection ()) {
442+ ExecutorService executorService = Executors .newFixedThreadPool (1 );
443+ Future <Connection > connectionFuture = executorService .submit (
444+ (Callable <Connection >) teamDataSource ::getConnection );
445+ Connection conn = null ;
446+ try {
447+ conn = connectionFuture .get (5 , TimeUnit .SECONDS );
437448 List <DatabaseEntity > latestDatabases = dbSchemaService .listDatabases (connection .getDialectType (), conn )
438449 .stream ().map (database -> {
439450 DatabaseEntity entity = new DatabaseEntity ();
@@ -512,16 +523,32 @@ private void syncTeamDataSources(ConnectionConfig connection) {
512523 "update connect_database set table_count=?, collation_name=?, charset_name=?, project_id=? where id = ?" ;
513524 jdbcTemplate .batchUpdate (update , toUpdate );
514525 }
515- } catch (SQLException e ) {
516- deleteDatabaseIfClusterNotExists (e , connection .getId (),
517- "update connect_database set is_existed = 0 where connection_id=?" );
518- throw new IllegalStateException (e );
526+ } catch (ExecutionException | InterruptedException | TimeoutException e ) {
527+ log .warn ("Failed to obtain the connection, errorMessage={}" , e .getMessage ());
528+ Throwable rootCause = e .getCause ();
529+ if (rootCause instanceof SQLException ) {
530+ deleteDatabaseIfClusterNotExists ((SQLException ) rootCause ,
531+ connection .getId (), "update connect_database set is_existed = 0 where connection_id=?" );
532+ throw new IllegalStateException (rootCause );
533+ }
519534 } finally {
535+ try {
536+ executorService .shutdownNow ();
537+ } catch (Exception e ) {
538+ // eat the exception
539+ }
540+ if (conn != null ) {
541+ try {
542+ conn .close ();
543+ } catch (Exception e ) {
544+ log .warn ("Close connection failed, errorMessage={}" , e .getMessage ());
545+ }
546+ }
520547 if (teamDataSource instanceof AutoCloseable ) {
521548 try {
522549 ((AutoCloseable ) teamDataSource ).close ();
523550 } catch (Exception e ) {
524- log .warn ("Failed to close datasource" , e );
551+ log .warn ("Failed to close datasource, errorMessgae={} " , e . getMessage () );
525552 }
526553 }
527554 }
@@ -542,7 +569,12 @@ private Long getProjectId(DatabaseEntity database, Long currentProjectId, List<S
542569
543570 private void syncIndividualDataSources (ConnectionConfig connection ) {
544571 DataSource individualDataSource = new OBConsoleDataSourceFactory (connection , true , false ).getDataSource ();
545- try (Connection conn = individualDataSource .getConnection ()) {
572+ ExecutorService executorService = Executors .newFixedThreadPool (1 );
573+ Future <Connection > connectionFuture = executorService .submit (
574+ (Callable <Connection >) individualDataSource ::getConnection );
575+ Connection conn = null ;
576+ try {
577+ conn = connectionFuture .get (5 , TimeUnit .SECONDS );
546578 Set <String > latestDatabaseNames = dbSchemaService .showDatabases (connection .getDialectType (), conn );
547579 List <DatabaseEntity > existedDatabasesInDb =
548580 databaseRepository .findByConnectionId (connection .getId ()).stream ()
@@ -578,16 +610,32 @@ private void syncIndividualDataSources(ConnectionConfig connection) {
578610 if (!CollectionUtils .isEmpty (toDelete )) {
579611 jdbcTemplate .batchUpdate ("delete from connect_database where id = ?" , toDelete );
580612 }
581- } catch (SQLException e ) {
582- deleteDatabaseIfClusterNotExists (e , connection .getId (),
583- "delete from connect_database where connection_id=?" );
584- throw new IllegalStateException (e );
613+ } catch (ExecutionException | InterruptedException | TimeoutException e ) {
614+ log .warn ("Failed to obtain the connection, errorMessage={}" , e .getMessage ());
615+ Throwable rootCause = e .getCause ();
616+ if (rootCause instanceof SQLException ) {
617+ deleteDatabaseIfClusterNotExists ((SQLException ) rootCause ,
618+ connection .getId (), "delete from connect_database where connection_id=?" );
619+ throw new IllegalStateException (rootCause );
620+ }
585621 } finally {
622+ try {
623+ executorService .shutdownNow ();
624+ } catch (Exception e ) {
625+ // eat the exception
626+ }
627+ if (conn != null ) {
628+ try {
629+ conn .close ();
630+ } catch (Exception e ) {
631+ log .warn ("Close connection failed, errorMessage={}" , e .getMessage ());
632+ }
633+ }
586634 if (individualDataSource instanceof AutoCloseable ) {
587635 try {
588636 ((AutoCloseable ) individualDataSource ).close ();
589637 } catch (Exception e ) {
590- log .warn ("Failed to close datasource" , e );
638+ log .warn ("Failed to close datasource, errorMessgae={} " , e . getMessage () );
591639 }
592640 }
593641 }
0 commit comments