@@ -46,23 +46,23 @@ protected OriginCountJobSession(CqlSession sourceSession, SparkConf sparkConf) {
46
46
}
47
47
48
48
readLimiter = RateLimiter .create (new Integer (sparkConf .get ("spark.readRateLimit" , "20000" )));
49
- sourceKeyspaceTable = sparkConf .get ("spark.source .keyspaceTable" );
49
+ sourceKeyspaceTable = sparkConf .get ("spark.origin .keyspaceTable" );
50
50
51
- hasRandomPartitioner = Boolean .parseBoolean (sparkConf .get ("spark.source .hasRandomPartitioner" , "false" ));
52
- isCounterTable = Boolean .parseBoolean (sparkConf .get ("spark.source. counterTable" , "false" ));
51
+ hasRandomPartitioner = Boolean .parseBoolean (sparkConf .get ("spark.origin .hasRandomPartitioner" , "false" ));
52
+ isCounterTable = Boolean .parseBoolean (sparkConf .get ("spark.counterTable" , "false" ));
53
53
54
- checkTableforColSize = Boolean .parseBoolean (sparkConf .get ("spark.source .checkTableforColSize" , "false" ));
55
- checkTableforselectCols = sparkConf .get ("spark.source .checkTableforColSize.cols" );
56
- checkTableforColSizeTypes = getTypes (sparkConf .get ("spark.source .checkTableforColSize.cols.types" ));
57
- filterColName = sparkConf .get ("spark.source .FilterColumn" );
58
- filterColType = sparkConf .get ("spark.source .FilterColumnType" );
59
- filterColIndex = Integer .parseInt (sparkConf .get ("spark.source .FilterColumnIndex" , "0" ));
54
+ checkTableforColSize = Boolean .parseBoolean (sparkConf .get ("spark.origin .checkTableforColSize" , "false" ));
55
+ checkTableforselectCols = sparkConf .get ("spark.origin .checkTableforColSize.cols" );
56
+ checkTableforColSizeTypes = getTypes (sparkConf .get ("spark.origin .checkTableforColSize.cols.types" ));
57
+ filterColName = sparkConf .get ("spark.origin .FilterColumn" );
58
+ filterColType = sparkConf .get ("spark.origin .FilterColumnType" );
59
+ filterColIndex = Integer .parseInt (sparkConf .get ("spark.origin .FilterColumnIndex" , "0" ));
60
60
61
61
String partionKey = sparkConf .get ("spark.query.cols.partitionKey" );
62
62
idColTypes = getTypes (sparkConf .get ("spark.query.cols.id.types" ));
63
63
64
64
String selectCols = sparkConf .get ("spark.query.cols.select" );
65
- String updateSelectMappingStr = sparkConf .get ("spark.source. counterTable.update.select .index" , "0" );
65
+ String updateSelectMappingStr = sparkConf .get ("spark.counterTable.cql .index" , "0" );
66
66
for (String updateSelectIndex : updateSelectMappingStr .split ("," )) {
67
67
updateSelectMapping .add (Integer .parseInt (updateSelectIndex ));
68
68
}
0 commit comments