Skip to content

Commit 7a524a7

Browse files
authored
Merge pull request #56 from datastax/feature/consistency-bug
Use applicable read/write consistency from properties.
2 parents f0a6360 + d0dc34b commit 7a524a7

File tree

9 files changed

+115
-44
lines changed

9 files changed

+115
-44
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>2.10.1</version>
6+
<version>2.11.0</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
2424
}
2525

2626
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc, boolean isJobMigrateRowsFromFile) {
27+
super(sc);
2728
this.sourceSession = sourceSession;
2829
this.astraSession = astraSession;
2930

@@ -79,6 +80,8 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
7980
customWritetime = Long.parseLong(customWriteTimeStr);
8081
}
8182

83+
logger.info("PARAM -- Read Consistency: {}", readConsistencyLevel);
84+
logger.info("PARAM -- Write Consistency: {}", writeConsistencyLevel);
8285
logger.info("PARAM -- Write Batch Size: {}", batchSize);
8386
logger.info("PARAM -- Read Fetch Size: {}", fetchSizeInRows);
8487
logger.info("PARAM -- Source Keyspace Table: {}", sourceKeyspaceTable);
@@ -98,6 +101,9 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
98101
String selectCols = Util.getSparkProp(sc, "spark.query.origin");
99102
String partionKey = Util.getSparkProp(sc, "spark.query.origin.partitionKey");
100103
String sourceSelectCondition = Util.getSparkPropOrEmpty(sc, "spark.query.condition");
104+
if (!sourceSelectCondition.isEmpty() && !sourceSelectCondition.trim().toUpperCase().startsWith("AND")) {
105+
sourceSelectCondition = " AND " + sourceSelectCondition;
106+
}
101107

102108
final StringBuilder selectTTLWriteTimeCols = new StringBuilder();
103109
String[] allCols = selectCols.split(",");
@@ -172,7 +178,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
172178
}
173179

174180
public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRow, Row astraRow) {
175-
BoundStatement boundInsertStatement = insertStatement.bind();
181+
BoundStatement boundInsertStatement = insertStatement.bind().setConsistencyLevel(writeConsistencyLevel);
176182

177183
if (isCounterTable) {
178184
for (int index = 0; index < selectColTypes.size(); index++) {
@@ -232,7 +238,7 @@ public long getLargestWriteTimeStamp(Row sourceRow) {
232238
}
233239

234240
public BoundStatement selectFromAstra(PreparedStatement selectStatement, Row sourceRow) {
235-
BoundStatement boundSelectStatement = selectStatement.bind();
241+
BoundStatement boundSelectStatement = selectStatement.bind().setConsistencyLevel(readConsistencyLevel);
236242
for (int index = 0; index < idColTypes.size(); index++) {
237243
MigrateDataType dataType = idColTypes.get(index);
238244
boundSelectStatement = boundSelectStatement.set(index, getData(dataType, index, sourceRow),

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package datastax.astra.migrate;
22

3+
import com.datastax.oss.driver.api.core.ConsistencyLevel;
34
import com.datastax.oss.driver.api.core.CqlSession;
45
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
56
import com.datastax.oss.driver.api.core.cql.Row;
67
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
8+
import org.apache.spark.SparkConf;
79

810
import java.util.ArrayList;
911
import java.util.List;
@@ -15,6 +17,8 @@ public abstract class BaseJobSession {
1517
protected PreparedStatement sourceSelectStatement;
1618
protected PreparedStatement astraSelectStatement;
1719
protected PreparedStatement astraInsertStatement;
20+
protected ConsistencyLevel readConsistencyLevel;
21+
protected ConsistencyLevel writeConsistencyLevel;
1822

1923
// Read/Write Rate limiter
2024
// Determine the total throughput for the entire cluster in terms of wries/sec,
@@ -55,6 +59,11 @@ public abstract class BaseJobSession {
5559
protected Integer filterColIndex;
5660
protected String filterColValue;
5761

62+
protected BaseJobSession(SparkConf sc) {
63+
readConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.read"));
64+
writeConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.write"));
65+
}
66+
5867
public String getKey(Row sourceRow) {
5968
StringBuffer key = new StringBuffer();
6069
for (int index = 0; index < idColTypes.size(); index++) {

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
4747
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
4848

4949
try {
50-
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ? min : min.longValueExact(),
51-
hasRandomPartitioner ? max : max.longValueExact()).setPageSize(fetchSizeInRows));
50+
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ?
51+
min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact())
52+
.setConsistencyLevel(readConsistencyLevel).setPageSize(fetchSizeInRows));
53+
5254
Collection<CompletionStage<AsyncResultSet>> writeResults = new ArrayList<CompletionStage<AsyncResultSet>>();
5355

5456
// cannot do batching if the writeFilter is greater than 0 or

src/main/java/datastax/astra/migrate/CopyPKJobSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void getRowAndInsert(List<SplitPartitions.PKRows> rowsList) {
4343
readCounter.incrementAndGet();
4444
String[] pkFields = row.split(" %% ");
4545
int idx = 0;
46-
BoundStatement bspk = sourceSelectStatement.bind();
46+
BoundStatement bspk = sourceSelectStatement.bind().setConsistencyLevel(readConsistencyLevel);
4747
for (MigrateDataType tp : idColTypes) {
4848
bspk = bspk.set(idx, convert(tp.typeClass, pkFields[idx]), tp.typeClass);
4949
idx++;

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package datastax.astra.migrate;
22

3-
import com.datastax.oss.driver.api.core.ConsistencyLevel;
43
import com.datastax.oss.driver.api.core.CqlSession;
54
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
65
import com.datastax.oss.driver.api.core.cql.ResultSet;
@@ -61,9 +60,9 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
6160

6261
try {
6362
// cannot do batching if the writeFilter is greater than 0
64-
ResultSet resultSet = sourceSession.execute(
65-
sourceSelectStatement.bind(hasRandomPartitioner ? min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact())
66-
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM).setPageSize(fetchSizeInRows));
63+
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ?
64+
min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact())
65+
.setConsistencyLevel(readConsistencyLevel).setPageSize(fetchSizeInRows));
6766

6867
Map<Row, CompletionStage<AsyncResultSet>> srcToTargetRowMap = new HashMap<Row, CompletionStage<AsyncResultSet>>();
6968
StreamSupport.stream(resultSet.spliterator(), false).forEach(srcRow -> {

src/main/java/datastax/astra/migrate/OriginCountJobSession.java

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,41 +26,41 @@ public class OriginCountJobSession extends BaseJobSession {
2626
protected Integer fieldGuardraillimitMB;
2727
protected List<MigrateDataType> checkTableforColSizeTypes = new ArrayList<MigrateDataType>();
2828

29-
protected OriginCountJobSession(CqlSession sourceSession, SparkConf sparkConf) {
29+
protected OriginCountJobSession(CqlSession sourceSession, SparkConf sc) {
30+
super(sc);
3031
this.sourceSession = sourceSession;
31-
batchSize = new Integer(sparkConf.get("spark.batchSize", "1"));
32-
printStatsAfter = new Integer(sparkConf.get("spark.printStatsAfter", "100000"));
32+
batchSize = new Integer(sc.get("spark.batchSize", "1"));
33+
printStatsAfter = new Integer(sc.get("spark.printStatsAfter", "100000"));
3334
if (printStatsAfter < 1) {
3435
printStatsAfter = 100000;
3536
}
3637

37-
readLimiter = RateLimiter.create(new Integer(sparkConf.get("spark.readRateLimit", "20000")));
38-
sourceKeyspaceTable = sparkConf.get("spark.origin.keyspaceTable");
38+
readLimiter = RateLimiter.create(new Integer(sc.get("spark.readRateLimit", "20000")));
39+
sourceKeyspaceTable = sc.get("spark.origin.keyspaceTable");
3940

40-
hasRandomPartitioner = Boolean.parseBoolean(sparkConf.get("spark.origin.hasRandomPartitioner", "false"));
41-
isCounterTable = Boolean.parseBoolean(sparkConf.get("spark.counterTable", "false"));
41+
hasRandomPartitioner = Boolean.parseBoolean(sc.get("spark.origin.hasRandomPartitioner", "false"));
42+
isCounterTable = Boolean.parseBoolean(sc.get("spark.counterTable", "false"));
4243

43-
checkTableforColSize = Boolean.parseBoolean(sparkConf.get("spark.origin.checkTableforColSize", "false"));
44-
checkTableforselectCols = sparkConf.get("spark.origin.checkTableforColSize.cols");
45-
checkTableforColSizeTypes = getTypes(sparkConf.get("spark.origin.checkTableforColSize.cols.types"));
46-
filterColName = Util.getSparkPropOrEmpty(sparkConf, "spark.origin.FilterColumn");
47-
filterColType = Util.getSparkPropOrEmpty(sparkConf, "spark.origin.FilterColumnType");
48-
filterColIndex = Integer.parseInt(sparkConf.get("spark.origin.FilterColumnIndex", "0"));
49-
fieldGuardraillimitMB = Integer.parseInt(sparkConf.get("spark.fieldGuardraillimitMB", "0"));
44+
checkTableforColSize = Boolean.parseBoolean(sc.get("spark.origin.checkTableforColSize", "false"));
45+
checkTableforselectCols = sc.get("spark.origin.checkTableforColSize.cols");
46+
checkTableforColSizeTypes = getTypes(sc.get("spark.origin.checkTableforColSize.cols.types"));
47+
filterColName = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumn");
48+
filterColType = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumnType");
49+
filterColIndex = Integer.parseInt(sc.get("spark.origin.FilterColumnIndex", "0"));
50+
fieldGuardraillimitMB = Integer.parseInt(sc.get("spark.fieldGuardraillimitMB", "0"));
5051

51-
String partionKey = sparkConf.get("spark.query.cols.partitionKey");
52-
idColTypes = getTypes(sparkConf.get("spark.query.cols.id.types"));
52+
String partionKey = sc.get("spark.query.cols.partitionKey");
53+
idColTypes = getTypes(sc.get("spark.query.cols.id.types"));
5354

54-
String selectCols = sparkConf.get("spark.query.cols.select");
55-
String updateSelectMappingStr = sparkConf.get("spark.counterTable.cql.index", "0");
55+
String selectCols = sc.get("spark.query.cols.select");
56+
String updateSelectMappingStr = sc.get("spark.counterTable.cql.index", "0");
5657
for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
5758
updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
5859
}
59-
String sourceSelectCondition = sparkConf.get("spark.query.cols.select.condition", "");
60+
String sourceSelectCondition = sc.get("spark.query.cols.select.condition", "");
6061
sourceSelectStatement = sourceSession.prepare(
6162
"select " + selectCols + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
6263
+ ") >= ? and token(" + partionKey.trim() + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING");
63-
6464
}
6565

6666
public static OriginCountJobSession getInstance(CqlSession sourceSession, SparkConf sparkConf) {
@@ -81,7 +81,10 @@ public void getData(BigInteger min, BigInteger max) {
8181
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
8282

8383
try {
84-
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ? min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact()));
84+
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ?
85+
min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact())
86+
.setConsistencyLevel(readConsistencyLevel).setPageSize(fetchSizeInRows));
87+
8588
Collection<CompletionStage<AsyncResultSet>> writeResults = new ArrayList<CompletionStage<AsyncResultSet>>();
8689

8790
// cannot do batching if the writeFilter is greater than 0 or
@@ -106,7 +109,6 @@ public void getData(BigInteger min, BigInteger max) {
106109
}
107110
}
108111
}
109-
110112
} else {
111113
BatchStatement batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
112114
for (Row sourceRow : resultSet) {
@@ -143,7 +145,6 @@ public void getData(BigInteger min, BigInteger max) {
143145
Thread.currentThread().getId(), min, max, retryCount);
144146
}
145147
}
146-
147148
}
148149

149150
private int GetRowColumnLength(Row sourceRow, String filterColType, Integer filterColIndex) {

src/main/java/datastax/astra/migrate/Util.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package datastax.astra.migrate;
22

3+
import com.datastax.oss.driver.api.core.ConsistencyLevel;
4+
import org.apache.commons.lang.StringUtils;
35
import org.apache.spark.SparkConf;
46

57
import java.io.BufferedReader;
@@ -39,4 +41,44 @@ public static BufferedReader getfileReader(String fileName) {
3941
}
4042
}
4143

44+
public static ConsistencyLevel mapToConsistencyLevel(String level) {
45+
ConsistencyLevel retVal = ConsistencyLevel.LOCAL_QUORUM;
46+
if (StringUtils.isNotEmpty(level)) {
47+
switch (level.toUpperCase()) {
48+
case "ANY":
49+
retVal = ConsistencyLevel.ANY;
50+
break;
51+
case "ONE":
52+
retVal = ConsistencyLevel.ONE;
53+
break;
54+
case "TWO":
55+
retVal = ConsistencyLevel.TWO;
56+
break;
57+
case "THREE":
58+
retVal = ConsistencyLevel.THREE;
59+
break;
60+
case "QUORUM":
61+
retVal = ConsistencyLevel.QUORUM;
62+
break;
63+
case "LOCAL_ONE":
64+
retVal = ConsistencyLevel.LOCAL_ONE;
65+
break;
66+
case "EACH_QUORUM":
67+
retVal = ConsistencyLevel.EACH_QUORUM;
68+
break;
69+
case "SERIAL":
70+
retVal = ConsistencyLevel.SERIAL;
71+
break;
72+
case "LOCAL_SERIAL":
73+
retVal = ConsistencyLevel.LOCAL_SERIAL;
74+
break;
75+
case "ALL":
76+
retVal = ConsistencyLevel.ALL;
77+
break;
78+
}
79+
}
80+
81+
return retVal;
82+
}
83+
4284
}

src/resources/sparkConf.properties

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,24 @@ spark.batchSize 5
1818

1919
spark.query.origin partition-key,clustering-key,order-date,amount
2020
spark.query.origin.partitionKey partition-key
21-
spark.query.target partition-key,clustering-key,order-date,amount
2221
spark.query.target.id partition-key,clustering-key
2322
spark.query.types 9,1,4,3
2423
spark.query.ttl.cols 2,3
2524
spark.query.writetime.cols 2,3
2625

27-
################################## ENABLE ONLY IF IT IS A COUNTER TABLE #####################################
26+
##### ENABLE ONLY IF COLUMN NAMES ON TARGET IS DIFFERENT FROM ORIGIN (SCHEMA & DATA-TYPES MUST BE SAME) #####
27+
#spark.query.target partition-key,clustering-key,order-date,amount
28+
29+
################# ENABLE ONLY IF YOU WANT TO MIGRATE/VALIDATE SOME DATA BASED ON CQL FILTER #################
30+
#spark.query.condition
31+
32+
################# ENABLE ONLY IF YOU WANT TO MIGRATE/VALIDATE SOME % (NOT 100%) DATA ######################
33+
#spark.coveragePercent 10
34+
35+
#################### ENABLE ONLY IF WANT LOG STATS MORE OR LESS FREQUENTLY THAN DEFAULT #####################
36+
#spark.printStatsAfter 100000
37+
38+
################################# ENABLE ONLY IF IT IS A COUNTER TABLE ######################################
2839
#spark.counterTable false
2940
#spark.counterTable.cql
3041
#spark.counterTable.cql.index 0
@@ -34,12 +45,17 @@ spark.query.writetime.cols 2,3
3445
#spark.origin.minWriteTimeStampFilter 0
3546
#spark.origin.maxWriteTimeStampFilter 4102444800000000
3647

37-
############################### ONLY CHANGE IF YOU KNOW WHAT YOU ARE DOING ##################################
38-
#spark.coveragePercent 100
39-
#spark.printStatsAfter 100000
40-
#spark.read.consistency.level LOCAL_QUORUM
48+
######## ENABLE ONLY IF YOU WANT TO USE READ AND/OR WRITE CONSISTENCY OTHER THAN LOCAL_QUORUM ##############
49+
#spark.consistency.read LOCAL_QUORUM
50+
#spark.consistency.write LOCAL_QUORUM
51+
52+
############# ENABLE ONLY IF YOU WANT TO REDUCE FETCH-SIZE TO AVOID FrameTooLongException ##################
4153
#spark.read.fetch.sizeInRows 1000
54+
55+
############### ENABLE ONLY IF YOU WANT TO USE CUSTOM FIXED WRITETIME VALUE ON TARGET ######################
4256
#spark.target.custom.writeTime 0
57+
58+
#################### ONLY USE if SKIPPING recs greater than 10MB from Origin needed #########################
4359
#spark.fieldGuardraillimitMB 10
4460

4561
#################### ONLY USE if count of recs greater than 10MB from Origin needed #########################
@@ -98,8 +114,4 @@ spark.query.writetime.cols 2,3
98114
# Note: The tool migrates TTL & Writetimes at row-level and not field-level.
99115
# Migration will use the largest TTL & Writetimes value per row.
100116
#
101-
# "spark.target.custom.writeTime" - User specified writetime. When set, this static value will be used as target writetime.
102-
#
103-
# Default value for "spark.origin.maxWriteTimeStampFilter" is "9223372036854775807" (max long value)
104-
#
105-
#############################################################################################################
117+
#############################################################################################################

0 commit comments

Comments
 (0)