Skip to content

Commit bdec1b1

Browse files
authored
Merge pull request #139 from datastax/feature/duplicate-values-in-list
Implements workaround for C* bug (duplicates in list on insert/update…
2 parents c333263 + 727cd34 commit bdec1b1

File tree

7 files changed

+41
-48
lines changed

7 files changed

+41
-48
lines changed

pom.xml

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
<properties>
1010
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
11-
<revision>3.4.1</revision>
11+
<revision>3.4.2</revision>
1212
<scala.version>2.12.17</scala.version>
1313
<scala.main.version>2.12</scala.main.version>
1414
<spark.version>3.3.1</spark.version>
@@ -48,21 +48,7 @@
4848
<artifactId>spark-sql_${scala.main.version}</artifactId>
4949
<version>${spark.version}</version>
5050
</dependency>
51-
<dependency>
52-
<groupId>org.apache.spark</groupId>
53-
<artifactId>spark-hive_${scala.main.version}</artifactId>
54-
<version>${spark.version}</version>
55-
<exclusions>
56-
<exclusion>
57-
<groupId>log4j</groupId>
58-
<artifactId>log4j</artifactId>
59-
</exclusion>
60-
<exclusion>
61-
<groupId>log4j</groupId>
62-
<artifactId>apache-log4j-extras</artifactId>
63-
</exclusion>
64-
</exclusions>
65-
</dependency>
51+
6652
<dependency>
6753
<groupId>com.datastax.spark</groupId>
6854
<artifactId>spark-cassandra-connector_${scala.main.version}</artifactId>
@@ -74,11 +60,6 @@
7460
<version>3.1.15</version>
7561
</dependency>
7662

77-
<dependency>
78-
<groupId>org.apache.logging.log4j</groupId>
79-
<artifactId>log4j-api</artifactId>
80-
<version>2.19.0</version>
81-
</dependency>
8263
<dependency>
8364
<groupId>org.apache.logging.log4j</groupId>
8465
<artifactId>log4j-core</artifactId>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import datastax.astra.migrate.schema.ColumnInfo;
99
import datastax.astra.migrate.schema.TableInfo;
1010
import datastax.astra.migrate.schema.TypeInfo;
11-
import org.apache.commons.lang.StringUtils;
11+
import org.apache.commons.lang3.StringUtils;
1212
import org.apache.spark.SparkConf;
1313
import org.slf4j.Logger;
1414
import org.slf4j.LoggerFactory;
@@ -30,6 +30,8 @@ public class AbstractJobSession extends BaseJobSession {
3030
protected List<String> ttlWTCols;
3131
protected String tsReplaceValStr;
3232
protected long tsReplaceVal;
33+
protected long customWriteTime = 0l;
34+
protected long incrementWriteTime = 0l;
3335

3436
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
3537
this(sourceSession, astraSession, sc, false);
@@ -67,7 +69,6 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
6769
logger.info("PARAM -- Destination Table: {}", astraKeyspaceTable.split("\\.")[1]);
6870
logger.info("PARAM -- ReadRateLimit: {}", readLimiter.getRate());
6971
logger.info("PARAM -- WriteRateLimit: {}", writeLimiter.getRate());
70-
logger.info("PARAM -- WriteTimestampFilter: {}", writeTimeStampFilter);
7172

7273
tableInfo = TableInfo.getInstance(sourceSession, sourceKeyspaceTable.split("\\.")[0],
7374
sourceKeyspaceTable.split("\\.")[1], Util.getSparkPropOrEmpty(sc, "spark.query.origin"));
@@ -90,18 +91,26 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
9091
}
9192
String maxWriteTimeStampFilterStr =
9293
Util.getSparkPropOr(sc, "spark.origin.maxWriteTimeStampFilter", "0");
93-
if (null != maxWriteTimeStampFilterStr && maxWriteTimeStampFilterStr.trim().length() > 1) {
94+
if (StringUtils.isNotBlank(maxWriteTimeStampFilterStr)) {
9495
maxWriteTimeStampFilter = Long.parseLong(maxWriteTimeStampFilterStr);
9596
}
9697

9798
String customWriteTimeStr =
98-
Util.getSparkPropOr(sc, "spark.target.custom.writeTime", "0");
99-
if (null != customWriteTimeStr && customWriteTimeStr.trim().length() > 1 && StringUtils.isNumeric(customWriteTimeStr.trim())) {
100-
customWritetime = Long.parseLong(customWriteTimeStr);
99+
Util.getSparkPropOr(sc, "spark.target.writeTime.fixedValue", "0");
100+
if (StringUtils.isNotBlank(customWriteTimeStr) && StringUtils.isNumeric(customWriteTimeStr)) {
101+
customWriteTime = Long.parseLong(customWriteTimeStr);
102+
}
103+
104+
String incrWriteTimeStr =
105+
Util.getSparkPropOr(sc, "spark.target.writeTime.incrementBy", "0");
106+
if (StringUtils.isNotBlank(incrWriteTimeStr) && StringUtils.isNumeric(incrWriteTimeStr)) {
107+
incrementWriteTime = Long.parseLong(incrWriteTimeStr);
101108
}
102109

103110
logger.info("PARAM -- TTL-WriteTime Columns: {}", ttlWTCols);
104-
logger.info("PARAM -- WriteTimestampFilter: {}", writeTimeStampFilter);
111+
logger.info("PARAM -- WriteTimes Filter: {}", writeTimeStampFilter);
112+
logger.info("PARAM -- WriteTime Custom Value: {}", customWriteTime);
113+
logger.info("PARAM -- WriteTime Increment Value: {}", incrementWriteTime);
105114
if (writeTimeStampFilter) {
106115
logger.info("PARAM -- minWriteTimeStampFilter: {} datetime is {}", minWriteTimeStampFilter,
107116
Instant.ofEpochMilli(minWriteTimeStampFilter / 1000));
@@ -193,10 +202,10 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
193202
if (!ttlWTCols.isEmpty()) {
194203
boundInsertStatement = boundInsertStatement.set(index, getLargestTTL(sourceRow), Integer.class);
195204
index++;
196-
if (customWritetime > 0) {
197-
boundInsertStatement = boundInsertStatement.set(index, customWritetime, Long.class);
205+
if (customWriteTime > 0) {
206+
boundInsertStatement = boundInsertStatement.set(index, customWriteTime, Long.class);
198207
} else {
199-
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow), Long.class);
208+
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow) + incrementWriteTime, Long.class);
200209
}
201210
}
202211
}

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,27 +31,26 @@ public abstract class BaseJobSession {
3131
// Rate = Total Throughput (write/read per sec) / Total Executors
3232
protected RateLimiter readLimiter;
3333
protected RateLimiter writeLimiter;
34-
protected Integer maxRetries = 10;
34+
protected int maxRetries = 10;
3535
protected AtomicLong readCounter = new AtomicLong(0);
36-
protected Integer batchSize = 1;
37-
protected Integer fetchSizeInRows = 1000;
38-
protected Integer printStatsAfter = 100000;
36+
protected int batchSize = 1;
37+
protected int fetchSizeInRows = 1000;
38+
protected int printStatsAfter;
3939

40-
protected Boolean writeTimeStampFilter = Boolean.FALSE;
41-
protected Long minWriteTimeStampFilter = 0l;
42-
protected Long maxWriteTimeStampFilter = Long.MAX_VALUE;
43-
protected Long customWritetime = 0l;
40+
protected boolean writeTimeStampFilter;
41+
protected long minWriteTimeStampFilter = 0l;
42+
protected long maxWriteTimeStampFilter = Long.MAX_VALUE;
4443

45-
protected Boolean isCounterTable = false;
44+
protected boolean isCounterTable;
4645

4746
protected String sourceKeyspaceTable;
4847
protected String astraKeyspaceTable;
4948

50-
protected Boolean hasRandomPartitioner;
51-
protected Boolean filterData;
49+
protected boolean hasRandomPartitioner;
50+
protected boolean filterData;
5251
protected String filterColName;
5352
protected String filterColType;
54-
protected Integer filterColIndex;
53+
protected int filterColIndex;
5554
protected String filterColValue;
5655
protected String sourceSelectCondition;
5756

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ public class DiffJobSession extends CopyJobSession {
3030
private final AtomicLong validCounter = new AtomicLong(0);
3131
private final AtomicLong skippedCounter = new AtomicLong(0);
3232
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
33-
protected Boolean autoCorrectMissing = false;
34-
protected Boolean autoCorrectMismatch = false;
33+
protected boolean autoCorrectMissing;
34+
protected boolean autoCorrectMismatch;
3535

3636
private DiffJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
3737
super(sourceSession, astraSession, sc);

src/main/java/datastax/astra/migrate/GuardrailJobSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class GuardrailJobSession extends BaseJobSession {
1919
protected AtomicLong readCounter = new AtomicLong(0);
2020
protected AtomicLong largeRowCounter = new AtomicLong(0);
2121
protected AtomicLong largeFieldCounter = new AtomicLong(0);
22-
protected Integer guardrailColSizeInKB;
22+
protected int guardrailColSizeInKB;
2323

2424
protected GuardrailJobSession(CqlSession session, SparkConf sc) {
2525
super(sc);

src/main/java/datastax/astra/migrate/Util.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package datastax.astra.migrate;
22

33
import com.datastax.oss.driver.api.core.ConsistencyLevel;
4-
import org.apache.commons.lang.StringUtils;
4+
import org.apache.commons.lang3.StringUtils;
55
import org.apache.spark.SparkConf;
66

77
import java.io.BufferedReader;

src/resources/cdm.properties

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,11 @@ spark.batchSize 10
9191
#spark.read.fetch.sizeInRows 1000
9292

9393
# ENABLE ONLY IF YOU WANT TO USE CUSTOM FIXED WRITETIME VALUE ON TARGET
94-
#spark.target.custom.writeTime 0
94+
#spark.target.writeTime.fixedValue 0
95+
96+
# ENABLE ONLY IF YOU WANT TO INCREMENT SOURCE WRITETIME VALUE
97+
# DUPLICATES IN LIST FIELDS: USE THIS WORKAROUND FOR CASSANDRA BUG https://issues.apache.org/jira/browse/CASSANDRA-11368
98+
#spark.target.writeTime.incrementBy 0
9599

96100
# ONLY USE when running in Guardrail mode to identify large fields
97101
#spark.guardrail.colSizeInKB 1024

0 commit comments

Comments
 (0)