Skip to content

Commit 97da1f4

Browse files
Added custom writetime property. When set it will be used as writetime for target writes. This can be set to a value such as the historic migration start time to mitigate issues related to overwriting the real-time updates.
1 parent 350e998 commit 97da1f4

File tree

4 files changed

+15
-1
lines changed

4 files changed

+15
-1
lines changed

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
6868
maxWriteTimeStampFilter = Long.parseLong(maxWriteTimeStampFilterStr);
6969
}
7070

71+
String customWriteTimeStr =
72+
sparkConf.get("spark.migrate.custom.writeTime", "0");
73+
if (null != customWriteTimeStr && customWriteTimeStr.trim().length() > 1) {
74+
customWritetime = Long.parseLong(customWriteTimeStr);
75+
}
76+
7177
logger.info(" DEFAULT -- Write Batch Size: " + batchSize);
7278
logger.info(" DEFAULT -- Source Keyspace Table: " + sourceKeyspaceTable);
7379
logger.info(" DEFAULT -- Destination Keyspace Table: " + astraKeyspaceTable);

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public abstract class BaseJobSession {
4040
protected Boolean writeTimeStampFilter = Boolean.FALSE;
4141
protected Long minWriteTimeStampFilter = 0l;
4242
protected Long maxWriteTimeStampFilter = Long.MAX_VALUE;
43+
protected Long customWritetime = 0l;
4344

4445
protected List<Integer> writeTimeStampCols = new ArrayList<Integer>();
4546
protected List<Integer> ttlCols = new ArrayList<Integer>();

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,11 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
211211
if (isPreserveTTLWritetime) {
212212
boundInsertStatement = boundInsertStatement.set(index, getLargestTTL(sourceRow), Integer.class);
213213
index++;
214-
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow), Long.class);
214+
if (customWritetime > 0) {
215+
boundInsertStatement = boundInsertStatement.set(index, customWritetime, Long.class);
216+
} else {
217+
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow), Long.class);
218+
}
215219
}
216220
}
217221

src/resources/sparkConf.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ spark.source.writeTimeStampFilter false
4242
spark.source.writeTimeStampFilter.cols 4,5
4343
spark.source.minWriteTimeStampFilter 0
4444
spark.source.maxWriteTimeStampFilter 9223372036854775807
45+
spark.migrate.custom.writeTime 1664980600000000
4546

4647
########################## ONLY USE if SSL clientAuth is enabled on source Cassandra/DSE ###############################
4748
#spark.source.trustStore.path
@@ -90,6 +91,8 @@ spark.source.maxWriteTimeStampFilter 9223372036854775807
9091
# "spark.source.writeTimeStampFilter.cols" - Comma separated column indexes from "spark.query.cols.select".
9192
# Script will only use the largest value per row.
9293
#
94+
# "spark.migrate.custom.writeTime" - User specified write time. When set it will be used as writetime for target writes.
95+
#
9396
# Default value for "spark.source.maxWriteTimeStampFilter" is alway "9223372036854775807" (max long value)
9497
#
9598
# Properties "spark.query.cols.insert" and "spark.query.cols.insert.types" are required for "Migrate" job,

0 commit comments

Comments
 (0)