Skip to content

Commit 86632b4

Browse files
committed
Fix counter-table bug when run in Diff mode & simplify conter-table config
1 parent d985c1c commit 86632b4

File tree

7 files changed

+18
-51
lines changed

7 files changed

+18
-51
lines changed

.idea/libraries/Maven__org_scala_lang_scala_library_2_11_8.xml

Lines changed: 0 additions & 23 deletions
This file was deleted.

README.md

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ tar -xvzf <spark downloaded file name>
2525
```
2626
./spark-submit --properties-file sparkConf.properties /
2727
--master "local[*]" /
28-
--conf spark.migrate.source.minPartition=-9223372036854775808 /
29-
--conf spark.migrate.source.maxPartition=9223372036854775807 /
3028
--class datastax.astra.migrate.Migrate cassandra-data-migrator-1.x.jar &> logfile_name.txt
3129
```
3230

@@ -40,8 +38,6 @@ Note: Above command also generates a log file `logfile_name.txt` to avoid log ou
4038
```
4139
./spark-submit --properties-file sparkConf.properties /
4240
--master "local[*]" /
43-
--conf spark.migrate.source.minPartition=-9223372036854775808 /
44-
--conf spark.migrate.source.maxPartition=9223372036854775807 /
4541
--class datastax.astra.migrate.DiffData cassandra-data-migrator-1.x.jar &> logfile_name.txt
4642
```
4743

@@ -60,8 +56,8 @@ Note: Above command also generates a log file `logfile_name.txt` to avoid log ou
6056
- Enable/disable this feature using one or both of the below setting in the config file
6157

6258
```
63-
spark.migrate.destination.autocorrect.missing true|false
64-
spark.migrate.destination.autocorrect.mismatch true|false
59+
spark.destination.autocorrect.missing true|false
60+
spark.destination.autocorrect.mismatch true|false
6561
```
6662

6763
# Additional features

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>1.1</version>
6+
<version>1.2</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public abstract class AbstractJobSession {
3434

3535
protected CqlSession sourceSession;
3636
protected CqlSession astraSession;
37+
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
3738
protected List<MigrateDataType> idColTypes = new ArrayList<MigrateDataType>();
3839

3940
protected Integer batchSize = 1;
@@ -47,7 +48,6 @@ public abstract class AbstractJobSession {
4748
protected List<Integer> writeTimeStampCols = new ArrayList<Integer>();
4849
protected List<Integer> ttlCols = new ArrayList<Integer>();
4950
protected Boolean isCounterTable;
50-
protected Integer counterDeltaMaxIndex = 0;
5151

5252
protected String sourceKeyspaceTable;
5353
protected String astraKeyspaceTable;
@@ -117,11 +117,8 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
117117

118118
hasRandomPartitioner = Boolean.parseBoolean(sparkConf.get("spark.source.hasRandomPartitioner", "false"));
119119

120-
isCounterTable = Boolean.parseBoolean(sparkConf.get("spark.source.counterTable", "false"));
121-
122-
counterDeltaMaxIndex = Integer
123-
.parseInt(sparkConf.get("spark.source.counterTable.update.max.counter.index", "0"));
124-
120+
isCounterTable = Boolean.parseBoolean(sparkConf.get("spark.counterTable", "false"));
121+
selectColTypes = getTypes(sparkConf.get("spark.diff.select.types"));
125122
String partionKey = sparkConf.get("spark.query.cols.partitionKey");
126123
String idCols = sparkConf.get("spark.query.cols.id");
127124
idColTypes = getTypes(sparkConf.get("spark.query.cols.id.types"));

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,12 @@ protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, Spar
5454
}
5555

5656
if (isCounterTable) {
57-
String updateSelectMappingStr = sparkConf.get("spark.source.counterTable.update.select.index", "0");
57+
String updateSelectMappingStr = sparkConf.get("spark.counterTable.cql.index", "0");
5858
for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
5959
updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
6060
}
6161

62-
String counterTableUpdate = sparkConf.get("spark.source.counterTable.update.cql");
62+
String counterTableUpdate = sparkConf.get("spark.counterTable.cql");
6363
astraInsertStatement = astraSession.prepare(counterTableUpdate);
6464
} else {
6565
if (isPreserveTTLWritetime) {
@@ -181,8 +181,8 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
181181
for (int index = 0; index < insertColTypes.size(); index++) {
182182
MigrateDataType dataType = insertColTypes.get(index);
183183
// compute the counter delta if reading from astra for the difference
184-
if (astraRow != null && isCounterTable && index <= counterDeltaMaxIndex) {
185-
boundInsertStatement = boundInsertStatement.set(index, getCounterDelta(sourceRow.getLong(updateSelectMapping.get(index)), astraRow.getLong(updateSelectMapping.get(index))), Long.class);
184+
if (astraRow != null && index < (selectColTypes.size() - idColTypes.size())) {
185+
boundInsertStatement = boundInsertStatement.set(index, (sourceRow.getLong(updateSelectMapping.get(index)) - astraRow.getLong(updateSelectMapping.get(index))), Long.class);
186186
} else {
187187
boundInsertStatement = boundInsertStatement.set(index, getData(dataType, updateSelectMapping.get(index), sourceRow), dataType.typeClass);
188188
}
@@ -217,8 +217,4 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
217217
return boundInsertStatement;
218218
}
219219

220-
public Long getCounterDelta(Long sourceRow, Long astraRow) {
221-
return sourceRow - astraRow;
222-
}
223-
224220
}

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ public class DiffJobSession extends CopyJobSession {
2828
private AtomicLong validCounter = new AtomicLong(0);
2929
private AtomicLong skippedCounter = new AtomicLong(0);
3030

31-
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
3231
protected Boolean autoCorrectMissing = false;
3332
protected Boolean autoCorrectMismatch = false;
3433

@@ -47,7 +46,6 @@ public static DiffJobSession getInstance(CqlSession sourceSession, CqlSession as
4746
private DiffJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
4847
super(sourceSession, astraSession, sparkConf);
4948

50-
selectColTypes = getTypes(sparkConf.get("spark.diff.select.types"));
5149
autoCorrectMissing = Boolean.parseBoolean(sparkConf.get("spark.destination.autocorrect.missing", "false"));
5250
autoCorrectMismatch = Boolean.parseBoolean(sparkConf.get("spark.destination.autocorrect.mismatch", "false"));
5351
}
@@ -134,7 +132,11 @@ private void diff(Row sourceRow, Row astraRow) {
134132
logger.error("Data mismatch found - Key: " + getKey(sourceRow) + " Data: " + diffData);
135133

136134
if (autoCorrectMismatch) {
137-
astraSession.execute(bindInsert(astraInsertStatement, sourceRow, null));
135+
if (isCounterTable) {
136+
astraSession.execute(bindInsert(astraInsertStatement, sourceRow, astraRow));
137+
} else {
138+
astraSession.execute(bindInsert(astraInsertStatement, sourceRow, null));
139+
}
138140
correctedMismatchCounter.incrementAndGet();
139141
logger.error("Corrected mismatch data in Astra: " + getKey(sourceRow));
140142
}

src/resources/sparkConf.properties

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,9 @@ spark.query.cols.partitionKey partition-key
3131
spark.query.cols.insert partition-key,clustering-key,order-date,amount
3232
spark.query.cols.insert.types 9,1,4,3
3333

34-
spark.source.counterTable false
35-
spark.source.counterTable.update.cql
36-
spark.source.counterTable.update.max.counter.index 0
37-
spark.source.counterTable.update.select.index 0
34+
spark.counterTable false
35+
spark.counterTable.cql
36+
spark.counterTable.cql.index 0
3837

3938
spark.preserveTTLWriteTime true
4039
spark.source.ttl.cols 6,7

0 commit comments

Comments
 (0)