30
30
31
31
import com .datastax .driver .core .exceptions .BusyConnectionException ;
32
32
import com .datastax .driver .core .exceptions .ConnectionException ;
33
+ import com .datastax .driver .core .exceptions .InvalidQueryException ;
33
34
import com .google .common .collect .Lists ;
34
35
import com .google .common .collect .Maps ;
35
36
import java .util .ArrayList ;
36
37
import java .util .Collections ;
37
38
import java .util .Comparator ;
38
39
import java .util .HashMap ;
40
+ import java .util .HashSet ;
39
41
import java .util .Iterator ;
40
42
import java .util .LinkedHashMap ;
41
43
import java .util .List ;
42
44
import java .util .Map ;
43
45
import java .util .Map .Entry ;
46
+ import java .util .Set ;
44
47
import java .util .concurrent .ExecutionException ;
45
48
import org .slf4j .Logger ;
46
49
import org .slf4j .LoggerFactory ;
@@ -56,6 +59,9 @@ abstract class SchemaParser {
56
59
private static final SchemaParser V3_PARSER = new V3SchemaParser ();
57
60
private static final SchemaParser V4_PARSER = new V4SchemaParser ();
58
61
62
+ private static final String SELECT_SCYLLA_KEYSPACES =
63
+ "SELECT * FROM system_schema.scylla_keyspaces" ;
64
+
59
65
static SchemaParser forVersion (VersionNumber cassandraVersion ) {
60
66
if (cassandraVersion .getMajor () >= 4 ) return V4_PARSER ;
61
67
if (cassandraVersion .getMajor () >= 3 ) return V3_PARSER ;
@@ -197,7 +203,6 @@ void refresh(
197
203
198
204
private Map <String , KeyspaceMetadata > buildKeyspaces (
199
205
SystemRows rows , VersionNumber cassandraVersion , Cluster cluster ) {
200
-
201
206
Map <String , KeyspaceMetadata > keyspaces = new LinkedHashMap <String , KeyspaceMetadata >();
202
207
for (Row keyspaceRow : rows .keyspaces ) {
203
208
KeyspaceMetadata keyspace = KeyspaceMetadata .build (keyspaceRow , cassandraVersion );
@@ -239,6 +244,13 @@ private Map<String, KeyspaceMetadata> buildKeyspaces(
239
244
for (MaterializedViewMetadata view : views .values ()) {
240
245
keyspace .add (view );
241
246
}
247
+ Row scyllaKeyspacesRow = rows .scyllaKeyspaces .getOrDefault (keyspace .getName (), null );
248
+ if (scyllaKeyspacesRow != null ) {
249
+ if (scyllaKeyspacesRow .getColumnDefinitions ().contains ("initial_tablets" )
250
+ && !scyllaKeyspacesRow .isNull ("initial_tablets" )) {
251
+ keyspace .setUsesTablets (true );
252
+ }
253
+ }
242
254
keyspaces .put (keyspace .getName (), keyspace );
243
255
}
244
256
if (rows .virtualKeyspaces != null ) {
@@ -619,6 +631,29 @@ private void updateViews(
619
631
}
620
632
}
621
633
634
+ static Set <String > toKeyspaceSet (ResultSet rs ) {
635
+ if (rs == null ) return Collections .emptySet ();
636
+
637
+ Set <String > result = new HashSet <>();
638
+ for (Row row : rs ) {
639
+ result .add (row .getString (KeyspaceMetadata .KS_NAME ));
640
+ }
641
+ return result ;
642
+ }
643
+
644
+ static Map <String , Row > groupByKeyspacePk (ResultSet rs ) {
645
+ // Assumes keyspace name is full primary key, therefore
646
+ // each keyspace name identifies at most one row
647
+ if (rs == null ) return Collections .emptyMap ();
648
+
649
+ Map <String , Row > result = new HashMap <String , Row >();
650
+ for (Row row : rs ) {
651
+ String ksName = row .getString (KeyspaceMetadata .KS_NAME );
652
+ result .put (ksName , row );
653
+ }
654
+ return result ;
655
+ }
656
+
622
657
static Map <String , List <Row >> groupByKeyspace (ResultSet rs ) {
623
658
if (rs == null ) return Collections .emptyMap ();
624
659
@@ -696,6 +731,25 @@ private static ResultSet get(ResultSetFuture future)
696
731
return (future == null ) ? null : future .get ();
697
732
}
698
733
734
+ private static ResultSet getIfExists (ResultSetFuture future )
735
+ throws InterruptedException , ExecutionException {
736
+ // Some of Scylla specific tables/columns may not exist depending on version.
737
+ // This method is meant to try to get results without failing whole schema parse
738
+ // if something additional does not exist.
739
+ if (future == null ) return null ;
740
+ try {
741
+ ResultSet resultSet = future .get ();
742
+ return resultSet ;
743
+ } catch (ExecutionException ex ) {
744
+ if (ex .getCause () instanceof InvalidQueryException ) {
745
+ // meant to handle keyspace/table does not exist exceptions
746
+ return null ;
747
+ }
748
+ // rethrow if it's something else
749
+ throw ex ;
750
+ }
751
+ }
752
+
699
753
/**
700
754
* The rows from the system tables that we want to parse to metadata classes. The format of these
701
755
* rows depends on the Cassandra version, but our parsing code knows how to handle the
@@ -713,6 +767,7 @@ private static class SystemRows {
713
767
final ResultSet virtualKeyspaces ;
714
768
final Map <String , List <Row >> virtualTables ;
715
769
final Map <String , Map <String , Map <String , ColumnMetadata .Raw >>> virtualColumns ;
770
+ final Map <String , Row > scyllaKeyspaces ;
716
771
717
772
public SystemRows (
718
773
ResultSet keyspaces ,
@@ -725,7 +780,8 @@ public SystemRows(
725
780
Map <String , Map <String , List <Row >>> indexes ,
726
781
ResultSet virtualKeyspaces ,
727
782
Map <String , List <Row >> virtualTables ,
728
- Map <String , Map <String , Map <String , ColumnMetadata .Raw >>> virtualColumns ) {
783
+ Map <String , Map <String , Map <String , ColumnMetadata .Raw >>> virtualColumns ,
784
+ Map <String , Row > scyllaKeyspaces ) {
729
785
this .keyspaces = keyspaces ;
730
786
this .tables = tables ;
731
787
this .columns = columns ;
@@ -737,6 +793,7 @@ public SystemRows(
737
793
this .virtualKeyspaces = virtualKeyspaces ;
738
794
this .virtualTables = virtualTables ;
739
795
this .virtualColumns = virtualColumns ;
796
+ this .scyllaKeyspaces = scyllaKeyspaces ;
740
797
}
741
798
}
742
799
@@ -790,7 +847,8 @@ else if (targetType == AGGREGATE)
790
847
cfFuture = null ,
791
848
colsFuture = null ,
792
849
functionsFuture = null ,
793
- aggregatesFuture = null ;
850
+ aggregatesFuture = null ,
851
+ scyllaKsFuture = null ;
794
852
795
853
ProtocolVersion protocolVersion =
796
854
cluster .getConfiguration ().getProtocolOptions ().getProtocolVersion ();
@@ -812,6 +870,21 @@ else if (targetType == AGGREGATE)
812
870
if (isSchemaOrKeyspace && supportsUdfs (cassandraVersion ) || targetType == AGGREGATE )
813
871
aggregatesFuture = queryAsync (SELECT_AGGREGATES + whereClause , connection , protocolVersion );
814
872
873
+ if (isSchemaOrKeyspace ) {
874
+ if (targetType == KEYSPACE ) {
875
+ scyllaKsFuture =
876
+ queryAsync (
877
+ SELECT_SCYLLA_KEYSPACES
878
+ + " WHERE keyspace_name = '"
879
+ + targetKeyspace
880
+ + "' LIMIT 1;" ,
881
+ connection ,
882
+ protocolVersion );
883
+ } else {
884
+ scyllaKsFuture = queryAsync (SELECT_SCYLLA_KEYSPACES , connection , protocolVersion );
885
+ }
886
+ }
887
+
815
888
return new SystemRows (
816
889
get (ksFuture ),
817
890
groupByKeyspace (get (cfFuture )),
@@ -824,7 +897,8 @@ else if (targetType == AGGREGATE)
824
897
Collections .<String , Map <String , List <Row >>>emptyMap (),
825
898
null ,
826
899
Collections .<String , List <Row >>emptyMap (),
827
- Collections .<String , Map <String , Map <String , ColumnMetadata .Raw >>>emptyMap ());
900
+ Collections .<String , Map <String , Map <String , ColumnMetadata .Raw >>>emptyMap (),
901
+ groupByKeyspacePk (getIfExists (scyllaKsFuture )));
828
902
}
829
903
830
904
@ Override
@@ -1197,9 +1271,19 @@ private Map<String, KeyspaceMetadata> buildSchema(
1197
1271
cluster .getConfiguration ().getProtocolOptions ().getProtocolVersion ();
1198
1272
1199
1273
Map <String , KeyspaceMetadata > keyspaces = new LinkedHashMap <String , KeyspaceMetadata >();
1274
+ ResultSetFuture scyllaKeyspacesFuture =
1275
+ queryAsync (SELECT_SCYLLA_KEYSPACES , connection , protocolVersion );
1200
1276
ResultSet keyspacesData = queryAsync (SELECT_KEYSPACES , connection , protocolVersion ).get ();
1277
+ Map <String , Row > scyllaKeyspacesData = groupByKeyspacePk (getIfExists (scyllaKeyspacesFuture ));
1201
1278
for (Row keyspaceRow : keyspacesData ) {
1202
1279
KeyspaceMetadata keyspace = KeyspaceMetadata .build (keyspaceRow , cassandraVersion );
1280
+ Row scyllaKeyspacesRow = scyllaKeyspacesData .getOrDefault (keyspace .getName (), null );
1281
+ if (scyllaKeyspacesRow != null ) {
1282
+ if (scyllaKeyspacesRow .getColumnDefinitions ().contains ("initial_tablets" )
1283
+ && !scyllaKeyspacesRow .isNull ("initial_tablets" )) {
1284
+ keyspace .setUsesTablets (true );
1285
+ }
1286
+ }
1203
1287
keyspaces .put (keyspace .getName (), keyspace );
1204
1288
}
1205
1289
@@ -1288,7 +1372,8 @@ SystemRows fetchSystemRows(
1288
1372
functionsFuture = null ,
1289
1373
aggregatesFuture = null ,
1290
1374
indexesFuture = null ,
1291
- viewsFuture = null ;
1375
+ viewsFuture = null ,
1376
+ scyllaKsFuture = null ;
1292
1377
1293
1378
ProtocolVersion protocolVersion =
1294
1379
cluster .getConfiguration ().getProtocolOptions ().getProtocolVersion ();
@@ -1356,6 +1441,21 @@ SystemRows fetchSystemRows(
1356
1441
connection ,
1357
1442
protocolVersion );
1358
1443
1444
+ if (isSchemaOrKeyspace ) {
1445
+ if (targetType == KEYSPACE ) {
1446
+ scyllaKsFuture =
1447
+ queryAsync (
1448
+ SELECT_SCYLLA_KEYSPACES
1449
+ + " WHERE keyspace_name = '"
1450
+ + targetKeyspace
1451
+ + "' LIMIT 1;" ,
1452
+ connection ,
1453
+ protocolVersion );
1454
+ } else {
1455
+ scyllaKsFuture = queryAsync (SELECT_SCYLLA_KEYSPACES , connection , protocolVersion );
1456
+ }
1457
+ }
1458
+
1359
1459
return new SystemRows (
1360
1460
get (ksFuture ),
1361
1461
groupByKeyspace (get (cfFuture )),
@@ -1367,7 +1467,8 @@ SystemRows fetchSystemRows(
1367
1467
groupByKeyspaceAndCf (get (indexesFuture ), TABLE_NAME ),
1368
1468
null ,
1369
1469
Collections .<String , List <Row >>emptyMap (),
1370
- Collections .<String , Map <String , Map <String , ColumnMetadata .Raw >>>emptyMap ());
1470
+ Collections .<String , Map <String , Map <String , ColumnMetadata .Raw >>>emptyMap (),
1471
+ groupByKeyspacePk (getIfExists (scyllaKsFuture )));
1371
1472
}
1372
1473
1373
1474
@ Override
@@ -1499,7 +1600,8 @@ SystemRows fetchSystemRows(
1499
1600
viewsFuture = null ,
1500
1601
virtualKeyspacesFuture = null ,
1501
1602
virtualTableFuture = null ,
1502
- virtualColumnsFuture = null ;
1603
+ virtualColumnsFuture = null ,
1604
+ scyllaKsFuture = null ;
1503
1605
1504
1606
ProtocolVersion protocolVersion =
1505
1607
cluster .getConfiguration ().getProtocolOptions ().getProtocolVersion ();
@@ -1589,6 +1691,21 @@ SystemRows fetchSystemRows(
1589
1691
protocolVersion );
1590
1692
}
1591
1693
1694
+ if (isSchemaOrKeyspace ) {
1695
+ if (targetType == KEYSPACE ) {
1696
+ scyllaKsFuture =
1697
+ queryAsync (
1698
+ SELECT_SCYLLA_KEYSPACES
1699
+ + " WHERE keyspace_name = '"
1700
+ + targetKeyspace
1701
+ + "' LIMIT 1;" ,
1702
+ connection ,
1703
+ protocolVersion );
1704
+ } else {
1705
+ scyllaKsFuture = queryAsync (SELECT_SCYLLA_KEYSPACES , connection , protocolVersion );
1706
+ }
1707
+ }
1708
+
1592
1709
return new SystemRows (
1593
1710
get (ksFuture ),
1594
1711
groupByKeyspace (get (cfFuture )),
@@ -1600,7 +1717,8 @@ SystemRows fetchSystemRows(
1600
1717
groupByKeyspaceAndCf (get (indexesFuture ), TABLE_NAME ),
1601
1718
get (virtualKeyspacesFuture ),
1602
1719
groupByKeyspace (get (virtualTableFuture )),
1603
- groupByKeyspaceAndCf (get (virtualColumnsFuture ), cassandraVersion , TABLE_NAME ));
1720
+ groupByKeyspaceAndCf (get (virtualColumnsFuture ), cassandraVersion , TABLE_NAME ),
1721
+ groupByKeyspacePk (getIfExists (scyllaKsFuture )));
1604
1722
}
1605
1723
}
1606
1724
}
0 commit comments