Skip to content

Commit b7c5a76

Browse files
committed
CDM-23 big refactor to split out CQL statements to allow manipulations and differences between origin and target
1 parent 5f8aafb commit b7c5a76

25 files changed

+1736
-483
lines changed

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
2424

2525
cqlHelper.setOriginSession(originSession);
2626
cqlHelper.setTargetSession(targetSession);
27-
cqlHelper.setJobMigrateRowsFromFile(isJobMigrateRowsFromFile);
2827

2928
printStatsAfter = propertyHelper.getInteger(KnownProperties.SPARK_STATS_AFTER);
3029
if (!propertyHelper.meetsMinimum(KnownProperties.SPARK_STATS_AFTER, printStatsAfter, 1)) {

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 64 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22

33
import com.datastax.oss.driver.api.core.CqlSession;
44
import com.datastax.oss.driver.api.core.cql.*;
5-
import datastax.astra.migrate.cql.CqlHelper;
5+
import datastax.astra.migrate.cql.PKFactory;
6+
import datastax.astra.migrate.cql.Record;
7+
import datastax.astra.migrate.cql.statements.OriginSelectByPartitionRangeStatement;
8+
import datastax.astra.migrate.cql.statements.TargetInsertStatement;
9+
import datastax.astra.migrate.cql.statements.TargetSelectByPKStatement;
10+
import datastax.astra.migrate.cql.statements.TargetUpdateStatement;
611
import org.apache.spark.SparkConf;
712
import org.slf4j.Logger;
813
import org.slf4j.LoggerFactory;
914

1015
import java.math.BigInteger;
11-
import java.util.ArrayList;
12-
import java.util.List;
13-
import java.util.Collection;
16+
import java.util.*;
1417
import java.util.concurrent.CompletionStage;
1518
import java.util.concurrent.atomic.AtomicLong;
1619

@@ -42,127 +45,100 @@ public static CopyJobSession getInstance(CqlSession originSession, CqlSession ta
4245
public void getDataAndInsert(BigInteger min, BigInteger max) {
4346
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
4447
boolean done = false;
48+
boolean batching = false;
4549
int maxAttempts = maxRetries + 1;
4650
for (int attempts = 1; attempts <= maxAttempts && !done; attempts++) {
4751
long readCnt = 0;
4852
long writeCnt = 0;
4953
long skipCnt = 0;
5054
long errCnt = 0;
5155
try {
52-
ResultSet resultSet = cqlHelper.getOriginSession().execute(
53-
cqlHelper.getPreparedStatement(CqlHelper.CQL.ORIGIN_SELECT)
54-
.bind(cqlHelper.hasRandomPartitioner() ? min : min.longValueExact(),
55-
cqlHelper.hasRandomPartitioner() ? max : max.longValueExact())
56-
.setConsistencyLevel(cqlHelper.getReadConsistencyLevel())
57-
.setPageSize(cqlHelper.getFetchSizeInRows()));
56+
PKFactory pkFactory = cqlHelper.getPKFactory();
57+
OriginSelectByPartitionRangeStatement originSelectByPartitionRangeStatement = cqlHelper.getOriginSelectByPartitionRangeStatement();
58+
ResultSet resultSet = originSelectByPartitionRangeStatement.execute(originSelectByPartitionRangeStatement.bind(min, max));
59+
60+
TargetInsertStatement targetInsertStatement = cqlHelper.getTargetInsertStatement();
61+
TargetUpdateStatement targetUpdateStatement = cqlHelper.getTargetUpdateStatement();
62+
TargetSelectByPKStatement targetSelectByPKStatement = cqlHelper.getTargetSelectByPKStatement();
5863

5964
Collection<CompletionStage<AsyncResultSet>> writeResults = new ArrayList<CompletionStage<AsyncResultSet>>();
6065

6166
// cannot do batching if the writeFilter is greater than 0 or
6267
// maxWriteTimeStampFilter is less than max long
6368
// do not batch for counters as it adds latency & increases chance of discrepancy
64-
if (cqlHelper.getBatchSize() == 1 || cqlHelper.hasWriteTimestampFilter() || cqlHelper.isCounterTable()) {
65-
for (Row originRow : resultSet) {
66-
readLimiter.acquire(1);
67-
readCnt++;
68-
if (readCnt % printStatsAfter == 0) {
69-
printCounts(false);
70-
}
69+
batching = !(cqlHelper.getBatchSize() == 1 || cqlHelper.hasWriteTimestampFilter() || cqlHelper.isCounterTable());
70+
BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED); // this may not be used
7171

72-
// exclusion filter below
73-
if (cqlHelper.hasFilterColumn()) {
74-
String col = (String) cqlHelper.getData(cqlHelper.getFilterColType(), cqlHelper.getFilterColIndex(), originRow);
75-
if (col.trim().equalsIgnoreCase(cqlHelper.getFilterColValue())) {
76-
logger.warn("Skipping row and filtering out: {}", cqlHelper.getKey(originRow));
77-
skipCnt++;
78-
continue;
79-
}
80-
}
81-
if (cqlHelper.hasWriteTimestampFilter()) {
82-
// only process rows greater than writeTimeStampFilter
83-
Long originWriteTimeStamp = cqlHelper.getLargestWriteTimeStamp(originRow);
84-
if (originWriteTimeStamp < cqlHelper.getMinWriteTimeStampFilter()
85-
|| originWriteTimeStamp > cqlHelper.getMaxWriteTimeStampFilter()) {
86-
skipCnt++;
87-
continue;
88-
}
89-
}
90-
writeLimiter.acquire(1);
72+
boolean isCounterTable = cqlHelper.isCounterTable();
73+
CompletionStage<AsyncResultSet> writeResultSet;
9174

92-
Row targetRow = null;
93-
if (cqlHelper.isCounterTable()) {
94-
ResultSet targetResultSet = cqlHelper.getTargetSession()
95-
.execute(cqlHelper.selectFromTargetByPK(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_SELECT_ORIGIN_BY_PK), originRow));
96-
targetRow = targetResultSet.one();
97-
}
75+
for (Row originRow : resultSet) {
76+
readLimiter.acquire(1);
77+
readCnt++;
78+
if (readCnt % printStatsAfter == 0) {
79+
printCounts(false);
80+
}
9881

99-
List<BoundStatement> bInsertList = cqlHelper.bindInsert(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), originRow, targetRow);
100-
if (null == bInsertList || bInsertList.isEmpty()) {
101-
skipCnt++;
102-
continue;
103-
}
104-
for (BoundStatement bInsert : bInsertList) {
105-
CompletionStage<AsyncResultSet> targetWriteResultSet = cqlHelper.getTargetSession().executeAsync(bInsert);
106-
writeResults.add(targetWriteResultSet);
107-
if (writeResults.size() > cqlHelper.getFetchSizeInRows()) {
108-
writeCnt += iterateAndClearWriteResults(writeResults, 1);
109-
}
110-
}
82+
Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null);
83+
if (originSelectByPartitionRangeStatement.shouldFilterRecord(record)) {
84+
skipCnt++;
85+
continue;
11186
}
11287

113-
// clear the write resultset
114-
writeCnt += iterateAndClearWriteResults(writeResults, 1);
115-
} else {
116-
BatchStatement batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
117-
for (Row originRow : resultSet) {
118-
readLimiter.acquire(1);
119-
readCnt++;
120-
if (readCnt % printStatsAfter == 0) {
121-
printCounts(false);
122-
}
88+
for (Record r : pkFactory.toValidRecordList(record)) {
89+
writeLimiter.acquire(1);
12390

124-
if (cqlHelper.hasFilterColumn()) {
125-
String colValue = (String) cqlHelper.getData(cqlHelper.getFilterColType(), cqlHelper.getFilterColIndex(), originRow);
126-
if (colValue.trim().equalsIgnoreCase(cqlHelper.getFilterColValue())) {
127-
logger.warn("Skipping row and filtering out: {}", cqlHelper.getKey(originRow));
128-
skipCnt++;
129-
continue;
91+
BoundStatement boundUpsert;
92+
if (isCounterTable) {
93+
Record targetRecord = targetSelectByPKStatement.getRecord(r.getPk());
94+
if (null != targetRecord) {
95+
r.setTargetRow(targetRecord.getTargetRow());
13096
}
97+
boundUpsert = targetUpdateStatement.bindRecord(r);
98+
}
99+
else {
100+
boundUpsert = targetInsertStatement.bindRecord(r);
131101
}
132102

133-
writeLimiter.acquire(1);
134-
List<BoundStatement> bInsertList = cqlHelper.bindInsert(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), originRow, null);
135-
if (null == bInsertList || bInsertList.isEmpty()) {
136-
skipCnt++;
103+
if (null == boundUpsert) {
104+
skipCnt++; // TODO: this previously skipped, why not errCnt?
137105
continue;
138106
}
139-
for (BoundStatement bInsert : bInsertList) {
140-
batchStatement = batchStatement.add(bInsert);
141107

142-
// if batch threshold is met, send the writes and clear the batch
143-
if (batchStatement.size() >= cqlHelper.getBatchSize()) {
144-
CompletionStage<AsyncResultSet> writeResultSet = cqlHelper.getTargetSession().executeAsync(batchStatement);
108+
if (batching) {
109+
batch = batch.add(boundUpsert);
110+
if (batch.size() >= cqlHelper.getBatchSize()) {
111+
writeResultSet = isCounterTable ? targetUpdateStatement.executeAsync(batch) : targetInsertStatement.executeAsync(batch);
145112
writeResults.add(writeResultSet);
146-
batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
113+
batch = BatchStatement.newInstance(BatchType.UNLOGGED);
147114
}
148115

149116
if (writeResults.size() * cqlHelper.getBatchSize() > cqlHelper.getFetchSizeInRows()) {
150117
writeCnt += iterateAndClearWriteResults(writeResults, cqlHelper.getBatchSize());
151118
}
152119
}
120+
else {
121+
writeResultSet = isCounterTable ? targetUpdateStatement.executeAsync(boundUpsert) : targetInsertStatement.executeAsync(boundUpsert);
122+
writeResults.add(writeResultSet);
123+
if (writeResults.size() > cqlHelper.getFetchSizeInRows()) {
124+
writeCnt += iterateAndClearWriteResults(writeResults, 1);
125+
}
126+
}
153127
}
128+
}
154129

155-
// clear the write resultset
156-
writeCnt += iterateAndClearWriteResults(writeResults, cqlHelper.getBatchSize());
157-
158-
// if there are any pending writes because the batchSize threshold was not met, then write and clear them
159-
if (batchStatement.size() > 0) {
160-
CompletionStage<AsyncResultSet> writeResultSet = cqlHelper.getTargetSession().executeAsync(batchStatement);
130+
// Flush pending writes
131+
if (batching) {
132+
if (batch.size() > 0) {
133+
writeResultSet = isCounterTable ? targetUpdateStatement.executeAsync(batch) : targetInsertStatement.executeAsync(batch);
161134
writeResults.add(writeResultSet);
162-
writeCnt += iterateAndClearWriteResults(writeResults, batchStatement.size());
163-
batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
135+
writeCnt += iterateAndClearWriteResults(writeResults, batch.size());
164136
}
165137
}
138+
else {
139+
// clear the write resultset
140+
writeCnt += iterateAndClearWriteResults(writeResults, 1);
141+
}
166142

167143
readCounter.addAndGet(readCnt);
168144
writeCounter.addAndGet(writeCnt);
Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
package datastax.astra.migrate;
22

33
import com.datastax.oss.driver.api.core.CqlSession;
4-
import com.datastax.oss.driver.api.core.cql.BoundStatement;
5-
import com.datastax.oss.driver.api.core.cql.ResultSet;
64
import com.datastax.oss.driver.api.core.cql.Row;
7-
import datastax.astra.migrate.cql.CqlHelper;
5+
import datastax.astra.migrate.cql.EnhancedPK;
6+
import datastax.astra.migrate.cql.PKFactory;
7+
import datastax.astra.migrate.cql.Record;
8+
import datastax.astra.migrate.cql.statements.OriginSelectByPKStatement;
89
import org.apache.spark.SparkConf;
910
import org.slf4j.Logger;
1011
import org.slf4j.LoggerFactory;
1112

1213
import java.beans.PropertyEditor;
1314
import java.beans.PropertyEditorManager;
15+
import java.util.ArrayList;
1416
import java.util.List;
1517
import java.util.concurrent.atomic.AtomicLong;
1618

@@ -20,10 +22,20 @@ public class CopyPKJobSession extends AbstractJobSession {
2022
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2123
protected AtomicLong readCounter = new AtomicLong(0);
2224
protected AtomicLong missingCounter = new AtomicLong(0);
25+
protected AtomicLong skipCounter = new AtomicLong(0);
2326
protected AtomicLong writeCounter = new AtomicLong(0);
2427

28+
private final PKFactory pkFactory;
29+
private final List<MigrateDataType> originPKTypes;
30+
private final boolean isCounterTable;
31+
private final OriginSelectByPKStatement originSelectByPKStatement;
32+
2533
protected CopyPKJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
2634
super(originSession, targetSession, sc, true);
35+
pkFactory = cqlHelper.getPKFactory();
36+
originPKTypes = pkFactory.getPKTypes(PKFactory.Side.ORIGIN);
37+
isCounterTable = cqlHelper.isCounterTable();
38+
originSelectByPKStatement = cqlHelper.getOriginSelectByPKStatement();
2739
}
2840

2941
public static CopyPKJobSession getInstance(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
@@ -42,28 +54,34 @@ public void getRowAndInsert(List<SplitPartitions.PKRows> rowsList) {
4254
for (SplitPartitions.PKRows rows : rowsList) {
4355
rows.pkRows.parallelStream().forEach(row -> {
4456
readCounter.incrementAndGet();
45-
String[] pkFields = row.split(" %% ");
46-
int idx = 0;
47-
BoundStatement bspk = cqlHelper.getPreparedStatement(CqlHelper.CQL.ORIGIN_SELECT).bind().setConsistencyLevel(cqlHelper.getReadConsistencyLevel());
48-
for (MigrateDataType tp : cqlHelper.getIdColTypes()) {
49-
bspk = bspk.set(idx, convert(tp.typeClass, pkFields[idx]), tp.typeClass);
50-
idx++;
57+
EnhancedPK pk = toEnhancedPK(row);
58+
if (null == pk || pk.isError()) {
59+
missingCounter.incrementAndGet();
60+
logger.error("Could not build PK object with value <{}>; error is: {}", row, (null == pk ? "null" : pk.getMessages()));
61+
return;
5162
}
52-
Row pkRow = cqlHelper.getOriginSession().execute(bspk).one();
53-
if (null == pkRow) {
63+
64+
Record recordFromOrigin = originSelectByPKStatement.getRecord(pk);
65+
if (null == recordFromOrigin) {
5466
missingCounter.incrementAndGet();
55-
logger.error("Could not find row with primary-key: {}", row);
67+
logger.error("Could not find origin row with primary-key: {}", row);
68+
return;
69+
}
70+
Row originRow = recordFromOrigin.getOriginRow();
71+
72+
Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null);
73+
if (originSelectByPKStatement.shouldFilterRecord(record)) {
74+
skipCounter.incrementAndGet();
5675
return;
5776
}
58-
List<BoundStatement> boundInserts = cqlHelper.bindInsert(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), pkRow, null);
59-
if (null != boundInserts) {
60-
for (BoundStatement bs : boundInserts) {
61-
ResultSet targetWriteResultSet = cqlHelper.getTargetSession().execute(bs);
62-
writeCounter.incrementAndGet();
63-
if (readCounter.get() % printStatsAfter == 0) {
64-
printCounts(false);
65-
}
66-
}
77+
78+
writeLimiter.acquire(1);
79+
if (isCounterTable) cqlHelper.getTargetUpdateStatement().putRecord(record);
80+
else cqlHelper.getTargetInsertStatement().putRecord(record);
81+
writeCounter.incrementAndGet();
82+
83+
if (readCounter.get() % printStatsAfter == 0) {
84+
printCounts(false);
6785
}
6886
});
6987
}
@@ -77,16 +95,22 @@ public void printCounts(boolean isFinal) {
7795
}
7896
logger.info("ThreadID: {} Read Record Count: {}", Thread.currentThread().getId(), readCounter.get());
7997
logger.info("ThreadID: {} Missing Record Count: {}", Thread.currentThread().getId(), missingCounter.get());
98+
logger.info("ThreadID: {} Skipped Record Count: {}", Thread.currentThread().getId(), skipCounter.get());
8099
logger.info("ThreadID: {} Inserted Record Count: {}", Thread.currentThread().getId(), writeCounter.get());
81100
if (isFinal) {
82101
logger.info("################################################################################################");
83102
}
84103
}
85104

86-
private Object convert(Class<?> targetType, String text) {
87-
PropertyEditor editor = PropertyEditorManager.findEditor(targetType);
88-
editor.setAsText(text);
89-
return editor.getValue();
105+
private EnhancedPK toEnhancedPK(String rowString) {
106+
String[] pkFields = rowString.split(" %% ");
107+
List<Object> values = new ArrayList<>(originPKTypes.size());
108+
for (int i=0; i<pkFields.length; i++) {
109+
PropertyEditor editor = PropertyEditorManager.findEditor(originPKTypes.get(i).getTypeClass());
110+
editor.setAsText(pkFields[i]);
111+
values.add(editor.getValue());
112+
}
113+
return pkFactory.toEnhancedPK(values, pkFactory.getPKTypes(PKFactory.Side.ORIGIN));
90114
}
91115

92116
}

0 commit comments

Comments
 (0)