10
10
import org .slf4j .Logger ;
11
11
import org .slf4j .LoggerFactory ;
12
12
13
- import java .time .Duration ;
14
13
import java .time .Instant ;
15
- import java .util .List ;
16
14
import java .util .Optional ;
17
- import java .util .stream .IntStream ;
18
15
19
16
public class AbstractJobSession extends BaseJobSession {
20
17
21
18
public Logger logger = LoggerFactory .getLogger (this .getClass ().getName ());
19
+ protected CqlHelper cqlHelper ;
22
20
23
21
protected AbstractJobSession (CqlSession sourceSession , CqlSession astraSession , SparkConf sc ) {
24
22
this (sourceSession , astraSession , sc , false );
@@ -33,6 +31,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
33
31
34
32
this .sourceSession = sourceSession ;
35
33
this .astraSession = astraSession ;
34
+ this .cqlHelper = new CqlHelper (this .propertyHelper , this .sourceSession , this .astraSession , isJobMigrateRowsFromFile , this );
36
35
37
36
batchSize = propertyHelper .getInteger (KnownProperties .SPARK_BATCH_SIZE );
38
37
fetchSizeInRows = propertyHelper .getInteger (KnownProperties .READ_FETCH_SIZE );
@@ -47,9 +46,6 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
47
46
writeLimiter = RateLimiter .create (propertyHelper .getInteger (KnownProperties .SPARK_LIMIT_WRITE ));
48
47
maxRetries = propertyHelper .getInteger (KnownProperties .SPARK_MAX_RETRIES );
49
48
50
- sourceKeyspaceTable = propertyHelper .getString (KnownProperties .ORIGIN_KEYSPACE_TABLE );
51
- targetKeyspaceTable = propertyHelper .getString (KnownProperties .TARGET_KEYSPACE_TABLE );
52
-
53
49
ttlCols = propertyHelper .getIntegerList (KnownProperties .ORIGIN_TTL_COLS );
54
50
writeTimeStampCols = propertyHelper .getIntegerList (KnownProperties .ORIGIN_WRITETIME_COLS );
55
51
@@ -62,6 +58,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
62
58
maxWriteTimeStampFilter = propertyHelper .getLong (KnownProperties .ORIGIN_FILTER_WRITETS_MAX );
63
59
64
60
customWritetime = propertyHelper .getLong (KnownProperties .TARGET_CUSTOM_WRITETIME );
61
+ isCounterTable = propertyHelper .getBoolean (KnownProperties .ORIGIN_IS_COUNTER );
65
62
66
63
logger .info ("PARAM -- Read Consistency: {}" , readConsistencyLevel );
67
64
logger .info ("PARAM -- Write Consistency: {}" , writeConsistencyLevel );
@@ -82,197 +79,30 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
82
79
Instant .ofEpochMilli (maxWriteTimeStampFilter / 1000 ));
83
80
}
84
81
85
- String selectCols = String .join ("," , propertyHelper .getStringList (KnownProperties .ORIGIN_COLUMN_NAMES ));
86
- String partitionKey = String .join ("," , propertyHelper .getStringList (KnownProperties .ORIGIN_PARTITION_KEY ));
87
- String sourceSelectCondition = propertyHelper .getString (KnownProperties .ORIGIN_FILTER_CONDITION );
88
- if (null != sourceSelectCondition && !sourceSelectCondition .isEmpty () && !sourceSelectCondition .trim ().toUpperCase ().startsWith ("AND" )) {
89
- sourceSelectCondition = " AND " + sourceSelectCondition ;
90
- propertyHelper .setProperty (KnownProperties .ORIGIN_FILTER_CONDITION , sourceSelectCondition );
91
- }
92
82
93
- final StringBuilder selectTTLWriteTimeCols = new StringBuilder ();
94
- allCols = propertyHelper .getStringList (KnownProperties .ORIGIN_COLUMN_NAMES );
95
- if (null != ttlCols ) {
96
- ttlCols .forEach (col -> {
97
- selectTTLWriteTimeCols .append (",ttl(" + allCols .get (col ) + ")" );
98
- });
99
- }
100
- if (null != writeTimeStampCols ) {
101
- writeTimeStampCols .forEach (col -> {
102
- selectTTLWriteTimeCols .append (",writetime(" + allCols .get (col ) + ")" );
103
- });
104
- }
105
- selectColTypes = propertyHelper .getMigrationTypeList (KnownProperties .ORIGIN_COLUMN_TYPES );
106
- String idCols = String .join ("," , propertyHelper .getStringList (KnownProperties .TARGET_PRIMARY_KEY ));
107
- idColTypes = selectColTypes .subList (0 , idCols .split ("," ).length );
108
83
109
- String insertCols = String .join ("," , propertyHelper .getStringList (KnownProperties .TARGET_COLUMN_NAMES ));
110
- if (null == insertCols || insertCols .trim ().isEmpty ()) {
111
- insertCols = selectCols ;
112
- propertyHelper .setProperty (KnownProperties .TARGET_COLUMN_NAMES , propertyHelper .getStringList (KnownProperties .ORIGIN_COLUMN_NAMES ));
113
- }
114
- String insertBinds = "" ;
115
- for (String str : idCols .split ("," )) {
116
- if (insertBinds .isEmpty ()) {
117
- insertBinds = str + "= ?" ;
118
- } else {
119
- insertBinds += " and " + str + "= ?" ;
120
- }
121
- }
122
-
123
- String fullSelectQuery ;
124
- if (!isJobMigrateRowsFromFile ) {
125
- fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable +
126
- " where token(" + partitionKey .trim () + ") >= ? and token(" + partitionKey .trim () + ") <= ? " +
127
- sourceSelectCondition + " ALLOW FILTERING" ;
128
- } else {
129
- fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable + " where " + insertBinds ;
130
- }
84
+ cqlHelper .initialize ();
85
+ String fullSelectQuery = cqlHelper .getCqlString (CqlHelper .CqlStatementType .ORIGIN_SELECT );
131
86
logger .info ("PARAM -- ORIGIN SELECT Query used: {}" , fullSelectQuery );
132
87
sourceSelectStatement = sourceSession .prepare (fullSelectQuery );
133
88
134
- astraSelectStatement = astraSession .prepare (
135
- "select " + insertCols + " from " + targetKeyspaceTable
136
- + " where " + insertBinds );
89
+ astraSelectStatement = astraSession .prepare (cqlHelper .getCqlString (CqlHelper .CqlStatementType .TARGET_SELECT_BY_PK ));
137
90
138
91
hasRandomPartitioner = propertyHelper .getBoolean (KnownProperties .ORIGIN_HAS_RANDOM_PARTITIONER );
139
- isCounterTable = propertyHelper .getBoolean (KnownProperties .ORIGIN_IS_COUNTER );
140
- if (isCounterTable ) {
141
- updateSelectMapping = propertyHelper .getIntegerList (KnownProperties .ORIGIN_COUNTER_INDEX );
142
- logger .info ("PARAM -- TARGET INSERT Query used: {}" , KnownProperties .ORIGIN_COUNTER_CQL );
143
- astraInsertStatement = astraSession .prepare (propertyHelper .getString (KnownProperties .ORIGIN_COUNTER_CQL ));
144
- } else {
145
- insertBinds = "" ;
146
- for (String str : insertCols .split ("," )) {
147
- if (insertBinds .isEmpty ()) {
148
- insertBinds += "?" ;
149
- } else {
150
- insertBinds += ", ?" ;
151
- }
152
- }
153
92
154
- String fullInsertQuery = "insert into " + targetKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")" ;
155
- if (null != ttlCols && !ttlCols .isEmpty ()) {
156
- fullInsertQuery += " USING TTL ?" ;
157
- if (null != writeTimeStampCols && !writeTimeStampCols .isEmpty ()) {
158
- fullInsertQuery += " AND TIMESTAMP ?" ;
159
- }
160
- } else if (null != writeTimeStampCols && !writeTimeStampCols .isEmpty ()) {
161
- fullInsertQuery += " USING TIMESTAMP ?" ;
162
- }
163
- logger .info ("PARAM -- TARGET INSERT Query used: {}" , fullInsertQuery );
164
- astraInsertStatement = astraSession .prepare (fullInsertQuery );
165
- }
93
+ astraInsertStatement = astraSession .prepare (cqlHelper .getCqlString (CqlHelper .CqlStatementType .TARGET_INSERT ));
166
94
167
95
// Handle rows with blank values for 'timestamp' data-type in primary-key fields
168
96
if (null != propertyHelper .getLong (KnownProperties .TARGET_REPLACE_MISSING_TS ))
169
97
tsReplaceVal = propertyHelper .getLong (KnownProperties .TARGET_REPLACE_MISSING_TS );
170
98
}
171
99
172
100
public BoundStatement bindInsert (PreparedStatement insertStatement , Row sourceRow , Row astraRow ) {
173
- BoundStatement boundInsertStatement = insertStatement .bind ().setConsistencyLevel (writeConsistencyLevel );
174
-
175
- if (isCounterTable ) {
176
- for (int index = 0 ; index < selectColTypes .size (); index ++) {
177
- MigrateDataType dataType = selectColTypes .get (updateSelectMapping .get (index ));
178
- // compute the counter delta if reading from astra for the difference
179
- if (astraRow != null && index < (selectColTypes .size () - idColTypes .size ())) {
180
- boundInsertStatement = boundInsertStatement .set (index , (sourceRow .getLong (updateSelectMapping .get (index )) - astraRow .getLong (updateSelectMapping .get (index ))), Long .class );
181
- } else {
182
- boundInsertStatement = boundInsertStatement .set (index , getData (dataType , updateSelectMapping .get (index ), sourceRow ), dataType .typeClass );
183
- }
184
- }
185
- } else {
186
- int index = 0 ;
187
- for (index = 0 ; index < selectColTypes .size (); index ++) {
188
- boundInsertStatement = getBoundStatement (sourceRow , boundInsertStatement , index , selectColTypes );
189
- if (boundInsertStatement == null ) return null ;
190
- }
191
-
192
- if (null != ttlCols && !ttlCols .isEmpty ()) {
193
- boundInsertStatement = boundInsertStatement .set (index , getLargestTTL (sourceRow ), Integer .class );
194
- index ++;
195
- }
196
- if (null != writeTimeStampCols && !writeTimeStampCols .isEmpty ()) {
197
- if (customWritetime > 0 ) {
198
- boundInsertStatement = boundInsertStatement .set (index , customWritetime , Long .class );
199
- } else {
200
- boundInsertStatement = boundInsertStatement .set (index , getLargestWriteTimeStamp (sourceRow ), Long .class );
201
- }
202
- }
203
- }
204
-
205
- // Batch insert for large records may take longer, hence 10 secs to avoid timeout errors
206
- return boundInsertStatement .setTimeout (Duration .ofSeconds (10 ));
207
- }
208
-
209
- public int getLargestTTL (Row sourceRow ) {
210
- return IntStream .range (0 , ttlCols .size ())
211
- .map (i -> sourceRow .getInt (selectColTypes .size () + i )).max ().getAsInt ();
212
- }
213
-
214
- public long getLargestWriteTimeStamp (Row sourceRow ) {
215
- return IntStream .range (0 , writeTimeStampCols .size ())
216
- .mapToLong (i -> sourceRow .getLong (selectColTypes .size () + ttlCols .size () + i )).max ().getAsLong ();
101
+ return cqlHelper .bindInsert (insertStatement , sourceRow , astraRow );
217
102
}
218
103
219
104
public BoundStatement selectFromAstra (PreparedStatement selectStatement , Row sourceRow ) {
220
- BoundStatement boundSelectStatement = selectStatement .bind ().setConsistencyLevel (readConsistencyLevel );
221
- for (int index = 0 ; index < idColTypes .size (); index ++) {
222
- boundSelectStatement = getBoundStatement (sourceRow , boundSelectStatement , index , idColTypes );
223
- if (boundSelectStatement == null ) return null ;
224
- }
225
-
226
- return boundSelectStatement ;
227
- }
228
-
229
- private BoundStatement getBoundStatement (Row sourceRow , BoundStatement boundSelectStatement , int index ,
230
- List <MigrateDataType > cols ) {
231
- MigrateDataType dataTypeObj = cols .get (index );
232
- Object colData = getData (dataTypeObj , index , sourceRow );
233
-
234
- // Handle rows with blank values in primary-key fields
235
- if (index < idColTypes .size ()) {
236
- Optional <Object > optionalVal = handleBlankInPrimaryKey (index , colData , dataTypeObj .typeClass , sourceRow );
237
- if (!optionalVal .isPresent ()) {
238
- return null ;
239
- }
240
- colData = optionalVal .get ();
241
- }
242
- boundSelectStatement = boundSelectStatement .set (index , colData , dataTypeObj .typeClass );
243
- return boundSelectStatement ;
244
- }
245
-
246
- protected Optional <Object > handleBlankInPrimaryKey (int index , Object colData , Class dataType , Row sourceRow ) {
247
- return handleBlankInPrimaryKey (index , colData , dataType , sourceRow , true );
248
- }
249
-
250
- protected Optional <Object > handleBlankInPrimaryKey (int index , Object colData , Class dataType , Row sourceRow , boolean logWarn ) {
251
- // Handle rows with blank values for 'String' data-type in primary-key fields
252
- if (index < idColTypes .size () && colData == null && dataType == String .class ) {
253
- if (logWarn ) {
254
- logger .warn ("For row with Key: {}, found String primary-key column {} with blank value" ,
255
- getKey (sourceRow ), allCols .get (index ));
256
- }
257
- return Optional .of ("" );
258
- }
259
-
260
- // Handle rows with blank values for 'timestamp' data-type in primary-key fields
261
- if (index < idColTypes .size () && colData == null && dataType == Instant .class ) {
262
- if (tsReplaceValStr .isEmpty ()) {
263
- logger .error ("Skipping row with Key: {} as Timestamp primary-key column {} has invalid blank value. " +
264
- "Alternatively rerun the job with --conf spark.target.replace.blankTimestampKeyUsingEpoch=\" <fixed-epoch-value>\" " +
265
- "option to replace the blanks with a fixed timestamp value" , getKey (sourceRow ), allCols .get (index ));
266
- return Optional .empty ();
267
- }
268
- if (logWarn ) {
269
- logger .warn ("For row with Key: {}, found Timestamp primary-key column {} with invalid blank value. " +
270
- "Using value {} instead" , getKey (sourceRow ), allCols .get (index ), Instant .ofEpochSecond (tsReplaceVal ));
271
- }
272
- return Optional .of (Instant .ofEpochSecond (tsReplaceVal ));
273
- }
274
-
275
- return Optional .of (colData );
105
+ return cqlHelper .selectFromTargetByPK (selectStatement , sourceRow );
276
106
}
277
107
278
108
}
0 commit comments