17
17
public class AbstractJobSession extends BaseJobSession {
18
18
19
19
public Logger logger = LoggerFactory .getLogger (this .getClass ().getName ());
20
-
20
+
21
21
protected AbstractJobSession (CqlSession sourceSession , CqlSession astraSession , SparkConf sparkConf ) {
22
22
this .sourceSession = sourceSession ;
23
23
this .astraSession = astraSession ;
@@ -37,25 +37,26 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
37
37
38
38
isPreserveTTLWritetime = Boolean .parseBoolean (sparkConf .get ("spark.preserveTTLWriteTime" , "false" ));
39
39
if (isPreserveTTLWritetime ) {
40
- String ttlColsStr = sparkConf .get ("spark.source .ttl.cols" );
40
+ String ttlColsStr = sparkConf .get ("spark.preserveTTLWriteTime .ttl.cols" );
41
41
if (null != ttlColsStr && ttlColsStr .trim ().length () > 0 ) {
42
42
for (String ttlCol : ttlColsStr .split ("," )) {
43
43
ttlCols .add (Integer .parseInt (ttlCol ));
44
44
}
45
45
}
46
+
47
+ String writeTimestampColsStr = sparkConf .get ("spark.preserveTTLWriteTime.writetime.cols" );
48
+ if (null != writeTimestampColsStr && writeTimestampColsStr .trim ().length () > 0 ) {
49
+ for (String writeTimeStampCol : writeTimestampColsStr .split ("," )) {
50
+ writeTimeStampCols .add (Integer .parseInt (writeTimeStampCol ));
51
+ }
52
+ }
46
53
}
47
54
48
55
writeTimeStampFilter = Boolean
49
56
.parseBoolean (sparkConf .get ("spark.source.writeTimeStampFilter" , "false" ));
50
57
// batchsize set to 1 if there is a writeFilter
51
58
if (writeTimeStampFilter ) {
52
59
batchSize = 1 ;
53
- String writeTimestampColsStr = sparkConf .get ("spark.source.writeTimeStampFilter.cols" );
54
- if (null != writeTimestampColsStr && writeTimestampColsStr .trim ().length () > 0 ) {
55
- for (String writeTimeStampCol : writeTimestampColsStr .split ("," )) {
56
- writeTimeStampCols .add (Integer .parseInt (writeTimeStampCol ));
57
- }
58
- }
59
60
}
60
61
61
62
String minWriteTimeStampFilterStr =
@@ -82,14 +83,27 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
82
83
83
84
String selectCols = sparkConf .get ("spark.query.source" );
84
85
String partionKey = sparkConf .get ("spark.query.source.partitionKey" );
86
+ String sourceSelectCondition = sparkConf .get ("spark.query.condition" , "" );
87
+
88
+ final StringBuilder selectTTLWriteTimeCols = new StringBuilder ();
89
+ if (isPreserveTTLWritetime ) {
90
+ String [] allCols = selectCols .split ("," );
91
+ ttlCols .forEach (col -> {
92
+ selectTTLWriteTimeCols .append (",ttl(" + allCols [col ] + ")" );
93
+ });
94
+ writeTimeStampCols .forEach (col -> {
95
+ selectTTLWriteTimeCols .append (",writetime(" + allCols [col ] + ")" );
96
+ });
97
+ }
98
+ String fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols .toString () + " from " + sourceKeyspaceTable + " where token(" + partionKey .trim ()
99
+ + ") >= ? and token(" + partionKey .trim () + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING" ;
100
+ sourceSelectStatement = sourceSession .prepare (fullSelectQuery );
101
+ logger .info ("PARAM -- Query used: " + fullSelectQuery );
102
+
85
103
selectColTypes = getTypes (sparkConf .get ("spark.query.types" ));
86
104
String idCols = sparkConf .get ("spark.query.destination.id" , "" );
87
105
idColTypes = selectColTypes .subList (0 , idCols .split ("," ).length );
88
- String sourceSelectCondition = sparkConf .get ("spark.query.condition" , "" );
89
- sourceSelectStatement = sourceSession .prepare (
90
- "select " + selectCols + " from " + sourceKeyspaceTable + " where token(" + partionKey .trim ()
91
- + ") >= ? and token(" + partionKey .trim () + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING" );
92
-
106
+
93
107
String insertCols = sparkConf .get ("spark.query.destination" , "" );
94
108
if (null == insertCols || insertCols .trim ().isEmpty ()) {
95
109
insertCols = selectCols ;
@@ -146,15 +160,15 @@ public List<MigrateDataType> getTypes(String types) {
146
160
public int getLargestTTL (Row sourceRow ) {
147
161
int ttl = 0 ;
148
162
for (Integer ttlCol : ttlCols ) {
149
- ttl = Math .max (ttl , sourceRow .getInt (ttlCol ));
163
+ ttl = Math .max (ttl , sourceRow .getInt (selectColTypes . size () + ttlCol - 1 ));
150
164
}
151
165
return ttl ;
152
166
}
153
167
154
168
public long getLargestWriteTimeStamp (Row sourceRow ) {
155
169
long writeTimestamp = 0 ;
156
170
for (Integer writeTimeStampCol : writeTimeStampCols ) {
157
- writeTimestamp = Math .max (writeTimestamp , sourceRow .getLong (writeTimeStampCol ));
171
+ writeTimestamp = Math .max (writeTimestamp , sourceRow .getLong (selectColTypes . size () + ttlCols . size () + writeTimeStampCol - 1 ));
158
172
}
159
173
return writeTimestamp ;
160
174
}
0 commit comments