Skip to content

Commit 1c9ed15

Browse files
authored
Merge pull request #9 from datastax/feature/flex_schema
Implemented flex-schema
2 parents 350e998 + 26895fe commit 1c9ed15

File tree

5 files changed

+73
-88
lines changed

5 files changed

+73
-88
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>1.3</version>
6+
<version>1.4</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 57 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616

1717
public class AbstractJobSession extends BaseJobSession {
1818

19-
19+
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
20+
2021
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
2122
this.sourceSession = sourceSession;
2223
this.astraSession = astraSession;
@@ -68,45 +69,69 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
6869
maxWriteTimeStampFilter = Long.parseLong(maxWriteTimeStampFilterStr);
6970
}
7071

71-
logger.info(" DEFAULT -- Write Batch Size: " + batchSize);
72-
logger.info(" DEFAULT -- Source Keyspace Table: " + sourceKeyspaceTable);
73-
logger.info(" DEFAULT -- Destination Keyspace Table: " + astraKeyspaceTable);
74-
logger.info(" DEFAULT -- ReadRateLimit: " + readLimiter.getRate());
75-
logger.info(" DEFAULT -- WriteRateLimit: " + writeLimiter.getRate());
76-
logger.info(" DEFAULT -- WriteTimestampFilter: " + writeTimeStampFilter);
77-
logger.info(" DEFAULT -- WriteTimestampFilterCols: " + writeTimeStampCols);
78-
logger.info(" DEFAULT -- isPreserveTTLWritetime: " + isPreserveTTLWritetime);
79-
logger.info(" DEFAULT -- TTLCols: " + ttlCols);
72+
logger.info("PARAM -- Write Batch Size: " + batchSize);
73+
logger.info("PARAM -- Source Keyspace Table: " + sourceKeyspaceTable);
74+
logger.info("PARAM -- Destination Keyspace Table: " + astraKeyspaceTable);
75+
logger.info("PARAM -- ReadRateLimit: " + readLimiter.getRate());
76+
logger.info("PARAM -- WriteRateLimit: " + writeLimiter.getRate());
77+
logger.info("PARAM -- WriteTimestampFilter: " + writeTimeStampFilter);
78+
logger.info("PARAM -- WriteTimestampFilterCols: " + writeTimeStampCols);
79+
logger.info("PARAM -- isPreserveTTLWritetime: " + isPreserveTTLWritetime);
80+
logger.info("PARAM -- isPreserveTTLWritetime: " + isPreserveTTLWritetime);
81+
logger.info("PARAM -- TTLCols: " + ttlCols);
82+
83+
String selectCols = sparkConf.get("spark.query.source");
84+
String partionKey = sparkConf.get("spark.query.source.partitionKey");
85+
selectColTypes = getTypes(sparkConf.get("spark.query.types"));
86+
String idCols = sparkConf.get("spark.query.destination.id", "");
87+
idColTypes = selectColTypes.subList(0, idCols.split(",").length);
88+
String sourceSelectCondition = sparkConf.get("spark.query.condition", "");
89+
sourceSelectStatement = sourceSession.prepare(
90+
"select " + selectCols + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
91+
+ ") >= ? and token(" + partionKey.trim() + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING");
8092

81-
hasRandomPartitioner = Boolean.parseBoolean(sparkConf.get("spark.source.hasRandomPartitioner", "false"));
93+
String insertCols = sparkConf.get("spark.query.destination", "");
94+
if (null == insertCols || insertCols.trim().isEmpty()) {
95+
insertCols = selectCols;
96+
}
97+
String insertBinds = "";
98+
for (String str : idCols.split(",")) {
99+
if (insertBinds.isEmpty()) {
100+
insertBinds = str + "= ?";
101+
} else {
102+
insertBinds += " and " + str + "= ?";
103+
}
104+
}
105+
astraSelectStatement = astraSession.prepare(
106+
"select " + insertCols + " from " + astraKeyspaceTable
107+
+ " where " + insertBinds);
82108

109+
hasRandomPartitioner = Boolean.parseBoolean(sparkConf.get("spark.source.hasRandomPartitioner", "false"));
83110
isCounterTable = Boolean.parseBoolean(sparkConf.get("spark.counterTable", "false"));
84-
selectColTypes = getTypes(sparkConf.get("spark.diff.select.types"));
85-
String partionKey = sparkConf.get("spark.query.cols.partitionKey");
86-
String idCols = sparkConf.get("spark.query.cols.id");
87-
idColTypes = getTypes(sparkConf.get("spark.query.cols.id.types"));
111+
if (isCounterTable) {
112+
String updateSelectMappingStr = sparkConf.get("spark.counterTable.cql.index", "0");
113+
for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
114+
updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
115+
}
88116

89-
String selectCols = sparkConf.get("spark.query.cols.select");
117+
String counterTableUpdate = sparkConf.get("spark.counterTable.cql");
118+
astraInsertStatement = astraSession.prepare(counterTableUpdate);
119+
} else {
120+
insertBinds = "";
121+
for (String str : insertCols.split(",")) {
122+
if (insertBinds.isEmpty()) {
123+
insertBinds += "?";
124+
} else {
125+
insertBinds += ", ?";
126+
}
127+
}
90128

91-
String idBinds = "";
92-
int count = 1;
93-
for (String str : idCols.split(",")) {
94-
if (count > 1) {
95-
idBinds = idBinds + " and " + str + "= ?";
129+
if (isPreserveTTLWritetime) {
130+
astraInsertStatement = astraSession.prepare("insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ") using TTL ? and TIMESTAMP ?");
96131
} else {
97-
idBinds = str + "= ?";
132+
astraInsertStatement = astraSession.prepare("insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")");
98133
}
99-
count++;
100134
}
101-
102-
sourceSelectCondition = sparkConf.get("spark.query.cols.select.condition", "");
103-
sourceSelectStatement = sourceSession.prepare(
104-
"select " + selectCols + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
105-
+ ") >= ? and token(" + partionKey.trim() + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING");
106-
107-
astraSelectStatement = astraSession.prepare(
108-
"select " + selectCols + " from " + astraKeyspaceTable
109-
+ " where " + idBinds);
110135
}
111136

112137
public List<MigrateDataType> getTypes(String types) {

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,9 @@
1111

1212
public abstract class BaseJobSession {
1313

14-
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
15-
1614
protected PreparedStatement sourceSelectStatement;
17-
protected String sourceSelectCondition;
18-
1915
protected PreparedStatement astraSelectStatement;
16+
protected PreparedStatement astraInsertStatement;
2017

2118
// Read/Write Rate limiter
2219
// Determine the total throughput for the entire cluster in terms of wries/sec,
@@ -32,6 +29,7 @@ public abstract class BaseJobSession {
3229
protected CqlSession astraSession;
3330
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
3431
protected List<MigrateDataType> idColTypes = new ArrayList<MigrateDataType>();
32+
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();
3533

3634
protected Integer batchSize = 1;
3735
protected Integer printStatsAfter = 100000;

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 8 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,20 @@
99
import java.math.BigInteger;
1010
import java.util.ArrayList;
1111
import java.util.Collection;
12-
import java.util.List;
1312
import java.util.Map;
1413
import java.util.concurrent.CompletionStage;
1514
import java.util.concurrent.atomic.AtomicLong;
1615

1716
public class CopyJobSession extends AbstractJobSession {
1817

19-
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2018
private static CopyJobSession copyJobSession;
21-
22-
protected PreparedStatement astraInsertStatement;
19+
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2320
protected AtomicLong readCounter = new AtomicLong(0);
2421
protected AtomicLong writeCounter = new AtomicLong(0);
2522

26-
protected List<MigrateDataType> insertColTypes = new ArrayList<MigrateDataType>();
27-
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();
23+
protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
24+
super(sourceSession, astraSession, sparkConf);
25+
}
2826

2927
public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
3028
if (copyJobSession == null) {
@@ -38,39 +36,6 @@ public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession as
3836
return copyJobSession;
3937
}
4038

41-
protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
42-
super(sourceSession, astraSession, sparkConf);
43-
44-
String insertCols = sparkConf.get("spark.query.cols.insert");
45-
insertColTypes = getTypes(sparkConf.get("spark.query.cols.insert.types"));
46-
String insertBinds = "";
47-
int count = 1;
48-
for (String str : insertCols.split(",")) {
49-
if (count > 1) {
50-
insertBinds = insertBinds + ",?";
51-
} else {
52-
insertBinds = insertBinds + "?";
53-
}
54-
count++;
55-
}
56-
57-
if (isCounterTable) {
58-
String updateSelectMappingStr = sparkConf.get("spark.counterTable.cql.index", "0");
59-
for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
60-
updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
61-
}
62-
63-
String counterTableUpdate = sparkConf.get("spark.counterTable.cql");
64-
astraInsertStatement = astraSession.prepare(counterTableUpdate);
65-
} else {
66-
if (isPreserveTTLWritetime) {
67-
astraInsertStatement = astraSession.prepare("insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ") using TTL ? and TIMESTAMP ?");
68-
} else {
69-
astraInsertStatement = astraSession.prepare("insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")");
70-
}
71-
}
72-
}
73-
7439
public void getDataAndInsert(BigInteger min, BigInteger max) {
7540
logger.info("TreadID: " + Thread.currentThread().getId() + " Processing min: " + min + " max:" + max);
7641
int maxAttempts = maxRetries;
@@ -179,8 +144,8 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
179144
BoundStatement boundInsertStatement = insertStatement.bind();
180145

181146
if (isCounterTable) {
182-
for (int index = 0; index < insertColTypes.size(); index++) {
183-
MigrateDataType dataType = insertColTypes.get(index);
147+
for (int index = 0; index < selectColTypes.size(); index++) {
148+
MigrateDataType dataType = selectColTypes.get(updateSelectMapping.get(index));
184149
// compute the counter delta if reading from astra for the difference
185150
if (astraRow != null && index < (selectColTypes.size() - idColTypes.size())) {
186151
boundInsertStatement = boundInsertStatement.set(index, (sourceRow.getLong(updateSelectMapping.get(index)) - astraRow.getLong(updateSelectMapping.get(index))), Long.class);
@@ -190,8 +155,8 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
190155
}
191156
} else {
192157
int index = 0;
193-
for (index = 0; index < insertColTypes.size(); index++) {
194-
MigrateDataType dataTypeObj = insertColTypes.get(index);
158+
for (index = 0; index < selectColTypes.size(); index++) {
159+
MigrateDataType dataTypeObj = selectColTypes.get(index);
195160
Class dataType = dataTypeObj.typeClass;
196161

197162
try {

src/resources/sparkConf.properties

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,11 @@ spark.batchSize 5
2222
spark.coveragePercent 100
2323
spark.printStatsAfter 100000
2424

25-
spark.query.cols.select partition-key,clustering-key,order-date,amount,writetime(order-date),writetime(amount),ttl(order-date),ttl(amount)
26-
spark.diff.select.types 9,1,4,3
27-
spark.query.cols.id partition-key,clustering-key
28-
spark.query.cols.id.types 9,1
29-
spark.query.cols.partitionKey partition-key
30-
31-
spark.query.cols.insert partition-key,clustering-key,order-date,amount
32-
spark.query.cols.insert.types 9,1,4,3
25+
spark.query.source partition-key,clustering-key,order-date,amount,writetime(order-date),writetime(amount),ttl(order-date),ttl(amount)
26+
spark.query.source.partitionKey partition-key
27+
spark.query.destination partition-key,clustering-key,order-date,amount
28+
spark.query.destination.id partition-key,clustering-key
29+
spark.query.types 9,1,4,3
3330

3431
spark.counterTable false
3532
spark.counterTable.cql

0 commit comments

Comments
 (0)