@@ -20,19 +20,19 @@ public class AbstractJobSession extends BaseJobSession {
20
20
21
21
public Logger logger = LoggerFactory .getLogger (this .getClass ().getName ());
22
22
23
- protected AbstractJobSession (CqlSession sourceSession , CqlSession astraSession , SparkConf sc ) {
24
- this (sourceSession , astraSession , sc , false );
23
+ protected AbstractJobSession (CqlSession originSession , CqlSession targetSession , SparkConf sc ) {
24
+ this (originSession , targetSession , sc , false );
25
25
}
26
26
27
- protected AbstractJobSession (CqlSession sourceSession , CqlSession astraSession , SparkConf sc , boolean isJobMigrateRowsFromFile ) {
27
+ protected AbstractJobSession (CqlSession originSession , CqlSession targetSession , SparkConf sc , boolean isJobMigrateRowsFromFile ) {
28
28
super (sc );
29
29
30
- if (sourceSession == null ) {
30
+ if (originSession == null ) {
31
31
return ;
32
32
}
33
33
34
- this .sourceSession = sourceSession ;
35
- this .astraSession = astraSession ;
34
+ this .originSessionSession = originSession ;
35
+ this .targetSession = targetSession ;
36
36
37
37
batchSize = new Integer (Util .getSparkPropOr (sc , "spark.batchSize" , "5" ));
38
38
fetchSizeInRows = new Integer (Util .getSparkPropOr (sc , "spark.read.fetch.sizeInRows" , "1000" ));
@@ -45,8 +45,8 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
45
45
writeLimiter = RateLimiter .create (new Integer (Util .getSparkPropOr (sc , "spark.writeRateLimit" , "40000" )));
46
46
maxRetries = Integer .parseInt (sc .get ("spark.maxRetries" , "0" ));
47
47
48
- sourceKeyspaceTable = Util .getSparkProp (sc , "spark.origin.keyspaceTable" );
49
- astraKeyspaceTable = Util .getSparkProp (sc , "spark.target.keyspaceTable" );
48
+ originKeyspaceTable = Util .getSparkProp (sc , "spark.origin.keyspaceTable" );
49
+ targetKeyspaceTable = Util .getSparkProp (sc , "spark.target.keyspaceTable" );
50
50
51
51
String ttlColsStr = Util .getSparkPropOrEmpty (sc , "spark.query.ttl.cols" );
52
52
if (null != ttlColsStr && ttlColsStr .trim ().length () > 0 ) {
@@ -91,8 +91,8 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
91
91
logger .info ("PARAM -- Write Batch Size: {}" , batchSize );
92
92
logger .info ("PARAM -- Max Retries: {}" , maxRetries );
93
93
logger .info ("PARAM -- Read Fetch Size: {}" , fetchSizeInRows );
94
- logger .info ("PARAM -- Source Keyspace Table: {}" , sourceKeyspaceTable );
95
- logger .info ("PARAM -- Destination Keyspace Table: {}" , astraKeyspaceTable );
94
+ logger .info ("PARAM -- Source Keyspace Table: {}" , originKeyspaceTable );
95
+ logger .info ("PARAM -- Destination Keyspace Table: {}" , targetKeyspaceTable );
96
96
logger .info ("PARAM -- ReadRateLimit: {}" , readLimiter .getRate ());
97
97
logger .info ("PARAM -- WriteRateLimit: {}" , writeLimiter .getRate ());
98
98
logger .info ("PARAM -- TTLCols: {}" , ttlCols );
@@ -107,9 +107,9 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
107
107
108
108
String selectCols = Util .getSparkProp (sc , "spark.query.origin" );
109
109
String partitionKey = Util .getSparkProp (sc , "spark.query.origin.partitionKey" );
110
- String sourceSelectCondition = Util .getSparkPropOrEmpty (sc , "spark.query.condition" );
111
- if (!sourceSelectCondition .isEmpty () && !sourceSelectCondition .trim ().toUpperCase ().startsWith ("AND" )) {
112
- sourceSelectCondition = " AND " + sourceSelectCondition ;
110
+ String originSelectCondition = Util .getSparkPropOrEmpty (sc , "spark.query.condition" );
111
+ if (!originSelectCondition .isEmpty () && !originSelectCondition .trim ().toUpperCase ().startsWith ("AND" )) {
112
+ originSelectCondition = " AND " + originSelectCondition ;
113
113
}
114
114
115
115
final StringBuilder selectTTLWriteTimeCols = new StringBuilder ();
@@ -139,17 +139,17 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
139
139
140
140
String fullSelectQuery ;
141
141
if (!isJobMigrateRowsFromFile ) {
142
- fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable +
142
+ fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + originKeyspaceTable +
143
143
" where token(" + partitionKey .trim () + ") >= ? and token(" + partitionKey .trim () + ") <= ? " +
144
- sourceSelectCondition + " ALLOW FILTERING" ;
144
+ originSelectCondition + " ALLOW FILTERING" ;
145
145
} else {
146
- fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable + " where " + insertBinds ;
146
+ fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + originKeyspaceTable + " where " + insertBinds ;
147
147
}
148
- sourceSelectStatement = sourceSession .prepare (fullSelectQuery );
148
+ originSelectStatement = originSession .prepare (fullSelectQuery );
149
149
logger .info ("PARAM -- Query used: {}" , fullSelectQuery );
150
150
151
- astraSelectStatement = astraSession .prepare (
152
- "select " + insertCols + " from " + astraKeyspaceTable
151
+ targetSelectStatement = targetSession .prepare (
152
+ "select " + insertCols + " from " + targetKeyspaceTable
153
153
+ " where " + insertBinds );
154
154
155
155
hasRandomPartitioner = Boolean .parseBoolean (Util .getSparkPropOr (sc , "spark.origin.hasRandomPartitioner" , "false" ));
@@ -161,7 +161,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
161
161
}
162
162
163
163
String counterTableUpdate = Util .getSparkProp (sc , "spark.counterTable.cql" );
164
- astraInsertStatement = astraSession .prepare (counterTableUpdate );
164
+ targetInsertStatement = targetSession .prepare (counterTableUpdate );
165
165
} else {
166
166
insertBinds = "" ;
167
167
for (String str : insertCols .split ("," )) {
@@ -172,7 +172,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
172
172
}
173
173
}
174
174
175
- String fullInsertQuery = "insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")" ;
175
+ String fullInsertQuery = "insert into " + targetKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")" ;
176
176
if (!ttlCols .isEmpty ()) {
177
177
fullInsertQuery += " USING TTL ?" ;
178
178
if (!writeTimeStampCols .isEmpty ()) {
@@ -181,7 +181,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
181
181
} else if (!writeTimeStampCols .isEmpty ()) {
182
182
fullInsertQuery += " USING TIMESTAMP ?" ;
183
183
}
184
- astraInsertStatement = astraSession .prepare (fullInsertQuery );
184
+ targetInsertStatement = targetSession .prepare (fullInsertQuery );
185
185
}
186
186
187
187
// Handle rows with blank values for 'timestamp' data-type in primary-key fields
@@ -191,35 +191,35 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
191
191
}
192
192
}
193
193
194
- public BoundStatement bindInsert (PreparedStatement insertStatement , Row sourceRow , Row astraRow ) {
194
+ public BoundStatement bindInsert (PreparedStatement insertStatement , Row originRow , Row targetRow ) {
195
195
BoundStatement boundInsertStatement = insertStatement .bind ().setConsistencyLevel (writeConsistencyLevel );
196
196
197
197
if (isCounterTable ) {
198
198
for (int index = 0 ; index < selectColTypes .size (); index ++) {
199
199
MigrateDataType dataType = selectColTypes .get (updateSelectMapping .get (index ));
200
- // compute the counter delta if reading from astra for the difference
201
- if (astraRow != null && index < (selectColTypes .size () - idColTypes .size ())) {
202
- boundInsertStatement = boundInsertStatement .set (index , (sourceRow .getLong (updateSelectMapping .get (index )) - astraRow .getLong (updateSelectMapping .get (index ))), Long .class );
200
+ // compute the counter delta if reading from target for the difference
201
+ if (targetRow != null && index < (selectColTypes .size () - idColTypes .size ())) {
202
+ boundInsertStatement = boundInsertStatement .set (index , (originRow .getLong (updateSelectMapping .get (index )) - targetRow .getLong (updateSelectMapping .get (index ))), Long .class );
203
203
} else {
204
- boundInsertStatement = boundInsertStatement .set (index , getData (dataType , updateSelectMapping .get (index ), sourceRow ), dataType .typeClass );
204
+ boundInsertStatement = boundInsertStatement .set (index , getData (dataType , updateSelectMapping .get (index ), originRow ), dataType .typeClass );
205
205
}
206
206
}
207
207
} else {
208
208
int index = 0 ;
209
209
for (index = 0 ; index < selectColTypes .size (); index ++) {
210
- boundInsertStatement = getBoundStatement (sourceRow , boundInsertStatement , index , selectColTypes );
210
+ boundInsertStatement = getBoundStatement (originRow , boundInsertStatement , index , selectColTypes );
211
211
if (boundInsertStatement == null ) return null ;
212
212
}
213
213
214
214
if (!ttlCols .isEmpty ()) {
215
- boundInsertStatement = boundInsertStatement .set (index , getLargestTTL (sourceRow ), Integer .class );
215
+ boundInsertStatement = boundInsertStatement .set (index , getLargestTTL (originRow ), Integer .class );
216
216
index ++;
217
217
}
218
218
if (!writeTimeStampCols .isEmpty ()) {
219
219
if (customWritetime > 0 ) {
220
220
boundInsertStatement = boundInsertStatement .set (index , customWritetime , Long .class );
221
221
} else {
222
- boundInsertStatement = boundInsertStatement .set (index , getLargestWriteTimeStamp (sourceRow ), Long .class );
222
+ boundInsertStatement = boundInsertStatement .set (index , getLargestWriteTimeStamp (originRow ), Long .class );
223
223
}
224
224
}
225
225
}
@@ -228,34 +228,34 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
228
228
return boundInsertStatement .setTimeout (Duration .ofSeconds (10 ));
229
229
}
230
230
231
- public int getLargestTTL (Row sourceRow ) {
231
+ public int getLargestTTL (Row row ) {
232
232
return IntStream .range (0 , ttlCols .size ())
233
- .map (i -> sourceRow .getInt (selectColTypes .size () + i )).max ().getAsInt ();
233
+ .map (i -> row .getInt (selectColTypes .size () + i )).max ().getAsInt ();
234
234
}
235
235
236
- public long getLargestWriteTimeStamp (Row sourceRow ) {
236
+ public long getLargestWriteTimeStamp (Row row ) {
237
237
return IntStream .range (0 , writeTimeStampCols .size ())
238
- .mapToLong (i -> sourceRow .getLong (selectColTypes .size () + ttlCols .size () + i )).max ().getAsLong ();
238
+ .mapToLong (i -> row .getLong (selectColTypes .size () + ttlCols .size () + i )).max ().getAsLong ();
239
239
}
240
240
241
- public BoundStatement selectFromAstra (PreparedStatement selectStatement , Row sourceRow ) {
241
+ public BoundStatement selectFromTarget (PreparedStatement selectStatement , Row originRow ) {
242
242
BoundStatement boundSelectStatement = selectStatement .bind ().setConsistencyLevel (readConsistencyLevel );
243
243
for (int index = 0 ; index < idColTypes .size (); index ++) {
244
- boundSelectStatement = getBoundStatement (sourceRow , boundSelectStatement , index , idColTypes );
244
+ boundSelectStatement = getBoundStatement (originRow , boundSelectStatement , index , idColTypes );
245
245
if (boundSelectStatement == null ) return null ;
246
246
}
247
247
248
248
return boundSelectStatement ;
249
249
}
250
250
251
- private BoundStatement getBoundStatement (Row sourceRow , BoundStatement boundSelectStatement , int index ,
251
+ private BoundStatement getBoundStatement (Row row , BoundStatement boundSelectStatement , int index ,
252
252
List <MigrateDataType > cols ) {
253
253
MigrateDataType dataTypeObj = cols .get (index );
254
- Object colData = getData (dataTypeObj , index , sourceRow );
254
+ Object colData = getData (dataTypeObj , index , row );
255
255
256
256
// Handle rows with blank values in primary-key fields
257
257
if (index < idColTypes .size ()) {
258
- Optional <Object > optionalVal = handleBlankInPrimaryKey (index , colData , dataTypeObj .typeClass , sourceRow );
258
+ Optional <Object > optionalVal = handleBlankInPrimaryKey (index , colData , dataTypeObj .typeClass , row );
259
259
if (!optionalVal .isPresent ()) {
260
260
return null ;
261
261
}
@@ -265,16 +265,16 @@ private BoundStatement getBoundStatement(Row sourceRow, BoundStatement boundSele
265
265
return boundSelectStatement ;
266
266
}
267
267
268
- protected Optional <Object > handleBlankInPrimaryKey (int index , Object colData , Class dataType , Row sourceRow ) {
269
- return handleBlankInPrimaryKey (index , colData , dataType , sourceRow , true );
268
+ protected Optional <Object > handleBlankInPrimaryKey (int index , Object colData , Class dataType , Row originRow ) {
269
+ return handleBlankInPrimaryKey (index , colData , dataType , originRow , true );
270
270
}
271
271
272
- protected Optional <Object > handleBlankInPrimaryKey (int index , Object colData , Class dataType , Row sourceRow , boolean logWarn ) {
272
+ protected Optional <Object > handleBlankInPrimaryKey (int index , Object colData , Class dataType , Row originRow , boolean logWarn ) {
273
273
// Handle rows with blank values for 'String' data-type in primary-key fields
274
274
if (index < idColTypes .size () && colData == null && dataType == String .class ) {
275
275
if (logWarn ) {
276
276
logger .warn ("For row with Key: {}, found String primary-key column {} with blank value" ,
277
- getKey (sourceRow ), allCols [index ]);
277
+ getKey (originRow ), allCols [index ]);
278
278
}
279
279
return Optional .of ("" );
280
280
}
@@ -284,12 +284,12 @@ protected Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Cl
284
284
if (tsReplaceValStr .isEmpty ()) {
285
285
logger .error ("Skipping row with Key: {} as Timestamp primary-key column {} has invalid blank value. " +
286
286
"Alternatively rerun the job with --conf spark.target.replace.blankTimestampKeyUsingEpoch=\" <fixed-epoch-value>\" " +
287
- "option to replace the blanks with a fixed timestamp value" , getKey (sourceRow ), allCols [index ]);
287
+ "option to replace the blanks with a fixed timestamp value" , getKey (originRow ), allCols [index ]);
288
288
return Optional .empty ();
289
289
}
290
290
if (logWarn ) {
291
291
logger .warn ("For row with Key: {}, found Timestamp primary-key column {} with invalid blank value. " +
292
- "Using value {} instead" , getKey (sourceRow ), allCols [index ], Instant .ofEpochSecond (tsReplaceVal ));
292
+ "Using value {} instead" , getKey (originRow ), allCols [index ], Instant .ofEpochSecond (tsReplaceVal ));
293
293
}
294
294
return Optional .of (Instant .ofEpochSecond (tsReplaceVal ));
295
295
}
0 commit comments