@@ -26,7 +26,6 @@ public abstract class AbstractJobSession {
26
26
protected final RateLimiter writeLimiter ;
27
27
public Logger logger = LoggerFactory .getLogger (this .getClass ().getName ());
28
28
protected PreparedStatement sourceSelectStatement ;
29
- protected String sourceSelectCondition ;
30
29
protected PreparedStatement astraSelectStatement ;
31
30
protected PreparedStatement astraInsertStatement ;
32
31
protected Integer maxRetries = 10 ;
@@ -73,25 +72,26 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
73
72
74
73
isPreserveTTLWritetime = Boolean .parseBoolean (sparkConf .get ("spark.preserveTTLWriteTime" , "false" ));
75
74
if (isPreserveTTLWritetime ) {
76
- String ttlColsStr = sparkConf .get ("spark.source .ttl.cols" );
75
+ String ttlColsStr = sparkConf .get ("spark.preserveTTLWriteTime .ttl.cols" );
77
76
if (null != ttlColsStr && ttlColsStr .trim ().length () > 0 ) {
78
77
for (String ttlCol : ttlColsStr .split ("," )) {
79
78
ttlCols .add (Integer .parseInt (ttlCol ));
80
79
}
81
80
}
81
+
82
+ String writeTimestampColsStr = sparkConf .get ("spark.preserveTTLWriteTime.writetime.cols" );
83
+ if (null != writeTimestampColsStr && writeTimestampColsStr .trim ().length () > 0 ) {
84
+ for (String writeTimeStampCol : writeTimestampColsStr .split ("," )) {
85
+ writeTimeStampCols .add (Integer .parseInt (writeTimeStampCol ));
86
+ }
87
+ }
82
88
}
83
89
84
90
writeTimeStampFilter = Boolean
85
91
.parseBoolean (sparkConf .get ("spark.source.writeTimeStampFilter" , "false" ));
86
92
// batchsize set to 1 if there is a writeFilter
87
93
if (writeTimeStampFilter ) {
88
94
batchSize = 1 ;
89
- String writeTimestampColsStr = sparkConf .get ("spark.source.writeTimeStampFilter.cols" );
90
- if (null != writeTimestampColsStr && writeTimestampColsStr .trim ().length () > 0 ) {
91
- for (String writeTimeStampCol : writeTimestampColsStr .split ("," )) {
92
- writeTimeStampCols .add (Integer .parseInt (writeTimeStampCol ));
93
- }
94
- }
95
95
}
96
96
97
97
String minWriteTimeStampFilterStr =
@@ -118,13 +118,27 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
118
118
119
119
String selectCols = sparkConf .get ("spark.query.source" );
120
120
String partionKey = sparkConf .get ("spark.query.source.partitionKey" );
121
+ String sourceSelectCondition = sparkConf .get ("spark.query.condition" , "" );
122
+
123
+
124
+ final StringBuilder selectTTLWriteTimeCols = new StringBuilder ();
125
+ if (isPreserveTTLWritetime ) {
126
+ String [] allCols = selectCols .split ("," );
127
+ ttlCols .forEach (col -> {
128
+ selectTTLWriteTimeCols .append (",ttl(" + allCols [col ] + ")" );
129
+ });
130
+ writeTimeStampCols .forEach (col -> {
131
+ selectTTLWriteTimeCols .append (",writetime(" + allCols [col ] + ")" );
132
+ });
133
+ }
134
+ String fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols .toString () + " from " + sourceKeyspaceTable + " where token(" + partionKey .trim ()
135
+ + ") >= ? and token(" + partionKey .trim () + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING" ;
136
+ sourceSelectStatement = sourceSession .prepare (fullSelectQuery );
137
+ logger .info ("PARAM -- Query used: " + fullSelectQuery );
138
+
121
139
selectColTypes = getTypes (sparkConf .get ("spark.query.types" ));
122
140
String idCols = sparkConf .get ("spark.query.destination.id" , "" );
123
141
idColTypes = selectColTypes .subList (0 , idCols .split ("," ).length );
124
- sourceSelectCondition = sparkConf .get ("spark.query.condition" , "" );
125
- sourceSelectStatement = sourceSession .prepare (
126
- "select " + selectCols + " from " + sourceKeyspaceTable + " where token(" + partionKey .trim ()
127
- + ") >= ? and token(" + partionKey .trim () + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING" );
128
142
129
143
String insertCols = sparkConf .get ("spark.query.destination" , "" );
130
144
if (null == insertCols || insertCols .trim ().isEmpty ()) {
@@ -182,15 +196,15 @@ public List<MigrateDataType> getTypes(String types) {
182
196
public int getLargestTTL (Row sourceRow ) {
183
197
int ttl = 0 ;
184
198
for (Integer ttlCol : ttlCols ) {
185
- ttl = Math .max (ttl , sourceRow .getInt (ttlCol ));
199
+ ttl = Math .max (ttl , sourceRow .getInt (selectColTypes . size () + ttlCol - 1 ));
186
200
}
187
201
return ttl ;
188
202
}
189
203
190
204
public long getLargestWriteTimeStamp (Row sourceRow ) {
191
205
long writeTimestamp = 0 ;
192
206
for (Integer writeTimeStampCol : writeTimeStampCols ) {
193
- writeTimestamp = Math .max (writeTimestamp , sourceRow .getLong (writeTimeStampCol ));
207
+ writeTimestamp = Math .max (writeTimestamp , sourceRow .getLong (selectColTypes . size () + ttlCols . size () + writeTimeStampCol - 1 ));
194
208
}
195
209
return writeTimestamp ;
196
210
}
0 commit comments