Skip to content

Commit e9161ba

Browse files
authored
Merge pull request #4 from datastax/feature/cleanup-config
Feature/cleanup config
2 parents 8eb105b + 86632b4 commit e9161ba

File tree

11 files changed

+233
-274
lines changed

11 files changed

+233
-274
lines changed

.idea/libraries/Maven__org_scala_lang_scala_library_2_11_8.xml

Lines changed: 0 additions & 23 deletions
This file was deleted.

README.md

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ tar -xvzf <spark downloaded file name>
2525
```
2626
./spark-submit --properties-file sparkConf.properties /
2727
--master "local[*]" /
28-
--conf spark.migrate.source.minPartition=-9223372036854775808 /
29-
--conf spark.migrate.source.maxPartition=9223372036854775807 /
3028
--class datastax.astra.migrate.Migrate cassandra-data-migrator-1.x.jar &> logfile_name.txt
3129
```
3230

@@ -40,8 +38,6 @@ Note: Above command also generates a log file `logfile_name.txt` to avoid log ou
4038
```
4139
./spark-submit --properties-file sparkConf.properties /
4240
--master "local[*]" /
43-
--conf spark.migrate.source.minPartition=-9223372036854775808 /
44-
--conf spark.migrate.source.maxPartition=9223372036854775807 /
4541
--class datastax.astra.migrate.DiffData cassandra-data-migrator-1.x.jar &> logfile_name.txt
4642
```
4743

@@ -60,8 +56,8 @@ Note: Above command also generates a log file `logfile_name.txt` to avoid log ou
6056
- Enable/disable this feature using one or both of the below setting in the config file
6157

6258
```
63-
spark.migrate.destination.autocorrect.missing true|false
64-
spark.migrate.destination.autocorrect.mismatch true|false
59+
spark.destination.autocorrect.missing true|false
60+
spark.destination.autocorrect.mismatch true|false
6561
```
6662

6763
# Additional features

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>1.0</version>
6+
<version>1.2</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public abstract class AbstractJobSession {
3434

3535
protected CqlSession sourceSession;
3636
protected CqlSession astraSession;
37+
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
3738
protected List<MigrateDataType> idColTypes = new ArrayList<MigrateDataType>();
3839

3940
protected Integer batchSize = 1;
@@ -47,7 +48,6 @@ public abstract class AbstractJobSession {
4748
protected List<Integer> writeTimeStampCols = new ArrayList<Integer>();
4849
protected List<Integer> ttlCols = new ArrayList<Integer>();
4950
protected Boolean isCounterTable;
50-
protected Integer counterDeltaMaxIndex = 0;
5151

5252
protected String sourceKeyspaceTable;
5353
protected String astraKeyspaceTable;
@@ -58,22 +58,22 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
5858
this.sourceSession = sourceSession;
5959
this.astraSession = astraSession;
6060

61-
batchSize = new Integer(sparkConf.get("spark.migrate.batchSize", "1"));
62-
printStatsAfter = new Integer(sparkConf.get("spark.migrate.printStatsAfter", "100000"));
61+
batchSize = new Integer(sparkConf.get("spark.batchSize", "1"));
62+
printStatsAfter = new Integer(sparkConf.get("spark.printStatsAfter", "100000"));
6363
if (printStatsAfter < 1) {
6464
printStatsAfter = 100000;
6565
}
6666

67-
readLimiter = RateLimiter.create(new Integer(sparkConf.get("spark.migrate.readRateLimit", "20000")));
68-
writeLimiter = RateLimiter.create(new Integer(sparkConf.get("spark.migrate.writeRateLimit", "40000")));
69-
maxRetries = Integer.parseInt(sparkConf.get("spark.migrate.maxRetries", "10"));
67+
readLimiter = RateLimiter.create(new Integer(sparkConf.get("spark.readRateLimit", "20000")));
68+
writeLimiter = RateLimiter.create(new Integer(sparkConf.get("spark.writeRateLimit", "40000")));
69+
maxRetries = Integer.parseInt(sparkConf.get("spark.maxRetries", "10"));
7070

71-
sourceKeyspaceTable = sparkConf.get("spark.migrate.source.keyspaceTable");
72-
astraKeyspaceTable = sparkConf.get("spark.migrate.destination.keyspaceTable");
71+
sourceKeyspaceTable = sparkConf.get("spark.source.keyspaceTable");
72+
astraKeyspaceTable = sparkConf.get("spark.destination.keyspaceTable");
7373

74-
isPreserveTTLWritetime = Boolean.parseBoolean(sparkConf.get("spark.migrate.preserveTTLWriteTime", "false"));
74+
isPreserveTTLWritetime = Boolean.parseBoolean(sparkConf.get("spark.preserveTTLWriteTime", "false"));
7575
if (isPreserveTTLWritetime) {
76-
String ttlColsStr = sparkConf.get("spark.migrate.source.ttl.cols");
76+
String ttlColsStr = sparkConf.get("spark.source.ttl.cols");
7777
if (null != ttlColsStr && ttlColsStr.trim().length() > 0) {
7878
for (String ttlCol : ttlColsStr.split(",")) {
7979
ttlCols.add(Integer.parseInt(ttlCol));
@@ -82,11 +82,11 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
8282
}
8383

8484
writeTimeStampFilter = Boolean
85-
.parseBoolean(sparkConf.get("spark.migrate.source.writeTimeStampFilter", "false"));
85+
.parseBoolean(sparkConf.get("spark.source.writeTimeStampFilter", "false"));
8686
// batchsize set to 1 if there is a writeFilter
8787
if (writeTimeStampFilter) {
8888
batchSize = 1;
89-
String writeTimestampColsStr = sparkConf.get("spark.migrate.source.writeTimeStampFilter.cols");
89+
String writeTimestampColsStr = sparkConf.get("spark.source.writeTimeStampFilter.cols");
9090
if (null != writeTimestampColsStr && writeTimestampColsStr.trim().length() > 0) {
9191
for (String writeTimeStampCol : writeTimestampColsStr.split(",")) {
9292
writeTimeStampCols.add(Integer.parseInt(writeTimeStampCol));
@@ -95,12 +95,12 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
9595
}
9696

9797
String minWriteTimeStampFilterStr =
98-
sparkConf.get("spark.migrate.source.minWriteTimeStampFilter", "0");
98+
sparkConf.get("spark.source.minWriteTimeStampFilter", "0");
9999
if (null != minWriteTimeStampFilterStr && minWriteTimeStampFilterStr.trim().length() > 1) {
100100
minWriteTimeStampFilter = Long.parseLong(minWriteTimeStampFilterStr);
101101
}
102102
String maxWriteTimeStampFilterStr =
103-
sparkConf.get("spark.migrate.source.maxWriteTimeStampFilter", "0");
103+
sparkConf.get("spark.source.maxWriteTimeStampFilter", "0");
104104
if (null != maxWriteTimeStampFilterStr && maxWriteTimeStampFilterStr.trim().length() > 1) {
105105
maxWriteTimeStampFilter = Long.parseLong(maxWriteTimeStampFilterStr);
106106
}
@@ -115,18 +115,15 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
115115
logger.info(" DEFAULT -- isPreserveTTLWritetime: " + isPreserveTTLWritetime);
116116
logger.info(" DEFAULT -- TTLCols: " + ttlCols);
117117

118-
hasRandomPartitioner = Boolean.parseBoolean(sparkConf.get("spark.migrate.source.hasRandomPartitioner", "false"));
118+
hasRandomPartitioner = Boolean.parseBoolean(sparkConf.get("spark.source.hasRandomPartitioner", "false"));
119119

120-
isCounterTable = Boolean.parseBoolean(sparkConf.get("spark.migrate.source.counterTable", "false"));
120+
isCounterTable = Boolean.parseBoolean(sparkConf.get("spark.counterTable", "false"));
121+
selectColTypes = getTypes(sparkConf.get("spark.diff.select.types"));
122+
String partionKey = sparkConf.get("spark.query.cols.partitionKey");
123+
String idCols = sparkConf.get("spark.query.cols.id");
124+
idColTypes = getTypes(sparkConf.get("spark.query.cols.id.types"));
121125

122-
counterDeltaMaxIndex = Integer
123-
.parseInt(sparkConf.get("spark.migrate.source.counterTable.update.max.counter.index", "0"));
124-
125-
String partionKey = sparkConf.get("spark.migrate.query.cols.partitionKey");
126-
String idCols = sparkConf.get("spark.migrate.query.cols.id");
127-
idColTypes = getTypes(sparkConf.get("spark.migrate.query.cols.id.types"));
128-
129-
String selectCols = sparkConf.get("spark.migrate.query.cols.select");
126+
String selectCols = sparkConf.get("spark.query.cols.select");
130127

131128
String idBinds = "";
132129
int count = 1;
@@ -139,7 +136,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
139136
count++;
140137
}
141138

142-
sourceSelectCondition = sparkConf.get("spark.migrate.query.cols.select.condition", "");
139+
sourceSelectCondition = sparkConf.get("spark.query.cols.select.condition", "");
143140
sourceSelectStatement = sourceSession.prepare(
144141
"select " + selectCols + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
145142
+ ") >= ? and token(" + partionKey.trim() + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING");

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession as
4040
protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
4141
super(sourceSession, astraSession, sparkConf);
4242

43-
String insertCols = sparkConf.get("spark.migrate.query.cols.insert");
44-
insertColTypes = getTypes(sparkConf.get("spark.migrate.query.cols.insert.types"));
43+
String insertCols = sparkConf.get("spark.query.cols.insert");
44+
insertColTypes = getTypes(sparkConf.get("spark.query.cols.insert.types"));
4545
String insertBinds = "";
4646
int count = 1;
4747
for (String str : insertCols.split(",")) {
@@ -54,12 +54,12 @@ protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, Spar
5454
}
5555

5656
if (isCounterTable) {
57-
String updateSelectMappingStr = sparkConf.get("spark.migrate.source.counterTable.update.select.index", "0");
57+
String updateSelectMappingStr = sparkConf.get("spark.counterTable.cql.index", "0");
5858
for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
5959
updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
6060
}
6161

62-
String counterTableUpdate = sparkConf.get("spark.migrate.source.counterTable.update.cql");
62+
String counterTableUpdate = sparkConf.get("spark.counterTable.cql");
6363
astraInsertStatement = astraSession.prepare(counterTableUpdate);
6464
} else {
6565
if (isPreserveTTLWritetime) {
@@ -181,8 +181,8 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
181181
for (int index = 0; index < insertColTypes.size(); index++) {
182182
MigrateDataType dataType = insertColTypes.get(index);
183183
// compute the counter delta if reading from astra for the difference
184-
if (astraRow != null && isCounterTable && index <= counterDeltaMaxIndex) {
185-
boundInsertStatement = boundInsertStatement.set(index, getCounterDelta(sourceRow.getLong(updateSelectMapping.get(index)), astraRow.getLong(updateSelectMapping.get(index))), Long.class);
184+
if (astraRow != null && index < (selectColTypes.size() - idColTypes.size())) {
185+
boundInsertStatement = boundInsertStatement.set(index, (sourceRow.getLong(updateSelectMapping.get(index)) - astraRow.getLong(updateSelectMapping.get(index))), Long.class);
186186
} else {
187187
boundInsertStatement = boundInsertStatement.set(index, getData(dataType, updateSelectMapping.get(index), sourceRow), dataType.typeClass);
188188
}
@@ -217,8 +217,4 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
217217
return boundInsertStatement;
218218
}
219219

220-
public Long getCounterDelta(Long sourceRow, Long astraRow) {
221-
return sourceRow - astraRow;
222-
}
223-
224220
}

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,6 @@
1515
import java.util.stream.IntStream;
1616
import java.util.stream.StreamSupport;
1717

18-
/*
19-
(
20-
data_id text,
21-
cylinder text,
22-
value blob,
23-
PRIMARY KEY (data_id, cylinder)
24-
)
25-
*/
2618
public class DiffJobSession extends CopyJobSession {
2719

2820
public static Logger logger = Logger.getLogger(DiffJobSession.class);
@@ -36,7 +28,6 @@ public class DiffJobSession extends CopyJobSession {
3628
private AtomicLong validCounter = new AtomicLong(0);
3729
private AtomicLong skippedCounter = new AtomicLong(0);
3830

39-
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
4031
protected Boolean autoCorrectMissing = false;
4132
protected Boolean autoCorrectMismatch = false;
4233

@@ -55,9 +46,8 @@ public static DiffJobSession getInstance(CqlSession sourceSession, CqlSession as
5546
private DiffJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
5647
super(sourceSession, astraSession, sparkConf);
5748

58-
selectColTypes = getTypes(sparkConf.get("spark.migrate.diff.select.types"));
59-
autoCorrectMissing = Boolean.parseBoolean(sparkConf.get("spark.migrate.destination.autocorrect.missing", "false"));
60-
autoCorrectMismatch = Boolean.parseBoolean(sparkConf.get("spark.migrate.destination.autocorrect.mismatch", "false"));
49+
autoCorrectMissing = Boolean.parseBoolean(sparkConf.get("spark.destination.autocorrect.missing", "false"));
50+
autoCorrectMismatch = Boolean.parseBoolean(sparkConf.get("spark.destination.autocorrect.mismatch", "false"));
6151
}
6252

6353
public void getDataAndDiff(BigInteger min, BigInteger max) {
@@ -142,7 +132,11 @@ private void diff(Row sourceRow, Row astraRow) {
142132
logger.error("Data mismatch found - Key: " + getKey(sourceRow) + " Data: " + diffData);
143133

144134
if (autoCorrectMismatch) {
145-
astraSession.execute(bindInsert(astraInsertStatement, sourceRow, null));
135+
if (isCounterTable) {
136+
astraSession.execute(bindInsert(astraInsertStatement, sourceRow, astraRow));
137+
} else {
138+
astraSession.execute(bindInsert(astraInsertStatement, sourceRow, null));
139+
}
146140
correctedMismatchCounter.incrementAndGet();
147141
logger.error("Corrected mismatch data in Astra: " + getKey(sourceRow));
148142
}

0 commit comments

Comments
 (0)