Skip to content

Commit 1681266

Browse files
committed
Merge branch 'main' into feature/process-errored-partitions
* main: Implement PR comments Refactoring changes Added custom writetime Added custom writetime property. When set it will be used as writetime for target writes. This can be set to a value such as the historic migration start time to mitigate issues related to overwriting the real-time updates.
2 parents f319f75 + 4b770f6 commit 1681266

File tree

5 files changed

+17
-1
lines changed

5 files changed

+17
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,4 @@ This mode is specifically useful to processes a subset of partition-ranges that
8787
- SSL Support (including custom cipher algorithms)
8888
- Migrate from any Cassandra source ([Apache Cassandra](https://cassandra.apache.org)/[DataStax Enterprise (DSE)](https://www.datastax.com/products/datastax-enterprise)/[DataStax Astra DB](https://www.datastax.com/products/datastax-astra)) to any Cassandra target ([Apache Cassandra](https://cassandra.apache.org)/[DataStax Enterprise (DSE)](https://www.datastax.com/products/datastax-enterprise)/[DataStax Astra DB](https://www.datastax.com/products/datastax-astra))
8989
- Validate migration accuracy and performance using a smaller randomized data-set
90+
- Custom writetime

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
66
import com.datastax.oss.driver.api.core.cql.Row;
77
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
8+
import org.apache.commons.lang.StringUtils;
89
import org.apache.spark.SparkConf;
910
import org.slf4j.Logger;
1011
import org.slf4j.LoggerFactory;
@@ -68,6 +69,12 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
6869
maxWriteTimeStampFilter = Long.parseLong(maxWriteTimeStampFilterStr);
6970
}
7071

72+
String customWriteTimeStr =
73+
sparkConf.get("spark.destination.custom.writeTime", "0");
74+
if (null != customWriteTimeStr && customWriteTimeStr.trim().length() > 1 && StringUtils.isNumeric(customWriteTimeStr.trim())) {
75+
customWritetime = Long.parseLong(customWriteTimeStr);
76+
}
77+
7178
logger.info("PARAM -- Write Batch Size: " + batchSize);
7279
logger.info("PARAM -- Source Keyspace Table: " + sourceKeyspaceTable);
7380
logger.info("PARAM -- Destination Keyspace Table: " + astraKeyspaceTable);

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public abstract class BaseJobSession {
3535
protected Boolean writeTimeStampFilter = Boolean.FALSE;
3636
protected Long minWriteTimeStampFilter = 0l;
3737
protected Long maxWriteTimeStampFilter = Long.MAX_VALUE;
38+
protected Long customWritetime = 0l;
3839

3940
protected List<Integer> writeTimeStampCols = new ArrayList<Integer>();
4041
protected List<Integer> ttlCols = new ArrayList<Integer>();

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,11 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
178178
index++;
179179
}
180180
if (!writeTimeStampCols.isEmpty()) {
181-
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow), Long.class);
181+
if (customWritetime > 0) {
182+
boundInsertStatement = boundInsertStatement.set(index, customWritetime, Long.class);
183+
} else {
184+
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow), Long.class);
185+
}
182186
}
183187
}
184188

src/resources/sparkConf.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ spark.destination.read.consistency.level LOCAL_QUORUM
1313
spark.destination.keyspaceTable test.a2
1414
spark.destination.autocorrect.missing false
1515
spark.destination.autocorrect.mismatch false
16+
spark.destination.custom.writeTime 0
1617

1718
spark.maxRetries 10
1819
spark.readRateLimit 20000
@@ -85,6 +86,8 @@ spark.source.maxWriteTimeStampFilter 9223372036854775807
8586
# "spark.source.writeTimeStampFilter.cols" - Comma separated column indexes from "spark.query.cols.select".
8687
# Script will only use the largest value per row.
8788
#
89+
# "spark.destination.custom.writeTime" - User specified write time. When set it will be used as writetime for target writes.
90+
#
8891
# Default value for "spark.source.maxWriteTimeStampFilter" is alway "9223372036854775807" (max long value)
8992
#
9093
# Properties "spark.query.cols.insert" and "spark.query.cols.insert.types" are required for "Migrate" job,

0 commit comments

Comments
 (0)