Skip to content

Commit 86f938d

Browse files
Merge branch 'main' into feature/exception_handler_refactor
# Conflicts: # src/main/scala/com/datastax/cdm/job/BaseJob.scala
2 parents 2313986 + ed7ba7c commit 86f938d

File tree

117 files changed

+3259
-1054
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

117 files changed

+3259
-1054
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# GitHub Action CI
2+
# Snyk clean-up when PR is merged/closed
3+
4+
on:
5+
pull_request:
6+
types:
7+
- closed
8+
branches:
9+
- main
10+
workflow_dispatch:
11+

.github/workflows/snyk-cli-scan.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# GitHub action CI
2+
# trigger by:
3+
# any push on any protected branch: main, v6.8, releases/**
4+
# any PR crteated against any protected branch: main, v6.8, releases/**
5+
6+
on:
7+
push:
8+
branches: [ main ]
9+
pull_request:
10+
branches: [ main ]
11+
workflow_dispatch:
12+
13+
env:
14+
SNYK_SEVERITY_THRESHOLD_LEVEL: high

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ dependency-reduced-pom.xml
66
.idea/*
77
cassandra-data-migrator.iml
88
SIT/local
9+
*.DS_Store

.snyk

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# .snyk
2+
# Snyk (https://snyk.io) policy file, patches or ignores known vulnerabilities.
3+
# See https://docs.snyk.io/scan-cloud-deployment/snyk-infrastructure-as-code/snyk-cli-for-infrastructure-as-code/iac-ignores-using-the-.snyk-policy-file for details.
4+
version: v1.22.2
5+
python: '3.7'
6+
patch: {}
7+
# ignores vulnerabilities until expiry date; change duration by modifying expiry date
8+
ignore:
9+

.snyk.ignore.example

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# .snyk.ignore.example
2+
# Snyk (https://snyk.io) policy file, patches or ignores known vulnerabilities.
3+
version: v1.22.2
4+
python: '3.7'
5+
patch: {}
6+
# ignores vulnerabilities until expiry date; change duration by modifying expiry date
7+
ignore:
8+
SNYK-PYTHON-URLLIB3-1533435:
9+
- '*':
10+
reason: state your ignore reason here
11+
expires: 2030-01-01T00:00:00.000Z
12+
created: 2022-03-21T13:19:22.196Z

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ ENV MAVEN_HOME /usr/share/maven
2525
ENV MAVEN_CONFIG "$USER_HOME_DIR/.m2"
2626
COPY ./src /assets/src
2727
COPY ./pom.xml /assets/pom.xml
28-
COPY ./src/resources/sparkConf.properties /assets/
28+
COPY ./src/resources/cdm.properties /assets/
2929
COPY ./src/resources/partitions.csv /assets/
3030
COPY ./src/resources/primary_key_rows.csv /assets/
3131
COPY ./src/resources/runCommands.txt /assets/

PERF/cdm-v3.properties

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
# Origin cluster credentials (use "host + port" OR "secure-connect-bundle" but not both)
2+
spark.origin.host cass-origin
3+
spark.origin.port 9042
4+
#spark.origin.scb file:///aaa/bbb/secure-connect-enterprise.zip
5+
spark.origin.username cassandra
6+
spark.origin.password cassandra
7+
8+
# Target cluster credentials (use "host + port" OR "secure-connect-bundle" but not both)
9+
spark.target.host cass-target
10+
#spark.target.port 9042
11+
#spark.target.scb file:///aaa/bbb/secure-connect-enterprise.zip
12+
spark.target.username cassandra
13+
spark.target.password cassandra
14+
15+
# Add 'missing' rows (during 'Validation') in 'Target' from 'Origin'. N/A for 'Migration'
16+
spark.target.autocorrect.missing false
17+
# Update 'mismatched' rows (during 'Validation') in 'Target' to match 'Origin'. N/A for 'Migration'
18+
spark.target.autocorrect.mismatch false
19+
20+
# Read & Write rate-limits(rows/second). Higher value will improve performance and put more load on cluster
21+
spark.readRateLimit 5000
22+
spark.writeRateLimit 5000
23+
24+
# Used to split Cassandra token-range into slices and migrate random slices one at a time
25+
# 10K splits usually works for tables up to 100GB (uncompressed) with balanced token distribution
26+
# For larger tables, test on 1% volume (using param coveragePercent) and increase the number-of-splits as needed
27+
spark.numSplits 10000
28+
29+
# Use a value of 1 (disable batching) when primary-key and partition-key are same
30+
# For tables with high avg count of rows/partition, use higher value to improve performance
31+
spark.batchSize 10
32+
33+
# ENABLE ONLY IF YOU WANT SOME COLUMNS FROM ORIGIN TO MIGRATE (default auto-detects schema & migrates all columns)
34+
# COMMA SEPARATED LIST OF COLUMN NAMES (MUST INCLUDE ALL PRIMARY-KEY FIELDS)
35+
#spark.query.origin comma-separated-partition-key,comma-separated-clustering-key,comma-separated-other-columns
36+
37+
# ENABLE ONLY IF COLUMN NAMES ON TARGET ARE DIFFERENT FROM ORIGIN (default assumes target schema to be same as origin)
38+
#spark.query.target comma-separated-partition-key,comma-separated-clustering-key,comma-separated-other-columns
39+
40+
############################### EXAMPLE MAPPING USING A DEMO TABLE ##########################################
41+
# If the origin table schema is as below
42+
# CREATE TABLE cycling.cyclist_name (
43+
# pk1 uuid,
44+
# pk2 date,
45+
# cc1 boolean,
46+
# firstname text,
47+
# middlename text, // You do not want to migrate this column
48+
# lastname text,
49+
# phones list<text>,
50+
# PRIMARY KEY((pk1,pk2),cc1)
51+
# );
52+
# then, our origin mapping would look like below
53+
# spark.query.origin pk1,pk2,cc1,firstname,lastname,phones
54+
#
55+
# And target table schema is as below
56+
# CREATE TABLE cycling.cyclist_name (
57+
# pk1 uuid,
58+
# pk2 date,
59+
# cc1 boolean,
60+
# fn text, // Column has different name than origin
61+
# ln text, // Column has different name than origin
62+
# phones list<text>,
63+
# PRIMARY KEY((pk1,pk2),cc1)
64+
# );
65+
# then, our target mapping would look like below
66+
# spark.query.target pk1,pk2,cc1,fn,ln,phones
67+
#############################################################################################################
68+
69+
# ENABLE ONLY IF YOU WANT TO MIGRATE/VALIDATE ROWS BASED ON A VALID CQL FILTER
70+
#spark.query.condition
71+
72+
# ENABLE ONLY IF YOU WANT TO FILTER BASED ON WRITE-TIME (values must be in microseconds)
73+
#spark.origin.writeTimeStampFilter false
74+
#spark.origin.minWriteTimeStampFilter 0
75+
#spark.origin.maxWriteTimeStampFilter 4102444800000000
76+
77+
# ENABLE ONLY IF retries needed (Retry a slice of token-range if an exception occurs)
78+
#spark.maxRetries 0
79+
80+
# ENABLE ONLY IF YOU WANT TO MIGRATE/VALIDATE SOME % OF ROWS (NOT 100%)
81+
#spark.coveragePercent 100
82+
83+
# ENABLE ONLY IF WANT LOG STATS MORE OR LESS FREQUENTLY THAN DEFAULT
84+
#spark.printStatsAfter 100000
85+
86+
# ENABLE ONLY IF YOU WANT TO USE READ AND/OR WRITE CONSISTENCY OTHER THAN LOCAL_QUORUM
87+
#spark.consistency.read LOCAL_QUORUM
88+
#spark.consistency.write LOCAL_QUORUM
89+
90+
# ENABLE ONLY IF YOU WANT TO REDUCE FETCH-SIZE TO AVOID FrameTooLongException
91+
#spark.read.fetch.sizeInRows 1000
92+
93+
# ENABLE ONLY IF YOU WANT TO USE CUSTOM FIXED WRITETIME VALUE ON TARGET
94+
#spark.target.writeTime.fixedValue 0
95+
96+
# ENABLE ONLY IF YOU WANT TO INCREMENT SOURCE WRITETIME VALUE
97+
# DUPLICATES IN LIST FIELDS: USE THIS WORKAROUND FOR CASSANDRA BUG https://issues.apache.org/jira/browse/CASSANDRA-11368
98+
#spark.target.writeTime.incrementBy 0
99+
100+
# ONLY USE when running in Guardrail mode to identify large fields
101+
#spark.guardrail.colSizeInKB 1024
102+
103+
# ENABLE ONLY TO filter data from Origin
104+
#spark.origin.FilterData false
105+
#spark.origin.FilterColumn test
106+
#spark.origin.FilterColumnIndex 2
107+
#spark.origin.FilterColumnType 6%16
108+
#spark.origin.FilterColumnValue test
109+
110+
# ONLY USE if SSL is enabled on origin Cassandra/DSE (e.g. Azure Cosmos Cassandra DB)
111+
#spark.origin.ssl.enabled true
112+
113+
# ONLY USE if SSL clientAuth is enabled on origin Cassandra/DSE
114+
#spark.origin.trustStore.path
115+
#spark.origin.trustStore.password
116+
#spark.origin.trustStore.type JKS
117+
#spark.origin.keyStore.path
118+
#spark.origin.keyStore.password
119+
#spark.origin.enabledAlgorithms TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA
120+
121+
# ONLY USE if SSL is enabled on target Cassandra/DSE
122+
#spark.target.ssl.enabled true
123+
124+
# ONLY USE if SSL clientAuth is enabled on target Cassandra/DSE
125+
#spark.target.trustStore.path
126+
#spark.target.trustStore.password
127+
#spark.target.trustStore.type JKS
128+
#spark.target.keyStore.path
129+
#spark.target.keyStore.password
130+
#spark.target.enabledAlgorithms TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA

src/resources/sparkConf.properties renamed to PERF/cdm-v4.properties

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@
3131
#
3232
# You must set either .host or .scb.
3333
#-----------------------------------------------------------------------------------------------------------
34-
spark.cdm.origin.connect.host localhost
34+
spark.cdm.origin.connect.host cass-origin
3535
spark.cdm.origin.connect.port 9042
3636
#spark.cdm.origin.connect.scb file:///aaa/bbb/secure-connect-enterprise.zip
3737
spark.cdm.origin.connect.username cassandra
3838
spark.cdm.origin.connect.password cassandra
3939

40-
spark.cdm.target.connect.host localhost
40+
spark.cdm.target.connect.host cass-target
4141
spark.cdm.target.connect.port 9042
4242
#spark.cdm.target.connect.scb file:///aaa/bbb/secure-connect-enterprise.zip
4343
spark.cdm.target.connect.username cassandra
@@ -53,14 +53,29 @@ spark.cdm.target.connect.password cassandra
5353
# Recommended Parameters:
5454
# spark.cdm.schema.origin
5555
# .column
56-
# .ttl.names : Default is empty. Names from .column.names to be combined using the MAX
57-
# function to determine the TTL of the entire migrated record. Will use target
58-
# table default when not set. The names cannot include any columns listed in
59-
# partition-key,clustering-key.
60-
# .writetime.names: Default is empty. Names from .column.names to be combined using the MAX
61-
# function to determine the TIMESTAMP of the entire migrated record. Will use
62-
# target table default when not set. The names cannot include any columns
63-
# listed in the primary key e.g. partition-key,clustering-key
56+
# .ttl
57+
# .automatic : Default is true, unless .ttl.names is specified. When true, the TTL of the
58+
# target record will be determined by finding the maxiumum TTL of
59+
# all origin columns that can have TTL set (which excludes partition key,
60+
# clustering key, collections/UDT/tuple, and frozen columns). When false, and
61+
# .names is not set, the target record will have the TTL determined by the target
62+
# table configuration.
63+
# .names : Default is empty, meaning they will be determined automatically if that is set
64+
# (see above). Specify a subset of eligible columns that are used to calculate
65+
# the TTL of the target record.
66+
# .writetime
67+
# .automatic : Default is true, unless .writetime.names is specified. When true, the WRITETIME of
68+
# the target record will be determined by finding the maxiumum WRITETIME of
69+
# all origin columns that can have WRITETIME set (which excludes partition key,
70+
# clustering key, collections/UDT/tuple, and frozen columns). When false, and
71+
# .names is not set, the target record will have the WRITETIME determined by the target
72+
# table configuration.
73+
#
74+
# *** Note spark.cdm.transform.custom.writetime overrides this setting ***
75+
#
76+
# .names : Default is empty, meaning they will be determined automatically if that is set
77+
# (see above). Specify a subset of eligible columns that are used to calculate
78+
# the WRITETIME of the target record.
6479
#
6580
# Other Parameters:
6681
# spark.cdm.schema.origin
@@ -70,10 +85,12 @@ spark.cdm.target.connect.password cassandra
7085
# origin_column_name:target_column_name. The list is comma-separated. Only renamed
7186
# columns need to be listed.
7287
#-----------------------------------------------------------------------------------------------------------
73-
spark.cdm.schema.origin.keyspaceTable keyspace_name.table_name
74-
spark.cdm.schema.origin.column.ttl.names data_col1,data_col2,...
75-
spark.cdm.schema.origin.column.writetime.names data_col1,data_col2,...
76-
#spark.cdm.schema.origin.column.names.to.target partition_col1:partition_col_1,partition_col2:partition_col_2,...
88+
spark.cdm.schema.origin.keyspaceTable devices.sensor_data
89+
#spark.cdm.schema.origin.column.ttl.automatic true
90+
#spark.cdm.schema.origin.column.ttl.names data_col1,data_col2,...
91+
#spark.cdm.schema.origin.column.writetime.automatic true
92+
#spark.cdm.schema.origin.column.writetime.names data_col1,data_col2,...
93+
#spark.cdm.schema.origin.column.names.to.target partition_col1:partition_col_1,partition_col2:partition_col_2,...
7794

7895
#===========================================================================================================
7996
# Details about the Target Schema
@@ -82,7 +99,7 @@ spark.cdm.schema.origin.column.writetime.names data_col1,data_col2,...
8299
# spark.cdm.schema.target
83100
# .keyspaceTable : <keyspace>.<table_name> of the table to be migrated. Table must exist in Target.
84101
#-----------------------------------------------------------------------------------------------------------
85-
spark.cdm.schema.target.keyspaceTable keyspace_name.table_name
102+
spark.cdm.schema.target.keyspaceTable devices.sensor_data
86103

87104
#===========================================================================================================
88105
# Autocorrection parameters allow CDM to correct data differences found between Origin and Target when
@@ -126,7 +143,7 @@ spark.cdm.autocorrect.mismatch false
126143
# spark.cdm.perfops
127144
# .numParts : Defaults is 10000. In standard operation, the full token range (-2^63..2^63-1)
128145
# is divided into a number of parts which will be parallel-processed. You should
129-
# aim for each part to comprise a total of ≈1-10GB of data to migrate. During
146+
# aim for each part to comprise a total of â1-10GB of data to migrate. During
130147
# intial testing, you may want this to be a small number (even 1).
131148
# .batchSize : Defaults is 5. When writing to Target, this comprises the number of records that
132149
# will be put into an UNLOGGED batch. CDM will tend to work on the same partition
@@ -158,8 +175,8 @@ spark.cdm.autocorrect.mismatch false
158175
#-----------------------------------------------------------------------------------------------------------
159176
spark.cdm.perfops.numParts 10000
160177
spark.cdm.perfops.batchSize 5
161-
spark.cdm.perfops.readRateLimit 20000
162-
spark.cdm.perfops.writeRateLimit 40000
178+
spark.cdm.perfops.readRateLimit 5000
179+
spark.cdm.perfops.writeRateLimit 5000
163180
#spark.cdm.perfops.consistency.read LOCAL_QUORUM
164181
#spark.cdm.perfops.consistency.write LOCAL_QUORUM
165182
#spark.cdm.perfops.printStatsAfter 100000
@@ -177,12 +194,18 @@ spark.cdm.perfops.writeRateLimit 40000
177194
# MigrateData operation would fail. This parameter allows a crude
178195
# constant value to be used in its place, separate from the Constant
179196
# Values feature.
180-
# .custom.writetime Default is 0 (diabled). Timestamp value in microseconds to use as the
197+
# .custom
198+
# .writetime Default is 0 (diabled). Timestamp value in microseconds to use as the
181199
# WRITETIME for the target record. This is useful when the WRITETIME of
182200
# the record in Origin cannot be determined (such as the only non-key
183201
# columns are collections). This parameter allows a crude constant value
184202
# to be used in its place, and overrides
185203
# .schema.origin.column.writetime.indexes.
204+
# .writetime.incrementBy Default is 0. This is useful when you have a List that is not frozen,
205+
# and are updating this via the autocorrect feature. Lists are not idempotent,
206+
# and subsequent UPSERTs would add duplicates to the list. Future versions
207+
# of CDM may tombstone the previous list, but for now this solution is
208+
# viable and, crucially, more performant.
186209
# .codecs Default is empty. A comma-separated list of additional codecs to
187210
# enable. Current codecs are:
188211
# INT_STRING : int stored in a String
@@ -202,7 +225,8 @@ spark.cdm.perfops.writeRateLimit 40000
202225
# .string.zone Default is UTC ; Must be in ZoneRulesProvider.getAvailableZoneIds()
203226
#-----------------------------------------------------------------------------------------------------------
204227
#spark.cdm.transform.missing.key.ts.replace.value
205-
#spark.cdm.transform.custom.writetime 0
228+
#spark.cdm.transform.custom.writetime 0
229+
#spark.cdm.transform.custom.writetime.incrementBy 0
206230
#spark.cdm.transform.codecs
207231
#spark.cdm.transform.codecs.timestamp.string.format yyyyMMddHHmmss
208232
#spark.cdm.transform.codecs.timestamp.string.zone UTC
@@ -287,6 +311,16 @@ spark.cdm.perfops.writeRateLimit 40000
287311
#spark.cdm.feature.explodeMap.target.name.key my_map_key
288312
#spark.cdm.feature.explodeMap.target.name.value my_map_value
289313

314+
#===========================================================================================================
315+
# Guardrail feature manages records that exceed guardrail checks. The Guardrail job will generate a
316+
# a report, other jobs will skip records that exceed the guardrail.
317+
#
318+
# spark.cdm.feature.guardrail
319+
# .colSizeInKB Default 0, meaning the check is not done. Records with one or more fields that
320+
# exceed this size will be flagged. Note this is kB (base 10), not kiB (base 2).
321+
#
322+
#===========================================================================================================
323+
#spark.cdm.feature.guardrail.colSizeInKB 1000
290324

291325
#===========================================================================================================
292326
# TLS (SSL) connection parameters, if so configured. Note that Secure Bundles embed these details.
@@ -320,3 +354,4 @@ spark.cdm.perfops.writeRateLimit 40000
320354
#spark.cdm.target.connect.tls.keyStore.password
321355
#spark.cdm.target.connect.tls.enabledAlgorithms TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA
322356

357+

0 commit comments

Comments
 (0)