Skip to content

Commit 42e2553

Browse files
committed
Merge remote-tracking branch 'origin/issue/CDM-17' into issue/97
2 parents 2d8e0ed + ed79962 commit 42e2553

File tree

14 files changed

+114
-102
lines changed

14 files changed

+114
-102
lines changed
Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
name: Build and test jar with integration tests
2-
run-name: ${{ github.actor }} is testing out GitHub Actions 🚀
32
on: [push]
43
jobs:
5-
Explore-GitHub-Actions:
4+
CDM-Integration-Test:
65
runs-on: ubuntu-latest
76
steps:
87
- uses: actions/checkout@v3
@@ -12,12 +11,8 @@ jobs:
1211
java-version: '8'
1312
distribution: 'temurin'
1413
cache: maven
15-
- name: Build with Maven
16-
run: mvn -B package --file pom.xml
17-
1814
- name: Test SIT with cdm
1915
working-directory: SIT
2016
run: |
2117
echo "Testing SIT with cdm"
22-
chmod o+rx ./environment.sh
23-
make -f Makefile
18+
make -f Makefile

SIT/Makefile

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11

2-
all: set_permissions setup test_smoke env_teardown
2+
all: setup test_smoke env_teardown
33

4-
set_permissions:
5-
chmod -R 777 ./*.sh
64
setup: build env_setup
75

86
reset: build env_reset
@@ -16,6 +14,7 @@ test_smoke_cmd:
1614
./test.sh -p smoke
1715

1816
env_setup:
17+
chmod -R 777 ./*.sh
1918
./environment.sh -m setup -j ../target/cassandra-data-migrator*.jar
2019
env_reset:
2120
./environment.sh -m reset -j ../target/cassandra-data-migrator*.jar

pom.xml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
<properties>
1010
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
11-
<revision>3.4.0</revision>
11+
<revision>4.0.0-SNAPSHOT</revision>
1212
<scala.version>2.12.17</scala.version>
1313
<scala.main.version>2.12</scala.main.version>
1414
<spark.version>3.3.1</spark.version>
@@ -23,6 +23,9 @@
2323
<id>github</id>
2424
<name>GitHub Packages</name>
2525
<url>https://maven.pkg.github.com/datastax/cassandra-data-migrator</url>
26+
<snapshots>
27+
<enabled>false</enabled>
28+
</snapshots>
2629
</repository>
2730
</distributionManagement>
2831

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.datastax.oss.driver.api.core.CqlSession;
44
import com.datastax.oss.driver.api.core.cql.*;
5+
import datastax.astra.migrate.properties.KnownProperties;
56
import org.apache.spark.SparkConf;
67
import org.slf4j.Logger;
78
import org.slf4j.LoggerFactory;

src/main/java/datastax/astra/migrate/MigrateDataType.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,19 @@ public MigrateDataType(String dataType) {
5353
}
5454
this.typeClass = getType(this.type);
5555

56+
if (this.type >= minType && this.type <= maxType) {
57+
isValid = true;
58+
for (Object o : subTypes) {
59+
if (null == o || Object.class == o) {
60+
isValid = false;
61+
}
62+
}
63+
}
64+
else {
65+
isValid = false;
66+
}
67+
this.typeClass = getType(this.type);
68+
5669
if (this.type >= minType && this.type <= maxType) {
5770
isValid = true;
5871
for (Object o : subTypes) {
@@ -76,16 +89,16 @@ private int typeAsInt(String dataType) {
7689
return rtn;
7790
}
7891

79-
public boolean diff(Object source, Object astra) {
80-
if (source == null && astra == null) {
92+
public boolean diff(Object obj1, Object obj2) {
93+
if (obj1 == null && obj2 == null) {
8194
return false;
82-
} else if (source == null && astra != null) {
95+
} else if (obj1 == null && obj2 != null) {
8396
return true;
84-
} else if (source != null && astra == null) {
97+
} else if (obj1 != null && obj2 == null) {
8598
return true;
8699
}
87100

88-
return !source.equals(astra);
101+
return !obj1.equals(obj2);
89102
}
90103

91104
private Class getType(int type) {

src/main/java/datastax/astra/migrate/OriginCountJobSession.java

Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -45,48 +45,49 @@ public class OriginCountJobSession extends BaseJobSession {
4545
// protected Integer fieldGuardraillimitMB;
4646
// protected List<MigrateDataType> checkTableforColSizeTypes = new ArrayList<MigrateDataType>();
4747
//
48-
protected OriginCountJobSession(CqlSession sourceSession, SparkConf sc) {
48+
protected OriginCountJobSession(CqlSession originSession, SparkConf sc) {
4949
super(sc);
50-
// this.sourceSession = sourceSession;
51-
// batchSize = new Integer(sc.get("spark.batchSize", "1"));
52-
// printStatsAfter = new Integer(sc.get("spark.printStatsAfter", "100000"));
50+
// this.originSessionSession = originSession;
51+
// batchSize = new Integer(sc.get(KnownProperties.SPARK_BATCH_SIZE, "1"));
52+
// printStatsAfter = new Integer(sc.get(KnownProperties.SPARK_STATS_AFTER, "100000"));
5353
// if (printStatsAfter < 1) {
5454
// printStatsAfter = 100000;
5555
// }
5656
//
57-
// readLimiter = RateLimiter.create(new Integer(sc.get("spark.readRateLimit", "20000")));
58-
// sourceKeyspaceTable = sc.get("spark.origin.keyspaceTable");
57+
// readLimiter = RateLimiter.create(new Integer(sc.get(KnownProperties.SPARK_LIMIT_READ, "20000")));
58+
// originKeyspaceTable = sc.get(KnownProperties.ORIGIN_KEYSPACE_TABLE);
5959
//
60-
// hasRandomPartitioner = Boolean.parseBoolean(sc.get("spark.origin.hasRandomPartitioner", "false"));
61-
// isCounterTable = Boolean.parseBoolean(sc.get("spark.counterTable", "false"));
60+
// hasRandomPartitioner = Boolean.parseBoolean(sc.get(KnownProperties.ORIGIN_HAS_RANDOM_PARTITIONER, "false"));
61+
// isCounterTable = Boolean.parseBoolean(sc.get(KnownProperties.ORIGIN_IS_COUNTER, "false"));
6262
//
63-
// checkTableforColSize = Boolean.parseBoolean(sc.get("spark.origin.checkTableforColSize", "false"));
64-
// checkTableforselectCols = sc.get("spark.origin.checkTableforColSize.cols");
65-
// checkTableforColSizeTypes = getTypes(sc.get("spark.origin.checkTableforColSize.cols.types"));
66-
// filterColName = propertyHelper.getString(KnownProperties.ORIGIN_FILTER_COLUMN_NAME);
67-
// filterColType = propertyHelper.getString(KnownProperties.ORIGIN_FILTER_COLUMN_TYPE); // TODO: this is a string, but should be MigrationDataType?
68-
// filterColIndex = Integer.parseInt(sc.get("spark.origin.FilterColumnIndex", "0"));
69-
// fieldGuardraillimitMB = Integer.parseInt(sc.get("spark.fieldGuardraillimitMB", "0"));
63+
// checkTableforColSize = Boolean.parseBoolean(sc.get(KnownProperties.ORIGIN_CHECK_COLSIZE_ENABLED, "false"));
64+
// checkTableforselectCols = sc.get(KnownProperties.ORIGIN_CHECK_COLSIZE_COLUMN_NAMES);
65+
// checkTableforColSizeTypes = getTypes(sc.get(KnownProperties.ORIGIN_CHECK_COLSIZE_COLUMN_TYPES));
66+
// filterColName = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_FILTER_COLUMN_NAME);
67+
// filterColType = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_FILTER_COLUMN_TYPE);
68+
// filterColIndex = Integer.parseInt(sc.get(KnownProperties.ORIGIN_FILTER_COLUMN_INDEX, "0"));
69+
// fieldGuardraillimitMB = Integer.parseInt(sc.get(KnownProperties.FIELD_GUARDRAIL_MB, "0"));
7070
//
71-
// String partionKey = sc.get("spark.query.cols.partitionKey");
72-
// idColTypes = getTypes(sc.get("spark.query.cols.id.types"));
71+
// String partionKey = sc.get(KnownProperties.ORIGIN_PARTITION_KEY);
72+
// idColTypes = getTypes(sc.get(KnownProperties.TARGET_PRIMARY_KEY_TYPES));
7373
//
74-
// String selectCols = sc.get("spark.query.cols.select");
75-
// String updateSelectMappingStr = sc.get("spark.counterTable.cql.index", "0");
74+
// String selectCols = sc.get(KnownProperties.ORIGIN_COLUMN_NAMES);
75+
// String updateSelectMappingStr = sc.get(KnownProperties.ORIGIN_COUNTER_INDEXES, "0");
7676
// for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
7777
// updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
7878
// }
79-
// String sourceSelectCondition = sc.get("spark.query.cols.select.condition", "");
80-
// sourceSelectStatement = sourceSession.prepare(
81-
// "select " + selectCols + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
82-
// + ") >= ? and token(" + partionKey.trim() + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING");
79+
// String originSelectCondition = sc.get(KnownProperties.ORIGIN_FILTER_CONDITION, "");
80+
// // TODO: AbstractJobSession has some checks to ensure AND is added to the condition
81+
// originSelectStatement = originSession.prepare(
82+
// "select " + selectCols + " from " + originKeyspaceTable + " where token(" + partionKey.trim()
83+
// + ") >= ? and token(" + partionKey.trim() + ") <= ? " + originSelectCondition + " ALLOW FILTERING");
8384
}
8485
//
85-
// public static OriginCountJobSession getInstance(CqlSession sourceSession, SparkConf sparkConf) {
86+
// public static OriginCountJobSession getInstance(CqlSession originSession, SparkConf sparkConf) {
8687
// if (originCountJobSession == null) {
8788
// synchronized (OriginCountJobSession.class) {
8889
// if (originCountJobSession == null) {
89-
// originCountJobSession = new OriginCountJobSession(sourceSession, sparkConf);
90+
// originCountJobSession = new OriginCountJobSession(originSession, sparkConf);
9091
// }
9192
// }
9293
// }
@@ -100,7 +101,7 @@ protected OriginCountJobSession(CqlSession sourceSession, SparkConf sc) {
100101
// int maxAttempts = maxRetries + 1;
101102
// for (int attempts = 1; attempts <= maxAttempts && !done; attempts++) {
102103
// try {
103-
// ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ?
104+
// ResultSet resultSet = originSessionSession.execute(originSelectStatement.bind(hasRandomPartitioner ?
104105
// min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact())
105106
// .setConsistencyLevel(readConsistencyLevel).setPageSize(fetchSizeInRows));
106107
//
@@ -110,16 +111,16 @@ protected OriginCountJobSession(CqlSession sourceSession, SparkConf sc) {
110111
// // maxWriteTimeStampFilter is less than max long
111112
// // do not batch for counters as it adds latency & increases chance of discrepancy
112113
// if (batchSize == 1 || writeTimeStampFilter || isCounterTable) {
113-
// for (Row sourceRow : resultSet) {
114+
// for (Row originRow : resultSet) {
114115
// readLimiter.acquire(1);
115116
//
116117
// if (checkTableforColSize) {
117-
// int rowColcnt = GetRowColumnLength(sourceRow, filterColType, filterColIndex);
118+
// int rowColcnt = GetRowColumnLength(originRow, filterColType, filterColIndex);
118119
// String result = "";
119120
// if (rowColcnt > fieldGuardraillimitMB * 1048576) {
120121
// for (int index = 0; index < checkTableforColSizeTypes.size(); index++) {
121122
// MigrateDataType dataType = checkTableforColSizeTypes.get(index);
122-
// Object colData = getData(dataType, index, sourceRow);
123+
// Object colData = getData(dataType, index, originRow);
123124
// String[] colName = checkTableforselectCols.split(",");
124125
// result = result + " - " + colName[index] + " : " + colData;
125126
// }
@@ -130,17 +131,17 @@ protected OriginCountJobSession(CqlSession sourceSession, SparkConf sc) {
130131
// }
131132
// } else {
132133
// BatchStatement batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
133-
// for (Row sourceRow : resultSet) {
134+
// for (Row originRow : resultSet) {
134135
// readLimiter.acquire(1);
135136
// writeLimiter.acquire(1);
136137
//
137138
// if (checkTableforColSize) {
138-
// int rowColcnt = GetRowColumnLength(sourceRow, filterColType, filterColIndex);
139+
// int rowColcnt = GetRowColumnLength(originRow, filterColType, filterColIndex);
139140
// String result = "";
140141
// if (rowColcnt > fieldGuardraillimitMB * 1048576) {
141142
// for (int index = 0; index < checkTableforColSizeTypes.size(); index++) {
142143
// MigrateDataType dataType = checkTableforColSizeTypes.get(index);
143-
// Object colData = getData(dataType, index, sourceRow);
144+
// Object colData = getData(dataType, index, originRow);
144145
// String[] colName = checkTableforselectCols.split(",");
145146
// result = result + " - " + colName[index] + " : " + colData;
146147
// }
@@ -166,9 +167,9 @@ protected OriginCountJobSession(CqlSession sourceSession, SparkConf sc) {
166167
// }
167168
// }
168169
//
169-
// private int GetRowColumnLength(Row sourceRow, String filterColType, Integer filterColIndex) {
170+
// private int GetRowColumnLength(Row originRow, String filterColType, Integer filterColIndex) {
170171
// int sizeInMB = 0;
171-
// Object colData = getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
172+
// Object colData = getData(new MigrateDataType(filterColType), filterColIndex, originRow);
172173
// byte[] colBytes = SerializationUtils.serialize((Serializable) colData);
173174
// sizeInMB = colBytes.length;
174175
// if (sizeInMB > fieldGuardraillimitMB)

src/main/java/datastax/astra/migrate/properties/KnownProperties.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public enum PropertyType {
174174
public static final String SPARK_MAX_RETRIES = "spark.maxRetries"; // 0
175175
public static final String READ_FETCH_SIZE = "spark.read.fetch.sizeInRows"; //1000
176176
public static final String SPARK_STATS_AFTER = "spark.printStatsAfter"; //100000
177-
public static final String FIELD_GUARDRAIL = "spark.fieldGuardraillimitMB"; //10
177+
public static final String FIELD_GUARDRAIL_MB = "spark.fieldGuardraillimitMB"; //10
178178
public static final String PARTITION_MIN = "spark.origin.minPartition"; // -9223372036854775808
179179
public static final String PARTITION_MAX = "spark.origin.maxPartition"; // 9223372036854775807
180180

@@ -195,8 +195,8 @@ public enum PropertyType {
195195
defaults.put(READ_FETCH_SIZE, "1000");
196196
types.put(SPARK_STATS_AFTER, PropertyType.NUMBER);
197197
defaults.put(SPARK_STATS_AFTER, "100000");
198-
types.put(FIELD_GUARDRAIL, PropertyType.NUMBER);
199-
defaults.put(FIELD_GUARDRAIL, "10");
198+
types.put(FIELD_GUARDRAIL_MB, PropertyType.NUMBER);
199+
defaults.put(FIELD_GUARDRAIL_MB, "10");
200200
types.put(PARTITION_MIN, PropertyType.NUMBER);
201201
defaults.put(PARTITION_MIN, "-9223372036854775808");
202202
types.put(PARTITION_MAX, PropertyType.NUMBER);

src/main/scala/datastax/astra/migrate/AbstractJob.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@ class AbstractJob extends BaseJob {
99
abstractLogger.info("PARAM -- Max Partition: " + maxPartition)
1010
abstractLogger.info("PARAM -- Number of Splits : " + numSplits)
1111
abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent)
12-
abstractLogger.info("PARAM -- Origin SSL Enabled: {}", sourceSSLEnabled);
13-
abstractLogger.info("PARAM -- Target SSL Enabled: {}", destinationSSLEnabled);
12+
abstractLogger.info("PARAM -- Origin SSL Enabled: {}", originSSLEnabled);
13+
abstractLogger.info("PARAM -- Target SSL Enabled: {}", targetSSLEnabled);
1414

15-
var sourceConnection = getConnection(true, sourceScbPath, sourceHost, sourcePort, sourceUsername, sourcePassword, sourceSSLEnabled,
16-
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms);
15+
var originConnection = getConnection(true, originScbPath, originHost, originPort, originUsername, originPassword, originSSLEnabled,
16+
originTrustStorePath, originTrustStorePassword, originTrustStoreType, originKeyStorePath, originKeyStorePassword, originEnabledAlgorithms);
1717

18-
var destinationConnection = getConnection(false, destinationScbPath, destinationHost, destinationPort, destinationUsername, destinationPassword, destinationSSLEnabled,
19-
destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, destinationKeyStorePassword, destinationEnabledAlgorithms);
18+
var targetConnection = getConnection(false, targetScbPath, targetHost, targetPort, targetUsername, targetPassword, targetSSLEnabled,
19+
targetTrustStorePath, targetTrustStorePassword, targetTrustStoreType, targetKeyStorePath, targetKeyStorePassword, targetEnabledAlgorithms);
2020

2121
private def getConnection(isSource: Boolean, scbPath: String, host: String, port: String, username: String, password: String,
2222
sslEnabled: String, trustStorePath: String, trustStorePassword: String, trustStoreType: String,

0 commit comments

Comments
 (0)