Skip to content

Commit 264e5ed

Browse files
authored
Merge pull request #126 from datastax/feature/CDM-3-auto-discover-schema
Implemented auto-discovery of scheme (partition-key, clustering-key, …
2 parents 0102e4e + 4051431 commit 264e5ed

16 files changed

+425
-305
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ tar -xvzf spark-3.3.1-bin-hadoop3.tgz
3131

3232
```
3333
./spark-submit --properties-file cdm.properties /
34+
--conf spark.origin.keyspaceTable="<keyspace-name>.<table-name>" /
3435
--master "local[*]" /
3536
--class datastax.astra.migrate.Migrate cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
3637
```
@@ -40,6 +41,7 @@ Note:
4041
- Add option `--driver-memory 25G --executor-memory 25G` as shown below if the table migrated is large (over 100GB)
4142
```
4243
./spark-submit --properties-file cdm.properties /
44+
--conf spark.origin.keyspaceTable="<keyspace-name>.<table-name>" /
4345
--master "local[*]" --driver-memory 25G --executor-memory 25G /
4446
--class datastax.astra.migrate.Migrate cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
4547
```
@@ -50,6 +52,7 @@ Note:
5052

5153
```
5254
./spark-submit --properties-file cdm.properties /
55+
--conf spark.origin.keyspaceTable="<keyspace-name>.<table-name>" /
5356
--master "local[*]" /
5457
--class datastax.astra.migrate.DiffData cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
5558
```
@@ -80,6 +83,7 @@ Note:
8083
- You can also use the tool to migrate specific partition ranges using class option `--class datastax.astra.migrate.MigratePartitionsFromFile` as shown below
8184
```
8285
./spark-submit --properties-file cdm.properties /
86+
--conf spark.origin.keyspaceTable="<keyspace-name>.<table-name>" /
8387
--master "local[*]" /
8488
--class datastax.astra.migrate.MigratePartitionsFromFile cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
8589
```
@@ -97,12 +101,14 @@ This mode is specifically useful to processes a subset of partition-ranges that
97101
- The tool can be used to identify large fields from a table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field) `--class datastax.astra.migrate.Guardrail` as shown below
98102
```
99103
./spark-submit --properties-file cdmGuardrail.properties /
104+
--conf spark.origin.keyspaceTable="<keyspace-name>.<table-name>" /
100105
--master "local[*]" /
101106
--class datastax.astra.migrate.Guardrail cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
102107
```
103108
> A sample Guardrail properties file can be [found here](./src/resources/cdmGuardrail.properties)
104109
105110
# Features
111+
- Auto-detects table schema (column names, types, id fields, collections, UDTs, etc.)
106112
- Supports migration/validation of [Counter tables](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_using/useCountersConcept.html)
107113
- Preserve [writetimes](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__retrieving-the-datetime-a-write-occurred-p) and [TTLs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__ref-select-ttl-p)
108114
- Supports migration/validation of advanced DataTypes ([Sets](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__set), [Lists](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__list), [Maps](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__map), [UDTs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__udt))
@@ -116,6 +122,9 @@ This mode is specifically useful to processes a subset of partition-ranges that
116122
- Validate migration accuracy and performance using a smaller randomized data-set
117123
- Supports adding custom fixed `writetime`
118124

125+
# Known Limitations
126+
- This tool does not migrate `ttl` & `writetime` at the field-level (for optimization reasons). It instead finds the field with the highest `ttl` & the field with the highest `writetime` within an `origin` row and uses those values on the entire `target` row.
127+
119128
# Building Jar for local development
120129
1. Clone this repo
121130
2. Move to the repo folder `cd cassandra-data-migrator`

pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
<properties>
1010
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
11-
<revision>3.3.1</revision>
11+
<revision>3.4.0</revision>
1212
<scala.version>2.12.17</scala.version>
1313
<scala.main.version>2.12</scala.main.version>
1414
<spark.version>3.3.1</spark.version>
@@ -89,6 +89,12 @@
8989
<artifactId>log4j-to-slf4j</artifactId>
9090
<version>2.19.0</version>
9191
</dependency>
92+
<dependency>
93+
<groupId>org.projectlombok</groupId>
94+
<artifactId>lombok</artifactId>
95+
<version>1.18.26</version>
96+
<scope>provided</scope>
97+
</dependency>
9298

9399
<!-- Test Dependencies -->
94100
<dependency>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 76 additions & 74 deletions
Large diffs are not rendered by default.

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 26 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
55
import com.datastax.oss.driver.api.core.cql.Row;
66
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
7+
import datastax.astra.migrate.schema.TableInfo;
8+
import datastax.astra.migrate.schema.TypeInfo;
79
import org.apache.commons.lang.SerializationUtils;
810
import org.apache.spark.SparkConf;
911

@@ -33,8 +35,6 @@ public abstract class BaseJobSession {
3335
protected Integer maxRetries = 10;
3436
protected AtomicLong readCounter = new AtomicLong(0);
3537

36-
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
37-
protected List<MigrateDataType> idColTypes = new ArrayList<MigrateDataType>();
3838
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();
3939

4040
protected Integer batchSize = 1;
@@ -46,8 +46,6 @@ public abstract class BaseJobSession {
4646
protected Long maxWriteTimeStampFilter = Long.MAX_VALUE;
4747
protected Long customWritetime = 0l;
4848

49-
protected List<Integer> writeTimeStampCols = new ArrayList<Integer>();
50-
protected List<Integer> ttlCols = new ArrayList<Integer>();
5149
protected Boolean isCounterTable = false;
5250

5351
protected String sourceKeyspaceTable;
@@ -59,79 +57,58 @@ public abstract class BaseJobSession {
5957
protected String filterColType;
6058
protected Integer filterColIndex;
6159
protected String filterColValue;
62-
63-
protected String selectCols;
64-
protected String partitionKey;
6560
protected String sourceSelectCondition;
66-
protected String[] allCols;
67-
protected String idCols;
68-
protected String tsReplaceValStr;
69-
protected long tsReplaceVal;
7061

7162
protected BaseJobSession(SparkConf sc) {
7263
readConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.read"));
7364
writeConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.write"));
7465
readLimiter = RateLimiter.create(Integer.parseInt(Util.getSparkPropOr(sc, "spark.readRateLimit", "20000")));
7566
sourceKeyspaceTable = sc.get("spark.origin.keyspaceTable");
7667
hasRandomPartitioner = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.origin.hasRandomPartitioner", "false"));
77-
78-
selectCols = Util.getSparkProp(sc, "spark.query.origin");
79-
allCols = selectCols.split(",");
80-
partitionKey = Util.getSparkProp(sc, "spark.query.origin.partitionKey");
8168
sourceSelectCondition = Util.getSparkPropOrEmpty(sc, "spark.query.condition");
8269
if (!sourceSelectCondition.isEmpty() && !sourceSelectCondition.trim().toUpperCase().startsWith("AND")) {
8370
sourceSelectCondition = " AND " + sourceSelectCondition;
8471
}
85-
selectColTypes = getTypes(Util.getSparkProp(sc, "spark.query.types"));
86-
idCols = Util.getSparkPropOrEmpty(sc, "spark.query.target.id");
87-
idColTypes = selectColTypes.subList(0, idCols.split(",").length);
72+
8873
printStatsAfter = Integer.parseInt(Util.getSparkPropOr(sc, "spark.printStatsAfter", "100000"));
8974
if (printStatsAfter < 1) {
9075
printStatsAfter = 100000;
9176
}
9277
}
9378

94-
public String getKey(Row sourceRow) {
95-
StringBuffer key = new StringBuffer();
96-
for (int index = 0; index < idColTypes.size(); index++) {
97-
MigrateDataType dataType = idColTypes.get(index);
98-
if (index == 0) {
99-
key.append(getData(dataType, index, sourceRow));
100-
} else {
101-
key.append(" %% " + getData(dataType, index, sourceRow));
79+
public Object getData(TypeInfo typeInfo, int index, Row row) {
80+
if (typeInfo.getTypeClass() == Map.class) {
81+
return row.getMap(index, typeInfo.getSubTypes().get(0), typeInfo.getSubTypes().get(1));
82+
} else if (typeInfo.getTypeClass() == List.class) {
83+
return row.getList(index, typeInfo.getSubTypes().get(0));
84+
} else if (typeInfo.getTypeClass() == Set.class) {
85+
return row.getSet(index, typeInfo.getSubTypes().get(0));
86+
} else if (isCounterTable && typeInfo.getTypeClass() == Long.class) {
87+
Object data = row.get(index, typeInfo.getTypeClass());
88+
if (data == null) {
89+
return Long.valueOf(0);
10290
}
10391
}
10492

105-
return key.toString();
93+
return row.get(index, typeInfo.getTypeClass());
10694
}
10795

108-
public List<MigrateDataType> getTypes(String types) {
109-
List<MigrateDataType> dataTypes = new ArrayList<MigrateDataType>();
110-
for (String type : types.split(",")) {
111-
dataTypes.add(new MigrateDataType(type));
112-
}
113-
114-
return dataTypes;
96+
public int getFieldSize(TypeInfo typeInfo, int index, Row row) {
97+
return SerializationUtils.serialize((Serializable) getData(typeInfo, index, row)).length;
11598
}
11699

117-
public Object getData(MigrateDataType dataType, int index, Row row) {
118-
if (dataType.typeClass == Map.class) {
119-
return row.getMap(index, dataType.subTypes.get(0), dataType.subTypes.get(1));
120-
} else if (dataType.typeClass == List.class) {
121-
return row.getList(index, dataType.subTypes.get(0));
122-
} else if (dataType.typeClass == Set.class) {
123-
return row.getSet(index, dataType.subTypes.get(0));
124-
} else if (isCounterTable && dataType.typeClass == Long.class) {
125-
Object data = row.get(index, dataType.typeClass);
126-
if (data == null) {
127-
return Long.valueOf(0);
100+
public String getKey(Row sourceRow, TableInfo tableInfo) {
101+
StringBuffer key = new StringBuffer();
102+
for (int index = 0; index < tableInfo.getKeyColumns().size(); index++) {
103+
TypeInfo typeInfo = tableInfo.getIdColumns().get(index).getTypeInfo();
104+
if (index == 0) {
105+
key.append(getData(typeInfo, index, sourceRow));
106+
} else {
107+
key.append(" %% " + getData(typeInfo, index, sourceRow));
128108
}
129109
}
130110

131-
return row.get(index, dataType.typeClass);
111+
return key.toString();
132112
}
133113

134-
public int getFieldSize(MigrateDataType dataType, int index, Row row) {
135-
return SerializationUtils.serialize((Serializable) getData(dataType, index, row)).length;
136-
}
137114
}

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.datastax.oss.driver.api.core.CqlSession;
44
import com.datastax.oss.driver.api.core.cql.*;
5+
import datastax.astra.migrate.schema.TypeInfo;
56
import org.apache.spark.SparkConf;
67
import org.slf4j.Logger;
78
import org.slf4j.LoggerFactory;
@@ -68,9 +69,9 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
6869
}
6970

7071
if (filterData) {
71-
String col = (String) getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
72+
String col = (String) getData(new TypeInfo(filterColType), filterColIndex, sourceRow);
7273
if (col.trim().equalsIgnoreCase(filterColValue)) {
73-
logger.warn("Skipping row and filtering out: {}", getKey(sourceRow));
74+
logger.warn("Skipping row and filtering out: {}", getKey(sourceRow, tableInfo));
7475
skipCnt++;
7576
continue;
7677
}
@@ -117,9 +118,9 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
117118
}
118119

119120
if (filterData) {
120-
String colValue = (String) getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
121+
String colValue = (String) getData(new TypeInfo(filterColType), filterColIndex, sourceRow);
121122
if (colValue.trim().equalsIgnoreCase(filterColValue)) {
122-
logger.warn("Skipping row and filtering out: {}", getKey(sourceRow));
123+
logger.warn("Skipping row and filtering out: {}", getKey(sourceRow, tableInfo));
123124
skipCnt++;
124125
continue;
125126
}

src/main/java/datastax/astra/migrate/CopyPKJobSession.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.datastax.oss.driver.api.core.cql.BoundStatement;
55
import com.datastax.oss.driver.api.core.cql.ResultSet;
66
import com.datastax.oss.driver.api.core.cql.Row;
7+
import datastax.astra.migrate.schema.ColumnInfo;
78
import org.apache.spark.SparkConf;
89
import org.slf4j.Logger;
910
import org.slf4j.LoggerFactory;
@@ -44,8 +45,8 @@ public void getRowAndInsert(List<SplitPartitions.PKRows> rowsList) {
4445
String[] pkFields = row.split(" %% ");
4546
int idx = 0;
4647
BoundStatement bspk = sourceSelectStatement.bind().setConsistencyLevel(readConsistencyLevel);
47-
for (MigrateDataType tp : idColTypes) {
48-
bspk = bspk.set(idx, convert(tp.typeClass, pkFields[idx]), tp.typeClass);
48+
for (ColumnInfo ci : tableInfo.getIdColumns()) {
49+
bspk = bspk.set(idx, convert(ci.getTypeInfo().getTypeClass(), pkFields[idx]), ci.getTypeInfo().getTypeClass());
4950
idx++;
5051
}
5152
Row pkRow = sourceSession.execute(bspk).one();

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.datastax.oss.driver.api.core.cql.ResultSet;
77
import com.datastax.oss.driver.api.core.cql.Row;
88
import com.datastax.oss.driver.api.core.data.UdtValue;
9+
import datastax.astra.migrate.schema.TypeInfo;
910
import org.apache.spark.SparkConf;
1011
import org.slf4j.Logger;
1112
import org.slf4j.LoggerFactory;
@@ -107,7 +108,7 @@ private void diffAndClear(Map<Row, CompletionStage<AsyncResultSet>> srcToTargetR
107108
Row targetRow = srcToTargetRowMap.get(srcRow).toCompletableFuture().get().one();
108109
diff(srcRow, targetRow);
109110
} catch (Exception e) {
110-
logger.error("Could not perform diff for Key: {}", getKey(srcRow), e);
111+
logger.error("Could not perform diff for Key: {}", getKey(srcRow, tableInfo), e);
111112
}
112113
}
113114
srcToTargetRowMap.clear();
@@ -134,13 +135,13 @@ public synchronized void printCounts(boolean isFinal) {
134135
private void diff(Row sourceRow, Row astraRow) {
135136
if (astraRow == null) {
136137
missingCounter.incrementAndGet();
137-
logger.error("Missing target row found for key: {}", getKey(sourceRow));
138+
logger.error("Missing target row found for key: {}", getKey(sourceRow, tableInfo));
138139
//correct data
139140

140141
if (autoCorrectMissing) {
141142
astraSession.execute(bindInsert(astraInsertStatement, sourceRow, null));
142143
correctedMissingCounter.incrementAndGet();
143-
logger.error("Inserted missing row in target: {}", getKey(sourceRow));
144+
logger.error("Inserted missing row in target: {}", getKey(sourceRow, tableInfo));
144145
}
145146

146147
return;
@@ -149,7 +150,7 @@ private void diff(Row sourceRow, Row astraRow) {
149150
String diffData = isDifferent(sourceRow, astraRow);
150151
if (!diffData.isEmpty()) {
151152
mismatchCounter.incrementAndGet();
152-
logger.error("Mismatch row found for key: {} Mismatch: {}", getKey(sourceRow), diffData);
153+
logger.error("Mismatch row found for key: {} Mismatch: {}", getKey(sourceRow, tableInfo), diffData);
153154

154155
if (autoCorrectMismatch) {
155156
if (isCounterTable) {
@@ -158,7 +159,7 @@ private void diff(Row sourceRow, Row astraRow) {
158159
astraSession.execute(bindInsert(astraInsertStatement, sourceRow, null));
159160
}
160161
correctedMismatchCounter.incrementAndGet();
161-
logger.error("Updated mismatch row in target: {}", getKey(sourceRow));
162+
logger.error("Updated mismatch row in target: {}", getKey(sourceRow, tableInfo));
162163
}
163164

164165
return;
@@ -169,21 +170,21 @@ private void diff(Row sourceRow, Row astraRow) {
169170

170171
private String isDifferent(Row sourceRow, Row astraRow) {
171172
StringBuffer diffData = new StringBuffer();
172-
IntStream.range(0, selectColTypes.size()).parallel().forEach(index -> {
173-
MigrateDataType dataTypeObj = selectColTypes.get(index);
174-
Object source = getData(dataTypeObj, index, sourceRow);
175-
if (index < idColTypes.size()) {
176-
Optional<Object> optionalVal = handleBlankInPrimaryKey(index, source, dataTypeObj.typeClass, sourceRow, false);
173+
IntStream.range(0, tableInfo.getAllColumns().size()).parallel().forEach(index -> {
174+
TypeInfo typeInfo = tableInfo.getColumns().get(index).getTypeInfo();
175+
Object source = getData(typeInfo, index, sourceRow);
176+
if (index < tableInfo.getKeyColumns().size()) {
177+
Optional<Object> optionalVal = handleBlankInPrimaryKey(index, source, typeInfo.getTypeClass(), sourceRow, false);
177178
if (optionalVal.isPresent()) {
178179
source = optionalVal.get();
179180
}
180181
}
181182

182-
Object astra = getData(dataTypeObj, index, astraRow);
183+
Object astra = getData(typeInfo, index, astraRow);
183184

184-
boolean isDiff = dataTypeObj.diff(source, astra);
185+
boolean isDiff = typeInfo.diff(source, astra);
185186
if (isDiff) {
186-
if (dataTypeObj.typeClass.equals(UdtValue.class)) {
187+
if (typeInfo.getTypeClass().equals(UdtValue.class)) {
187188
String sourceUdtContent = ((UdtValue) source).getFormattedContents();
188189
String astraUdtContent = ((UdtValue) astra).getFormattedContents();
189190
if (!sourceUdtContent.equals(astraUdtContent)) {

0 commit comments

Comments
 (0)