2121import com .datastax .driver .core .ResultSet ;
2222import com .datastax .driver .core .Row ;
2323import com .datastax .driver .core .Session ;
24+ import com .datastax .driver .core .exceptions .NoHostAvailableException ;
2425import com .datastax .driver .core .policies .ConstantReconnectionPolicy ;
2526import com .datastax .driver .mapping .Mapper ;
2627import com .datastax .driver .mapping .MappingManager ;
@@ -205,6 +206,7 @@ public void close()
205206 {
206207 if ( cluster != null ) // close only if the session and cluster were built by self
207208 {
209+ logger .info ( "Close cassandra client" );
208210 asyncJobExecutor .shutdownAndWaitTermination ();
209211 session .close ();
210212 cluster .close ();
@@ -229,7 +231,7 @@ public Set<String> getFileSystemContaining( Collection<String> candidates, Strin
229231 String filename = PathMapUtils .getFilename ( path );
230232
231233 BoundStatement bound = preparedContainingQuery .bind ( candidates , parentPath , filename );
232- ResultSet result = session . execute ( bound );
234+ ResultSet result = executeSession ( bound );
233235 return result .all ().stream ().map ( row -> row .get ( 0 , String .class ) ).collect ( Collectors .toSet () );
234236 }
235237
@@ -287,7 +289,7 @@ public List<PathMap> list( String fileSystem, String path, boolean recursive, in
287289 private Result <DtxPathMap > boundAndRunListQuery ( String fileSystem , String parentPath )
288290 {
289291 BoundStatement bound = preparedListQuery .bind ( fileSystem , parentPath );
290- ResultSet result = session . execute ( bound );
292+ ResultSet result = executeSession ( bound );
291293 return pathMapMapper .map ( result );
292294 }
293295
@@ -435,7 +437,7 @@ public FileType exists( String fileSystem, String path )
435437 {
436438 bound = preparedExistQuery .bind ( fileSystem , parentPath , Arrays .asList ( filename , filename + "/" ) );
437439 }
438- ResultSet result = session . execute ( bound );
440+ ResultSet result = executeSession ( bound );
439441 FileType ret = getFileTypeOrNull ( result );
440442 if ( ret != null )
441443 {
@@ -455,7 +457,7 @@ public boolean existsFile( String fileSystem, String path )
455457 String filename = PathMapUtils .getFilename ( path );
456458
457459 BoundStatement bound = preparedExistFileQuery .bind ( fileSystem , parentPath , filename );
458- ResultSet result = session . execute ( bound );
460+ ResultSet result = executeSession ( bound );
459461 Row row = result .one ();
460462 boolean exists = false ;
461463 if ( row != null )
@@ -590,7 +592,7 @@ public boolean isDirectory( String fileSystem, String path )
590592 String filename = PathMapUtils .getFilename ( path );
591593
592594 BoundStatement bound = preparedExistQuery .bind ( fileSystem , parentPath , Arrays .asList ( filename ) );
593- ResultSet result = session . execute ( bound );
595+ ResultSet result = executeSession ( bound );
594596 return notNull ( result );
595597 }
596598
@@ -605,7 +607,7 @@ public boolean isFile( String fileSystem, String path )
605607 String filename = PathMapUtils .getFilename ( path );
606608
607609 BoundStatement bound = preparedExistQuery .bind ( fileSystem , parentPath , Arrays .asList ( filename ) );
608- ResultSet result = session . execute ( bound );
610+ ResultSet result = executeSession ( bound );
609611 return notNull ( result );
610612 }
611613
@@ -685,7 +687,7 @@ private boolean isEmptyDirectory( String fileSystem, String path )
685687 {
686688 path = PathMapUtils .normalizeParentPath ( path );
687689 BoundStatement bound = preparedListCheckEmpty .bind ( fileSystem , path );
688- ResultSet result = session . execute ( bound );
690+ ResultSet result = executeSession ( bound );
689691 Row row = result .one ();
690692 boolean empty = false ;
691693 if ( row != null )
@@ -705,7 +707,7 @@ private void deleteFromReverseMap( String fileId, String path )
705707 reduction .add ( path );
706708 bound .setSet ( 0 , reduction );
707709 bound .setString ( 1 , fileId );
708- session . execute ( bound );
710+ executeSession ( bound );
709711 }
710712
711713 private void addToReverseMap ( String fileId , String path )
@@ -716,7 +718,7 @@ private void addToReverseMap( String fileId, String path )
716718 increment .add ( path );
717719 bound .setSet ( 0 , increment );
718720 bound .setString ( 1 , fileId );
719- session . execute ( bound );
721+ executeSession ( bound );
720722 }
721723
722724 private void updateFilesystemIncrease (String filesystem , long count , long size )
@@ -726,7 +728,7 @@ private void updateFilesystemIncrease(String filesystem, long count, long size)
726728 bound .setLong ( 0 , count );
727729 bound .setLong ( 1 , size );
728730 bound .setString ( 2 , filesystem );
729- session . execute ( bound );
731+ executeSession ( bound );
730732 }
731733
732734 private void updateFilesystemDecrease (String filesystem , long count , long size )
@@ -736,7 +738,7 @@ private void updateFilesystemDecrease(String filesystem, long count, long size)
736738 bound .setLong ( 0 , count );
737739 bound .setLong ( 1 , size );
738740 bound .setString ( 2 , filesystem );
739- session . execute ( bound );
741+ executeSession ( bound );
740742 }
741743
742744 private void reclaim ( String fileId , String fileStorage , String checksum )
@@ -833,7 +835,7 @@ public void expire(String fileSystem, String path, Date expiration)
833835 bound .setString ( 1 , fileSystem );
834836 bound .setString ( 2 , parentPath );
835837 bound .setString ( 3 , filename );
836- session . execute ( bound );
838+ executeSession ( bound );
837839 }
838840
839841 @ Override
@@ -854,7 +856,7 @@ public void makeDirs( String fileSystem, String path )
854856 String filename = PathMapUtils .getFilename ( path );
855857
856858 BoundStatement bound = preparedExistQuery .bind ( fileSystem , parentPath , Arrays .asList ( filename ) );
857- ResultSet result = session . execute ( bound );
859+ ResultSet result = executeSession ( bound );
858860 if ( notNull ( result ) )
859861 {
860862 logger .debug ( "Dir already exists, fileSystem: {}, path: {}" , fileSystem , path );
@@ -930,7 +932,7 @@ public Filesystem getFilesystem(String filesystem)
930932 @ Override
931933 public List <? extends Filesystem > getFilesystems ()
932934 {
933- ResultSet result = session . execute ( preparedFilesystemList .bind () );
935+ ResultSet result = executeSession ( preparedFilesystemList .bind () );
934936 return filesystemMapper .map (result ).all ();
935937 }
936938
@@ -961,4 +963,34 @@ public Set<String> getPathsByFileId( String fileId )
961963 }
962964 return emptySet ();
963965 }
966+
967+ private ResultSet executeSession ( BoundStatement bind )
968+ {
969+ boolean exception = false ;
970+ ResultSet trackingRecord = null ;
971+ try
972+ {
973+ if ( session == null || session .isClosed () )
974+ {
975+ close ();
976+ new CassandraPathDB ( config );
977+ }
978+ trackingRecord = session .execute ( bind );
979+ }
980+ catch ( NoHostAvailableException e )
981+ {
982+ exception = true ;
983+ logger .error ( "Cannot connect to host, reconnect once more with new session." , e );
984+ }
985+ finally
986+ {
987+ if ( exception )
988+ {
989+ close ();
990+ new CassandraPathDB ( config );
991+ trackingRecord = session .execute ( bind );
992+ }
993+ }
994+ return trackingRecord ;
995+ }
964996}
0 commit comments