Skip to content

Commit 9c69dec

Browse files
authored
Added a static async SCB delete delay to address intermittent connection issues on spark worker nodes. (#332)
1 parent 0729d97 commit 9c69dec

File tree

5 files changed

+58
-30
lines changed

5 files changed

+58
-30
lines changed

README.md

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ tar -xvzf spark-3.5.3-bin-hadoop3-scala2.13.tgz
3131
```
3232

3333
> [!CAUTION]
34-
> If the above Spark and Scala version does not match, you may see an exception similar like below when running the CDM jobs,
34+
> If the above Spark and Scala version does not match, you may see an exception like below when running the CDM jobs,
3535
```
3636
Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.Statics.releaseFence()V
3737
```
@@ -41,24 +41,24 @@ Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.Statics.re
4141
4242
# Steps for Data-Migration:
4343

44-
1. `cdm.properties` file needs to be configured as applicable for the environment. Parameter descriptions and defaults are described in the file. The file can have any name, it does not need to be `cdm.properties`.
45-
> * A simplified sample properties file configuration can be found here as [cdm.properties](./src/resources/cdm.properties)
46-
> * A complete sample properties file configuration can be found here as [cdm-detailed.properties](./src/resources/cdm-detailed.properties)
44+
1. `cdm.properties` file needs to be configured as applicable for the environment. The file can have any name, it does not need to be `cdm.properties`.
45+
> * A sample properties file with default values can be found here as [cdm.properties](./src/resources/cdm.properties)
46+
> * A complete reference properties file with default values can be found here as [cdm-detailed.properties](./src/resources/cdm-detailed.properties)
4747
2. Place the properties file where it can be accessed while running the job via spark-submit.
48-
3. Run the below job using `spark-submit` command as shown below:
48+
3. Run the job using `spark-submit` command as shown below:
4949

5050
```
5151
spark-submit --properties-file cdm.properties \
5252
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
5353
--master "local[*]" --driver-memory 25G --executor-memory 25G \
54-
--class com.datastax.cdm.job.Migrate cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
54+
--class com.datastax.cdm.job.Migrate cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
5555
```
5656

5757
**Note:**
5858
- Above command generates a log file `logfile_name_*.txt` to avoid log output on the console.
5959
- Update the memory options (driver & executor memory) based on your use-case
60-
- To track details of a run in the `target` keyspace, pass param `--conf spark.cdm.trackRun=true`
61-
- To filter and migrate data only in a specific token range, you can pass the below two additional params to the `Migration` or `Validation` jobs
60+
- To track details of a run (recorded on the `target` keyspace), pass param `--conf spark.cdm.trackRun=true`
61+
- To filter records only for a specific token range, pass the below two additional params to the `Migration` OR `Validation` job
6262

6363
```
6464
--conf spark.cdm.filter.cassandra.partition.min=<token-range-min>
@@ -73,7 +73,7 @@ spark-submit --properties-file cdm.properties \
7373
spark-submit --properties-file cdm.properties \
7474
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
7575
--master "local[*]" --driver-memory 25G --executor-memory 25G \
76-
--class com.datastax.cdm.job.DiffData cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
76+
--class com.datastax.cdm.job.DiffData cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
7777
```
7878

7979
- Validation job will report differences as “ERRORS” in the log file as shown below.
@@ -95,13 +95,13 @@ spark-submit --properties-file cdm.properties \
9595
--conf spark.executor.extraJavaOptions='-Dlog4j.configurationFile=log4j2.properties' \
9696
--conf spark.driver.extraJavaOptions='-Dlog4j.configurationFile=log4j2.properties' \
9797
--master "local[*]" --driver-memory 25G --executor-memory 25G \
98-
--class com.datastax.cdm.job.DiffData cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
98+
--class com.datastax.cdm.job.DiffData cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
9999
```
100100

101101
- The Validation job can also be run in an AutoCorrect mode. This mode can
102-
- Add any missing records from origin to target
103-
- Update any mismatched records between origin and target (makes target same as origin).
104-
- Enable/disable this feature using one or both of the below setting in the config file
102+
- Add any missing records from `origin` to `target`
103+
- Update any mismatched records between `origin` and `target`
104+
- Enable/disable this feature using one or both of the below params in the properties file
105105
```
106106
spark.cdm.autocorrect.missing false|true
107107
spark.cdm.autocorrect.mismatch false|true
@@ -117,24 +117,27 @@ spark-submit --properties-file cdm.properties \
117117
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
118118
--conf spark.cdm.trackRun.previousRunId=<prev_run_id> \
119119
--master "local[*]" --driver-memory 25G --executor-memory 25G \
120-
--class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
120+
--class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
121121
```
122122

123123
# Perform large-field Guardrail violation checks
124-
- The tool can be used to identify large fields from a table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field), use class option `--class com.datastax.cdm.job.GuardrailCheck` as shown below
124+
- This mode can help identify large fields on an `origin` table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field), use class option `--class com.datastax.cdm.job.GuardrailCheck` as shown below
125125

126126
```
127127
spark-submit --properties-file cdm.properties \
128128
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
129129
--conf spark.cdm.feature.guardrail.colSizeInKB=10000 \
130130
--master "local[*]" --driver-memory 25G --executor-memory 25G \
131-
--class com.datastax.cdm.job.GuardrailCheck cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
131+
--class com.datastax.cdm.job.GuardrailCheck cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
132132
```
133133

134+
> [!NOTE]
135+
> This mode only operates on one database i.e. `origin`, there is no `target` in this mode
136+
134137
# Features
135138
- Auto-detects table schema (column names, types, keys, collections, UDTs, etc.)
136139
- Including counter table [Counter tables](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_using/useCountersConcept.html)
137-
- Rerun job from where the previous job had stopped for any reason (killed, had exceptions, etc.)
140+
- Rerun/Resume a previous job that may have stopped for any reason (killed, had exceptions, etc.)
138141
- If you rerun a `validation` job, it will include any token-ranges that had differences in the previous run
139142
- Preserve [writetimes](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__retrieving-the-datetime-a-write-occurred-p) and [TTLs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__ref-select-ttl-p)
140143
- Supports migration/validation of advanced DataTypes ([Sets](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__set), [Lists](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__list), [Maps](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__map), [UDTs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__udt))
@@ -184,7 +187,7 @@ Below recommendations may only be useful when migrating large tables where the d
184187
1. Clone this repo
185188
2. Move to the repo folder `cd cassandra-data-migrator`
186189
3. Run the build `mvn clean package` (Needs Maven 3.9.x)
187-
4. The fat jar (`cassandra-data-migrator-4.x.x.jar`) file should now be present in the `target` folder
190+
4. The fat jar (`cassandra-data-migrator-5.x.x.jar`) file should now be present in the `target` folder
188191

189192
# Contributors
190193
Checkout all our wonderful contributors [here](./CONTRIBUTING.md#contributors).

RELEASE.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
# Release Notes
2+
## [5.1.2] - 2024-11-26
3+
- Bug fix: SCB file on some Spark worker nodes may get deleted before the connection is established, which may cause connection exception on that worker node. Added a static async SCB delete delay to address such issues.
4+
25
## [5.1.1] - 2024-11-22
36
- Bug fix: Writetime filter does not work as expected when custom writetimestamp is also used (issue #327).
47
- Removed deprecated properties `printStatsAfter` and `printStatsPerPart`. Run metrics should now be tracked using the `trackRun` feature instead.

src/main/java/com/datastax/cdm/cql/statement/OriginSelectStatement.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ public boolean shouldFilterRecord(Record record) {
122122
}
123123
if (originWriteTimeStamp < minWriteTimeStampFilter || originWriteTimeStamp > maxWriteTimeStampFilter) {
124124
if (logger.isInfoEnabled())
125-
logger.info("Timestamp filter removing record with primary key: {} with write timestamp: {}", record.getPk(),
126-
originWriteTimeStamp);
125+
logger.info("Timestamp filter removing record with primary key: {} with write timestamp: {}",
126+
record.getPk(), originWriteTimeStamp);
127127
return true;
128128
}
129129
}

src/main/java/com/datastax/cdm/data/DataUtility.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import java.util.List;
2626
import java.util.Map;
2727
import java.util.Set;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.locks.LockSupport;
2831
import java.util.zip.ZipEntry;
2932
import java.util.zip.ZipOutputStream;
3033

@@ -39,6 +42,7 @@ public class DataUtility {
3942
public static final Logger logger = LoggerFactory.getLogger(DataUtility.class.getName());
4043

4144
protected static final String SCB_FILE_NAME = "_temp_cdm_scb_do_not_touch.zip";
45+
protected static final int SCB_DELETE_DELAY = 5;
4246

4347
public static boolean diff(Object obj1, Object obj2) {
4448
if (obj1 == null && obj2 == null) {
@@ -157,15 +161,27 @@ public static String getMyClassMethodLine(Exception e) {
157161
return "Unknown";
158162
}
159163

164+
public static void deleteGeneratedSCB(long runId, int waitSeconds) {
165+
CompletableFuture.runAsync(() -> {
166+
try {
167+
File originFile = new File(PKFactory.Side.ORIGIN + "_" + Long.toString(runId) + SCB_FILE_NAME);
168+
File targetFile = new File(PKFactory.Side.TARGET + "_" + Long.toString(runId) + SCB_FILE_NAME);
169+
170+
if (originFile.exists() || targetFile.exists()) {
171+
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(waitSeconds));
172+
if (originFile.exists())
173+
originFile.delete();
174+
if (targetFile.exists())
175+
targetFile.delete();
176+
}
177+
} catch (Exception e) {
178+
logger.error("Unable to delete generated SCB files: {}", e.getMessage());
179+
}
180+
});
181+
}
182+
160183
public static void deleteGeneratedSCB(long runId) {
161-
File file = new File(PKFactory.Side.ORIGIN + "_" + Long.toString(runId) + SCB_FILE_NAME);
162-
if (file.exists()) {
163-
file.delete();
164-
}
165-
file = new File(PKFactory.Side.TARGET + "_" + Long.toString(runId) + SCB_FILE_NAME);
166-
if (file.exists()) {
167-
file.delete();
168-
}
184+
deleteGeneratedSCB(runId, SCB_DELETE_DELAY);
169185
}
170186

171187
public static File generateSCB(String host, String port, String trustStorePassword, String trustStorePath,

src/test/java/com/datastax/cdm/data/DataUtilityTest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.Set;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.locks.LockSupport;
3436

3537
import org.junit.jupiter.api.BeforeEach;
3638
import org.junit.jupiter.api.Test;
@@ -171,7 +173,9 @@ public void generateSCBOrigin() throws IOException {
171173
File file = new File(PKFactory.Side.ORIGIN + "_" + Long.toString(0) + DataUtility.SCB_FILE_NAME);
172174
assertTrue(file.exists());
173175

174-
DataUtility.deleteGeneratedSCB(0);
176+
DataUtility.deleteGeneratedSCB(0, 0);
177+
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
178+
175179
assertFalse(file.exists());
176180
}
177181

@@ -183,7 +187,9 @@ public void generateSCBTarget() throws IOException {
183187
File file = new File(PKFactory.Side.TARGET + "_" + Long.toString(0) + DataUtility.SCB_FILE_NAME);
184188
assertTrue(file.exists());
185189

186-
DataUtility.deleteGeneratedSCB(0);
190+
DataUtility.deleteGeneratedSCB(0, 0);
191+
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
192+
187193
assertFalse(file.exists());
188194
}
189195
}

0 commit comments

Comments
 (0)