Skip to content

Commit ebf69ed

Browse files
Merge pull request #14 from datastax/feature/custom_writetime
Feature/custom writetime
2 parents befca05 + 12d3184 commit ebf69ed

File tree

5 files changed

+17
-1
lines changed

5 files changed

+17
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,4 @@ spark.destination.autocorrect.mismatch true|false
7070
- SSL Support (including custom cipher algorithms)
7171
- Migrate from any Cassandra source ([Apache Cassandra](https://cassandra.apache.org)/[DataStax Enterprise (DSE)](https://www.datastax.com/products/datastax-enterprise)/[DataStax Astra DB](https://www.datastax.com/products/datastax-astra)) to any Cassandra target ([Apache Cassandra](https://cassandra.apache.org)/[DataStax Enterprise (DSE)](https://www.datastax.com/products/datastax-enterprise)/[DataStax Astra DB](https://www.datastax.com/products/datastax-astra))
7272
- Validate migration accuracy and performance using a smaller randomized data-set
73+
- Custom writetime

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
66
import com.datastax.oss.driver.api.core.cql.Row;
77
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
8+
import org.apache.commons.lang.StringUtils;
89
import org.apache.spark.SparkConf;
910
import org.slf4j.Logger;
1011
import org.slf4j.LoggerFactory;
@@ -70,6 +71,12 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
7071
maxWriteTimeStampFilter = Long.parseLong(maxWriteTimeStampFilterStr);
7172
}
7273

74+
String customWriteTimeStr =
75+
sparkConf.get("spark.destination.custom.writeTime", "0");
76+
if (null != customWriteTimeStr && customWriteTimeStr.trim().length() > 1 && StringUtils.isNumeric(customWriteTimeStr.trim())) {
77+
customWritetime = Long.parseLong(customWriteTimeStr);
78+
}
79+
7380
logger.info("PARAM -- Write Batch Size: " + batchSize);
7481
logger.info("PARAM -- Source Keyspace Table: " + sourceKeyspaceTable);
7582
logger.info("PARAM -- Destination Keyspace Table: " + astraKeyspaceTable);

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public abstract class BaseJobSession {
3636
protected Boolean writeTimeStampFilter = Boolean.FALSE;
3737
protected Long minWriteTimeStampFilter = 0l;
3838
protected Long maxWriteTimeStampFilter = Long.MAX_VALUE;
39+
protected Long customWritetime = 0l;
3940

4041
protected List<Integer> writeTimeStampCols = new ArrayList<Integer>();
4142
protected List<Integer> ttlCols = new ArrayList<Integer>();

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,11 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
176176
if (isPreserveTTLWritetime) {
177177
boundInsertStatement = boundInsertStatement.set(index, getLargestTTL(sourceRow), Integer.class);
178178
index++;
179-
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow), Long.class);
179+
if (customWritetime > 0) {
180+
boundInsertStatement = boundInsertStatement.set(index, customWritetime, Long.class);
181+
} else {
182+
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow), Long.class);
183+
}
180184
}
181185
}
182186

src/resources/sparkConf.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ spark.destination.read.consistency.level LOCAL_QUORUM
1313
spark.destination.keyspaceTable test.a2
1414
spark.destination.autocorrect.missing false
1515
spark.destination.autocorrect.mismatch false
16+
spark.destination.custom.writeTime 0
1617

1718
spark.maxRetries 10
1819
spark.readRateLimit 20000
@@ -87,6 +88,8 @@ spark.source.maxWriteTimeStampFilter 9223372036854775807
8788
# "spark.source.writeTimeStampFilter.cols" - Comma separated column indexes from "spark.query.cols.select".
8889
# Script will only use the largest value per row.
8990
#
91+
# "spark.destination.custom.writeTime" - User specified write time. When set it will be used as writetime for target writes.
92+
#
9093
# Default value for "spark.source.maxWriteTimeStampFilter" is alway "9223372036854775807" (max long value)
9194
#
9295
# Properties "spark.query.cols.insert" and "spark.query.cols.insert.types" are required for "Migrate" job,

0 commit comments

Comments
 (0)