6
6
import com .datastax .oss .driver .api .core .cql .Row ;
7
7
import com .datastax .oss .driver .shaded .guava .common .util .concurrent .RateLimiter ;
8
8
import datastax .astra .migrate .properties .KnownProperties ;
9
- import org .apache .commons .lang .StringUtils ;
10
9
import org .apache .spark .SparkConf ;
11
10
import org .slf4j .Logger ;
12
11
import org .slf4j .LoggerFactory ;
@@ -35,57 +34,45 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
35
34
this .originSessionSession = originSession ;
36
35
this .targetSession = targetSession ;
37
36
38
- batchSize = new Integer (Util .getSparkPropOr (sc , KnownProperties .SPARK_BATCH_SIZE , "5" ));
39
- fetchSizeInRows = new Integer (Util .getSparkPropOr (sc , KnownProperties .READ_FETCH_SIZE , "1000" ));
40
- printStatsAfter = new Integer (Util .getSparkPropOr (sc , KnownProperties .SPARK_STATS_AFTER , "100000" ));
41
- if (printStatsAfter < 1 ) {
42
- printStatsAfter = 100000 ;
37
+ batchSize = propertyHelper .getInteger (KnownProperties .SPARK_BATCH_SIZE );
38
+ fetchSizeInRows = propertyHelper .getInteger (KnownProperties .READ_FETCH_SIZE );
39
+ printStatsAfter = propertyHelper .getInteger (KnownProperties .SPARK_STATS_AFTER );
40
+ if (!propertyHelper .meetsMinimum (KnownProperties .SPARK_STATS_AFTER , printStatsAfter , 1 )) {
41
+ logger .warn (KnownProperties .SPARK_STATS_AFTER +" must be greater than 0. Setting to default value of " + KnownProperties .getDefaultAsString (KnownProperties .SPARK_STATS_AFTER ));
42
+ propertyHelper .setProperty (KnownProperties .SPARK_STATS_AFTER , KnownProperties .getDefault (KnownProperties .SPARK_STATS_AFTER ));
43
+ printStatsAfter = propertyHelper .getInteger (KnownProperties .SPARK_STATS_AFTER );
43
44
}
44
45
45
- readLimiter = RateLimiter .create (new Integer ( Util . getSparkPropOr ( sc , KnownProperties .SPARK_LIMIT_READ , "20000" ) ));
46
- writeLimiter = RateLimiter .create (new Integer ( Util . getSparkPropOr ( sc , KnownProperties .SPARK_LIMIT_WRITE , "40000" ) ));
47
- maxRetries = Integer . parseInt ( sc . get ( KnownProperties .SPARK_MAX_RETRIES , "0" ) );
46
+ readLimiter = RateLimiter .create (propertyHelper . getInteger ( KnownProperties .SPARK_LIMIT_READ ));
47
+ writeLimiter = RateLimiter .create (propertyHelper . getInteger ( KnownProperties .SPARK_LIMIT_WRITE ));
48
+ maxRetries = propertyHelper . getInteger ( KnownProperties .SPARK_MAX_RETRIES );
48
49
49
- originKeyspaceTable = Util . getSparkProp ( sc , KnownProperties .ORIGIN_KEYSPACE_TABLE );
50
- targetKeyspaceTable = Util . getSparkProp ( sc , KnownProperties .TARGET_KEYSPACE_TABLE );
50
+ originKeyspaceTable = propertyHelper . getString ( KnownProperties .ORIGIN_KEYSPACE_TABLE );
51
+ targetKeyspaceTable = propertyHelper . getString ( KnownProperties .TARGET_KEYSPACE_TABLE );
51
52
52
- String ttlColsStr = Util . getSparkPropOrEmpty ( sc , KnownProperties .ORIGIN_TTL_COLS );
53
+ String ttlColsStr = propertyHelper . getAsString ( KnownProperties .ORIGIN_TTL_COLS );
53
54
if (null != ttlColsStr && ttlColsStr .trim ().length () > 0 ) {
54
55
for (String ttlCol : ttlColsStr .split ("," )) {
55
56
ttlCols .add (Integer .parseInt (ttlCol ));
56
57
}
57
58
}
58
59
59
- String writeTimestampColsStr = Util . getSparkPropOrEmpty ( sc , KnownProperties .ORIGIN_WRITETIME_COLS );
60
+ String writeTimestampColsStr = propertyHelper . getAsString ( KnownProperties .ORIGIN_WRITETIME_COLS );
60
61
if (null != writeTimestampColsStr && writeTimestampColsStr .trim ().length () > 0 ) {
61
62
for (String writeTimeStampCol : writeTimestampColsStr .split ("," )) {
62
63
writeTimeStampCols .add (Integer .parseInt (writeTimeStampCol ));
63
64
}
64
65
}
65
66
66
- writeTimeStampFilter = Boolean
67
- .parseBoolean (Util .getSparkPropOr (sc , KnownProperties .ORIGIN_FILTER_WRITETS_ENABLED , "false" ));
67
+ writeTimeStampFilter = propertyHelper .getBoolean (KnownProperties .ORIGIN_FILTER_WRITETS_ENABLED );
68
68
// batchsize set to 1 if there is a writeFilter
69
69
if (writeTimeStampFilter ) {
70
70
batchSize = 1 ;
71
71
}
72
72
73
- String minWriteTimeStampFilterStr =
74
- Util .getSparkPropOr (sc , KnownProperties .ORIGIN_FILTER_WRITETS_MIN , "0" );
75
- if (null != minWriteTimeStampFilterStr && minWriteTimeStampFilterStr .trim ().length () > 1 ) {
76
- minWriteTimeStampFilter = Long .parseLong (minWriteTimeStampFilterStr );
77
- }
78
- String maxWriteTimeStampFilterStr =
79
- Util .getSparkPropOr (sc , KnownProperties .ORIGIN_FILTER_WRITETS_MAX , "0" );
80
- if (null != maxWriteTimeStampFilterStr && maxWriteTimeStampFilterStr .trim ().length () > 1 ) {
81
- maxWriteTimeStampFilter = Long .parseLong (maxWriteTimeStampFilterStr );
82
- }
83
-
84
- String customWriteTimeStr =
85
- Util .getSparkPropOr (sc , KnownProperties .TARGET_CUSTOM_WRITETIME , "0" );
86
- if (null != customWriteTimeStr && customWriteTimeStr .trim ().length () > 1 && StringUtils .isNumeric (customWriteTimeStr .trim ())) {
87
- customWritetime = Long .parseLong (customWriteTimeStr );
88
- }
73
+ minWriteTimeStampFilter = propertyHelper .getLong (KnownProperties .ORIGIN_FILTER_WRITETS_MIN );
74
+ maxWriteTimeStampFilter = propertyHelper .getLong (KnownProperties .ORIGIN_FILTER_WRITETS_MAX );
75
+ customWritetime = propertyHelper .getLong (KnownProperties .TARGET_CUSTOM_WRITETIME );
89
76
90
77
logger .info ("PARAM -- Read Consistency: {}" , readConsistencyLevel );
91
78
logger .info ("PARAM -- Write Consistency: {}" , writeConsistencyLevel );
@@ -106,9 +93,9 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
106
93
Instant .ofEpochMilli (maxWriteTimeStampFilter / 1000 ));
107
94
}
108
95
109
- String selectCols = Util . getSparkProp ( sc , KnownProperties .ORIGIN_COLUMN_NAMES );
110
- String partitionKey = Util . getSparkProp ( sc , KnownProperties .ORIGIN_PARTITION_KEY );
111
- String originSelectCondition = Util . getSparkPropOrEmpty ( sc , KnownProperties .ORIGIN_FILTER_CONDITION );
96
+ String selectCols = propertyHelper . getAsString ( KnownProperties .ORIGIN_COLUMN_NAMES );
97
+ String partitionKey = propertyHelper . getAsString ( KnownProperties .ORIGIN_PARTITION_KEY );
98
+ String originSelectCondition = propertyHelper . getAsString ( KnownProperties .ORIGIN_FILTER_CONDITION );
112
99
if (!originSelectCondition .isEmpty () && !originSelectCondition .trim ().toUpperCase ().startsWith ("AND" )) {
113
100
originSelectCondition = " AND " + originSelectCondition ;
114
101
}
@@ -121,11 +108,11 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
121
108
writeTimeStampCols .forEach (col -> {
122
109
selectTTLWriteTimeCols .append (",writetime(" + allCols [col ] + ")" );
123
110
});
124
- selectColTypes = getTypes (Util . getSparkProp ( sc , KnownProperties .ORIGIN_COLUMN_TYPES ));
125
- String idCols = Util . getSparkPropOrEmpty ( sc , KnownProperties .TARGET_PRIMARY_KEY );
111
+ selectColTypes = getTypes (propertyHelper . getAsString ( KnownProperties .ORIGIN_COLUMN_TYPES ));
112
+ String idCols = propertyHelper . getAsString ( KnownProperties .TARGET_PRIMARY_KEY );
126
113
idColTypes = selectColTypes .subList (0 , idCols .split ("," ).length );
127
114
128
- String insertCols = Util . getSparkPropOrEmpty ( sc , KnownProperties .TARGET_COLUMN_NAMES );
115
+ String insertCols = propertyHelper . getAsString ( KnownProperties .TARGET_COLUMN_NAMES );
129
116
if (null == insertCols || insertCols .trim ().isEmpty ()) {
130
117
insertCols = selectCols ;
131
118
}
@@ -153,15 +140,15 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
153
140
"select " + insertCols + " from " + targetKeyspaceTable
154
141
+ " where " + insertBinds );
155
142
156
- hasRandomPartitioner = Boolean . parseBoolean ( Util . getSparkPropOr ( sc , KnownProperties .ORIGIN_HAS_RANDOM_PARTITIONER , "false" ) );
157
- isCounterTable = Boolean . parseBoolean ( Util . getSparkPropOr ( sc , KnownProperties .ORIGIN_IS_COUNTER , "false" ) );
143
+ hasRandomPartitioner = propertyHelper . getBoolean ( KnownProperties .ORIGIN_HAS_RANDOM_PARTITIONER );
144
+ isCounterTable = propertyHelper . getBoolean ( KnownProperties .ORIGIN_IS_COUNTER );
158
145
if (isCounterTable ) {
159
- String updateSelectMappingStr = Util . getSparkPropOr ( sc , KnownProperties .ORIGIN_COUNTER_INDEXES , "0" );
146
+ String updateSelectMappingStr = propertyHelper . getString ( KnownProperties .ORIGIN_COUNTER_INDEXES );
160
147
for (String updateSelectIndex : updateSelectMappingStr .split ("," )) {
161
148
updateSelectMapping .add (Integer .parseInt (updateSelectIndex ));
162
149
}
163
150
164
- String counterTableUpdate = Util . getSparkProp ( sc , KnownProperties .ORIGIN_COUNTER_CQL );
151
+ String counterTableUpdate = propertyHelper . getString ( KnownProperties .ORIGIN_COUNTER_CQL );
165
152
targetInsertStatement = targetSession .prepare (counterTableUpdate );
166
153
} else {
167
154
insertBinds = "" ;
@@ -186,7 +173,7 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
186
173
}
187
174
188
175
// Handle rows with blank values for 'timestamp' data-type in primary-key fields
189
- tsReplaceValStr = Util . getSparkPropOr ( sc , KnownProperties .TARGET_REPLACE_MISSING_TS , "" );
176
+ tsReplaceValStr = propertyHelper . getAsString ( KnownProperties .TARGET_REPLACE_MISSING_TS );
190
177
if (!tsReplaceValStr .isEmpty ()) {
191
178
tsReplaceVal = Long .parseLong (tsReplaceValStr );
192
179
}
0 commit comments