@@ -20,57 +20,57 @@ public class AbstractJobSession extends BaseJobSession {
20
20
21
21
public Logger logger = LoggerFactory .getLogger (this .getClass ().getName ());
22
22
23
- protected AbstractJobSession (CqlSession sourceSession , CqlSession astraSession , SparkConf sparkConf ) {
23
+ protected AbstractJobSession (CqlSession sourceSession , CqlSession astraSession , SparkConf sc ) {
24
24
this .sourceSession = sourceSession ;
25
25
this .astraSession = astraSession ;
26
26
27
- batchSize = new Integer (sparkConf . get ( "spark.batchSize" , "1" ));
28
- printStatsAfter = new Integer (sparkConf . get ( "spark.printStatsAfter" , "100000" ));
27
+ batchSize = new Integer (Util . getSparkPropOr ( sc , "spark.batchSize" , "1" ));
28
+ printStatsAfter = new Integer (Util . getSparkPropOr ( sc , "spark.printStatsAfter" , "100000" ));
29
29
if (printStatsAfter < 1 ) {
30
30
printStatsAfter = 100000 ;
31
31
}
32
32
33
- readLimiter = RateLimiter .create (new Integer (sparkConf . get ( "spark.readRateLimit" , "20000" )));
34
- writeLimiter = RateLimiter .create (new Integer (sparkConf . get ( "spark.writeRateLimit" , "40000" )));
35
- maxRetries = Integer .parseInt (sparkConf .get ("spark.maxRetries" , "10" ));
33
+ readLimiter = RateLimiter .create (new Integer (Util . getSparkPropOr ( sc , "spark.readRateLimit" , "20000" )));
34
+ writeLimiter = RateLimiter .create (new Integer (Util . getSparkPropOr ( sc , "spark.writeRateLimit" , "40000" )));
35
+ maxRetries = Integer .parseInt (sc .get ("spark.maxRetries" , "10" ));
36
36
37
- sourceKeyspaceTable = sparkConf . get ( "spark.source .keyspaceTable" );
38
- astraKeyspaceTable = sparkConf . get ( "spark.destination .keyspaceTable" );
37
+ sourceKeyspaceTable = Util . getSparkProp ( sc , "spark.origin .keyspaceTable" );
38
+ astraKeyspaceTable = Util . getSparkProp ( sc , "spark.target .keyspaceTable" );
39
39
40
- String ttlColsStr = sparkConf . get ( "spark.query.ttl.cols" , " " );
40
+ String ttlColsStr = Util . getSparkPropOrEmpty ( sc , "spark.query.ttl.cols" );
41
41
if (null != ttlColsStr && ttlColsStr .trim ().length () > 0 ) {
42
42
for (String ttlCol : ttlColsStr .split ("," )) {
43
43
ttlCols .add (Integer .parseInt (ttlCol ));
44
44
}
45
45
}
46
46
47
- String writeTimestampColsStr = sparkConf . get ( "spark.query.writetime.cols" , " " );
47
+ String writeTimestampColsStr = Util . getSparkPropOrEmpty ( sc , "spark.query.writetime.cols" );
48
48
if (null != writeTimestampColsStr && writeTimestampColsStr .trim ().length () > 0 ) {
49
49
for (String writeTimeStampCol : writeTimestampColsStr .split ("," )) {
50
50
writeTimeStampCols .add (Integer .parseInt (writeTimeStampCol ));
51
51
}
52
52
}
53
53
54
54
writeTimeStampFilter = Boolean
55
- .parseBoolean (sparkConf . get ( "spark.source .writeTimeStampFilter" , "false" ));
55
+ .parseBoolean (Util . getSparkPropOr ( sc , "spark.origin .writeTimeStampFilter" , "false" ));
56
56
// batchsize set to 1 if there is a writeFilter
57
57
if (writeTimeStampFilter ) {
58
58
batchSize = 1 ;
59
59
}
60
60
61
61
String minWriteTimeStampFilterStr =
62
- sparkConf . get ( "spark.source .minWriteTimeStampFilter" , "0" );
62
+ Util . getSparkPropOr ( sc , "spark.origin .minWriteTimeStampFilter" , "0" );
63
63
if (null != minWriteTimeStampFilterStr && minWriteTimeStampFilterStr .trim ().length () > 1 ) {
64
64
minWriteTimeStampFilter = Long .parseLong (minWriteTimeStampFilterStr );
65
65
}
66
66
String maxWriteTimeStampFilterStr =
67
- sparkConf . get ( "spark.source .maxWriteTimeStampFilter" , "0" );
67
+ Util . getSparkPropOr ( sc , "spark.origin .maxWriteTimeStampFilter" , "0" );
68
68
if (null != maxWriteTimeStampFilterStr && maxWriteTimeStampFilterStr .trim ().length () > 1 ) {
69
69
maxWriteTimeStampFilter = Long .parseLong (maxWriteTimeStampFilterStr );
70
70
}
71
71
72
72
String customWriteTimeStr =
73
- sparkConf . get ( "spark.destination .custom.writeTime" , "0" );
73
+ Util . getSparkPropOr ( sc , "spark.target .custom.writeTime" , "0" );
74
74
if (null != customWriteTimeStr && customWriteTimeStr .trim ().length () > 1 && StringUtils .isNumeric (customWriteTimeStr .trim ())) {
75
75
customWritetime = Long .parseLong (customWriteTimeStr );
76
76
}
@@ -84,9 +84,9 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
84
84
logger .info ("PARAM -- WriteTimestampFilterCols: " + writeTimeStampCols );
85
85
logger .info ("PARAM -- WriteTimestampFilter: " + writeTimeStampFilter );
86
86
87
- String selectCols = sparkConf . get ( "spark.query.source " );
88
- String partionKey = sparkConf . get ( "spark.query.source .partitionKey" );
89
- String sourceSelectCondition = sparkConf . get ( "spark.query.condition" , " " );
87
+ String selectCols = Util . getSparkProp ( sc , "spark.query.origin " );
88
+ String partionKey = Util . getSparkProp ( sc , "spark.query.origin .partitionKey" );
89
+ String sourceSelectCondition = Util . getSparkPropOrEmpty ( sc , "spark.query.condition" );
90
90
91
91
final StringBuilder selectTTLWriteTimeCols = new StringBuilder ();
92
92
String [] allCols = selectCols .split ("," );
@@ -96,16 +96,16 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
96
96
writeTimeStampCols .forEach (col -> {
97
97
selectTTLWriteTimeCols .append (",writetime(" + allCols [col ] + ")" );
98
98
});
99
- String fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols . toString () + " from " + sourceKeyspaceTable + " where token(" + partionKey .trim ()
99
+ String fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable + " where token(" + partionKey .trim ()
100
100
+ ") >= ? and token(" + partionKey .trim () + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING" ;
101
101
sourceSelectStatement = sourceSession .prepare (fullSelectQuery );
102
102
logger .info ("PARAM -- Query used: " + fullSelectQuery );
103
103
104
- selectColTypes = getTypes (sparkConf . get ( "spark.query.types" ));
105
- String idCols = sparkConf . get ( "spark.query.destination .id" , " " );
104
+ selectColTypes = getTypes (Util . getSparkProp ( sc , "spark.query.types" ));
105
+ String idCols = Util . getSparkPropOrEmpty ( sc , "spark.query.target .id" );
106
106
idColTypes = selectColTypes .subList (0 , idCols .split ("," ).length );
107
107
108
- String insertCols = sparkConf . get ( "spark.query.destination" , " " );
108
+ String insertCols = Util . getSparkPropOrEmpty ( sc , "spark.query.target " );
109
109
if (null == insertCols || insertCols .trim ().isEmpty ()) {
110
110
insertCols = selectCols ;
111
111
}
@@ -121,15 +121,15 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
121
121
"select " + insertCols + " from " + astraKeyspaceTable
122
122
+ " where " + insertBinds );
123
123
124
- hasRandomPartitioner = Boolean .parseBoolean (sparkConf . get ( "spark.source .hasRandomPartitioner" , "false" ));
125
- isCounterTable = Boolean .parseBoolean (sparkConf . get ( "spark.counterTable" , "false" ));
124
+ hasRandomPartitioner = Boolean .parseBoolean (Util . getSparkPropOr ( sc , "spark.origin .hasRandomPartitioner" , "false" ));
125
+ isCounterTable = Boolean .parseBoolean (Util . getSparkPropOr ( sc , "spark.counterTable" , "false" ));
126
126
if (isCounterTable ) {
127
- String updateSelectMappingStr = sparkConf . get ( "spark.counterTable.cql.index" , "0" );
127
+ String updateSelectMappingStr = Util . getSparkPropOr ( sc , "spark.counterTable.cql.index" , "0" );
128
128
for (String updateSelectIndex : updateSelectMappingStr .split ("," )) {
129
129
updateSelectMapping .add (Integer .parseInt (updateSelectIndex ));
130
130
}
131
131
132
- String counterTableUpdate = sparkConf . get ( "spark.counterTable.cql" );
132
+ String counterTableUpdate = Util . getSparkProp ( sc , "spark.counterTable.cql" );
133
133
astraInsertStatement = astraSession .prepare (counterTableUpdate );
134
134
} else {
135
135
insertBinds = "" ;
0 commit comments