Skip to content

Commit 557a9eb

Browse files
authored
Merge pull request #22 from datastax/feature/migrate-rows-from-file
Migrate by primary-key rows in a file
2 parents 504baf9 + c3da026 commit 557a9eb

File tree

10 files changed

+248
-65
lines changed

10 files changed

+248
-65
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>2.2</version>
6+
<version>2.3</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,18 @@
1010
import org.slf4j.Logger;
1111
import org.slf4j.LoggerFactory;
1212

13-
import java.util.ArrayList;
14-
import java.util.List;
1513
import java.util.Map;
16-
import java.util.Set;
1714
import java.util.stream.IntStream;
1815

1916
public class AbstractJobSession extends BaseJobSession {
2017

2118
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2219

2320
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
21+
this(sourceSession, astraSession, sc, false);
22+
}
23+
24+
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc, boolean isJobMigrateRowsFromFile) {
2425
this.sourceSession = sourceSession;
2526
this.astraSession = astraSession;
2627

@@ -96,11 +97,6 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
9697
writeTimeStampCols.forEach(col -> {
9798
selectTTLWriteTimeCols.append(",writetime(" + allCols[col] + ")");
9899
});
99-
String fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
100-
+ ") >= ? and token(" + partionKey.trim() + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING";
101-
sourceSelectStatement = sourceSession.prepare(fullSelectQuery);
102-
logger.info("PARAM -- Query used: " + fullSelectQuery);
103-
104100
selectColTypes = getTypes(Util.getSparkProp(sc, "spark.query.types"));
105101
String idCols = Util.getSparkPropOrEmpty(sc, "spark.query.target.id");
106102
idColTypes = selectColTypes.subList(0, idCols.split(",").length);
@@ -117,6 +113,17 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
117113
insertBinds += " and " + str + "= ?";
118114
}
119115
}
116+
117+
String fullSelectQuery;
118+
if (!isJobMigrateRowsFromFile) {
119+
fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
120+
+ ") >= ? and token(" + partionKey.trim() + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING";
121+
} else {
122+
fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable + " where " + insertBinds;
123+
}
124+
sourceSelectStatement = sourceSession.prepare(fullSelectQuery);
125+
logger.info("PARAM -- Query used: " + fullSelectQuery);
126+
120127
astraSelectStatement = astraSession.prepare(
121128
"select " + insertCols + " from " + astraKeyspaceTable
122129
+ " where " + insertBinds);
@@ -154,6 +161,55 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
154161
}
155162
}
156163

164+
public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRow, Row astraRow) {
165+
BoundStatement boundInsertStatement = insertStatement.bind();
166+
167+
if (isCounterTable) {
168+
for (int index = 0; index < selectColTypes.size(); index++) {
169+
MigrateDataType dataType = selectColTypes.get(updateSelectMapping.get(index));
170+
// compute the counter delta if reading from astra for the difference
171+
if (astraRow != null && index < (selectColTypes.size() - idColTypes.size())) {
172+
boundInsertStatement = boundInsertStatement.set(index, (sourceRow.getLong(updateSelectMapping.get(index)) - astraRow.getLong(updateSelectMapping.get(index))), Long.class);
173+
} else {
174+
boundInsertStatement = boundInsertStatement.set(index, getData(dataType, updateSelectMapping.get(index), sourceRow), dataType.typeClass);
175+
}
176+
}
177+
} else {
178+
int index = 0;
179+
for (index = 0; index < selectColTypes.size(); index++) {
180+
MigrateDataType dataTypeObj = selectColTypes.get(index);
181+
Class dataType = dataTypeObj.typeClass;
182+
183+
try {
184+
Object colData = getData(dataTypeObj, index, sourceRow);
185+
if (index < idColTypes.size() && colData == null && dataType == String.class) {
186+
colData = "";
187+
}
188+
boundInsertStatement = boundInsertStatement.set(index, colData, dataType);
189+
} catch (NullPointerException e) {
190+
// ignore the exception for map values being null
191+
if (dataType != Map.class) {
192+
throw e;
193+
}
194+
}
195+
}
196+
197+
if (!ttlCols.isEmpty()) {
198+
boundInsertStatement = boundInsertStatement.set(index, getLargestTTL(sourceRow), Integer.class);
199+
index++;
200+
}
201+
if (!writeTimeStampCols.isEmpty()) {
202+
if (customWritetime > 0) {
203+
boundInsertStatement = boundInsertStatement.set(index, customWritetime, Long.class);
204+
} else {
205+
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow), Long.class);
206+
}
207+
}
208+
}
209+
210+
return boundInsertStatement;
211+
}
212+
157213
public int getLargestTTL(Row sourceRow) {
158214
return IntStream.range(0, ttlCols.size())
159215
.map(i -> sourceRow.getInt(selectColTypes.size() + i)).max().getAsInt();

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 0 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.math.BigInteger;
1010
import java.util.ArrayList;
1111
import java.util.Collection;
12-
import java.util.Map;
1312
import java.util.concurrent.CompletionStage;
1413
import java.util.concurrent.atomic.AtomicLong;
1514

@@ -117,7 +116,6 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
117116
}
118117
}
119118

120-
121119
logger.info("TreadID: " + Thread.currentThread().getId() + " Final Read Record Count: " + readCounter.get());
122120
logger.info("TreadID: " + Thread.currentThread().getId() + " Final Write Record Count: " + writeCounter.get());
123121
retryCount = maxAttempts;
@@ -126,7 +124,6 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
126124
logger.error("Error with PartitionRange -- TreadID: " + Thread.currentThread().getId() + " Processing min: " + min + " max:" + max + " -- Retry# " + retryCount);
127125
}
128126
}
129-
130127
}
131128

132129
private void iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultSet>> writeResults, int incrementBy) throws Exception {
@@ -140,53 +137,4 @@ private void iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultS
140137
writeResults.clear();
141138
}
142139

143-
public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRow, Row astraRow) {
144-
BoundStatement boundInsertStatement = insertStatement.bind();
145-
146-
if (isCounterTable) {
147-
for (int index = 0; index < selectColTypes.size(); index++) {
148-
MigrateDataType dataType = selectColTypes.get(updateSelectMapping.get(index));
149-
// compute the counter delta if reading from astra for the difference
150-
if (astraRow != null && index < (selectColTypes.size() - idColTypes.size())) {
151-
boundInsertStatement = boundInsertStatement.set(index, (sourceRow.getLong(updateSelectMapping.get(index)) - astraRow.getLong(updateSelectMapping.get(index))), Long.class);
152-
} else {
153-
boundInsertStatement = boundInsertStatement.set(index, getData(dataType, updateSelectMapping.get(index), sourceRow), dataType.typeClass);
154-
}
155-
}
156-
} else {
157-
int index = 0;
158-
for (index = 0; index < selectColTypes.size(); index++) {
159-
MigrateDataType dataTypeObj = selectColTypes.get(index);
160-
Class dataType = dataTypeObj.typeClass;
161-
162-
try {
163-
Object colData = getData(dataTypeObj, index, sourceRow);
164-
if (index < idColTypes.size() && colData == null && dataType == String.class) {
165-
colData = "";
166-
}
167-
boundInsertStatement = boundInsertStatement.set(index, colData, dataType);
168-
} catch (NullPointerException e) {
169-
// ignore the exception for map values being null
170-
if (dataType != Map.class) {
171-
throw e;
172-
}
173-
}
174-
}
175-
176-
if (!ttlCols.isEmpty()) {
177-
boundInsertStatement = boundInsertStatement.set(index, getLargestTTL(sourceRow), Integer.class);
178-
index++;
179-
}
180-
if (!writeTimeStampCols.isEmpty()) {
181-
if (customWritetime > 0) {
182-
boundInsertStatement = boundInsertStatement.set(index, customWritetime, Long.class);
183-
} else {
184-
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow), Long.class);
185-
}
186-
}
187-
}
188-
189-
return boundInsertStatement;
190-
}
191-
192140
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package datastax.astra.migrate;
2+
3+
import com.datastax.oss.driver.api.core.CqlSession;
4+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
5+
import com.datastax.oss.driver.api.core.cql.ResultSet;
6+
import com.datastax.oss.driver.api.core.cql.Row;
7+
import org.apache.spark.SparkConf;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import java.beans.PropertyEditor;
12+
import java.beans.PropertyEditorManager;
13+
import java.util.List;
14+
import java.util.concurrent.atomic.AtomicLong;
15+
16+
public class CopyPKJobSession extends AbstractJobSession {
17+
18+
private static CopyPKJobSession copyJobSession;
19+
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
20+
protected AtomicLong readCounter = new AtomicLong(0);
21+
protected AtomicLong missingCounter = new AtomicLong(0);
22+
protected AtomicLong writeCounter = new AtomicLong(0);
23+
24+
protected CopyPKJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
25+
super(sourceSession, astraSession, sc, true);
26+
}
27+
28+
public static CopyPKJobSession getInstance(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
29+
if (copyJobSession == null) {
30+
synchronized (CopyPKJobSession.class) {
31+
if (copyJobSession == null) {
32+
copyJobSession = new CopyPKJobSession(sourceSession, astraSession, sc);
33+
}
34+
}
35+
}
36+
37+
return copyJobSession;
38+
}
39+
40+
public void getRowAndInsert(List<SplitPartitions.PKRows> rowsList) {
41+
for (SplitPartitions.PKRows rows : rowsList) {
42+
rows.pkRows.parallelStream().forEach(row -> {
43+
readCounter.incrementAndGet();
44+
String[] pkFields = row.split(" %% ");
45+
int idx = 0;
46+
BoundStatement bspk = sourceSelectStatement.bind();
47+
for (MigrateDataType tp : idColTypes) {
48+
bspk = bspk.set(idx, convert(tp.typeClass, pkFields[idx]), tp.typeClass);
49+
idx++;
50+
}
51+
Row pkRow = sourceSession.execute(bspk).one();
52+
if (null == pkRow) {
53+
missingCounter.incrementAndGet();
54+
logger.error("Could not find row with primary-key: " + row);
55+
return;
56+
}
57+
ResultSet astraWriteResultSet = astraSession
58+
.execute(bindInsert(astraInsertStatement, pkRow, null));
59+
writeCounter.incrementAndGet();
60+
if (readCounter.get() % printStatsAfter == 0) {
61+
printCounts(false);
62+
}
63+
});
64+
}
65+
66+
printCounts(true);
67+
}
68+
69+
public void printCounts(boolean isFinal) {
70+
if (isFinal) {
71+
logger.info("################################################################################################");
72+
}
73+
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Record Count: " + readCounter.get());
74+
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Missing Count: " + missingCounter.get());
75+
logger.info("TreadID: " + Thread.currentThread().getId() + " Inserted Record Count: " + writeCounter.get());
76+
if (isFinal) {
77+
logger.info("################################################################################################");
78+
}
79+
}
80+
81+
private Object convert(Class<?> targetType, String text) {
82+
PropertyEditor editor = PropertyEditorManager.findEditor(targetType);
83+
editor.setAsText(text);
84+
return editor.getValue();
85+
}
86+
87+
}

src/main/java/datastax/astra/migrate/OriginCountJobSession.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
import com.datastax.oss.driver.api.core.cql.*;
55
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
66
import org.apache.commons.lang.SerializationUtils;
7+
import org.apache.spark.SparkConf;
78
import org.slf4j.Logger;
89
import org.slf4j.LoggerFactory;
9-
import org.apache.spark.SparkConf;
1010

1111
import java.io.Serializable;
1212
import java.math.BigInteger;
13-
import java.util.*;
13+
import java.util.ArrayList;
14+
import java.util.Collection;
15+
import java.util.List;
1416
import java.util.concurrent.CompletionStage;
1517
import java.util.concurrent.atomic.AtomicLong;
1618

src/main/java/datastax/astra/migrate/SplitPartitions.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@
44
import org.slf4j.LoggerFactory;
55

66
import java.io.BufferedReader;
7-
import java.io.FileReader;
87
import java.io.IOException;
98
import java.io.Serializable;
109
import java.math.BigInteger;
1110
import java.util.ArrayList;
1211
import java.util.Collection;
1312
import java.util.Collections;
1413
import java.util.List;
14+
import java.util.stream.Collectors;
15+
import java.util.stream.IntStream;
16+
import java.util.stream.Stream;
1517

1618
public class SplitPartitions {
1719

@@ -42,9 +44,12 @@ public static List<Partition> getSubPartitionsFromFile(int splitSize) throws IOE
4244
logger.info("TreadID: " + Thread.currentThread().getId() +
4345
" Splitting partitions in file: ./partitions.csv using a split-size of " + splitSize);
4446
List<Partition> partitions = new ArrayList<Partition>();
45-
BufferedReader reader = new BufferedReader(new FileReader("./partitions.csv"));
47+
BufferedReader reader = Util.getfileReader("./partitions.csv");
4648
String line = null;
4749
while ((line = reader.readLine()) != null) {
50+
if (line.startsWith("#")) {
51+
continue;
52+
}
4853
String[] minMax = line.split(",");
4954
try {
5055
partitions.addAll(getSubPartitions(splitSize, new BigInteger(minMax[0]), new BigInteger(minMax[1]), 100));
@@ -56,6 +61,36 @@ public static List<Partition> getSubPartitionsFromFile(int splitSize) throws IOE
5661
return partitions;
5762
}
5863

64+
public static List<PKRows> getRowPartsFromFile(int splitSize) throws IOException {
65+
logger.info("TreadID: " + Thread.currentThread().getId() +
66+
" Splitting rows in file: ./primary_key_rows.csv using a split-size of " + splitSize);
67+
List<String> pkRows = new ArrayList<String>();
68+
BufferedReader reader = Util.getfileReader("./primary_key_rows.csv");
69+
String pkRow = null;
70+
while ((pkRow = reader.readLine()) != null) {
71+
if (pkRow.startsWith("#")) {
72+
continue;
73+
}
74+
pkRows.add(pkRow);
75+
}
76+
int partSize = pkRows.size() / splitSize;
77+
if (partSize == 0) {
78+
partSize = pkRows.size();
79+
}
80+
return batches(pkRows, partSize).map(l -> (new PKRows(l))).collect(Collectors.toList());
81+
}
82+
83+
public static <T> Stream<List<T>> batches(List<T> source, int length) {
84+
if (length <= 0)
85+
throw new IllegalArgumentException("length = " + length);
86+
int size = source.size();
87+
if (size <= 0)
88+
return Stream.empty();
89+
int fullChunks = (size - 1) / length;
90+
return IntStream.range(0, fullChunks + 1).mapToObj(
91+
n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
92+
}
93+
5994
private static List<Partition> getSubPartitions(int splitSize, BigInteger min, BigInteger max, int coveragePercent) {
6095
if (coveragePercent < 1 || coveragePercent > 100) {
6196
coveragePercent = 100;
@@ -92,6 +127,14 @@ private static List<Partition> getSubPartitions(int splitSize, BigInteger min, B
92127
return partitions;
93128
}
94129

130+
public static class PKRows implements Serializable {
131+
List<String> pkRows;
132+
133+
public PKRows(List<String> rows) {
134+
pkRows = rows;
135+
}
136+
}
137+
95138
public static class Partition implements Serializable {
96139
private static final long serialVersionUID = 1L;
97140

0 commit comments

Comments
 (0)