Skip to content

Commit 18c0b6b

Browse files
authored
Merge pull request #113 from datastax/issue/CDM-19
Issue/cdm 19
2 parents b76cc2e + 64ec36b commit 18c0b6b

File tree

10 files changed

+815
-605
lines changed

10 files changed

+815
-605
lines changed
Lines changed: 4 additions & 245 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,12 @@
11
package datastax.astra.migrate;
22

33
import com.datastax.oss.driver.api.core.CqlSession;
4-
import com.datastax.oss.driver.api.core.cql.BoundStatement;
5-
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
6-
import com.datastax.oss.driver.api.core.cql.Row;
74
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
85
import datastax.astra.migrate.properties.KnownProperties;
96
import org.apache.spark.SparkConf;
107
import org.slf4j.Logger;
118
import org.slf4j.LoggerFactory;
129

13-
import java.time.Duration;
14-
import java.time.Instant;
15-
import java.util.List;
16-
import java.util.Optional;
17-
import java.util.stream.IntStream;
18-
1910
public class AbstractJobSession extends BaseJobSession {
2011

2112
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
@@ -31,11 +22,10 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
3122
return;
3223
}
3324

34-
this.originSessionSession = originSession;
35-
this.targetSession = targetSession;
25+
cqlHelper.setOriginSession(originSession);
26+
cqlHelper.setTargetSession(targetSession);
27+
cqlHelper.setJobMigrateRowsFromFile(isJobMigrateRowsFromFile);
3628

37-
batchSize = propertyHelper.getInteger(KnownProperties.SPARK_BATCH_SIZE);
38-
fetchSizeInRows = propertyHelper.getInteger(KnownProperties.READ_FETCH_SIZE);
3929
printStatsAfter = propertyHelper.getInteger(KnownProperties.SPARK_STATS_AFTER);
4030
if (!propertyHelper.meetsMinimum(KnownProperties.SPARK_STATS_AFTER, printStatsAfter, 1)) {
4131
logger.warn(KnownProperties.SPARK_STATS_AFTER +" must be greater than 0. Setting to default value of " + KnownProperties.getDefaultAsString(KnownProperties.SPARK_STATS_AFTER));
@@ -47,242 +37,11 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
4737
writeLimiter = RateLimiter.create(propertyHelper.getInteger(KnownProperties.SPARK_LIMIT_WRITE));
4838
maxRetries = propertyHelper.getInteger(KnownProperties.SPARK_MAX_RETRIES);
4939

50-
originKeyspaceTable = propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE);
51-
targetKeyspaceTable = propertyHelper.getString(KnownProperties.TARGET_KEYSPACE_TABLE);
52-
53-
String ttlColsStr = propertyHelper.getAsString(KnownProperties.ORIGIN_TTL_COLS);
54-
if (null != ttlColsStr && ttlColsStr.trim().length() > 0) {
55-
for (String ttlCol : ttlColsStr.split(",")) {
56-
ttlCols.add(Integer.parseInt(ttlCol));
57-
}
58-
}
59-
60-
String writeTimestampColsStr = propertyHelper.getAsString(KnownProperties.ORIGIN_WRITETIME_COLS);
61-
if (null != writeTimestampColsStr && writeTimestampColsStr.trim().length() > 0) {
62-
for (String writeTimeStampCol : writeTimestampColsStr.split(",")) {
63-
writeTimeStampCols.add(Integer.parseInt(writeTimeStampCol));
64-
}
65-
}
66-
67-
writeTimeStampFilter = propertyHelper.getBoolean(KnownProperties.ORIGIN_FILTER_WRITETS_ENABLED);
68-
// batchsize set to 1 if there is a writeFilter
69-
if (writeTimeStampFilter) {
70-
batchSize = 1;
71-
}
72-
73-
minWriteTimeStampFilter = propertyHelper.getLong(KnownProperties.ORIGIN_FILTER_WRITETS_MIN);
74-
maxWriteTimeStampFilter = propertyHelper.getLong(KnownProperties.ORIGIN_FILTER_WRITETS_MAX);
75-
customWritetime = propertyHelper.getLong(KnownProperties.TARGET_CUSTOM_WRITETIME);
76-
77-
logger.info("PARAM -- Read Consistency: {}", readConsistencyLevel);
78-
logger.info("PARAM -- Write Consistency: {}", writeConsistencyLevel);
79-
logger.info("PARAM -- Write Batch Size: {}", batchSize);
8040
logger.info("PARAM -- Max Retries: {}", maxRetries);
81-
logger.info("PARAM -- Read Fetch Size: {}", fetchSizeInRows);
82-
logger.info("PARAM -- Source Keyspace Table: {}", originKeyspaceTable);
83-
logger.info("PARAM -- Destination Keyspace Table: {}", targetKeyspaceTable);
8441
logger.info("PARAM -- ReadRateLimit: {}", readLimiter.getRate());
8542
logger.info("PARAM -- WriteRateLimit: {}", writeLimiter.getRate());
86-
logger.info("PARAM -- TTLCols: {}", ttlCols);
87-
logger.info("PARAM -- WriteTimestampFilterCols: {}", writeTimeStampCols);
88-
logger.info("PARAM -- WriteTimestampFilter: {}", writeTimeStampFilter);
89-
if (writeTimeStampFilter) {
90-
logger.info("PARAM -- minWriteTimeStampFilter: {} datetime is {}", minWriteTimeStampFilter,
91-
Instant.ofEpochMilli(minWriteTimeStampFilter / 1000));
92-
logger.info("PARAM -- maxWriteTimeStampFilter: {} datetime is {}", maxWriteTimeStampFilter,
93-
Instant.ofEpochMilli(maxWriteTimeStampFilter / 1000));
94-
}
95-
96-
String selectCols = propertyHelper.getAsString(KnownProperties.ORIGIN_COLUMN_NAMES);
97-
String partitionKey = propertyHelper.getAsString(KnownProperties.ORIGIN_PARTITION_KEY);
98-
String originSelectCondition = propertyHelper.getAsString(KnownProperties.ORIGIN_FILTER_CONDITION);
99-
if (!originSelectCondition.isEmpty() && !originSelectCondition.trim().toUpperCase().startsWith("AND")) {
100-
originSelectCondition = " AND " + originSelectCondition;
101-
}
102-
103-
final StringBuilder selectTTLWriteTimeCols = new StringBuilder();
104-
allCols = selectCols.split(",");
105-
ttlCols.forEach(col -> {
106-
selectTTLWriteTimeCols.append(",ttl(" + allCols[col] + ")");
107-
});
108-
writeTimeStampCols.forEach(col -> {
109-
selectTTLWriteTimeCols.append(",writetime(" + allCols[col] + ")");
110-
});
111-
selectColTypes = getTypes(propertyHelper.getAsString(KnownProperties.ORIGIN_COLUMN_TYPES));
112-
String idCols = propertyHelper.getAsString(KnownProperties.TARGET_PRIMARY_KEY);
113-
idColTypes = selectColTypes.subList(0, idCols.split(",").length);
114-
115-
String insertCols = propertyHelper.getAsString(KnownProperties.TARGET_COLUMN_NAMES);
116-
if (null == insertCols || insertCols.trim().isEmpty()) {
117-
insertCols = selectCols;
118-
}
119-
String insertBinds = "";
120-
for (String str : idCols.split(",")) {
121-
if (insertBinds.isEmpty()) {
122-
insertBinds = str + "= ?";
123-
} else {
124-
insertBinds += " and " + str + "= ?";
125-
}
126-
}
127-
128-
String fullSelectQuery;
129-
if (!isJobMigrateRowsFromFile) {
130-
fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + originKeyspaceTable +
131-
" where token(" + partitionKey.trim() + ") >= ? and token(" + partitionKey.trim() + ") <= ? " +
132-
originSelectCondition + " ALLOW FILTERING";
133-
} else {
134-
fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + originKeyspaceTable + " where " + insertBinds;
135-
}
136-
originSelectStatement = originSession.prepare(fullSelectQuery);
137-
logger.info("PARAM -- Query used: {}", fullSelectQuery);
138-
139-
targetSelectStatement = targetSession.prepare(
140-
"select " + insertCols + " from " + targetKeyspaceTable
141-
+ " where " + insertBinds);
142-
143-
hasRandomPartitioner = propertyHelper.getBoolean(KnownProperties.ORIGIN_HAS_RANDOM_PARTITIONER);
144-
isCounterTable = propertyHelper.getBoolean(KnownProperties.ORIGIN_IS_COUNTER);
145-
if (isCounterTable) {
146-
String updateSelectMappingStr = propertyHelper.getString(KnownProperties.ORIGIN_COUNTER_INDEXES);
147-
for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
148-
updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
149-
}
150-
151-
String counterTableUpdate = propertyHelper.getString(KnownProperties.ORIGIN_COUNTER_CQL);
152-
targetInsertStatement = targetSession.prepare(counterTableUpdate);
153-
} else {
154-
insertBinds = "";
155-
for (String str : insertCols.split(",")) {
156-
if (insertBinds.isEmpty()) {
157-
insertBinds += "?";
158-
} else {
159-
insertBinds += ", ?";
160-
}
161-
}
162-
163-
String fullInsertQuery = "insert into " + targetKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")";
164-
if (!ttlCols.isEmpty()) {
165-
fullInsertQuery += " USING TTL ?";
166-
if (!writeTimeStampCols.isEmpty()) {
167-
fullInsertQuery += " AND TIMESTAMP ?";
168-
}
169-
} else if (!writeTimeStampCols.isEmpty()) {
170-
fullInsertQuery += " USING TIMESTAMP ?";
171-
}
172-
targetInsertStatement = targetSession.prepare(fullInsertQuery);
173-
}
174-
175-
// Handle rows with blank values for 'timestamp' data-type in primary-key fields
176-
tsReplaceValStr = propertyHelper.getAsString(KnownProperties.TARGET_REPLACE_MISSING_TS);
177-
if (!tsReplaceValStr.isEmpty()) {
178-
tsReplaceVal = Long.parseLong(tsReplaceValStr);
179-
}
180-
}
181-
182-
public BoundStatement bindInsert(PreparedStatement insertStatement, Row originRow, Row targetRow) {
183-
BoundStatement boundInsertStatement = insertStatement.bind().setConsistencyLevel(writeConsistencyLevel);
184-
185-
if (isCounterTable) {
186-
for (int index = 0; index < selectColTypes.size(); index++) {
187-
MigrateDataType dataType = selectColTypes.get(updateSelectMapping.get(index));
188-
// compute the counter delta if reading from target for the difference
189-
if (targetRow != null && index < (selectColTypes.size() - idColTypes.size())) {
190-
boundInsertStatement = boundInsertStatement.set(index, (originRow.getLong(updateSelectMapping.get(index)) - targetRow.getLong(updateSelectMapping.get(index))), Long.class);
191-
} else {
192-
boundInsertStatement = boundInsertStatement.set(index, getData(dataType, updateSelectMapping.get(index), originRow), dataType.typeClass);
193-
}
194-
}
195-
} else {
196-
int index = 0;
197-
for (index = 0; index < selectColTypes.size(); index++) {
198-
boundInsertStatement = getBoundStatement(originRow, boundInsertStatement, index, selectColTypes);
199-
if (boundInsertStatement == null) return null;
200-
}
201-
202-
if (!ttlCols.isEmpty()) {
203-
boundInsertStatement = boundInsertStatement.set(index, getLargestTTL(originRow), Integer.class);
204-
index++;
205-
}
206-
if (!writeTimeStampCols.isEmpty()) {
207-
if (customWritetime > 0) {
208-
boundInsertStatement = boundInsertStatement.set(index, customWritetime, Long.class);
209-
} else {
210-
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(originRow), Long.class);
211-
}
212-
}
213-
}
214-
215-
// Batch insert for large records may take longer, hence 10 secs to avoid timeout errors
216-
return boundInsertStatement.setTimeout(Duration.ofSeconds(10));
217-
}
218-
219-
public int getLargestTTL(Row row) {
220-
return IntStream.range(0, ttlCols.size())
221-
.map(i -> row.getInt(selectColTypes.size() + i)).max().getAsInt();
222-
}
223-
224-
public long getLargestWriteTimeStamp(Row row) {
225-
return IntStream.range(0, writeTimeStampCols.size())
226-
.mapToLong(i -> row.getLong(selectColTypes.size() + ttlCols.size() + i)).max().getAsLong();
227-
}
228-
229-
public BoundStatement selectFromTarget(PreparedStatement selectStatement, Row originRow) {
230-
BoundStatement boundSelectStatement = selectStatement.bind().setConsistencyLevel(readConsistencyLevel);
231-
for (int index = 0; index < idColTypes.size(); index++) {
232-
boundSelectStatement = getBoundStatement(originRow, boundSelectStatement, index, idColTypes);
233-
if (boundSelectStatement == null) return null;
234-
}
235-
236-
return boundSelectStatement;
237-
}
238-
239-
private BoundStatement getBoundStatement(Row row, BoundStatement boundSelectStatement, int index,
240-
List<MigrateDataType> cols) {
241-
MigrateDataType dataTypeObj = cols.get(index);
242-
Object colData = getData(dataTypeObj, index, row);
243-
244-
// Handle rows with blank values in primary-key fields
245-
if (index < idColTypes.size()) {
246-
Optional<Object> optionalVal = handleBlankInPrimaryKey(index, colData, dataTypeObj.typeClass, row);
247-
if (!optionalVal.isPresent()) {
248-
return null;
249-
}
250-
colData = optionalVal.get();
251-
}
252-
boundSelectStatement = boundSelectStatement.set(index, colData, dataTypeObj.typeClass);
253-
return boundSelectStatement;
254-
}
255-
256-
protected Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Class dataType, Row originRow) {
257-
return handleBlankInPrimaryKey(index, colData, dataType, originRow, true);
258-
}
259-
260-
protected Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Class dataType, Row originRow, boolean logWarn) {
261-
// Handle rows with blank values for 'String' data-type in primary-key fields
262-
if (index < idColTypes.size() && colData == null && dataType == String.class) {
263-
if (logWarn) {
264-
logger.warn("For row with Key: {}, found String primary-key column {} with blank value",
265-
getKey(originRow), allCols[index]);
266-
}
267-
return Optional.of("");
268-
}
269-
270-
// Handle rows with blank values for 'timestamp' data-type in primary-key fields
271-
if (index < idColTypes.size() && colData == null && dataType == Instant.class) {
272-
if (tsReplaceValStr.isEmpty()) {
273-
logger.error("Skipping row with Key: {} as Timestamp primary-key column {} has invalid blank value. " +
274-
"Alternatively rerun the job with --conf "+KnownProperties.TARGET_REPLACE_MISSING_TS+"\"<fixed-epoch-value>\" " +
275-
"option to replace the blanks with a fixed timestamp value", getKey(originRow), allCols[index]);
276-
return Optional.empty();
277-
}
278-
if (logWarn) {
279-
logger.warn("For row with Key: {}, found Timestamp primary-key column {} with invalid blank value. " +
280-
"Using value {} instead", getKey(originRow), allCols[index], Instant.ofEpochSecond(tsReplaceVal));
281-
}
282-
return Optional.of(Instant.ofEpochSecond(tsReplaceVal));
283-
}
28443

285-
return Optional.of(colData);
44+
cqlHelper.initialize();
28645
}
28746

28847
}
Lines changed: 3 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,14 @@
11
package datastax.astra.migrate;
22

3-
import com.datastax.oss.driver.api.core.ConsistencyLevel;
4-
import com.datastax.oss.driver.api.core.CqlSession;
5-
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
6-
import com.datastax.oss.driver.api.core.cql.Row;
73
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
8-
import datastax.astra.migrate.properties.KnownProperties;
4+
import datastax.astra.migrate.cql.CqlHelper;
95
import datastax.astra.migrate.properties.PropertyHelper;
106
import org.apache.spark.SparkConf;
117

12-
import java.util.ArrayList;
13-
import java.util.List;
14-
import java.util.Map;
15-
import java.util.Set;
16-
178
public abstract class BaseJobSession {
189

1910
protected PropertyHelper propertyHelper = PropertyHelper.getInstance();
20-
21-
protected PreparedStatement originSelectStatement;
22-
protected PreparedStatement targetSelectStatement;
23-
protected PreparedStatement targetInsertStatement;
24-
protected ConsistencyLevel readConsistencyLevel;
25-
protected ConsistencyLevel writeConsistencyLevel;
11+
protected CqlHelper cqlHelper;
2612

2713
// Read/Write Rate limiter
2814
// Determine the total throughput for the entire cluster in terms of wries/sec,
@@ -34,82 +20,11 @@ public abstract class BaseJobSession {
3420
protected RateLimiter writeLimiter;
3521
protected Integer maxRetries = 10;
3622

37-
protected CqlSession originSessionSession;
38-
protected CqlSession targetSession;
39-
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
40-
protected List<MigrateDataType> idColTypes = new ArrayList<MigrateDataType>();
41-
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();
42-
43-
protected Integer batchSize = 1;
44-
protected Integer fetchSizeInRows = 1000;
4523
protected Integer printStatsAfter = 100000;
4624

47-
protected Boolean writeTimeStampFilter = Boolean.FALSE;
48-
protected Long minWriteTimeStampFilter = 0l;
49-
protected Long maxWriteTimeStampFilter = Long.MAX_VALUE;
50-
protected Long customWritetime = 0l;
51-
52-
protected List<Integer> writeTimeStampCols = new ArrayList<Integer>();
53-
protected List<Integer> ttlCols = new ArrayList<Integer>();
54-
protected Boolean isCounterTable;
55-
56-
protected String originKeyspaceTable;
57-
protected String targetKeyspaceTable;
58-
59-
protected Boolean hasRandomPartitioner;
60-
protected Boolean filterData;
61-
protected String filterColName;
62-
protected String filterColType;
63-
protected Integer filterColIndex;
64-
protected String filterColValue;
65-
66-
protected String[] allCols;
67-
protected String tsReplaceValStr;
68-
protected long tsReplaceVal;
69-
7025
protected BaseJobSession(SparkConf sc) {
7126
propertyHelper.initializeSparkConf(sc);
72-
readConsistencyLevel = Util.mapToConsistencyLevel(propertyHelper.getString(KnownProperties.READ_CL));
73-
writeConsistencyLevel = Util.mapToConsistencyLevel(propertyHelper.getString(KnownProperties.WRITE_CL));
27+
this.cqlHelper = new CqlHelper();
7428
}
7529

76-
public String getKey(Row originRow) {
77-
StringBuffer key = new StringBuffer();
78-
for (int index = 0; index < idColTypes.size(); index++) {
79-
MigrateDataType dataType = idColTypes.get(index);
80-
if (index == 0) {
81-
key.append(getData(dataType, index, originRow));
82-
} else {
83-
key.append(" %% " + getData(dataType, index, originRow));
84-
}
85-
}
86-
87-
return key.toString();
88-
}
89-
90-
public List<MigrateDataType> getTypes(String types) {
91-
List<MigrateDataType> dataTypes = new ArrayList<MigrateDataType>();
92-
for (String type : types.split(",")) {
93-
dataTypes.add(new MigrateDataType(type));
94-
}
95-
96-
return dataTypes;
97-
}
98-
99-
public Object getData(MigrateDataType dataType, int index, Row originRow) {
100-
if (dataType.typeClass == Map.class) {
101-
return originRow.getMap(index, dataType.subTypes.get(0), dataType.subTypes.get(1));
102-
} else if (dataType.typeClass == List.class) {
103-
return originRow.getList(index, dataType.subTypes.get(0));
104-
} else if (dataType.typeClass == Set.class) {
105-
return originRow.getSet(index, dataType.subTypes.get(0));
106-
} else if (isCounterTable && dataType.typeClass == Long.class) {
107-
Object data = originRow.get(index, dataType.typeClass);
108-
if (data == null) {
109-
return new Long(0);
110-
}
111-
}
112-
113-
return originRow.get(index, dataType.typeClass);
114-
}
11530
}

0 commit comments

Comments
 (0)