Skip to content

Commit 0c9a2bc

Browse files
pravinbhatmsmygit
andauthored
Make trackRun feature work on all versions of C* (#320)
* Make trackRun feature work on all versions of C*, improved docs & optimized imports. * Update README.md Fix typos Co-authored-by: Madhavan <[email protected]> --------- Co-authored-by: Madhavan <[email protected]>
1 parent f207e33 commit 0c9a2bc

File tree

13 files changed

+136
-29
lines changed

13 files changed

+136
-29
lines changed

README.md

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ spark-submit --properties-file cdm.properties \
130130
- If you rerun a `validation` job, it will include any token-ranges that had differences in the previous run
131131
- Preserve [writetimes](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__retrieving-the-datetime-a-write-occurred-p) and [TTLs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__ref-select-ttl-p)
132132
- Supports migration/validation of advanced DataTypes ([Sets](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__set), [Lists](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__list), [Maps](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__map), [UDTs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__udt))
133-
- Filter records from `Origin` using `writetimes` and/or CQL conditions and/or a list of token-ranges
133+
- Filter records from `Origin` using `writetime` and/or CQL conditions and/or a list of token-ranges
134134
- Perform guardrail checks (identify large fields)
135135
- Supports adding `constants` as new columns on `Target`
136136
- Supports expanding `Map` columns on `Origin` into multiple records on `Target`
@@ -140,7 +140,7 @@ spark-submit --properties-file cdm.properties \
140140
- Migrate from any Cassandra `Origin` ([Apache Cassandra®](https://cassandra.apache.org) / [DataStax Enterprise&trade;](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB&trade;](https://www.datastax.com/products/datastax-astra)) to any Cassandra `Target` ([Apache Cassandra®](https://cassandra.apache.org) / [DataStax Enterprise&trade;](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB&trade;](https://www.datastax.com/products/datastax-astra))
141141
- Supports migration/validation from and to [Azure Cosmos Cassandra](https://learn.microsoft.com/en-us/azure/cosmos-db/cassandra)
142142
- Validate migration accuracy and performance using a smaller randomized data-set
143-
- Supports adding custom fixed `writetime`
143+
- Supports adding custom fixed `writetime` and/or `ttl`
144144
- Track run information (start-time, end-time, status, etc.) in tables (`cdm_run_info` and `cdm_run_details`) on the target keyspace
145145

146146
# Things to know
@@ -152,6 +152,20 @@ spark-submit --properties-file cdm.properties \
152152
- When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in `list` type columns. Note this is [due to a Cassandra/DSE bug](https://issues.apache.org/jira/browse/CASSANDRA-11368) and not a CDM issue. This issue can be addressed by enabling and setting a positive value for `spark.cdm.transform.custom.writetime.incrementBy` param. This param was specifically added to address this issue.
153153
- When you rerun job to resume from a previous run, the run metrics (read, write, skipped, etc.) captured in table `cdm_run_info` will be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well.
154154

155+
# Performance FAQ
156+
- Below recommendations may only be needed while migrating large tables where the default performance is not good enough.
157+
- Performance bottleneck are usually the result of
158+
- Low resource availability on `Origin` OR `Target` cluster
159+
- Low resource availability on CDM VMs, [see recommendations here](https://docs.datastax.com/en/data-migration/deployment-infrastructure.html#_machines)
160+
- Bad schema design which could be caused by out of balance `Origin` cluster, large partitions (> 100 MB), large rows (> 10MB) and/or high column count.
161+
- Incorrect configuration of below properties
162+
- `numParts`: Default is 5K, but ideal value is usually around table-size/10MB.
163+
- `batchSize`: Default is 5, but this should be set to 1 for tables where primary-key=partition-key OR where average row-size is > 20 KB. Similarly, this should be set to a value > 5, if row-size is small (< 1KB) and most partitions have several rows (100+).
164+
- `fetchSizeInRows`: Default is 1K & this usually fine. However you can reduce this if your table has many large rows (over 100KB).
165+
- `ratelimit`: Default is 20K. Once you set all the other properties appropriately, set this value to the highest possible value that your cluster (origin & target) is able to handle.
166+
- Using schema manipulation features (like `constantColumns`, `explodeMap`, `extractJson`), transformation functions and/or where-filter-conditions (except partition min/max) may negatively impacts performance
167+
- We typically recommend [this infrastructure](https://docs.datastax.com/en/data-migration/deployment-infrastructure.html#_machines) for CDM VMs and [this starter conf](https://github.com/datastax/cassandra-data-migrator/blob/main/src/resources/cdm.properties). You can then optimize the job further based on CDM params info provided above and the observed load and throughput on `Origin` and `Target` cluster.
168+
- Note: For additional performance tuning, refer to details mentioned in [cdm-detailed.properties file here](https://github.com/datastax/cassandra-data-migrator/blob/main/src/resources/cdm-detailed.properties)
155169

156170
# Building Jar for local development
157171
1. Clone this repo

RELEASE.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
# Release Notes
2+
## [4.6.1] - 2024-10-22
3+
- Make `trackRun` feature work on all versions of Cassandra/DSE by replacing the `IN` clause on `cdm_run_details` table.
4+
- Updated `README` docs.
5+
26
## [4.6.0] - 2024-10-18
37
- Allow using Collections and/or UDTs for `ttl` & `writetime` calculations. This is specifically helpful in scenarios where the only non-key columns are Collections and/or UDTs.
48

src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.time.Duration;
2020
import java.util.ArrayList;
2121
import java.util.Collection;
22+
import java.util.Collections;
2223

2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
@@ -84,35 +85,45 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable)
8485
boundSelectInfoStatement = bindStatement(
8586
"SELECT status FROM " + cdmKsTabInfo + " WHERE table_name = ? AND run_id = ?");
8687
boundSelectStatement = bindStatement("SELECT token_min, token_max FROM " + cdmKsTabDetails
87-
+ " WHERE table_name = ? AND run_id = ? and status in ('NOT_STARTED', 'STARTED', 'FAIL', 'DIFF') ALLOW FILTERING");
88+
+ " WHERE table_name = ? AND run_id = ? AND status = ? ALLOW FILTERING");
8889
}
8990

9091
public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId) throws RunNotStartedException {
91-
final Collection<SplitPartitions.Partition> pendingParts = new ArrayList<SplitPartitions.Partition>();
9292
if (prevRunId == 0) {
93-
return pendingParts;
93+
return Collections.emptyList();
9494
}
9595

9696
ResultSet rsInfo = session
9797
.execute(boundSelectInfoStatement.setString("table_name", tableName).setLong("run_id", prevRunId));
9898
Row cdmRunStatus = rsInfo.one();
9999
if (cdmRunStatus == null) {
100-
return pendingParts;
100+
return Collections.emptyList();
101101
} else {
102102
String status = cdmRunStatus.getString("status");
103103
if (TrackRun.RUN_STATUS.NOT_STARTED.toString().equals(status)) {
104104
throw new RunNotStartedException("Run not started for run_id: " + prevRunId);
105105
}
106106
}
107107

108-
ResultSet rs = session
109-
.execute(boundSelectStatement.setString("table_name", tableName).setLong("run_id", prevRunId));
108+
final Collection<SplitPartitions.Partition> pendingParts = new ArrayList<SplitPartitions.Partition>();
109+
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.NOT_STARTED.toString()));
110+
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.STARTED.toString()));
111+
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.FAIL.toString()));
112+
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.DIFF.toString()));
113+
114+
return pendingParts;
115+
}
116+
117+
protected Collection<SplitPartitions.Partition> getPartitionsByStatus(long prevRunId, String status) {
118+
ResultSet rs = session.execute(boundSelectStatement.setString("table_name", tableName)
119+
.setLong("run_id", prevRunId).setString("status", status));
120+
121+
final Collection<SplitPartitions.Partition> pendingParts = new ArrayList<SplitPartitions.Partition>();
110122
rs.forEach(row -> {
111123
Partition part = new Partition(BigInteger.valueOf(row.getLong("token_min")),
112124
BigInteger.valueOf(row.getLong("token_max")));
113125
pendingParts.add(part);
114126
});
115-
116127
return pendingParts;
117128
}
118129

src/main/java/com/datastax/cdm/cql/statement/TargetUpsertStatement.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@
2929
import com.datastax.cdm.feature.WritetimeTTL;
3030
import com.datastax.cdm.properties.IPropertyHelper;
3131
import com.datastax.cdm.properties.KnownProperties;
32-
import com.datastax.oss.driver.api.core.cql.*;
32+
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
33+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
34+
import com.datastax.oss.driver.api.core.cql.ResultSet;
35+
import com.datastax.oss.driver.api.core.cql.Row;
36+
import com.datastax.oss.driver.api.core.cql.Statement;
3337
import com.datastax.oss.driver.api.core.type.DataType;
3438

3539
public abstract class TargetUpsertStatement extends BaseCdmStatement {

src/main/java/com/datastax/cdm/data/CqlConversion.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616
package com.datastax.cdm.data;
1717

1818
import java.nio.ByteBuffer;
19-
import java.util.*;
19+
import java.util.ArrayList;
20+
import java.util.Collections;
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.Set;
2024
import java.util.stream.Collectors;
2125

2226
import org.slf4j.Logger;
@@ -25,7 +29,8 @@
2529
import com.datastax.cdm.schema.CqlTable;
2630
import com.datastax.oss.driver.api.core.ProtocolVersion;
2731
import com.datastax.oss.driver.api.core.data.UdtValue;
28-
import com.datastax.oss.driver.api.core.type.*;
32+
import com.datastax.oss.driver.api.core.type.DataType;
33+
import com.datastax.oss.driver.api.core.type.UserDefinedType;
2934
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
3035
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
3136

src/main/java/com/datastax/cdm/data/CqlData.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,22 @@
1515
*/
1616
package com.datastax.cdm.data;
1717

18-
import java.util.*;
18+
import java.util.Arrays;
19+
import java.util.Collections;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
1923

2024
import com.datastax.dse.driver.api.core.type.DseDataTypes;
2125
import com.datastax.oss.driver.api.core.data.UdtValue;
22-
import com.datastax.oss.driver.api.core.type.*;
26+
import com.datastax.oss.driver.api.core.type.DataType;
27+
import com.datastax.oss.driver.api.core.type.DataTypes;
28+
import com.datastax.oss.driver.api.core.type.ListType;
29+
import com.datastax.oss.driver.api.core.type.MapType;
30+
import com.datastax.oss.driver.api.core.type.SetType;
31+
import com.datastax.oss.driver.api.core.type.TupleType;
32+
import com.datastax.oss.driver.api.core.type.UserDefinedType;
33+
import com.datastax.oss.driver.api.core.type.VectorType;
2334

2435
public class CqlData {
2536
public enum Type {

src/main/java/com/datastax/cdm/data/DataUtility.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@
1919
import java.io.FileInputStream;
2020
import java.io.FileOutputStream;
2121
import java.io.IOException;
22-
import java.util.*;
22+
import java.util.ArrayList;
23+
import java.util.Arrays;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Set;
2328
import java.util.zip.ZipEntry;
2429
import java.util.zip.ZipOutputStream;
2530

src/main/java/com/datastax/cdm/data/EnhancedPK.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
*/
1616
package com.datastax.cdm.data;
1717

18-
import java.util.*;
18+
import java.util.ArrayList;
19+
import java.util.Collections;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Objects;
1923
import java.util.stream.Collectors;
2024

2125
import org.slf4j.Logger;

src/main/java/com/datastax/cdm/data/PKFactory.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,20 @@
1515
*/
1616
package com.datastax.cdm.data;
1717

18-
import java.util.*;
18+
import java.util.ArrayList;
19+
import java.util.Arrays;
20+
import java.util.List;
21+
import java.util.Map;
1922
import java.util.stream.Collectors;
2023

2124
import org.slf4j.Logger;
2225
import org.slf4j.LoggerFactory;
2326

24-
import com.datastax.cdm.feature.*;
27+
import com.datastax.cdm.feature.ConstantColumns;
28+
import com.datastax.cdm.feature.ExplodeMap;
29+
import com.datastax.cdm.feature.FeatureFactory;
30+
import com.datastax.cdm.feature.Featureset;
31+
import com.datastax.cdm.feature.WritetimeTTL;
2532
import com.datastax.cdm.properties.PropertyHelper;
2633
import com.datastax.cdm.schema.CqlTable;
2734
import com.datastax.oss.driver.api.core.cql.BoundStatement;

src/main/java/com/datastax/cdm/feature/ExplodeMap.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
*/
1616
package com.datastax.cdm.feature;
1717

18-
import java.util.*;
18+
import java.util.AbstractMap;
19+
import java.util.Arrays;
20+
import java.util.Collections;
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.Set;
1924
import java.util.stream.Collectors;
2025

2126
import org.slf4j.Logger;

0 commit comments

Comments
 (0)