@@ -6,8 +6,8 @@ Spark jobs in this repo can be used for data migration and data validation.
6
6
7
7
## Prerequisite
8
8
9
- Install Java8 as requirement as spark binaries are compiled with it
10
- Install single instance of spark on a node where we want to run this job. Spark can be installed by running the following: -
9
+ Install Java8 as spark binaries are compiled with it.
10
+ Install single instance of spark on a node where you want to run this job. Spark can be installed by running the following: -
11
11
12
12
```
13
13
wget https://downloads.apache.org/spark/spark-2.4.8/
@@ -17,74 +17,78 @@ tar -xvzf <spark downloaded file name>
17
17
# Steps:
18
18
19
19
1 . sparkConf.properties file needs to be configured as applicable for the environment
20
- > Sample Spark conf file configuration can be found here: ` astra-spark-migration-ranges/src/resources/sparkConf.properties `
20
+ > A sample Spark conf file configuration can be found here: ` astra-spark-migration-ranges/src/resources/sparkConf.properties `
21
21
22
22
```
23
23
Example of the conf file is below: -
24
- `spark.migrate.source.username <Source cluster user_id>`
25
- `spark.migrate.source.password <Source cluster password>`
26
- `spark.migrate.source.host <Source cluster host ip address or contact ip address>`
27
- `spark.migrate.astra.scb <path to scb for the target database on Astra>`
28
- `spark.migrate.astra.username <Astra client_id from the token file>`
29
- `spark.migrate.astra.password <Client_Secret from token file>`
30
- `spark.migrate.keyspaceName <keyspace name>`
31
- `spark.migrate.tableName <table name>`
32
- `spark.migrate.readRateLimit 200000 <can be configured as needed>`
33
- `spark.migrate.writeRateLimit 200000 <can be configured as needed>`
34
- `spark.migrate.batchSize 2 <batch size can be configured as needed>`
35
- `spark.migrate.source.writeTimeStampFilter 0`
24
+ spark.migrate.source.isAstra false
25
+ spark.migrate.source.host <host contact point>
26
+ spark.migrate.source.username <username>
27
+ spark.migrate.source.password <password>
28
+ spark.migrate.source.read.consistency.level LOCAL_QUORUM
29
+ spark.migrate.source.keyspaceTable test.a1
30
+
31
+ spark.migrate.destination.isAstra true
32
+ spark.migrate.destination.scb file:///aaa/bbb/secure-connect-enterprise.zip
33
+ spark.migrate.destination.username <astra-client-id>
34
+ spark.migrate.destination.password <astra-client-secret>
35
+ spark.migrate.destination.read.consistency.level LOCAL_QUORUM
36
+ spark.migrate.destination.keyspaceTable test.a2
37
+ spark.migrate.destination.autocorrect.missing false
38
+ spark.migrate.destination.autocorrect.mismatch false
36
39
```
37
40
38
41
2 . Place the conf file where it can be accessed while running the job via spark-submit.
39
- 3 . If the project is not already packaged, it needs to be packaged via mvn. It will be generate a fat jar called ` migrate-0.x.jar `
40
- 4 . Find the Min and Max partition values for the table using dsbulk as shown below:
42
+ 3 . Generate a fat jar ( ` migrate-0.x.jar ` ) using command ` mvn clean package `
43
+ 4 . Run the 'Data Migration' job using ` spark-submit ` command as shown below:
41
44
42
45
```
43
- dsbulk unload -h <contact_points> -query "select token(<partition_keys>) from <keyspace>.<table>;" -verbosity 0 --connector.csv.header false | sort -un | { tee >(head -1 >&2; cat >/dev/null) | tail -1; }
44
- ```
45
-
46
- 5 . Once the jar file is ready, we can run the 'Data Migration' job via spark-submit command as shown below:
47
-
48
- ```
49
- ./spark-submit --properties-file /media/bulk/sparkConf.properties /
46
+ ./spark-submit --properties-file sparkConf.properties /
50
47
--master "local[*]" /
51
48
--conf spark.migrate.source.minPartition=-9223372036854775808 /
52
49
--conf spark.migrate.source.maxPartition=9223372036854775807 /
53
- --class datastax.astra.migrate.Migrate /media/bulk/ migrate-0.x.jar
50
+ --class datastax.astra.migrate.Migrate migrate-0.x.jar &> logfile_name.txt
54
51
```
55
52
56
- 6 . Additionally you could write the output to a log file like “ logfile_name.txt” to avoid getting the output on the console.
53
+ Note: Above command also generates a log file ` logfile_name.txt ` to avoid log output on the console.
57
54
58
- ```
59
- ./spark-submit --properties-file /media/bulk/sparkConf.properties /
60
- --master "local[*]" /
61
- --conf spark.migrate.source.minPartition=-9223372036854775808 /
62
- --conf spark.migrate.source.maxPartition=9223372036854775807 /
63
- --class datastax.astra.migrate.Migrate /media/bulk/migrate-0.x.jar &> logfile_name.txt
64
- ```
65
55
66
56
# Data-validation job
67
57
68
- - For Data validation same prerequisite applies as Data migration, however you will need to use the class option ` --class datastax.astra.migrate.DiffData `
58
+ - To run the job in Data validation mode, use class option ` --class datastax.astra.migrate.DiffData ` as shown below
69
59
70
60
```
71
- ./spark-submit --properties-file /media/bulk/ sparkConf.properties /
61
+ ./spark-submit --properties-file sparkConf.properties /
72
62
--master "local[*]" /
73
63
--conf spark.migrate.source.minPartition=-9223372036854775808 /
74
64
--conf spark.migrate.source.maxPartition=9223372036854775807 /
75
- --class datastax.astra.migrate.DiffData /media/bulk/ migrate-0.x.jar
65
+ --class datastax.astra.migrate.DiffData migrate-0.x.jar &> logfile_name.txt
76
66
```
77
67
78
- - On the output of the run, the job will report differences as “ERRORS” as shown below
68
+ - Validation job will report differences as “ERRORS” in the log file as shown below
79
69
80
70
```
81
- 22/02/16 12:41:15 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5). 794 bytes result sent to driver
82
- 22/02/16 12:41:15 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 751 bytes result sent to driver
83
- 22/02/16 12:41:15 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 2814 ms on localhost (executor driver) (3/6)
84
- 22/02/16 12:41:15 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 2814 ms on localhost (executor driver) (4/6)
85
71
22/02/16 12:41:15 ERROR DiffJobSession: Data is missing in Astra: e7cd5752-bc0d-4157-a80f-7523add8dbcd
86
72
22/02/16 12:41:15 ERROR DiffJobSession: Data difference found - Key: 1 Data: (Index: 3 Source: [val-A, val-B] Astra: [val-A, val-B, val-C] )
87
- 22/02/16 12:41:15 INFO DiffJobSession: TreadID: 57 Final Read Record Count: 1
88
73
```
89
74
90
- - Please grep for all ` ERROR ` from the output log files to get the list of differences, notice that its listing differences by partition key value in this case.
75
+ - Please grep for all ` ERROR ` from the output log files to get the list of missing and mismatched records.
76
+ - Note that it lists differences by partition key values.
77
+ - The Validation job can also be run in an AutoCorrect mode. This mode can
78
+ - Add any missing records from source to target
79
+ - Fix any inconsistencies between source and target (makes target same as source).
80
+ - Enable/disable this feature using one or both of the below setting in the config file
81
+
82
+ ```
83
+ spark.migrate.destination.autocorrect.missing true|false
84
+ spark.migrate.destination.autocorrect.mismatch true|false
85
+ ```
86
+
87
+ # Additional features
88
+ - Counter tables
89
+ - Preserve writetimes and TTL
90
+ - Advanced DataTypes (Sets, Lists, Maps, UDTs)
91
+ - Filter records from source using writetime
92
+ - SSL Support (including custom cipher algorithms)
93
+ - Migrate from any Cassandra source (Cassandra/DSE/Astra) to any Cassandra target (Cassandra/DSE/Astra)
94
+ - Validate migration accuracy and performance using a smaller randomized data-set
0 commit comments