16
16
17
17
public class AbstractJobSession extends BaseJobSession {
18
18
19
+ public Logger logger = LoggerFactory .getLogger (this .getClass ().getName ());
19
20
20
21
protected AbstractJobSession (CqlSession sourceSession , CqlSession astraSession , SparkConf sparkConf ) {
21
22
this .sourceSession = sourceSession ;
@@ -36,25 +37,26 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
36
37
37
38
isPreserveTTLWritetime = Boolean .parseBoolean (sparkConf .get ("spark.preserveTTLWriteTime" , "false" ));
38
39
if (isPreserveTTLWritetime ) {
39
- String ttlColsStr = sparkConf .get ("spark.source .ttl.cols" );
40
+ String ttlColsStr = sparkConf .get ("spark.preserveTTLWriteTime .ttl.cols" );
40
41
if (null != ttlColsStr && ttlColsStr .trim ().length () > 0 ) {
41
42
for (String ttlCol : ttlColsStr .split ("," )) {
42
43
ttlCols .add (Integer .parseInt (ttlCol ));
43
44
}
44
45
}
46
+
47
+ String writeTimestampColsStr = sparkConf .get ("spark.preserveTTLWriteTime.writetime.cols" );
48
+ if (null != writeTimestampColsStr && writeTimestampColsStr .trim ().length () > 0 ) {
49
+ for (String writeTimeStampCol : writeTimestampColsStr .split ("," )) {
50
+ writeTimeStampCols .add (Integer .parseInt (writeTimeStampCol ));
51
+ }
52
+ }
45
53
}
46
54
47
55
writeTimeStampFilter = Boolean
48
56
.parseBoolean (sparkConf .get ("spark.source.writeTimeStampFilter" , "false" ));
49
57
// batchsize set to 1 if there is a writeFilter
50
58
if (writeTimeStampFilter ) {
51
59
batchSize = 1 ;
52
- String writeTimestampColsStr = sparkConf .get ("spark.source.writeTimeStampFilter.cols" );
53
- if (null != writeTimestampColsStr && writeTimestampColsStr .trim ().length () > 0 ) {
54
- for (String writeTimeStampCol : writeTimestampColsStr .split ("," )) {
55
- writeTimeStampCols .add (Integer .parseInt (writeTimeStampCol ));
56
- }
57
- }
58
60
}
59
61
60
62
String minWriteTimeStampFilterStr =
@@ -74,45 +76,82 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
74
76
customWritetime = Long .parseLong (customWriteTimeStr );
75
77
}
76
78
77
- logger .info (" DEFAULT -- Write Batch Size: " + batchSize );
78
- logger .info (" DEFAULT -- Source Keyspace Table: " + sourceKeyspaceTable );
79
- logger .info (" DEFAULT -- Destination Keyspace Table: " + astraKeyspaceTable );
80
- logger .info (" DEFAULT -- ReadRateLimit: " + readLimiter .getRate ());
81
- logger .info (" DEFAULT -- WriteRateLimit: " + writeLimiter .getRate ());
82
- logger .info (" DEFAULT -- WriteTimestampFilter: " + writeTimeStampFilter );
83
- logger .info (" DEFAULT -- WriteTimestampFilterCols: " + writeTimeStampCols );
84
- logger .info (" DEFAULT -- isPreserveTTLWritetime: " + isPreserveTTLWritetime );
85
- logger .info (" DEFAULT -- TTLCols: " + ttlCols );
79
+ logger .info ("PARAM -- Write Batch Size: " + batchSize );
80
+ logger .info ("PARAM -- Source Keyspace Table: " + sourceKeyspaceTable );
81
+ logger .info ("PARAM -- Destination Keyspace Table: " + astraKeyspaceTable );
82
+ logger .info ("PARAM -- ReadRateLimit: " + readLimiter .getRate ());
83
+ logger .info ("PARAM -- WriteRateLimit: " + writeLimiter .getRate ());
84
+ logger .info ("PARAM -- WriteTimestampFilter: " + writeTimeStampFilter );
85
+ logger .info ("PARAM -- WriteTimestampFilterCols: " + writeTimeStampCols );
86
+ logger .info ("PARAM -- isPreserveTTLWritetime: " + isPreserveTTLWritetime );
87
+ logger .info ("PARAM -- isPreserveTTLWritetime: " + isPreserveTTLWritetime );
88
+ logger .info ("PARAM -- TTLCols: " + ttlCols );
89
+
90
+ String selectCols = sparkConf .get ("spark.query.source" );
91
+ String partionKey = sparkConf .get ("spark.query.source.partitionKey" );
92
+ String sourceSelectCondition = sparkConf .get ("spark.query.condition" , "" );
93
+
94
+ final StringBuilder selectTTLWriteTimeCols = new StringBuilder ();
95
+ if (isPreserveTTLWritetime ) {
96
+ String [] allCols = selectCols .split ("," );
97
+ ttlCols .forEach (col -> {
98
+ selectTTLWriteTimeCols .append (",ttl(" + allCols [col ] + ")" );
99
+ });
100
+ writeTimeStampCols .forEach (col -> {
101
+ selectTTLWriteTimeCols .append (",writetime(" + allCols [col ] + ")" );
102
+ });
103
+ }
104
+ String fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols .toString () + " from " + sourceKeyspaceTable + " where token(" + partionKey .trim ()
105
+ + ") >= ? and token(" + partionKey .trim () + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING" ;
106
+ sourceSelectStatement = sourceSession .prepare (fullSelectQuery );
107
+ logger .info ("PARAM -- Query used: " + fullSelectQuery );
108
+
109
+ selectColTypes = getTypes (sparkConf .get ("spark.query.types" ));
110
+ String idCols = sparkConf .get ("spark.query.destination.id" , "" );
111
+ idColTypes = selectColTypes .subList (0 , idCols .split ("," ).length );
112
+
113
+ String insertCols = sparkConf .get ("spark.query.destination" , "" );
114
+ if (null == insertCols || insertCols .trim ().isEmpty ()) {
115
+ insertCols = selectCols ;
116
+ }
117
+ String insertBinds = "" ;
118
+ for (String str : idCols .split ("," )) {
119
+ if (insertBinds .isEmpty ()) {
120
+ insertBinds = str + "= ?" ;
121
+ } else {
122
+ insertBinds += " and " + str + "= ?" ;
123
+ }
124
+ }
125
+ astraSelectStatement = astraSession .prepare (
126
+ "select " + insertCols + " from " + astraKeyspaceTable
127
+ + " where " + insertBinds );
86
128
87
129
hasRandomPartitioner = Boolean .parseBoolean (sparkConf .get ("spark.source.hasRandomPartitioner" , "false" ));
88
-
89
130
isCounterTable = Boolean .parseBoolean (sparkConf .get ("spark.counterTable" , "false" ));
90
- selectColTypes = getTypes (sparkConf .get ("spark.diff.select.types" ));
91
- String partionKey = sparkConf .get ("spark.query.cols.partitionKey" );
92
- String idCols = sparkConf .get ("spark.query.cols.id" );
93
- idColTypes = getTypes (sparkConf .get ("spark.query.cols.id.types" ));
131
+ if (isCounterTable ) {
132
+ String updateSelectMappingStr = sparkConf .get ("spark.counterTable.cql.index" , "0" );
133
+ for (String updateSelectIndex : updateSelectMappingStr .split ("," )) {
134
+ updateSelectMapping .add (Integer .parseInt (updateSelectIndex ));
135
+ }
94
136
95
- String selectCols = sparkConf .get ("spark.query.cols.select" );
137
+ String counterTableUpdate = sparkConf .get ("spark.counterTable.cql" );
138
+ astraInsertStatement = astraSession .prepare (counterTableUpdate );
139
+ } else {
140
+ insertBinds = "" ;
141
+ for (String str : insertCols .split ("," )) {
142
+ if (insertBinds .isEmpty ()) {
143
+ insertBinds += "?" ;
144
+ } else {
145
+ insertBinds += ", ?" ;
146
+ }
147
+ }
96
148
97
- String idBinds = "" ;
98
- int count = 1 ;
99
- for (String str : idCols .split ("," )) {
100
- if (count > 1 ) {
101
- idBinds = idBinds + " and " + str + "= ?" ;
149
+ if (isPreserveTTLWritetime ) {
150
+ astraInsertStatement = astraSession .prepare ("insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ") using TTL ? and TIMESTAMP ?" );
102
151
} else {
103
- idBinds = str + "= ?" ;
152
+ astraInsertStatement = astraSession . prepare ( "insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")" ) ;
104
153
}
105
- count ++;
106
154
}
107
-
108
- sourceSelectCondition = sparkConf .get ("spark.query.cols.select.condition" , "" );
109
- sourceSelectStatement = sourceSession .prepare (
110
- "select " + selectCols + " from " + sourceKeyspaceTable + " where token(" + partionKey .trim ()
111
- + ") >= ? and token(" + partionKey .trim () + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING" );
112
-
113
- astraSelectStatement = astraSession .prepare (
114
- "select " + selectCols + " from " + astraKeyspaceTable
115
- + " where " + idBinds );
116
155
}
117
156
118
157
public List <MigrateDataType > getTypes (String types ) {
@@ -127,15 +166,15 @@ public List<MigrateDataType> getTypes(String types) {
127
166
public int getLargestTTL (Row sourceRow ) {
128
167
int ttl = 0 ;
129
168
for (Integer ttlCol : ttlCols ) {
130
- ttl = Math .max (ttl , sourceRow .getInt (ttlCol ));
169
+ ttl = Math .max (ttl , sourceRow .getInt (selectColTypes . size () + ttlCol - 1 ));
131
170
}
132
171
return ttl ;
133
172
}
134
173
135
174
public long getLargestWriteTimeStamp (Row sourceRow ) {
136
175
long writeTimestamp = 0 ;
137
176
for (Integer writeTimeStampCol : writeTimeStampCols ) {
138
- writeTimestamp = Math .max (writeTimestamp , sourceRow .getLong (writeTimeStampCol ));
177
+ writeTimestamp = Math .max (writeTimestamp , sourceRow .getLong (selectColTypes . size () + ttlCols . size () + writeTimeStampCol - 1 ));
139
178
}
140
179
return writeTimestamp ;
141
180
}
0 commit comments