Skip to content

Commit 0279656

Browse files
committed
Implements workaround for C* bug (duplicates in list on insert/update with same timestamp) https://issues.apache.org/jira/browse/CASSANDRA-11368
1 parent c333263 commit 0279656

File tree

4 files changed

+20
-8
lines changed

4 files changed

+20
-8
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
<properties>
1010
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
11-
<revision>3.4.1</revision>
11+
<revision>3.4.2</revision>
1212
<scala.version>2.12.17</scala.version>
1313
<scala.main.version>2.12</scala.main.version>
1414
<spark.version>3.3.1</spark.version>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public class AbstractJobSession extends BaseJobSession {
3030
protected List<String> ttlWTCols;
3131
protected String tsReplaceValStr;
3232
protected long tsReplaceVal;
33+
protected Long customWritetime = 0l;
34+
protected Long incrWritetime = 0l;
3335

3436
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
3537
this(sourceSession, astraSession, sc, false);
@@ -67,7 +69,6 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
6769
logger.info("PARAM -- Destination Table: {}", astraKeyspaceTable.split("\\.")[1]);
6870
logger.info("PARAM -- ReadRateLimit: {}", readLimiter.getRate());
6971
logger.info("PARAM -- WriteRateLimit: {}", writeLimiter.getRate());
70-
logger.info("PARAM -- WriteTimestampFilter: {}", writeTimeStampFilter);
7172

7273
tableInfo = TableInfo.getInstance(sourceSession, sourceKeyspaceTable.split("\\.")[0],
7374
sourceKeyspaceTable.split("\\.")[1], Util.getSparkPropOrEmpty(sc, "spark.query.origin"));
@@ -95,13 +96,21 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
9596
}
9697

9798
String customWriteTimeStr =
98-
Util.getSparkPropOr(sc, "spark.target.custom.writeTime", "0");
99-
if (null != customWriteTimeStr && customWriteTimeStr.trim().length() > 1 && StringUtils.isNumeric(customWriteTimeStr.trim())) {
99+
Util.getSparkPropOr(sc, "spark.target.writeTime.fixedValue", "0");
100+
if (null != customWriteTimeStr && customWriteTimeStr.trim().length() > 0 && StringUtils.isNumeric(customWriteTimeStr.trim())) {
100101
customWritetime = Long.parseLong(customWriteTimeStr);
101102
}
102103

104+
String incrWriteTimeStr =
105+
Util.getSparkPropOr(sc, "spark.target.writeTime.incrementBy", "0");
106+
if (null != incrWriteTimeStr && incrWriteTimeStr.trim().length() > 0 && StringUtils.isNumeric(incrWriteTimeStr.trim())) {
107+
incrWritetime = Long.parseLong(incrWriteTimeStr);
108+
}
109+
103110
logger.info("PARAM -- TTL-WriteTime Columns: {}", ttlWTCols);
104-
logger.info("PARAM -- WriteTimestampFilter: {}", writeTimeStampFilter);
111+
logger.info("PARAM -- WriteTimes Filter: {}", writeTimeStampFilter);
112+
logger.info("PARAM -- WriteTime Custom Value: {}", customWritetime);
113+
logger.info("PARAM -- WriteTime increment Value: {}", incrWritetime);
105114
if (writeTimeStampFilter) {
106115
logger.info("PARAM -- minWriteTimeStampFilter: {} datetime is {}", minWriteTimeStampFilter,
107116
Instant.ofEpochMilli(minWriteTimeStampFilter / 1000));
@@ -196,7 +205,7 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
196205
if (customWritetime > 0) {
197206
boundInsertStatement = boundInsertStatement.set(index, customWritetime, Long.class);
198207
} else {
199-
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow), Long.class);
208+
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow) + incrWritetime, Long.class);
200209
}
201210
}
202211
}

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public abstract class BaseJobSession {
4040
protected Boolean writeTimeStampFilter = Boolean.FALSE;
4141
protected Long minWriteTimeStampFilter = 0l;
4242
protected Long maxWriteTimeStampFilter = Long.MAX_VALUE;
43-
protected Long customWritetime = 0l;
4443

4544
protected Boolean isCounterTable = false;
4645

src/resources/cdm.properties

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,11 @@ spark.batchSize 10
9191
#spark.read.fetch.sizeInRows 1000
9292

9393
# ENABLE ONLY IF YOU WANT TO USE CUSTOM FIXED WRITETIME VALUE ON TARGET
94-
#spark.target.custom.writeTime 0
94+
#spark.target.writeTime.fixedValue 0
95+
96+
# ENABLE ONLY IF YOU WANT TO INCREMENT SOURCE WRITETIME VALUE
97+
# DUPLICATES IN LIST FIELDS: USE THIS WORKAROUND FOR CASSANDRA BUG https://issues.apache.org/jira/browse/CASSANDRA-11368
98+
#spark.target.writeTime.incrementBy 0
9599

96100
# ONLY USE when running in Guardrail mode to identify large fields
97101
#spark.guardrail.colSizeInKB 1024

0 commit comments

Comments
 (0)