Skip to content

Commit bb59c36

Browse files
faizalrub-datastaxmsmygit
authored andcommitted
Exception handler - Partition level
1 parent f6747ff commit bb59c36

File tree

7 files changed

+25
-14
lines changed

7 files changed

+25
-14
lines changed

SIT/smoke/01_basic_kvp/cdm.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ EOF
1010
spark-submit \
1111
--properties-file /smoke/01_basic_kvp/migrate.properties \
1212
--master "local[*]" \
13-
--class datastax.astra.migrate.Migrate /local/cassandra-data-migrator.jar
13+
--class datastax.cdm.job.Migrate /local/cassandra-data-migrator.jar
1414

1515
cat <<EOF
1616
!!!!!!!!
@@ -21,4 +21,4 @@ EOF
2121
spark-submit \
2222
--properties-file /smoke/01_basic_kvp/migrate.properties \
2323
--master "local[*]" \
24-
--class datastax.astra.migrate.DiffData /local/cassandra-data-migrator.jar
24+
--class datastax.cdm.job.DiffData /local/cassandra-data-migrator.jar

src/main/java/datastax/cdm/job/AbstractJobSession.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
4343
logger.info("PARAM -- Max Retries: {}", maxRetries);
4444
logger.info("PARAM -- ReadRateLimit: {}", readLimiter.getRate());
4545
logger.info("PARAM -- WriteRateLimit: {}", writeLimiter.getRate());
46+
logger.info("PARAM -- Token range exception dir: {}", tokenRangeExceptionDir);
47+
logger.info("PARAM -- Token range exception file name: {}", exceptionFileName);
4648

4749
cqlHelper.initialize();
4850
}

src/main/java/datastax/cdm/job/DiffJobSession.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ private void diff(Record record) {
205205
//correct data
206206
if (autoCorrectMissing) {
207207
writeLimiter.acquire(1);
208+
logger.info("Inserting missing row in target: {}", record.getPk());
208209
if (isCounterTable) cqlHelper.getTargetUpdateStatement().putRecord(record);
209210
else cqlHelper.getTargetInsertStatement().putRecord(record);
210211
correctedMissingCounter.incrementAndGet();
@@ -220,6 +221,7 @@ private void diff(Record record) {
220221

221222
if (autoCorrectMismatch) {
222223
writeLimiter.acquire(1);
224+
logger.info("Correcting mismatch row in target: {}", record.getPk());
223225
if (isCounterTable) cqlHelper.getTargetUpdateStatement().putRecord(record);
224226
else cqlHelper.getTargetInsertStatement().putRecord(record);
225227
correctedMismatchCounter.incrementAndGet();

src/main/java/datastax/cdm/properties/KnownProperties.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ public enum PropertyType {
172172
//==========================================================================
173173
// Error handling
174174
//==========================================================================
175-
public static final String TOKEN_RANGE_EXCEPTION_DIR = "spark.tokenRange.exceptionDir"; // file:///aaa/bbb/
176-
public static final String TOKEN_RANGE_EXCEPTION_FILE = "spark.input.partitionFile"; // file:///aaa/bbb/partitions.csv
175+
public static final String TOKEN_RANGE_EXCEPTION_DIR = "spark.tokenRange.exceptionDir"; // aaa/bbb
176+
public static final String TOKEN_RANGE_EXCEPTION_FILE = "spark.input.partitionFile"; // aaa/bbb/filename
177177
static {
178178
types.put(TOKEN_RANGE_EXCEPTION_DIR, PropertyType.STRING);
179179
types.put(TOKEN_RANGE_EXCEPTION_FILE, PropertyType.STRING);
Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package datastax.cdm.job
22

33
import com.datastax.spark.connector.cql.CassandraConnector
4+
import org.apache.spark.SparkConf
45
import org.slf4j.LoggerFactory
56

67
import scala.collection.JavaConversions._
@@ -10,11 +11,11 @@ object DiffDataFailedPartitionsFromFile extends AbstractJob {
1011
val logger = LoggerFactory.getLogger(this.getClass.getName)
1112
logger.info("Started MigratePartitionsFromFile App")
1213

13-
migrateTable(originConnection, targetConnection)
14+
migrateTable(originConnection, targetConnection, sc)
1415

1516
exitSpark
1617

17-
private def migrateTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector) = {
18+
private def migrateTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector, config: SparkConf) = {
1819
val partitions = SplitPartitions.getFailedSubPartitionsFromFile(numSplits, tokenRangeFile)
1920
logger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
2021
val parts = sContext.parallelize(partitions.toSeq, partitions.size);
@@ -23,11 +24,11 @@ object DiffDataFailedPartitionsFromFile extends AbstractJob {
2324
parts.foreach(part => {
2425
sourceConnection.withSessionDo(sourceSession =>
2526
destinationConnection.withSessionDo(destinationSession =>
26-
DiffJobSession.getInstance(sourceSession, destinationSession, sc)
27+
DiffJobSession.getInstance(sourceSession, destinationSession, config)
2728
.getDataAndDiff(part.getMin, part.getMax)))
2829
})
2930

30-
DiffJobSession.getInstance(null, null, sc).printCounts(true);
31+
DiffJobSession.getInstance(null, null, config).printCounts(true);
3132
}
3233

3334
}

src/resources/runCommands.txt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ curl -OL https://downloads.datastax.com/enterprise/cqlsh-astra.tar.gz
44
wget https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
55

66
// Migrate
7-
spark-submit --properties-file /<path>/sparkConf.properties --verbose --master "local[*]" --class datastax.astra.migrate.Migrate /<path>/cassandra-data-migrator-3.*.jar
8-
spark-submit --properties-file /<path>/sparkConf.properties --master "local[*]" --driver-memory 25G --executor-memory 25G --class datastax.astra.migrate.Migrate /<path>/cassandra-data-migrator-3.*.jar &> table_out.log
7+
spark-submit --properties-file /<path>/sparkConf.properties --verbose --master "local[*]" --class datastax.cdm.job.Migrate /<path>/cassandra-data-migrator-3.*.jar
8+
spark-submit --properties-file /<path>/sparkConf.properties --master "local[*]" --driver-memory 25G --executor-memory 25G --class datastax.cdm.job.Migrate /<path>/cassandra-data-migrator-3.*.jar &> table_out.log
99

1010
// Random Partitioner Run Command
11-
spark-submit --properties-file /<path>/sparkConf.properties --verbose --master "local[*]" --conf spark.origin.minPartition=-1 --conf spark.origin.maxPartition=170141183460469231731687303715884105728 --class datastax.astra.migrate.Migrate /<path>/cassandra-data-migrator-3.*.jar
11+
spark-submit --properties-file /<path>/sparkConf.properties --verbose --master "local[*]" --conf spark.origin.minPartition=-1 --conf spark.origin.maxPartition=170141183460469231731687303715884105728 --class datastax.cdm.job.Migrate /<path>/cassandra-data-migrator-3.*.jar
1212

1313
// Validate
14-
spark-submit --properties-file /<path>/sparkConf.properties --master "local[*]" --driver-memory 25G --executor-memory 25G --class datastax.astra.migrate.DiffData /<path>/cassandra-data-migrator-3.*.jar &> table_out.log
14+
spark-submit --properties-file /<path>/sparkConf.properties --master "local[*]" --driver-memory 25G --executor-memory 25G --class datastax.cdm.job.DiffData /<path>/cassandra-data-migrator-3.*.jar &> table_out.log
15+
16+
// Rerun validate with failed partition ranges
17+
spark-submit --properties-file /<path>/sparkConf.properties --master "local[*]" --conf spark.input.partitionFile=<path>>/<fileName> --driver-memory 25G --executor-memory 25G --class datastax.cdm.job.DiffDataFailedPartitionsFromFile /<path>/cassandra-data-migrator-3.*.jar &> table_out.log

src/resources/sparkConf.properties

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,12 +164,15 @@ spark.cdm.schema.target.column.id.names partition_col1,partition_col2,
164164
# 5323. The corresponding counter in Origin is also 5323. At some point, the Target
165165
# counter gets DELETEd. Should the .missing record be re-inserted before
166166
# the DELETE gets tombstoned, the counter will zombie back to life, and the
167-
# counter will become 5323+5323 = 10646.
167+
# counter will become 5323+5323 = 10646.
168+
# spark.tokenRange
169+
# .exceptionDir : When set partition ranges corresponding to the exceptions will be logged in
170+
# file with the name <keyspace>.<table_name> in this directory.
168171
#-----------------------------------------------------------------------------------------------------------
169172
spark.cdm.autocorrect.missing false
170173
spark.cdm.autocorrect.mismatch false
171174
#spark.cdm.autocorrect.missing.counter false
172-
175+
#spark.tokenRange.exceptionDir aaa/bbb
173176
#===========================================================================================================
174177
# Performance and Operations Parameters affecting throughput, error handling, and similar concerns.
175178
#

0 commit comments

Comments
 (0)