@@ -30,8 +30,8 @@ public class AbstractJobSession extends BaseJobSession {
30
30
protected List <String > ttlWTCols ;
31
31
protected String tsReplaceValStr ;
32
32
protected long tsReplaceVal ;
33
- protected Long customWritetime = 0l ;
34
- protected Long incrWritetime = 0l ;
33
+ protected long customWriteTime = 0l ;
34
+ protected long incrementWriteTime = 0l ;
35
35
36
36
protected AbstractJobSession (CqlSession sourceSession , CqlSession astraSession , SparkConf sc ) {
37
37
this (sourceSession , astraSession , sc , false );
@@ -98,19 +98,19 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
98
98
String customWriteTimeStr =
99
99
Util .getSparkPropOr (sc , "spark.target.writeTime.fixedValue" , "0" );
100
100
if (null != customWriteTimeStr && customWriteTimeStr .trim ().length () > 0 && StringUtils .isNumeric (customWriteTimeStr .trim ())) {
101
- customWritetime = Long .parseLong (customWriteTimeStr );
101
+ customWriteTime = Long .parseLong (customWriteTimeStr );
102
102
}
103
103
104
104
String incrWriteTimeStr =
105
105
Util .getSparkPropOr (sc , "spark.target.writeTime.incrementBy" , "0" );
106
106
if (null != incrWriteTimeStr && incrWriteTimeStr .trim ().length () > 0 && StringUtils .isNumeric (incrWriteTimeStr .trim ())) {
107
- incrWritetime = Long .parseLong (incrWriteTimeStr );
107
+ incrementWriteTime = Long .parseLong (incrWriteTimeStr );
108
108
}
109
109
110
110
logger .info ("PARAM -- TTL-WriteTime Columns: {}" , ttlWTCols );
111
111
logger .info ("PARAM -- WriteTimes Filter: {}" , writeTimeStampFilter );
112
- logger .info ("PARAM -- WriteTime Custom Value: {}" , customWritetime );
113
- logger .info ("PARAM -- WriteTime increment Value: {}" , incrWritetime );
112
+ logger .info ("PARAM -- WriteTime Custom Value: {}" , customWriteTime );
113
+ logger .info ("PARAM -- WriteTime Increment Value: {}" , incrementWriteTime );
114
114
if (writeTimeStampFilter ) {
115
115
logger .info ("PARAM -- minWriteTimeStampFilter: {} datetime is {}" , minWriteTimeStampFilter ,
116
116
Instant .ofEpochMilli (minWriteTimeStampFilter / 1000 ));
@@ -202,10 +202,10 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
202
202
if (!ttlWTCols .isEmpty ()) {
203
203
boundInsertStatement = boundInsertStatement .set (index , getLargestTTL (sourceRow ), Integer .class );
204
204
index ++;
205
- if (customWritetime > 0 ) {
206
- boundInsertStatement = boundInsertStatement .set (index , customWritetime , Long .class );
205
+ if (customWriteTime > 0 ) {
206
+ boundInsertStatement = boundInsertStatement .set (index , customWriteTime , Long .class );
207
207
} else {
208
- boundInsertStatement = boundInsertStatement .set (index , getLargestWriteTimeStamp (sourceRow ) + incrWritetime , Long .class );
208
+ boundInsertStatement = boundInsertStatement .set (index , getLargestWriteTimeStamp (sourceRow ) + incrementWriteTime , Long .class );
209
209
}
210
210
}
211
211
}
0 commit comments