@@ -39,7 +39,8 @@ public abstract class AbstractJobSession {
39
39
protected Integer batchSize = 1 ;
40
40
protected Integer printStatsAfter = 100000 ;
41
41
42
- protected Boolean writeTimeStampFilter = false ;
42
+ protected Boolean isPreserveTTLWritetime = Boolean .FALSE ;
43
+ protected Boolean writeTimeStampFilter = Boolean .FALSE ;
43
44
protected Long minWriteTimeStampFilter = 0l ;
44
45
protected Long maxWriteTimeStampFilter = Long .MAX_VALUE ;
45
46
@@ -51,11 +52,9 @@ public abstract class AbstractJobSession {
51
52
protected String sourceKeyspaceTable ;
52
53
protected String astraKeyspaceTable ;
53
54
54
-
55
55
protected Boolean hasRandomPartitioner ;
56
56
57
57
protected AbstractJobSession (CqlSession sourceSession , CqlSession astraSession , SparkConf sparkConf ) {
58
-
59
58
this .sourceSession = sourceSession ;
60
59
this .astraSession = astraSession ;
61
60
@@ -72,22 +71,43 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
72
71
sourceKeyspaceTable = sparkConf .get ("spark.migrate.source.keyspaceTable" );
73
72
astraKeyspaceTable = sparkConf .get ("spark.migrate.astra.keyspaceTable" );
74
73
74
+ isPreserveTTLWritetime = Boolean .parseBoolean (sparkConf .get ("spark.migrate.preserveTTLWriteTime" , "false" ));
75
+ if (isPreserveTTLWritetime ) {
76
+ String ttlColsStr = sparkConf .get ("spark.migrate.source.ttl.cols" );
77
+ if (null != ttlColsStr && ttlColsStr .trim ().length () > 0 ) {
78
+ for (String ttlCol : ttlColsStr .split ("," )) {
79
+ ttlCols .add (Integer .parseInt (ttlCol ));
80
+ }
81
+ }
82
+ }
83
+
75
84
writeTimeStampFilter = Boolean
76
85
.parseBoolean (sparkConf .get ("spark.migrate.source.writeTimeStampFilter" , "false" ));
77
- minWriteTimeStampFilter = new Long (
78
- sparkConf .get ("spark.migrate.source.minWriteTimeStampFilter" , "0" ));
79
- maxWriteTimeStampFilter = new Long (
80
- sparkConf .get ("spark.migrate.source.maxWriteTimeStampFilter" , "" + Long .MAX_VALUE ));
81
86
// batchsize set to 1 if there is a writeFilter
82
87
if (writeTimeStampFilter ) {
83
88
batchSize = 1 ;
89
+ String writeTimestampColsStr = sparkConf .get ("spark.migrate.source.writeTimeStampFilter.cols" );
90
+ if (null != writeTimestampColsStr && writeTimestampColsStr .trim ().length () > 0 ) {
91
+ for (String writeTimeStampCol : writeTimestampColsStr .split ("," )) {
92
+ writeTimeStampCols .add (Integer .parseInt (writeTimeStampCol ));
93
+ }
94
+ }
84
95
}
96
+
97
+ minWriteTimeStampFilter = new Long (
98
+ sparkConf .get ("spark.migrate.source.minWriteTimeStampFilter" , "0" ));
99
+ maxWriteTimeStampFilter = new Long (
100
+ sparkConf .get ("spark.migrate.source.maxWriteTimeStampFilter" , "" + Long .MAX_VALUE ));
101
+
85
102
logger .info (" DEFAULT -- Write Batch Size: " + batchSize );
86
103
logger .info (" DEFAULT -- Source Keyspace Table: " + sourceKeyspaceTable );
87
104
logger .info (" DEFAULT -- Astra Keyspace Table: " + astraKeyspaceTable );
88
105
logger .info (" DEFAULT -- ReadRateLimit: " + readLimiter .getRate ());
89
106
logger .info (" DEFAULT -- WriteRateLimit: " + writeLimiter .getRate ());
90
107
logger .info (" DEFAULT -- WriteTimestampFilter: " + writeTimeStampFilter );
108
+ logger .info (" DEFAULT -- WriteTimestampFilterCols: " + writeTimeStampCols );
109
+ logger .info (" DEFAULT -- isPreserveTTLWritetime: " + isPreserveTTLWritetime );
110
+ logger .info (" DEFAULT -- TTLCols: " + ttlCols );
91
111
92
112
hasRandomPartitioner = Boolean .parseBoolean (sparkConf .get ("spark.migrate.source.hasRandomPartitioner" , "false" ));
93
113
@@ -96,20 +116,6 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
96
116
counterDeltaMaxIndex = Integer
97
117
.parseInt (sparkConf .get ("spark.migrate.source.counterTable.update.max.counter.index" , "0" ));
98
118
99
- String writeTimestampColsStr = sparkConf .get ("spark.migrate.source.writeTimeStampFilter.cols" );
100
- if (null != writeTimestampColsStr && writeTimestampColsStr .trim ().length () > 0 ) {
101
- for (String writeTimeStampCol : writeTimestampColsStr .split ("," )) {
102
- writeTimeStampCols .add (Integer .parseInt (writeTimeStampCol ));
103
- }
104
- }
105
-
106
- String ttlColsStr = sparkConf .get ("spark.migrate.source.ttl.cols" );
107
- if (null != ttlColsStr && ttlColsStr .trim ().length () > 0 ) {
108
- for (String ttlCol : ttlColsStr .split ("," )) {
109
- ttlCols .add (Integer .parseInt (ttlCol ));
110
- }
111
- }
112
-
113
119
String partionKey = sparkConf .get ("spark.migrate.query.cols.partitionKey" );
114
120
String idCols = sparkConf .get ("spark.migrate.query.cols.id" );
115
121
idColTypes = getTypes (sparkConf .get ("spark.migrate.query.cols.id.types" ));
@@ -128,15 +134,13 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
128
134
}
129
135
130
136
sourceSelectCondition = sparkConf .get ("spark.migrate.query.cols.select.condition" , "" );
131
-
132
137
sourceSelectStatement = sourceSession .prepare (
133
138
"select " + selectCols + " from " + sourceKeyspaceTable + " where token(" + partionKey .trim ()
134
139
+ ") >= ? and token(" + partionKey .trim () + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING" );
135
140
136
141
astraSelectStatement = astraSession .prepare (
137
142
"select " + selectCols + " from " + astraKeyspaceTable
138
143
+ " where " + idBinds );
139
-
140
144
}
141
145
142
146
public List <MigrateDataType > getTypes (String types ) {
@@ -146,7 +150,6 @@ public List<MigrateDataType> getTypes(String types) {
146
150
}
147
151
148
152
return dataTypes ;
149
-
150
153
}
151
154
152
155
public int getLargestTTL (Row sourceRow ) {
@@ -177,7 +180,6 @@ public BoundStatement selectFromAstra(PreparedStatement selectStatement, Row sou
177
180
}
178
181
179
182
public Object getData (MigrateDataType dataType , int index , Row sourceRow ) {
180
-
181
183
if (dataType .typeClass == Map .class ) {
182
184
return sourceRow .getMap (index , dataType .subTypes .get (0 ), dataType .subTypes .get (1 ));
183
185
} else if (dataType .typeClass == List .class ) {
0 commit comments