Skip to content

Commit 9efb866

Browse files
authored
Merge branch 'main' into CDM-84
2 parents eb26917 + e279fae commit 9efb866

File tree

13 files changed

+88
-12
lines changed

13 files changed

+88
-12
lines changed

.github/workflows/snyk-cli-scan.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@ on:
1111
workflow_dispatch:
1212

1313
env:
14-
SNYK_SEVERITY_THRESHOLD_LEVEL: high
14+
SNYK_SEVERITY_THRESHOLD_LEVEL: critical

PERF/cdm-v4.properties

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,15 @@ spark.cdm.schema.target.keyspaceTable devices.sensor_data
127127
# 5323. The corresponding counter in Origin is also 5323. At some point, the Target
128128
# counter gets DELETEd. Should the .missing record be re-inserted before
129129
# the DELETE gets tombstoned, the counter will zombie back to life, and the
130-
# counter will become 5323+5323 = 10646.
130+
# counter will become 5323+5323 = 10646.
131+
# spark.tokenRange
132+
# .exceptionDir : When set partition ranges corresponding to the exceptions will be logged in
133+
# file with the name <keyspace>.<table_name> in this directory.
131134
#-----------------------------------------------------------------------------------------------------------
132135
spark.cdm.autocorrect.missing false
133136
spark.cdm.autocorrect.mismatch false
134137
#spark.cdm.autocorrect.missing.counter false
135-
138+
#spark.tokenRange.exceptionDir aaa/bbb
136139
#===========================================================================================================
137140
# Performance and Operations Parameters affecting throughput, error handling, and similar concerns.
138141
#

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ When running in above mode the tool assumes a `partitions.csv` file to be presen
142142
- Supports migration/validation from and to [Azure Cosmos Cassandra](https://learn.microsoft.com/en-us/azure/cosmos-db/cassandra)
143143
- Validate migration accuracy and performance using a smaller randomized data-set
144144
- Supports adding custom fixed `writetime`
145+
- Validation - Log partitions range level exceptions, use the exceptions file as input for rerun
145146

146147
# Known Limitations
147148
- This tool does not migrate `ttl` & `writetime` at the field-level (for optimization reasons). It instead finds the field with the highest `ttl` & the field with the highest `writetime` within an `origin` row and uses those values on the entire `target` row.

SIT/smoke/01_basic_kvp/cdm.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ EOF
1010
spark-submit \
1111
--properties-file /smoke/01_basic_kvp/migrate.properties \
1212
--master "local[*]" \
13-
--class datastax.astra.migrate.Migrate /local/cassandra-data-migrator.jar
13+
--class datastax.cdm.job.Migrate /local/cassandra-data-migrator.jar
1414

1515
cat <<EOF
1616
!!!!!!!!
@@ -21,4 +21,4 @@ EOF
2121
spark-submit \
2222
--properties-file /smoke/01_basic_kvp/migrate.properties \
2323
--master "local[*]" \
24-
--class datastax.astra.migrate.DiffData /local/cassandra-data-migrator.jar
24+
--class datastax.cdm.job.DiffData /local/cassandra-data-migrator.jar

src/main/java/com/datastax/cdm/job/AbstractJobSession.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,12 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
4646
rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));
4747
maxRetries = propertyHelper.getInteger(KnownProperties.MAX_RETRIES);
4848

49+
tokenRangeExceptionDir = propertyHelper.getString(KnownProperties.TOKEN_RANGE_EXCEPTION_DIR);
50+
exceptionFileName = propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE);
51+
4952
logger.info("PARAM -- Max Retries: {}", maxRetries);
53+
logger.info("PARAM -- Token range exception dir: {}", tokenRangeExceptionDir);
54+
logger.info("PARAM -- Token range exception file name: {}", exceptionFileName);
5055
logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate());
5156
logger.info("PARAM -- Target Rate Limit: {}", rateLimiterTarget.getRate());
5257

src/main/java/com/datastax/cdm/job/BaseJobSession.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ public abstract class BaseJobSession {
3030

3131
protected Integer printStatsAfter = 100000;
3232

33+
protected String tokenRangeExceptionDir;
34+
protected String exceptionFileName;
35+
3336
protected BaseJobSession(SparkConf sc) {
3437
propertyHelper.initializeSparkConf(sc);
3538
this.featureMap = calcFeatureMap(propertyHelper);

src/main/java/com/datastax/cdm/job/DiffJobSession.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.datastax.cdm.cql.statement.OriginSelectByPartitionRangeStatement;
1414
import com.datastax.cdm.cql.statement.TargetSelectByPKStatement;
1515
import com.datastax.cdm.properties.KnownProperties;
16+
import org.apache.commons.lang3.StringUtils;
1617
import org.apache.logging.log4j.ThreadContext;
1718
import org.apache.spark.SparkConf;
1819
import org.slf4j.Logger;
@@ -154,10 +155,20 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
154155
logger.error("Error occurred during Attempt#: {}", attempts, e);
155156
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",
156157
Thread.currentThread().getId(), min, max, attempts);
158+
if (StringUtils.isNotBlank(tokenRangeExceptionDir) && attempts == maxAttempts) {
159+
logFailedPartitionsInFile(min, max);
160+
}
157161
}
158162
}
159163
}
160164

165+
private void logFailedPartitionsInFile(BigInteger min, BigInteger max) {
166+
try {
167+
ExceptionHandler.FileAppend(tokenRangeExceptionDir, exceptionFileName, min + "," + max);
168+
} catch (Exception ee) {
169+
logger.error("Error occurred while writing to token range file min: {} max: {}", min, max, ee);
170+
}
171+
}
161172
private void diffAndClear(List<Record> recordsToDiff) {
162173
for (Record record : recordsToDiff) {
163174
try {
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.datastax.cdm.job;
2+
3+
import java.io.IOException;
4+
import java.nio.charset.StandardCharsets;
5+
import java.nio.file.Files;
6+
import java.nio.file.Path;
7+
import java.nio.file.Paths;
8+
import java.nio.file.StandardOpenOption;
9+
10+
public class ExceptionHandler {
11+
12+
private static final String NEW_LINE = System.lineSeparator();
13+
14+
private static void appendToFile(Path path, String content)
15+
throws IOException {
16+
// if file not exists, create and write to it
17+
// otherwise append to the end of the file
18+
Files.write(path, content.getBytes(StandardCharsets.UTF_8),
19+
StandardOpenOption.CREATE,
20+
StandardOpenOption.APPEND);
21+
}
22+
23+
public static void FileAppend(String dir, String fileName, String content) throws IOException {
24+
25+
//create directory if not already existing
26+
Files.createDirectories(Paths.get(dir));
27+
Path path = Paths.get(dir + "/" + fileName);
28+
appendToFile(path, content + NEW_LINE);
29+
30+
}
31+
32+
}

src/main/java/com/datastax/cdm/job/SplitPartitions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
import java.io.*;
77
import java.math.BigInteger;
8+
import java.nio.file.Files;
9+
import java.nio.file.Paths;
810
import java.util.ArrayList;
911
import java.util.Collection;
1012
import java.util.Collections;

src/main/java/com/datastax/cdm/properties/KnownProperties.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,15 @@ public enum PropertyType {
137137
defaults.put(MAX_RETRIES, "0");
138138
}
139139

140+
//==========================================================================
141+
// Error handling
142+
//==========================================================================
143+
public static final String TOKEN_RANGE_EXCEPTION_DIR = "spark.tokenRange.exceptionDir"; // aaa/bbb
144+
public static final String PARTITIONS_TOKEN_RANGE_FILE = "spark.input.partitionFile"; // aaa/bbb/filename
145+
static {
146+
types.put(TOKEN_RANGE_EXCEPTION_DIR, PropertyType.STRING);
147+
types.put(PARTITIONS_TOKEN_RANGE_FILE, PropertyType.STRING);
148+
}
140149
//==========================================================================
141150
// Guardrails and Transformations
142151
//==========================================================================

0 commit comments

Comments
 (0)