Skip to content

Commit f8d3eb9

Browse files
pravinbhatmsmygit
authored andcommitted
Improved docs, logging & set default max-retry to 0
1 parent 0154c50 commit f8d3eb9

File tree

6 files changed

+91
-79
lines changed

6 files changed

+91
-79
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>3.0.4</version>
6+
<version>3.0.5</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
3333
this.sourceSession = sourceSession;
3434
this.astraSession = astraSession;
3535

36-
batchSize = new Integer(Util.getSparkPropOr(sc, "spark.batchSize", "1"));
36+
batchSize = new Integer(Util.getSparkPropOr(sc, "spark.batchSize", "5"));
3737
fetchSizeInRows = new Integer(Util.getSparkPropOr(sc, "spark.read.fetch.sizeInRows", "1000"));
3838
printStatsAfter = new Integer(Util.getSparkPropOr(sc, "spark.printStatsAfter", "100000"));
3939
if (printStatsAfter < 1) {
@@ -42,7 +42,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
4242

4343
readLimiter = RateLimiter.create(new Integer(Util.getSparkPropOr(sc, "spark.readRateLimit", "20000")));
4444
writeLimiter = RateLimiter.create(new Integer(Util.getSparkPropOr(sc, "spark.writeRateLimit", "40000")));
45-
maxRetries = Integer.parseInt(sc.get("spark.maxRetries", "10"));
45+
maxRetries = Integer.parseInt(sc.get("spark.maxRetries", "0"));
4646

4747
sourceKeyspaceTable = Util.getSparkProp(sc, "spark.origin.keyspaceTable");
4848
astraKeyspaceTable = Util.getSparkProp(sc, "spark.target.keyspaceTable");
@@ -88,6 +88,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
8888
logger.info("PARAM -- Read Consistency: {}", readConsistencyLevel);
8989
logger.info("PARAM -- Write Consistency: {}", writeConsistencyLevel);
9090
logger.info("PARAM -- Write Batch Size: {}", batchSize);
91+
logger.info("PARAM -- Max Retries: {}", maxRetries);
9192
logger.info("PARAM -- Read Fetch Size: {}", fetchSizeInRows);
9293
logger.info("PARAM -- Source Keyspace Table: {}", sourceKeyspaceTable);
9394
logger.info("PARAM -- Destination Keyspace Table: {}", astraKeyspaceTable);

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,9 @@ public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession as
4444

4545
public void getDataAndInsert(BigInteger min, BigInteger max) {
4646
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
47-
int maxAttempts = maxRetries;
4847
boolean done = false;
49-
50-
for (int retryCount = 1; retryCount <= maxAttempts && !done; retryCount++) {
48+
int maxAttempts = maxRetries + 1;
49+
for (int attempts = 1; attempts <= maxAttempts && !done; attempts++) {
5150
long readCnt = 0;
5251
long writeCnt = 0;
5352
long skipCnt = 0;
@@ -156,15 +155,15 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
156155
skippedCounter.addAndGet(skipCnt);
157156
done = true;
158157
} catch (Exception e) {
159-
if (retryCount == maxAttempts) {
158+
if (attempts == maxAttempts) {
160159
readCounter.addAndGet(readCnt);
161160
writeCounter.addAndGet(writeCnt);
162161
skippedCounter.addAndGet(skipCnt);
163162
errorCounter.addAndGet(readCnt - writeCnt - skipCnt);
164163
}
165-
logger.error("Error occurred retry#: {}", retryCount, e);
166-
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Retry# {}",
167-
Thread.currentThread().getId(), min, max, retryCount);
164+
logger.error("Error occurred during Attempt#: {}", attempts, e);
165+
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",
166+
Thread.currentThread().getId(), min, max, attempts);
168167
logger.error("Error stats Read#: {}, Wrote#: {}, Skipped#: {}, Error#: {}", readCnt, writeCnt, skipCnt, (readCnt - writeCnt - skipCnt));
169168
}
170169
}
@@ -188,7 +187,7 @@ public synchronized void printCounts(boolean isFinal) {
188187
private int iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultSet>> writeResults, int incrementBy) throws Exception {
189188
int cnt = 0;
190189
for (CompletionStage<AsyncResultSet> writeResult : writeResults) {
191-
//wait for the writes to complete for the batch. The Retry policy, if defined, should retry the write on timeouts.
190+
//wait for the writes to complete for the batch. The Retry policy, if defined, should retry the write on timeouts.
192191
writeResult.toCompletableFuture().get().one();
193192
cnt += incrementBy;
194193
}

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ public static DiffJobSession getInstance(CqlSession sourceSession, CqlSession as
5555

5656
public void getDataAndDiff(BigInteger min, BigInteger max) {
5757
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
58-
int maxAttempts = maxRetries;
59-
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
60-
58+
boolean done = false;
59+
int maxAttempts = maxRetries + 1;
60+
for (int attempts = 1; attempts <= maxAttempts && !done; attempts++) {
6161
try {
6262
// cannot do batching if the writeFilter is greater than 0
6363
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ?
@@ -86,11 +86,11 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
8686
}
8787
});
8888
diffAndClear(srcToTargetRowMap);
89-
retryCount = maxAttempts;
89+
done = true;
9090
} catch (Exception e) {
91-
logger.error("Error occurred retry#: {}", retryCount, e);
92-
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Retry# {}",
93-
Thread.currentThread().getId(), min, max, retryCount);
91+
logger.error("Error occurred during Attempt#: {}", attempts, e);
92+
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",
93+
Thread.currentThread().getId(), min, max, attempts);
9494
}
9595
}
9696

src/main/java/datastax/astra/migrate/OriginCountJobSession.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ public static OriginCountJobSession getInstance(CqlSession sourceSession, SparkC
7777

7878
public void getData(BigInteger min, BigInteger max) {
7979
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
80-
int maxAttempts = maxRetries;
81-
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
82-
80+
boolean done = false;
81+
int maxAttempts = maxRetries + 1;
82+
for (int attempts = 1; attempts <= maxAttempts && !done; attempts++) {
8383
try {
8484
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ?
8585
min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact())
@@ -138,11 +138,11 @@ public void getData(BigInteger min, BigInteger max) {
138138
}
139139

140140
logger.info("ThreadID: {} Final Read Record Count: {}", Thread.currentThread().getId(), readCounter.get());
141-
retryCount = maxAttempts;
141+
done = true;
142142
} catch (Exception e) {
143-
logger.error("Error occurred retry#: {}", retryCount, e);
144-
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Retry# {}",
145-
Thread.currentThread().getId(), min, max, retryCount);
143+
logger.error("Error occurred during Attempt#: {}", attempts, e);
144+
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",
145+
Thread.currentThread().getId(), min, max, attempts);
146146
}
147147
}
148148
}

src/resources/sparkConf.properties

Lines changed: 66 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,119 +1,131 @@
1+
# Origin cluster credentials
12
spark.origin.host localhost
23
spark.origin.username some-username
34
spark.origin.password some-secret-password
45
spark.origin.keyspaceTable test.a1
56

7+
# Target cluster credentials
68
spark.target.scb file:///aaa/bbb/secure-connect-enterprise.zip
79
spark.target.username client-id
810
spark.target.password client-secret
911
spark.target.keyspaceTable test.a2
12+
13+
# Add 'missing' rows (during 'Validation') in 'Target' from 'Origin'. N/A for 'Migration'
1014
spark.target.autocorrect.missing false
15+
# Update 'mismatched' rows (during 'Validation') in 'Target' to match 'Origin'. N/A for 'Migration'
1116
spark.target.autocorrect.mismatch false
1217

13-
spark.maxRetries 3
18+
# Read & Write rate-limits(rows/second). Higher value will improve performance and put more load on cluster
1419
spark.readRateLimit 20000
1520
spark.writeRateLimit 20000
21+
22+
# Used to split Cassandra token-range into slices and migrate random slices one at a time
23+
# 10K splits usually works for tables up to 100GB (uncompressed) with balanced token distribution
24+
# For larger tables, increase the splits relatively i.e. use 100K for a 1TB table
1625
spark.splitSize 10000
17-
spark.batchSize 5
1826

27+
# Use a value of 1 (disable batching) when primary-key and partition-key are same
28+
# For tables with high avg count of rows/partition, use higher value to improve performance
29+
spark.batchSize 10
30+
31+
# Below 'query' properties are set based on table schema
1932
spark.query.origin partition-key,clustering-key,order-date,amount
2033
spark.query.origin.partitionKey partition-key
2134
spark.query.target.id partition-key,clustering-key
2235
spark.query.types 9,1,4,3
23-
spark.query.ttl.cols 2,3
24-
spark.query.writetime.cols 2,3
36+
#############################################################################################################
37+
# Following are the supported data types and their corresponding [Cassandra data-types]
38+
# 0: ascii, text, varchar
39+
# 1: int
40+
# 2: bigint, counter
41+
# 3: double
42+
# 4: timestamp
43+
# 5: map (separate type by %) - Example: 5%1%0 for map<int, text>
44+
# 6: list (separate type by %) - Example: 6%0 for list<text>
45+
# 7: blob
46+
# 8: set (separate type by %) - Example: 8%0 for set<text>
47+
# 9: uuid, timeuuid
48+
# 10: boolean
49+
# 11: tuple
50+
# 12: float
51+
# 13: tinyint
52+
# 14: decimal
53+
# 15: date
54+
# 16: UDT [any user-defined-type created using 'CREATE TYPE']
55+
# 17: varint
56+
# 18: time
57+
# 19: smallint
58+
# Note: Ignore "Frozen" while mapping Collections (Map/List/Set) - Example: 5%1%0 for frozen<map<int, text>>
59+
#############################################################################################################
2560

26-
##### ENABLE ONLY IF COLUMN NAMES ON TARGET IS DIFFERENT FROM ORIGIN (SCHEMA & DATA-TYPES MUST BE SAME) #####
61+
# ENABLE ONLY IF COLUMN NAMES ON TARGET IS DIFFERENT FROM ORIGIN (SCHEMA & DATA-TYPES MUST BE SAME)
2762
#spark.query.target partition-key,clustering-key,order-date,amount
2863

29-
################# ENABLE ONLY IF YOU WANT TO MIGRATE/VALIDATE SOME DATA BASED ON CQL FILTER #################
30-
#spark.query.condition
31-
32-
################# ENABLE ONLY IF YOU WANT TO MIGRATE/VALIDATE SOME % (NOT 100%) DATA ######################
33-
#spark.coveragePercent 10
64+
# The tool adds TTL & Writetime at row-level (not field-level).
65+
# The largest TTL & Writetime values are used if multiple indexes are listed (comma separated)
66+
# Comma separated column indexes from "spark.query.origin" used to find largest TTL or Writetime
67+
spark.query.ttl.cols 2,3
68+
spark.query.writetime.cols 2,3
3469

35-
#################### ENABLE ONLY IF WANT LOG STATS MORE OR LESS FREQUENTLY THAN DEFAULT #####################
36-
#spark.printStatsAfter 100000
70+
# ENABLE ONLY IF YOU WANT TO MIGRATE/VALIDATE ROWS BASED ON CQL FILTER
71+
#spark.query.condition
3772

38-
################################# ENABLE ONLY IF IT IS A COUNTER TABLE ######################################
73+
# ENABLE ONLY IF IT IS A COUNTER TABLE
3974
#spark.counterTable false
4075
#spark.counterTable.cql
4176
#spark.counterTable.cql.index 0
4277

43-
######## ENABLE ONLY IF YOU WANT TO FILTER BASED ON WRITE-TIME (values must be in microseconds) #############
78+
# ENABLE ONLY IF YOU WANT TO FILTER BASED ON WRITE-TIME (values must be in microseconds)
4479
#spark.origin.writeTimeStampFilter false
4580
#spark.origin.minWriteTimeStampFilter 0
4681
#spark.origin.maxWriteTimeStampFilter 4102444800000000
4782

48-
######## ENABLE ONLY IF YOU WANT TO USE READ AND/OR WRITE CONSISTENCY OTHER THAN LOCAL_QUORUM ##############
83+
# ENABLE ONLY IF retries needed (Retry a slice of token-range if an exception occurs)
84+
#spark.maxRetries 0
85+
86+
# ENABLE ONLY IF YOU WANT TO MIGRATE/VALIDATE SOME % OF ROWS (NOT 100%)
87+
#spark.coveragePercent 100
88+
89+
# ENABLE ONLY IF WANT LOG STATS MORE OR LESS FREQUENTLY THAN DEFAULT
90+
#spark.printStatsAfter 100000
91+
92+
# ENABLE ONLY IF YOU WANT TO USE READ AND/OR WRITE CONSISTENCY OTHER THAN LOCAL_QUORUM
4993
#spark.consistency.read LOCAL_QUORUM
5094
#spark.consistency.write LOCAL_QUORUM
5195

52-
############# ENABLE ONLY IF YOU WANT TO REDUCE FETCH-SIZE TO AVOID FrameTooLongException ##################
96+
# ENABLE ONLY IF YOU WANT TO REDUCE FETCH-SIZE TO AVOID FrameTooLongException
5397
#spark.read.fetch.sizeInRows 1000
5498

55-
############### ENABLE ONLY IF YOU WANT TO USE CUSTOM FIXED WRITETIME VALUE ON TARGET ######################
99+
# ENABLE ONLY IF YOU WANT TO USE CUSTOM FIXED WRITETIME VALUE ON TARGET
56100
#spark.target.custom.writeTime 0
57101

58-
#################### ONLY USE if SKIPPING recs greater than 10MB from Origin needed #########################
102+
# ENABLE ONLY TO SKIP recs greater than 10MB from Origin (to avoid Astra Guardrail error)
59103
#spark.fieldGuardraillimitMB 10
60104

61-
#################### ONLY USE if count of recs greater than 10MB from Origin needed #########################
105+
# ENABLE ONLY TO count of recs greater than 10MB from Origin needed
62106
#spark.origin.checkTableforColSize false
63107
#spark.origin.checkTableforColSize.cols partition-key,clustering-key
64108
#spark.origin.checkTableforColSize.cols.types 9,1
65109

66-
############################ ONLY USE if needing to filter data from Origin #################################
110+
# ENABLE ONLY TO filter data from Origin
67111
#spark.origin.FilterData false
68112
#spark.origin.FilterColumn test
69113
#spark.origin.FilterColumnIndex 2
70114
#spark.origin.FilterColumnType 6%16
71115
#spark.origin.FilterColumnValue test
72116

73-
########################## ONLY USE if SSL clientAuth is enabled on origin Cassandra/DSE ####################
117+
# ONLY USE if SSL clientAuth is enabled on origin Cassandra/DSE
74118
#spark.origin.trustStore.path
75119
#spark.origin.trustStore.password
76120
#spark.origin.trustStore.type JKS
77121
#spark.origin.keyStore.path
78122
#spark.origin.keyStore.password
79123
#spark.origin.enabledAlgorithms TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA
80124

81-
####################### ONLY USE if SSL clientAuth is enabled on target Cassandra/DSE #######################
125+
# ONLY USE if SSL clientAuth is enabled on target Cassandra/DSE
82126
#spark.target.trustStore.path
83127
#spark.target.trustStore.password
84128
#spark.target.trustStore.type JKS
85129
#spark.target.keyStore.path
86130
#spark.target.keyStore.password
87131
#spark.target.enabledAlgorithms TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA
88-
89-
#############################################################################################################
90-
# Following are the supported data types and their corresponding [Cassandra data-types]
91-
# 0: ascii, text, varchar
92-
# 1: int
93-
# 2: bigint, counter
94-
# 3: double
95-
# 4: timestamp
96-
# 5: map (separate type by %) - Example: 5%1%0 for map<int, text>
97-
# 6: list (separate type by %) - Example: 6%0 for list<text>
98-
# 7: blob
99-
# 8: set (separate type by %) - Example: 8%0 for set<text>
100-
# 9: uuid, timeuuid
101-
# 10: boolean
102-
# 11: tuple
103-
# 12: float
104-
# 13: tinyint
105-
# 14: decimal
106-
# 15: date
107-
# 16: UDT [any user-defined-type created using 'CREATE TYPE']
108-
# 17: varint
109-
# 18: time
110-
# 19: smallint
111-
112-
# Note: Ignore "Frozen" while mapping Collections (Map/List/Set) - Example: 5%1%0 for frozen<map<int, text>>
113-
#
114-
# "spark.query.ttl.cols" - Comma separated column indexes from "spark.query.origin" used to find largest TTL.
115-
# "spark.query.writetime.cols" - Comma separated column indexes from "spark.query.origin" used to find largest writetime.
116-
# Note: The tool migrates TTL & Writetimes at row-level and not field-level.
117-
# Migration will use the largest TTL & Writetimes value per row.
118-
#
119-
#############################################################################################################

0 commit comments

Comments
 (0)