@@ -36,20 +36,17 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
36
36
sourceKeyspaceTable = sparkConf .get ("spark.source.keyspaceTable" );
37
37
astraKeyspaceTable = sparkConf .get ("spark.destination.keyspaceTable" );
38
38
39
- isPreserveTTLWritetime = Boolean .parseBoolean (sparkConf .get ("spark.preserveTTLWriteTime" , "false" ));
40
- if (isPreserveTTLWritetime ) {
41
- String ttlColsStr = sparkConf .get ("spark.preserveTTLWriteTime.ttl.cols" );
42
- if (null != ttlColsStr && ttlColsStr .trim ().length () > 0 ) {
43
- for (String ttlCol : ttlColsStr .split ("," )) {
44
- ttlCols .add (Integer .parseInt (ttlCol ));
45
- }
39
+ String ttlColsStr = sparkConf .get ("spark.query.ttl.cols" );
40
+ if (null != ttlColsStr && ttlColsStr .trim ().length () > 0 ) {
41
+ for (String ttlCol : ttlColsStr .split ("," )) {
42
+ ttlCols .add (Integer .parseInt (ttlCol ));
46
43
}
44
+ }
47
45
48
- String writeTimestampColsStr = sparkConf .get ("spark.preserveTTLWriteTime.writetime.cols" );
49
- if (null != writeTimestampColsStr && writeTimestampColsStr .trim ().length () > 0 ) {
50
- for (String writeTimeStampCol : writeTimestampColsStr .split ("," )) {
51
- writeTimeStampCols .add (Integer .parseInt (writeTimeStampCol ));
52
- }
46
+ String writeTimestampColsStr = sparkConf .get ("spark.query.writetime.cols" );
47
+ if (null != writeTimestampColsStr && writeTimestampColsStr .trim ().length () > 0 ) {
48
+ for (String writeTimeStampCol : writeTimestampColsStr .split ("," )) {
49
+ writeTimeStampCols .add (Integer .parseInt (writeTimeStampCol ));
53
50
}
54
51
}
55
52
@@ -82,26 +79,22 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
82
79
logger .info ("PARAM -- Destination Keyspace Table: " + astraKeyspaceTable );
83
80
logger .info ("PARAM -- ReadRateLimit: " + readLimiter .getRate ());
84
81
logger .info ("PARAM -- WriteRateLimit: " + writeLimiter .getRate ());
85
- logger .info ("PARAM -- WriteTimestampFilter: " + writeTimeStampFilter );
86
- logger .info ("PARAM -- WriteTimestampFilterCols: " + writeTimeStampCols );
87
- logger .info ("PARAM -- isPreserveTTLWritetime: " + isPreserveTTLWritetime );
88
- logger .info ("PARAM -- isPreserveTTLWritetime: " + isPreserveTTLWritetime );
89
82
logger .info ("PARAM -- TTLCols: " + ttlCols );
83
+ logger .info ("PARAM -- WriteTimestampFilterCols: " + writeTimeStampCols );
84
+ logger .info ("PARAM -- WriteTimestampFilter: " + writeTimeStampFilter );
90
85
91
86
String selectCols = sparkConf .get ("spark.query.source" );
92
87
String partionKey = sparkConf .get ("spark.query.source.partitionKey" );
93
88
String sourceSelectCondition = sparkConf .get ("spark.query.condition" , "" );
94
89
95
90
final StringBuilder selectTTLWriteTimeCols = new StringBuilder ();
96
- if (isPreserveTTLWritetime ) {
97
- String [] allCols = selectCols .split ("," );
98
- ttlCols .forEach (col -> {
99
- selectTTLWriteTimeCols .append (",ttl(" + allCols [col ] + ")" );
100
- });
101
- writeTimeStampCols .forEach (col -> {
102
- selectTTLWriteTimeCols .append (",writetime(" + allCols [col ] + ")" );
103
- });
104
- }
91
+ String [] allCols = selectCols .split ("," );
92
+ ttlCols .forEach (col -> {
93
+ selectTTLWriteTimeCols .append (",ttl(" + allCols [col ] + ")" );
94
+ });
95
+ writeTimeStampCols .forEach (col -> {
96
+ selectTTLWriteTimeCols .append (",writetime(" + allCols [col ] + ")" );
97
+ });
105
98
String fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols .toString () + " from " + sourceKeyspaceTable + " where token(" + partionKey .trim ()
106
99
+ ") >= ? and token(" + partionKey .trim () + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING" ;
107
100
sourceSelectStatement = sourceSession .prepare (fullSelectQuery );
@@ -110,7 +103,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
110
103
selectColTypes = getTypes (sparkConf .get ("spark.query.types" ));
111
104
String idCols = sparkConf .get ("spark.query.destination.id" , "" );
112
105
idColTypes = selectColTypes .subList (0 , idCols .split ("," ).length );
113
-
106
+
114
107
String insertCols = sparkConf .get ("spark.query.destination" , "" );
115
108
if (null == insertCols || insertCols .trim ().isEmpty ()) {
116
109
insertCols = selectCols ;
@@ -147,11 +140,16 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
147
140
}
148
141
}
149
142
150
- if (isPreserveTTLWritetime ) {
151
- astraInsertStatement = astraSession .prepare ("insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ") using TTL ? and TIMESTAMP ?" );
152
- } else {
153
- astraInsertStatement = astraSession .prepare ("insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")" );
143
+ String fullInsertQuery = "insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")" ;
144
+ if (!ttlCols .isEmpty ()) {
145
+ fullInsertQuery += " USING TTL ?" ;
146
+ if (!writeTimeStampCols .isEmpty ()) {
147
+ fullInsertQuery += " AND TIMESTAMP ?" ;
148
+ }
149
+ } else if (!writeTimeStampCols .isEmpty ()) {
150
+ fullInsertQuery += " USING TIMESTAMP ?" ;
154
151
}
152
+ astraInsertStatement = astraSession .prepare (fullInsertQuery );
155
153
}
156
154
}
157
155
0 commit comments