Skip to content

Commit 7407828

Browse files
committed
initial commit
1 parent 8140df0 commit 7407828

File tree

5 files changed

+274
-26
lines changed

5 files changed

+274
-26
lines changed

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -154,15 +154,6 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
154154
}
155155
}
156156

157-
public List<MigrateDataType> getTypes(String types) {
158-
List<MigrateDataType> dataTypes = new ArrayList<MigrateDataType>();
159-
for (String type : types.split(",")) {
160-
dataTypes.add(new MigrateDataType(type));
161-
}
162-
163-
return dataTypes;
164-
}
165-
166157
public int getLargestTTL(Row sourceRow) {
167158
return IntStream.range(0, ttlCols.size())
168159
.map(i -> sourceRow.getInt(selectColTypes.size() + i)).max().getAsInt();
@@ -184,21 +175,4 @@ public BoundStatement selectFromAstra(PreparedStatement selectStatement, Row sou
184175
return boundSelectStatement;
185176
}
186177

187-
public Object getData(MigrateDataType dataType, int index, Row sourceRow) {
188-
if (dataType.typeClass == Map.class) {
189-
return sourceRow.getMap(index, dataType.subTypes.get(0), dataType.subTypes.get(1));
190-
} else if (dataType.typeClass == List.class) {
191-
return sourceRow.getList(index, dataType.subTypes.get(0));
192-
} else if (dataType.typeClass == Set.class) {
193-
return sourceRow.getSet(index, dataType.subTypes.get(0));
194-
} else if (isCounterTable && dataType.typeClass == Long.class) {
195-
Object data = sourceRow.get(index, dataType.typeClass);
196-
if (data == null) {
197-
return new Long(0);
198-
}
199-
}
200-
201-
return sourceRow.get(index, dataType.typeClass);
202-
}
203-
204178
}

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22

33
import com.datastax.oss.driver.api.core.CqlSession;
44
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
5+
import com.datastax.oss.driver.api.core.cql.Row;
56
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
67

78
import java.util.ArrayList;
89
import java.util.List;
10+
import java.util.Map;
11+
import java.util.Set;
912

1013
public abstract class BaseJobSession {
1114

@@ -46,4 +49,29 @@ public abstract class BaseJobSession {
4649

4750
protected Boolean hasRandomPartitioner;
4851

52+
public List<MigrateDataType> getTypes(String types) {
53+
List<MigrateDataType> dataTypes = new ArrayList<MigrateDataType>();
54+
for (String type : types.split(",")) {
55+
dataTypes.add(new MigrateDataType(type));
56+
}
57+
58+
return dataTypes;
59+
}
60+
61+
public Object getData(MigrateDataType dataType, int index, Row sourceRow) {
62+
if (dataType.typeClass == Map.class) {
63+
return sourceRow.getMap(index, dataType.subTypes.get(0), dataType.subTypes.get(1));
64+
} else if (dataType.typeClass == List.class) {
65+
return sourceRow.getList(index, dataType.subTypes.get(0));
66+
} else if (dataType.typeClass == Set.class) {
67+
return sourceRow.getSet(index, dataType.subTypes.get(0));
68+
} else if (isCounterTable && dataType.typeClass == Long.class) {
69+
Object data = sourceRow.get(index, dataType.typeClass);
70+
if (data == null) {
71+
return new Long(0);
72+
}
73+
}
74+
75+
return sourceRow.get(index, dataType.typeClass);
76+
}
4977
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package datastax.astra.migrate;
2+
3+
import com.datastax.oss.driver.api.core.CqlSession;
4+
import com.datastax.oss.driver.api.core.cql.*;
5+
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
6+
import org.apache.commons.lang.SerializationUtils;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
import org.apache.spark.SparkConf;
10+
11+
import java.io.Serializable;
12+
import java.math.BigInteger;
13+
import java.util.*;
14+
import java.util.concurrent.CompletionStage;
15+
import java.util.concurrent.atomic.AtomicLong;
16+
17+
public class OriginCountJobSession extends BaseJobSession{
18+
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
19+
private static OriginCountJobSession originCountJobSession;
20+
protected AtomicLong readCounter = new AtomicLong(0);
21+
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();
22+
protected Boolean checkTableforColSize;
23+
protected String checkTableforselectCols;
24+
protected String filterColName;
25+
protected String filterColType;
26+
protected Integer filterColIndex;
27+
protected List<MigrateDataType> checkTableforColSizeTypes = new ArrayList<MigrateDataType>();
28+
public static OriginCountJobSession getInstance(CqlSession sourceSession, SparkConf sparkConf) {
29+
if (originCountJobSession == null) {
30+
synchronized (OriginCountJobSession.class) {
31+
if (originCountJobSession == null) {
32+
originCountJobSession = new OriginCountJobSession(sourceSession, sparkConf);
33+
}
34+
}
35+
}
36+
37+
return originCountJobSession;
38+
}
39+
40+
protected OriginCountJobSession(CqlSession sourceSession, SparkConf sparkConf) {
41+
this.sourceSession = sourceSession;
42+
batchSize = new Integer(sparkConf.get("spark.batchSize", "1"));
43+
printStatsAfter = new Integer(sparkConf.get("spark.printStatsAfter", "100000"));
44+
if (printStatsAfter < 1) {
45+
printStatsAfter = 100000;
46+
}
47+
48+
readLimiter = RateLimiter.create(new Integer(sparkConf.get("spark.readRateLimit", "20000")));
49+
sourceKeyspaceTable = sparkConf.get("spark.source.keyspaceTable");
50+
51+
hasRandomPartitioner = Boolean.parseBoolean(sparkConf.get("spark.source.hasRandomPartitioner", "false"));
52+
isCounterTable = Boolean.parseBoolean(sparkConf.get("spark.source.counterTable", "false"));
53+
54+
checkTableforColSize = Boolean.parseBoolean(sparkConf.get("spark.source.checkTableforColSize", "false"));
55+
checkTableforselectCols = sparkConf.get("spark.source.checkTableforColSize.cols");
56+
checkTableforColSizeTypes = getTypes(sparkConf.get("spark.source.checkTableforColSize.cols.types"));
57+
filterColName = sparkConf.get("spark.source.FilterColumn");
58+
filterColType = sparkConf.get("spark.source.FilterColumnType");
59+
filterColIndex = Integer.parseInt(sparkConf.get("spark.source.FilterColumnIndex", "0"));
60+
61+
String partionKey = sparkConf.get("spark.query.cols.partitionKey");
62+
idColTypes = getTypes(sparkConf.get("spark.query.cols.id.types"));
63+
64+
String selectCols = sparkConf.get("spark.query.cols.select");
65+
String updateSelectMappingStr = sparkConf.get("spark.source.counterTable.update.select.index", "0");
66+
for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
67+
updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
68+
}
69+
String sourceSelectCondition = sparkConf.get("spark.query.cols.select.condition", "");
70+
sourceSelectStatement = sourceSession.prepare(
71+
"select " + selectCols + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
72+
+ ") >= ? and token(" + partionKey.trim() + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING");
73+
74+
}
75+
76+
public void getData(BigInteger min, BigInteger max) {
77+
logger.info("TreadID: " + Thread.currentThread().getId() + " Processing min: " + min + " max:" + max);
78+
int maxAttempts = maxRetries;
79+
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
80+
81+
try {
82+
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ? min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact()));
83+
Collection<CompletionStage<AsyncResultSet>> writeResults = new ArrayList<CompletionStage<AsyncResultSet>>();
84+
85+
// cannot do batching if the writeFilter is greater than 0 or
86+
// maxWriteTimeStampFilter is less than max long
87+
// do not batch for counters as it adds latency & increases chance of discrepancy
88+
if (batchSize == 1 || writeTimeStampFilter || isCounterTable) {
89+
for (Row sourceRow : resultSet) {
90+
readLimiter.acquire(1);
91+
92+
if(checkTableforColSize) {
93+
int rowColcnt = GetRowColumnLength(sourceRow, filterColType, filterColIndex);
94+
String result = "";
95+
if (rowColcnt > 1024 * 1024 * 10) {
96+
for (int index = 0; index < checkTableforColSizeTypes.size(); index++) {
97+
MigrateDataType dataType = checkTableforColSizeTypes.get(index);
98+
Object colData = getData(dataType, index, sourceRow);
99+
String[] colName = checkTableforselectCols.split(",");
100+
result = result + " - " + colName[index] + " : " + colData;
101+
}
102+
logger.error("ThreadID: " + Thread.currentThread().getId() + result + " - " + filterColName + " length: " + rowColcnt);
103+
continue;
104+
}
105+
}
106+
}
107+
108+
} else {
109+
BatchStatement batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
110+
for (Row sourceRow : resultSet) {
111+
readLimiter.acquire(1);
112+
writeLimiter.acquire(1);
113+
114+
if(checkTableforColSize) {
115+
int rowColcnt = GetRowColumnLength(sourceRow, filterColType, filterColIndex);
116+
String result = "";
117+
if (rowColcnt > 1024 * 1024 * 10) {
118+
for (int index = 0; index < checkTableforColSizeTypes.size(); index++) {
119+
MigrateDataType dataType = checkTableforColSizeTypes.get(index);
120+
Object colData = getData(dataType, index, sourceRow);
121+
String[] colName = checkTableforselectCols.split(",");
122+
result = result + " - " + colName[index] + " : " + colData;
123+
}
124+
logger.error("ThreadID: " + Thread.currentThread().getId() + result + " - " + filterColName + " length: " + rowColcnt);
125+
continue;
126+
}
127+
}
128+
129+
if (readCounter.incrementAndGet() % 1000 == 0) {
130+
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Record Count: " + readCounter.get());
131+
}
132+
133+
}
134+
}
135+
136+
137+
logger.info("TreadID: " + Thread.currentThread().getId() + " Final Read Record Count: " + readCounter.get());
138+
retryCount = maxAttempts;
139+
} catch (Exception e) {
140+
logger.error("Error occurred retry#: " + retryCount, e);
141+
logger.error("Error with PartitionRange -- TreadID: " + Thread.currentThread().getId() + " Processing min: " + min + " max:" + max + " -- Retry# " + retryCount);
142+
}
143+
}
144+
145+
}
146+
147+
private int GetRowColumnLength(Row sourceRow, String filterColType, Integer filterColIndex) {
148+
int i = 0;
149+
Object colData = getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
150+
byte[] colBytes = SerializationUtils.serialize((Serializable) colData);
151+
i = colBytes.length;
152+
if (i > 1024*1024*10)
153+
return i;
154+
return i;
155+
}
156+
157+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package datastax.astra.migrate
2+
3+
import com.datastax.spark.connector.cql.CassandraConnector
4+
import org.slf4j.LoggerFactory
5+
6+
import scala.collection.JavaConversions._
7+
8+
object OriginData extends BaseJob {
9+
10+
val logger = LoggerFactory.getLogger(this.getClass.getName)
11+
logger.info("Started Migration App")
12+
var sourceConnection = getConnection(true, sourceIsAstra, sourceScbPath, sourceHost, sourceUsername, sourcePassword, sourceReadConsistencyLevel,
13+
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms);
14+
analyzeSourceTable(sourceConnection)
15+
exitSpark
16+
17+
18+
private def getConnection(isSource: Boolean, isAstra: String, scbPath: String, host: String, username: String, password: String, readConsistencyLevel: String,
19+
trustStorePath: String, trustStorePassword: String, trustStoreType: String,
20+
keyStorePath: String, keyStorePassword: String, enabledAlgorithms: String): CassandraConnector = {
21+
var connType: String = "Source"
22+
23+
if ("true".equals(isAstra)) {
24+
abstractLogger.info(connType + ": Connected to Astra!");
25+
26+
return CassandraConnector(sc
27+
.set("spark.cassandra.auth.username", username)
28+
.set("spark.cassandra.auth.password", password)
29+
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
30+
.set("spark.cassandra.connection.config.cloud.path", scbPath))
31+
} else if (null != trustStorePath && !trustStorePath.trim.isEmpty) {
32+
abstractLogger.info(connType + ": Connected to Cassandra (or DSE) with SSL!");
33+
34+
// Use defaults when not provided
35+
var enabledAlgorithmsVar = enabledAlgorithms
36+
if (enabledAlgorithms == null || enabledAlgorithms.trim.isEmpty) {
37+
enabledAlgorithmsVar = "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA"
38+
}
39+
40+
return CassandraConnector(sc
41+
.set("spark.cassandra.auth.username", username)
42+
.set("spark.cassandra.auth.password", password)
43+
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
44+
.set("spark.cassandra.connection.host", host)
45+
.set("spark.cassandra.connection.ssl.enabled", "true")
46+
.set("spark.cassandra.connection.ssl.enabledAlgorithms", enabledAlgorithmsVar)
47+
.set("spark.cassandra.connection.ssl.trustStore.password", trustStorePassword)
48+
.set("spark.cassandra.connection.ssl.trustStore.path", trustStorePath)
49+
.set("spark.cassandra.connection.ssl.keyStore.password", keyStorePassword)
50+
.set("spark.cassandra.connection.ssl.keyStore.path", keyStorePath)
51+
.set("spark.cassandra.connection.ssl.trustStore.type", trustStoreType)
52+
.set("spark.cassandra.connection.ssl.clientAuth.enabled", "true")
53+
)
54+
} else {
55+
abstractLogger.info(connType + ": Connected to Cassandra (or DSE)!");
56+
57+
return CassandraConnector(sc.set("spark.cassandra.auth.username", username)
58+
.set("spark.cassandra.auth.password", password)
59+
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
60+
.set("spark.cassandra.connection.host", host))
61+
}
62+
63+
}
64+
65+
private def analyzeSourceTable(sourceConnection: CassandraConnector) = {
66+
val partitions = SplitPartitions.getRandomSubPartitions(splitSize, minPartition, maxPartition, Integer.parseInt(coveragePercent))
67+
logger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
68+
val parts = sContext.parallelize(partitions.toSeq,partitions.size);
69+
logger.info("Spark parallelize created : " + parts.count() + " parts!");
70+
71+
parts.foreach(part => {
72+
sourceConnection.withSessionDo(sourceSession =>
73+
OriginCountJobSession.getInstance(sourceSession, sc)
74+
.getData(part.getMin, part.getMax))
75+
})
76+
77+
}
78+
79+
}
80+
81+
82+
83+

src/resources/sparkConf.properties

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ spark.origin.username some-username
44
spark.origin.password some-secret-password
55
spark.origin.read.consistency.level LOCAL_QUORUM
66
spark.origin.keyspaceTable test.a1
7+
spark.source.checkTableforColSize false
8+
spark.source.checkTableforColSize.cols partition-key,clustering-key
9+
spark.source.checkTableforColSize.cols.types 9,1
10+
spark.source.FilterColumn test
11+
spark.source.FilterColumnIndex 2
12+
spark.source.FilterColumnType 6%16
713

814
spark.target.isAstra true
915
spark.target.scb file:///aaa/bbb/secure-connect-enterprise.zip

0 commit comments

Comments
 (0)