3030
3131import com .datastax .driver .core .exceptions .BusyConnectionException ;
3232import com .datastax .driver .core .exceptions .ConnectionException ;
33+ import com .datastax .driver .core .exceptions .InvalidQueryException ;
3334import com .google .common .collect .Lists ;
3435import com .google .common .collect .Maps ;
3536import java .util .ArrayList ;
3637import java .util .Collections ;
3738import java .util .Comparator ;
3839import java .util .HashMap ;
40+ import java .util .HashSet ;
3941import java .util .Iterator ;
4042import java .util .LinkedHashMap ;
4143import java .util .List ;
4244import java .util .Map ;
4345import java .util .Map .Entry ;
46+ import java .util .Set ;
4447import java .util .concurrent .ExecutionException ;
4548import org .slf4j .Logger ;
4649import org .slf4j .LoggerFactory ;
@@ -56,6 +59,9 @@ abstract class SchemaParser {
5659 private static final SchemaParser V3_PARSER = new V3SchemaParser ();
5760 private static final SchemaParser V4_PARSER = new V4SchemaParser ();
5861
62+ private static final String SELECT_SCYLLA_KEYSPACES =
63+ "SELECT * FROM system_schema.scylla_keyspaces" ;
64+
5965 static SchemaParser forVersion (VersionNumber cassandraVersion ) {
6066 if (cassandraVersion .getMajor () >= 4 ) return V4_PARSER ;
6167 if (cassandraVersion .getMajor () >= 3 ) return V3_PARSER ;
@@ -197,7 +203,6 @@ void refresh(
197203
198204 private Map <String , KeyspaceMetadata > buildKeyspaces (
199205 SystemRows rows , VersionNumber cassandraVersion , Cluster cluster ) {
200-
201206 Map <String , KeyspaceMetadata > keyspaces = new LinkedHashMap <String , KeyspaceMetadata >();
202207 for (Row keyspaceRow : rows .keyspaces ) {
203208 KeyspaceMetadata keyspace = KeyspaceMetadata .build (keyspaceRow , cassandraVersion );
@@ -239,6 +244,13 @@ private Map<String, KeyspaceMetadata> buildKeyspaces(
239244 for (MaterializedViewMetadata view : views .values ()) {
240245 keyspace .add (view );
241246 }
247+ Row scyllaKeyspacesRow = rows .scyllaKeyspaces .getOrDefault (keyspace .getName (), null );
248+ if (scyllaKeyspacesRow != null ) {
249+ if (scyllaKeyspacesRow .getColumnDefinitions ().contains ("initial_tablets" )
250+ && !scyllaKeyspacesRow .isNull ("initial_tablets" )) {
251+ keyspace .setUsesTablets (true );
252+ }
253+ }
242254 keyspaces .put (keyspace .getName (), keyspace );
243255 }
244256 if (rows .virtualKeyspaces != null ) {
@@ -619,6 +631,29 @@ private void updateViews(
619631 }
620632 }
621633
634+ static Set <String > toKeyspaceSet (ResultSet rs ) {
635+ if (rs == null ) return Collections .emptySet ();
636+
637+ Set <String > result = new HashSet <>();
638+ for (Row row : rs ) {
639+ result .add (row .getString (KeyspaceMetadata .KS_NAME ));
640+ }
641+ return result ;
642+ }
643+
644+ static Map <String , Row > groupByKeyspacePk (ResultSet rs ) {
645+ // Assumes keyspace name is full primary key, therefore
646+ // each keyspace name identifies at most one row
647+ if (rs == null ) return Collections .emptyMap ();
648+
649+ Map <String , Row > result = new HashMap <String , Row >();
650+ for (Row row : rs ) {
651+ String ksName = row .getString (KeyspaceMetadata .KS_NAME );
652+ result .put (ksName , row );
653+ }
654+ return result ;
655+ }
656+
622657 static Map <String , List <Row >> groupByKeyspace (ResultSet rs ) {
623658 if (rs == null ) return Collections .emptyMap ();
624659
@@ -696,6 +731,25 @@ private static ResultSet get(ResultSetFuture future)
696731 return (future == null ) ? null : future .get ();
697732 }
698733
734+ private static ResultSet getIfExists (ResultSetFuture future )
735+ throws InterruptedException , ExecutionException {
736+ // Some of Scylla specific tables/columns may not exist depending on version.
737+ // This method is meant to try to get results without failing whole schema parse
738+ // if something additional does not exist.
739+ if (future == null ) return null ;
740+ try {
741+ ResultSet resultSet = future .get ();
742+ return resultSet ;
743+ } catch (ExecutionException ex ) {
744+ if (ex .getCause () instanceof InvalidQueryException ) {
745+ // meant to handle keyspace/table does not exist exceptions
746+ return null ;
747+ }
748+ // rethrow if it's something else
749+ throw ex ;
750+ }
751+ }
752+
699753 /**
700754 * The rows from the system tables that we want to parse to metadata classes. The format of these
701755 * rows depends on the Cassandra version, but our parsing code knows how to handle the
@@ -713,6 +767,7 @@ private static class SystemRows {
713767 final ResultSet virtualKeyspaces ;
714768 final Map <String , List <Row >> virtualTables ;
715769 final Map <String , Map <String , Map <String , ColumnMetadata .Raw >>> virtualColumns ;
770+ final Map <String , Row > scyllaKeyspaces ;
716771
717772 public SystemRows (
718773 ResultSet keyspaces ,
@@ -725,7 +780,8 @@ public SystemRows(
725780 Map <String , Map <String , List <Row >>> indexes ,
726781 ResultSet virtualKeyspaces ,
727782 Map <String , List <Row >> virtualTables ,
728- Map <String , Map <String , Map <String , ColumnMetadata .Raw >>> virtualColumns ) {
783+ Map <String , Map <String , Map <String , ColumnMetadata .Raw >>> virtualColumns ,
784+ Map <String , Row > scyllaKeyspaces ) {
729785 this .keyspaces = keyspaces ;
730786 this .tables = tables ;
731787 this .columns = columns ;
@@ -737,6 +793,7 @@ public SystemRows(
737793 this .virtualKeyspaces = virtualKeyspaces ;
738794 this .virtualTables = virtualTables ;
739795 this .virtualColumns = virtualColumns ;
796+ this .scyllaKeyspaces = scyllaKeyspaces ;
740797 }
741798 }
742799
@@ -790,7 +847,8 @@ else if (targetType == AGGREGATE)
790847 cfFuture = null ,
791848 colsFuture = null ,
792849 functionsFuture = null ,
793- aggregatesFuture = null ;
850+ aggregatesFuture = null ,
851+ scyllaKsFuture = null ;
794852
795853 ProtocolVersion protocolVersion =
796854 cluster .getConfiguration ().getProtocolOptions ().getProtocolVersion ();
@@ -812,6 +870,21 @@ else if (targetType == AGGREGATE)
812870 if (isSchemaOrKeyspace && supportsUdfs (cassandraVersion ) || targetType == AGGREGATE )
813871 aggregatesFuture = queryAsync (SELECT_AGGREGATES + whereClause , connection , protocolVersion );
814872
873+ if (isSchemaOrKeyspace ) {
874+ if (targetType == KEYSPACE ) {
875+ scyllaKsFuture =
876+ queryAsync (
877+ SELECT_SCYLLA_KEYSPACES
878+ + " WHERE keyspace_name = '"
879+ + targetKeyspace
880+ + "' LIMIT 1;" ,
881+ connection ,
882+ protocolVersion );
883+ } else {
884+ scyllaKsFuture = queryAsync (SELECT_SCYLLA_KEYSPACES , connection , protocolVersion );
885+ }
886+ }
887+
815888 return new SystemRows (
816889 get (ksFuture ),
817890 groupByKeyspace (get (cfFuture )),
@@ -824,7 +897,8 @@ else if (targetType == AGGREGATE)
824897 Collections .<String , Map <String , List <Row >>>emptyMap (),
825898 null ,
826899 Collections .<String , List <Row >>emptyMap (),
827- Collections .<String , Map <String , Map <String , ColumnMetadata .Raw >>>emptyMap ());
900+ Collections .<String , Map <String , Map <String , ColumnMetadata .Raw >>>emptyMap (),
901+ groupByKeyspacePk (getIfExists (scyllaKsFuture )));
828902 }
829903
830904 @ Override
@@ -1197,9 +1271,19 @@ private Map<String, KeyspaceMetadata> buildSchema(
11971271 cluster .getConfiguration ().getProtocolOptions ().getProtocolVersion ();
11981272
11991273 Map <String , KeyspaceMetadata > keyspaces = new LinkedHashMap <String , KeyspaceMetadata >();
1274+ ResultSetFuture scyllaKeyspacesFuture =
1275+ queryAsync (SELECT_SCYLLA_KEYSPACES , connection , protocolVersion );
12001276 ResultSet keyspacesData = queryAsync (SELECT_KEYSPACES , connection , protocolVersion ).get ();
1277+ Map <String , Row > scyllaKeyspacesData = groupByKeyspacePk (getIfExists (scyllaKeyspacesFuture ));
12011278 for (Row keyspaceRow : keyspacesData ) {
12021279 KeyspaceMetadata keyspace = KeyspaceMetadata .build (keyspaceRow , cassandraVersion );
1280+ Row scyllaKeyspacesRow = scyllaKeyspacesData .getOrDefault (keyspace .getName (), null );
1281+ if (scyllaKeyspacesRow != null ) {
1282+ if (scyllaKeyspacesRow .getColumnDefinitions ().contains ("initial_tablets" )
1283+ && !scyllaKeyspacesRow .isNull ("initial_tablets" )) {
1284+ keyspace .setUsesTablets (true );
1285+ }
1286+ }
12031287 keyspaces .put (keyspace .getName (), keyspace );
12041288 }
12051289
@@ -1288,7 +1372,8 @@ SystemRows fetchSystemRows(
12881372 functionsFuture = null ,
12891373 aggregatesFuture = null ,
12901374 indexesFuture = null ,
1291- viewsFuture = null ;
1375+ viewsFuture = null ,
1376+ scyllaKsFuture = null ;
12921377
12931378 ProtocolVersion protocolVersion =
12941379 cluster .getConfiguration ().getProtocolOptions ().getProtocolVersion ();
@@ -1356,6 +1441,21 @@ SystemRows fetchSystemRows(
13561441 connection ,
13571442 protocolVersion );
13581443
1444+ if (isSchemaOrKeyspace ) {
1445+ if (targetType == KEYSPACE ) {
1446+ scyllaKsFuture =
1447+ queryAsync (
1448+ SELECT_SCYLLA_KEYSPACES
1449+ + " WHERE keyspace_name = '"
1450+ + targetKeyspace
1451+ + "' LIMIT 1;" ,
1452+ connection ,
1453+ protocolVersion );
1454+ } else {
1455+ scyllaKsFuture = queryAsync (SELECT_SCYLLA_KEYSPACES , connection , protocolVersion );
1456+ }
1457+ }
1458+
13591459 return new SystemRows (
13601460 get (ksFuture ),
13611461 groupByKeyspace (get (cfFuture )),
@@ -1367,7 +1467,8 @@ SystemRows fetchSystemRows(
13671467 groupByKeyspaceAndCf (get (indexesFuture ), TABLE_NAME ),
13681468 null ,
13691469 Collections .<String , List <Row >>emptyMap (),
1370- Collections .<String , Map <String , Map <String , ColumnMetadata .Raw >>>emptyMap ());
1470+ Collections .<String , Map <String , Map <String , ColumnMetadata .Raw >>>emptyMap (),
1471+ groupByKeyspacePk (getIfExists (scyllaKsFuture )));
13711472 }
13721473
13731474 @ Override
@@ -1499,7 +1600,8 @@ SystemRows fetchSystemRows(
14991600 viewsFuture = null ,
15001601 virtualKeyspacesFuture = null ,
15011602 virtualTableFuture = null ,
1502- virtualColumnsFuture = null ;
1603+ virtualColumnsFuture = null ,
1604+ scyllaKsFuture = null ;
15031605
15041606 ProtocolVersion protocolVersion =
15051607 cluster .getConfiguration ().getProtocolOptions ().getProtocolVersion ();
@@ -1589,6 +1691,21 @@ SystemRows fetchSystemRows(
15891691 protocolVersion );
15901692 }
15911693
1694+ if (isSchemaOrKeyspace ) {
1695+ if (targetType == KEYSPACE ) {
1696+ scyllaKsFuture =
1697+ queryAsync (
1698+ SELECT_SCYLLA_KEYSPACES
1699+ + " WHERE keyspace_name = '"
1700+ + targetKeyspace
1701+ + "' LIMIT 1;" ,
1702+ connection ,
1703+ protocolVersion );
1704+ } else {
1705+ scyllaKsFuture = queryAsync (SELECT_SCYLLA_KEYSPACES , connection , protocolVersion );
1706+ }
1707+ }
1708+
15921709 return new SystemRows (
15931710 get (ksFuture ),
15941711 groupByKeyspace (get (cfFuture )),
@@ -1600,7 +1717,8 @@ SystemRows fetchSystemRows(
16001717 groupByKeyspaceAndCf (get (indexesFuture ), TABLE_NAME ),
16011718 get (virtualKeyspacesFuture ),
16021719 groupByKeyspace (get (virtualTableFuture )),
1603- groupByKeyspaceAndCf (get (virtualColumnsFuture ), cassandraVersion , TABLE_NAME ));
1720+ groupByKeyspaceAndCf (get (virtualColumnsFuture ), cassandraVersion , TABLE_NAME ),
1721+ groupByKeyspacePk (getIfExists (scyllaKsFuture )));
16041722 }
16051723 }
16061724}
0 commit comments