Skip to content

Commit d3b19ae

Browse files
committed
Implemented flex-schema. Destination schema column name can be different from source.
Also simplified config - No need to pass destination/insert schema if its same as source, No need to id-types as its auto-computed.
1 parent 56ba843 commit d3b19ae

File tree

4 files changed

+75
-90
lines changed

4 files changed

+75
-90
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>1.3</version>
6+
<version>1.4</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 61 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,6 @@
1616

1717
public abstract class AbstractJobSession {
1818

19-
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
20-
21-
protected PreparedStatement sourceSelectStatement;
22-
protected String sourceSelectCondition;
23-
24-
protected PreparedStatement astraSelectStatement;
25-
2619
// Read/Write Rate limiter
2720
// Determine the total throughput for the entire cluster in terms of wries/sec,
2821
// reads/sec
@@ -31,12 +24,18 @@ public abstract class AbstractJobSession {
3124
// Rate = Total Throughput (write/read per sec) / Total Executors
3225
protected final RateLimiter readLimiter;
3326
protected final RateLimiter writeLimiter;
27+
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
28+
protected PreparedStatement sourceSelectStatement;
29+
protected String sourceSelectCondition;
30+
protected PreparedStatement astraSelectStatement;
31+
protected PreparedStatement astraInsertStatement;
3432
protected Integer maxRetries = 10;
3533

3634
protected CqlSession sourceSession;
3735
protected CqlSession astraSession;
3836
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
3937
protected List<MigrateDataType> idColTypes = new ArrayList<MigrateDataType>();
38+
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();
4039

4140
protected Integer batchSize = 1;
4241
protected Integer printStatsAfter = 100000;
@@ -106,45 +105,69 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
106105
maxWriteTimeStampFilter = Long.parseLong(maxWriteTimeStampFilterStr);
107106
}
108107

109-
logger.info(" DEFAULT -- Write Batch Size: " + batchSize);
110-
logger.info(" DEFAULT -- Source Keyspace Table: " + sourceKeyspaceTable);
111-
logger.info(" DEFAULT -- Destination Keyspace Table: " + astraKeyspaceTable);
112-
logger.info(" DEFAULT -- ReadRateLimit: " + readLimiter.getRate());
113-
logger.info(" DEFAULT -- WriteRateLimit: " + writeLimiter.getRate());
114-
logger.info(" DEFAULT -- WriteTimestampFilter: " + writeTimeStampFilter);
115-
logger.info(" DEFAULT -- WriteTimestampFilterCols: " + writeTimeStampCols);
116-
logger.info(" DEFAULT -- isPreserveTTLWritetime: " + isPreserveTTLWritetime);
117-
logger.info(" DEFAULT -- TTLCols: " + ttlCols);
108+
logger.info("PARAM -- Write Batch Size: " + batchSize);
109+
logger.info("PARAM -- Source Keyspace Table: " + sourceKeyspaceTable);
110+
logger.info("PARAM -- Destination Keyspace Table: " + astraKeyspaceTable);
111+
logger.info("PARAM -- ReadRateLimit: " + readLimiter.getRate());
112+
logger.info("PARAM -- WriteRateLimit: " + writeLimiter.getRate());
113+
logger.info("PARAM -- WriteTimestampFilter: " + writeTimeStampFilter);
114+
logger.info("PARAM -- WriteTimestampFilterCols: " + writeTimeStampCols);
115+
logger.info("PARAM -- isPreserveTTLWritetime: " + isPreserveTTLWritetime);
116+
logger.info("PARAM -- isPreserveTTLWritetime: " + isPreserveTTLWritetime);
117+
logger.info("PARAM -- TTLCols: " + ttlCols);
118+
119+
String selectCols = sparkConf.get("spark.query.source");
120+
String partionKey = sparkConf.get("spark.query.source.partitionKey");
121+
selectColTypes = getTypes(sparkConf.get("spark.query.types"));
122+
String idCols = sparkConf.get("spark.query.destination.id", "");
123+
idColTypes = selectColTypes.subList(0, idCols.split(",").length);
124+
sourceSelectCondition = sparkConf.get("spark.query.condition", "");
125+
sourceSelectStatement = sourceSession.prepare(
126+
"select " + selectCols + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
127+
+ ") >= ? and token(" + partionKey.trim() + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING");
118128

119-
hasRandomPartitioner = Boolean.parseBoolean(sparkConf.get("spark.source.hasRandomPartitioner", "false"));
129+
String insertCols = sparkConf.get("spark.query.destination", "");
130+
if (null == insertCols || insertCols.trim().isEmpty()) {
131+
insertCols = selectCols;
132+
}
133+
String insertBinds = "";
134+
for (String str : idCols.split(",")) {
135+
if (insertBinds.isEmpty()) {
136+
insertBinds = str + "= ?";
137+
} else {
138+
insertBinds += " and " + str + "= ?";
139+
}
140+
}
141+
astraSelectStatement = astraSession.prepare(
142+
"select " + insertCols + " from " + astraKeyspaceTable
143+
+ " where " + insertBinds);
120144

145+
hasRandomPartitioner = Boolean.parseBoolean(sparkConf.get("spark.source.hasRandomPartitioner", "false"));
121146
isCounterTable = Boolean.parseBoolean(sparkConf.get("spark.counterTable", "false"));
122-
selectColTypes = getTypes(sparkConf.get("spark.diff.select.types"));
123-
String partionKey = sparkConf.get("spark.query.cols.partitionKey");
124-
String idCols = sparkConf.get("spark.query.cols.id");
125-
idColTypes = getTypes(sparkConf.get("spark.query.cols.id.types"));
147+
if (isCounterTable) {
148+
String updateSelectMappingStr = sparkConf.get("spark.counterTable.cql.index", "0");
149+
for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
150+
updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
151+
}
126152

127-
String selectCols = sparkConf.get("spark.query.cols.select");
153+
String counterTableUpdate = sparkConf.get("spark.counterTable.cql");
154+
astraInsertStatement = astraSession.prepare(counterTableUpdate);
155+
} else {
156+
insertBinds = "";
157+
for (String str : insertCols.split(",")) {
158+
if (insertBinds.isEmpty()) {
159+
insertBinds += "?";
160+
} else {
161+
insertBinds += ", ?";
162+
}
163+
}
128164

129-
String idBinds = "";
130-
int count = 1;
131-
for (String str : idCols.split(",")) {
132-
if (count > 1) {
133-
idBinds = idBinds + " and " + str + "= ?";
165+
if (isPreserveTTLWritetime) {
166+
astraInsertStatement = astraSession.prepare("insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ") using TTL ? and TIMESTAMP ?");
134167
} else {
135-
idBinds = str + "= ?";
168+
astraInsertStatement = astraSession.prepare("insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")");
136169
}
137-
count++;
138170
}
139-
140-
sourceSelectCondition = sparkConf.get("spark.query.cols.select.condition", "");
141-
sourceSelectStatement = sourceSession.prepare(
142-
"select " + selectCols + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
143-
+ ") >= ? and token(" + partionKey.trim() + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING");
144-
145-
astraSelectStatement = astraSession.prepare(
146-
"select " + selectCols + " from " + astraKeyspaceTable
147-
+ " where " + idBinds);
148171
}
149172

150173
public List<MigrateDataType> getTypes(String types) {

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 8 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,20 @@
99
import java.math.BigInteger;
1010
import java.util.ArrayList;
1111
import java.util.Collection;
12-
import java.util.List;
1312
import java.util.Map;
1413
import java.util.concurrent.CompletionStage;
1514
import java.util.concurrent.atomic.AtomicLong;
1615

1716
public class CopyJobSession extends AbstractJobSession {
1817

19-
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2018
private static CopyJobSession copyJobSession;
21-
22-
protected PreparedStatement astraInsertStatement;
19+
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2320
protected AtomicLong readCounter = new AtomicLong(0);
2421
protected AtomicLong writeCounter = new AtomicLong(0);
2522

26-
protected List<MigrateDataType> insertColTypes = new ArrayList<MigrateDataType>();
27-
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();
23+
protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
24+
super(sourceSession, astraSession, sparkConf);
25+
}
2826

2927
public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
3028
if (copyJobSession == null) {
@@ -38,39 +36,6 @@ public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession as
3836
return copyJobSession;
3937
}
4038

41-
protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
42-
super(sourceSession, astraSession, sparkConf);
43-
44-
String insertCols = sparkConf.get("spark.query.cols.insert");
45-
insertColTypes = getTypes(sparkConf.get("spark.query.cols.insert.types"));
46-
String insertBinds = "";
47-
int count = 1;
48-
for (String str : insertCols.split(",")) {
49-
if (count > 1) {
50-
insertBinds = insertBinds + ",?";
51-
} else {
52-
insertBinds = insertBinds + "?";
53-
}
54-
count++;
55-
}
56-
57-
if (isCounterTable) {
58-
String updateSelectMappingStr = sparkConf.get("spark.counterTable.cql.index", "0");
59-
for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
60-
updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
61-
}
62-
63-
String counterTableUpdate = sparkConf.get("spark.counterTable.cql");
64-
astraInsertStatement = astraSession.prepare(counterTableUpdate);
65-
} else {
66-
if (isPreserveTTLWritetime) {
67-
astraInsertStatement = astraSession.prepare("insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ") using TTL ? and TIMESTAMP ?");
68-
} else {
69-
astraInsertStatement = astraSession.prepare("insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")");
70-
}
71-
}
72-
}
73-
7439
public void getDataAndInsert(BigInteger min, BigInteger max) {
7540
logger.info("TreadID: " + Thread.currentThread().getId() + " Processing min: " + min + " max:" + max);
7641
int maxAttempts = maxRetries;
@@ -179,8 +144,8 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
179144
BoundStatement boundInsertStatement = insertStatement.bind();
180145

181146
if (isCounterTable) {
182-
for (int index = 0; index < insertColTypes.size(); index++) {
183-
MigrateDataType dataType = insertColTypes.get(index);
147+
for (int index = 0; index < selectColTypes.size(); index++) {
148+
MigrateDataType dataType = selectColTypes.get(updateSelectMapping.get(index));
184149
// compute the counter delta if reading from astra for the difference
185150
if (astraRow != null && index < (selectColTypes.size() - idColTypes.size())) {
186151
boundInsertStatement = boundInsertStatement.set(index, (sourceRow.getLong(updateSelectMapping.get(index)) - astraRow.getLong(updateSelectMapping.get(index))), Long.class);
@@ -190,8 +155,8 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
190155
}
191156
} else {
192157
int index = 0;
193-
for (index = 0; index < insertColTypes.size(); index++) {
194-
MigrateDataType dataTypeObj = insertColTypes.get(index);
158+
for (index = 0; index < selectColTypes.size(); index++) {
159+
MigrateDataType dataTypeObj = selectColTypes.get(index);
195160
Class dataType = dataTypeObj.typeClass;
196161

197162
try {

src/resources/sparkConf.properties

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,11 @@ spark.batchSize 5
2222
spark.coveragePercent 100
2323
spark.printStatsAfter 100000
2424

25-
spark.query.cols.select partition-key,clustering-key,order-date,amount,writetime(order-date),writetime(amount),ttl(order-date),ttl(amount)
26-
spark.diff.select.types 9,1,4,3
27-
spark.query.cols.id partition-key,clustering-key
28-
spark.query.cols.id.types 9,1
29-
spark.query.cols.partitionKey partition-key
30-
31-
spark.query.cols.insert partition-key,clustering-key,order-date,amount
32-
spark.query.cols.insert.types 9,1,4,3
25+
spark.query.source partition-key,clustering-key,order-date,amount,writetime(order-date),writetime(amount),ttl(order-date),ttl(amount)
26+
spark.query.source.partitionKey partition-key
27+
spark.query.destination partition-key,clustering-key,order-date,amount
28+
spark.query.destination.id partition-key,clustering-key
29+
spark.query.types 9,1,4,3
3330

3431
spark.counterTable false
3532
spark.counterTable.cql

0 commit comments

Comments
 (0)