5
5
import com .datastax .oss .driver .api .core .cql .PreparedStatement ;
6
6
import com .datastax .oss .driver .api .core .cql .Row ;
7
7
import com .datastax .oss .driver .shaded .guava .common .util .concurrent .RateLimiter ;
8
+ import datastax .astra .migrate .properties .KnownProperties ;
8
9
import org .apache .commons .lang .StringUtils ;
9
10
import org .apache .spark .SparkConf ;
10
11
import org .slf4j .Logger ;
@@ -34,54 +35,54 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
34
35
this .originSessionSession = originSession ;
35
36
this .targetSession = targetSession ;
36
37
37
- batchSize = new Integer (Util .getSparkPropOr (sc , "spark.batchSize" , "5" ));
38
- fetchSizeInRows = new Integer (Util .getSparkPropOr (sc , "spark.read.fetch.sizeInRows" , "1000" ));
39
- printStatsAfter = new Integer (Util .getSparkPropOr (sc , "spark.printStatsAfter" , "100000" ));
38
+ batchSize = new Integer (Util .getSparkPropOr (sc , KnownProperties . SPARK_BATCH_SIZE , "5" ));
39
+ fetchSizeInRows = new Integer (Util .getSparkPropOr (sc , KnownProperties . READ_FETCH_SIZE , "1000" ));
40
+ printStatsAfter = new Integer (Util .getSparkPropOr (sc , KnownProperties . SPARK_STATS_AFTER , "100000" ));
40
41
if (printStatsAfter < 1 ) {
41
42
printStatsAfter = 100000 ;
42
43
}
43
44
44
- readLimiter = RateLimiter .create (new Integer (Util .getSparkPropOr (sc , "spark.readRateLimit" , "20000" )));
45
- writeLimiter = RateLimiter .create (new Integer (Util .getSparkPropOr (sc , "spark.writeRateLimit" , "40000" )));
46
- maxRetries = Integer .parseInt (sc .get ("spark.maxRetries" , "0" ));
45
+ readLimiter = RateLimiter .create (new Integer (Util .getSparkPropOr (sc , KnownProperties . SPARK_LIMIT_READ , "20000" )));
46
+ writeLimiter = RateLimiter .create (new Integer (Util .getSparkPropOr (sc , KnownProperties . SPARK_LIMIT_WRITE , "40000" )));
47
+ maxRetries = Integer .parseInt (sc .get (KnownProperties . SPARK_MAX_RETRIES , "0" ));
47
48
48
- originKeyspaceTable = Util .getSparkProp (sc , "spark.origin.keyspaceTable" );
49
- targetKeyspaceTable = Util .getSparkProp (sc , "spark.target.keyspaceTable" );
49
+ originKeyspaceTable = Util .getSparkProp (sc , KnownProperties . ORIGIN_KEYSPACE_TABLE );
50
+ targetKeyspaceTable = Util .getSparkProp (sc , KnownProperties . TARGET_KEYSPACE_TABLE );
50
51
51
- String ttlColsStr = Util .getSparkPropOrEmpty (sc , "spark.query.ttl.cols" );
52
+ String ttlColsStr = Util .getSparkPropOrEmpty (sc , KnownProperties . ORIGIN_TTL_COLS );
52
53
if (null != ttlColsStr && ttlColsStr .trim ().length () > 0 ) {
53
54
for (String ttlCol : ttlColsStr .split ("," )) {
54
55
ttlCols .add (Integer .parseInt (ttlCol ));
55
56
}
56
57
}
57
58
58
- String writeTimestampColsStr = Util .getSparkPropOrEmpty (sc , "spark.query.writetime.cols" );
59
+ String writeTimestampColsStr = Util .getSparkPropOrEmpty (sc , KnownProperties . ORIGIN_WRITETIME_COLS );
59
60
if (null != writeTimestampColsStr && writeTimestampColsStr .trim ().length () > 0 ) {
60
61
for (String writeTimeStampCol : writeTimestampColsStr .split ("," )) {
61
62
writeTimeStampCols .add (Integer .parseInt (writeTimeStampCol ));
62
63
}
63
64
}
64
65
65
66
writeTimeStampFilter = Boolean
66
- .parseBoolean (Util .getSparkPropOr (sc , "spark.origin.writeTimeStampFilter" , "false" ));
67
+ .parseBoolean (Util .getSparkPropOr (sc , KnownProperties . ORIGIN_FILTER_WRITETS_ENABLED , "false" ));
67
68
// batchsize set to 1 if there is a writeFilter
68
69
if (writeTimeStampFilter ) {
69
70
batchSize = 1 ;
70
71
}
71
72
72
73
String minWriteTimeStampFilterStr =
73
- Util .getSparkPropOr (sc , "spark.origin.minWriteTimeStampFilter" , "0" );
74
+ Util .getSparkPropOr (sc , KnownProperties . ORIGIN_FILTER_WRITETS_MIN , "0" );
74
75
if (null != minWriteTimeStampFilterStr && minWriteTimeStampFilterStr .trim ().length () > 1 ) {
75
76
minWriteTimeStampFilter = Long .parseLong (minWriteTimeStampFilterStr );
76
77
}
77
78
String maxWriteTimeStampFilterStr =
78
- Util .getSparkPropOr (sc , "spark.origin.maxWriteTimeStampFilter" , "0" );
79
+ Util .getSparkPropOr (sc , KnownProperties . ORIGIN_FILTER_WRITETS_MAX , "0" );
79
80
if (null != maxWriteTimeStampFilterStr && maxWriteTimeStampFilterStr .trim ().length () > 1 ) {
80
81
maxWriteTimeStampFilter = Long .parseLong (maxWriteTimeStampFilterStr );
81
82
}
82
83
83
84
String customWriteTimeStr =
84
- Util .getSparkPropOr (sc , "spark.target.custom.writeTime" , "0" );
85
+ Util .getSparkPropOr (sc , KnownProperties . TARGET_CUSTOM_WRITETIME , "0" );
85
86
if (null != customWriteTimeStr && customWriteTimeStr .trim ().length () > 1 && StringUtils .isNumeric (customWriteTimeStr .trim ())) {
86
87
customWritetime = Long .parseLong (customWriteTimeStr );
87
88
}
@@ -105,9 +106,9 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
105
106
Instant .ofEpochMilli (maxWriteTimeStampFilter / 1000 ));
106
107
}
107
108
108
- String selectCols = Util .getSparkProp (sc , "spark.query.origin" );
109
- String partitionKey = Util .getSparkProp (sc , "spark.query.origin.partitionKey" );
110
- String originSelectCondition = Util .getSparkPropOrEmpty (sc , "spark.query.condition" );
109
+ String selectCols = Util .getSparkProp (sc , KnownProperties . ORIGIN_COLUMN_NAMES );
110
+ String partitionKey = Util .getSparkProp (sc , KnownProperties . ORIGIN_PARTITION_KEY );
111
+ String originSelectCondition = Util .getSparkPropOrEmpty (sc , KnownProperties . ORIGIN_FILTER_CONDITION );
111
112
if (!originSelectCondition .isEmpty () && !originSelectCondition .trim ().toUpperCase ().startsWith ("AND" )) {
112
113
originSelectCondition = " AND " + originSelectCondition ;
113
114
}
@@ -120,11 +121,11 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
120
121
writeTimeStampCols .forEach (col -> {
121
122
selectTTLWriteTimeCols .append (",writetime(" + allCols [col ] + ")" );
122
123
});
123
- selectColTypes = getTypes (Util .getSparkProp (sc , "spark.query.types" ));
124
- String idCols = Util .getSparkPropOrEmpty (sc , "spark.query.target.id" );
124
+ selectColTypes = getTypes (Util .getSparkProp (sc , KnownProperties . ORIGIN_COLUMN_TYPES ));
125
+ String idCols = Util .getSparkPropOrEmpty (sc , KnownProperties . TARGET_PRIMARY_KEY );
125
126
idColTypes = selectColTypes .subList (0 , idCols .split ("," ).length );
126
127
127
- String insertCols = Util .getSparkPropOrEmpty (sc , "spark.query.target" );
128
+ String insertCols = Util .getSparkPropOrEmpty (sc , KnownProperties . TARGET_COLUMN_NAMES );
128
129
if (null == insertCols || insertCols .trim ().isEmpty ()) {
129
130
insertCols = selectCols ;
130
131
}
@@ -152,15 +153,15 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
152
153
"select " + insertCols + " from " + targetKeyspaceTable
153
154
+ " where " + insertBinds );
154
155
155
- hasRandomPartitioner = Boolean .parseBoolean (Util .getSparkPropOr (sc , "spark.origin.hasRandomPartitioner" , "false" ));
156
- isCounterTable = Boolean .parseBoolean (Util .getSparkPropOr (sc , "spark.counterTable" , "false" ));
156
+ hasRandomPartitioner = Boolean .parseBoolean (Util .getSparkPropOr (sc , KnownProperties . ORIGIN_HAS_RANDOM_PARTITIONER , "false" ));
157
+ isCounterTable = Boolean .parseBoolean (Util .getSparkPropOr (sc , KnownProperties . ORIGIN_IS_COUNTER , "false" ));
157
158
if (isCounterTable ) {
158
- String updateSelectMappingStr = Util .getSparkPropOr (sc , "spark.counterTable.cql.index" , "0" );
159
+ String updateSelectMappingStr = Util .getSparkPropOr (sc , KnownProperties . ORIGIN_COUNTER_INDEXES , "0" );
159
160
for (String updateSelectIndex : updateSelectMappingStr .split ("," )) {
160
161
updateSelectMapping .add (Integer .parseInt (updateSelectIndex ));
161
162
}
162
163
163
- String counterTableUpdate = Util .getSparkProp (sc , "spark.counterTable.cql" );
164
+ String counterTableUpdate = Util .getSparkProp (sc , KnownProperties . ORIGIN_COUNTER_CQL );
164
165
targetInsertStatement = targetSession .prepare (counterTableUpdate );
165
166
} else {
166
167
insertBinds = "" ;
@@ -185,7 +186,7 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
185
186
}
186
187
187
188
// Handle rows with blank values for 'timestamp' data-type in primary-key fields
188
- tsReplaceValStr = Util .getSparkPropOr (sc , "spark.target.replace.blankTimestampKeyUsingEpoch" , "" );
189
+ tsReplaceValStr = Util .getSparkPropOr (sc , KnownProperties . TARGET_REPLACE_MISSING_TS , "" );
189
190
if (!tsReplaceValStr .isEmpty ()) {
190
191
tsReplaceVal = Long .parseLong (tsReplaceValStr );
191
192
}
@@ -283,7 +284,7 @@ protected Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Cl
283
284
if (index < idColTypes .size () && colData == null && dataType == Instant .class ) {
284
285
if (tsReplaceValStr .isEmpty ()) {
285
286
logger .error ("Skipping row with Key: {} as Timestamp primary-key column {} has invalid blank value. " +
286
- "Alternatively rerun the job with --conf spark.target.replace.blankTimestampKeyUsingEpoch= \" <fixed-epoch-value>\" " +
287
+ "Alternatively rerun the job with --conf " + KnownProperties . TARGET_REPLACE_MISSING_TS + " \" <fixed-epoch-value>\" " +
287
288
"option to replace the blanks with a fixed timestamp value" , getKey (originRow ), allCols [index ]);
288
289
return Optional .empty ();
289
290
}
0 commit comments