Skip to content

Commit 32b1b3c

Browse files
committed
moving all CQL-related functions into the helper
1 parent 45733c4 commit 32b1b3c

File tree

9 files changed

+535
-509
lines changed

9 files changed

+535
-509
lines changed
Lines changed: 5 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,30 @@
11
package datastax.astra.migrate;
22

33
import com.datastax.oss.driver.api.core.CqlSession;
4-
import com.datastax.oss.driver.api.core.cql.BoundStatement;
5-
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
6-
import com.datastax.oss.driver.api.core.cql.Row;
74
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
85
import datastax.astra.migrate.properties.KnownProperties;
96
import org.apache.spark.SparkConf;
107
import org.slf4j.Logger;
118
import org.slf4j.LoggerFactory;
129

13-
import java.time.Instant;
14-
import java.util.Optional;
15-
1610
public class AbstractJobSession extends BaseJobSession {
1711

1812
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
19-
protected CqlHelper cqlHelper;
2013

2114
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
2215
this(sourceSession, astraSession, sc, false);
2316
}
2417

25-
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc, boolean isJobMigrateRowsFromFile) {
18+
protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc, boolean isJobMigrateRowsFromFile) {
2619
super(sc);
2720

28-
if (sourceSession == null) {
21+
if (originSession == null) {
2922
return;
3023
}
3124

32-
this.sourceSession = sourceSession;
33-
this.astraSession = astraSession;
34-
this.cqlHelper = new CqlHelper(this.propertyHelper, this.sourceSession, this.astraSession, isJobMigrateRowsFromFile, this);
35-
36-
batchSize = propertyHelper.getInteger(KnownProperties.SPARK_BATCH_SIZE);
37-
fetchSizeInRows = propertyHelper.getInteger(KnownProperties.READ_FETCH_SIZE);
25+
cqlHelper.setOriginSession(originSession);
26+
cqlHelper.setTargetSession(targetSession);
27+
cqlHelper.setJobMigrateRowsFromFile(isJobMigrateRowsFromFile);
3828

3929
printStatsAfter = propertyHelper.getInteger(KnownProperties.SPARK_STATS_AFTER);
4030
if (!propertyHelper.meetsMinimum(KnownProperties.SPARK_STATS_AFTER, printStatsAfter, 1)) {
@@ -46,63 +36,11 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
4636
writeLimiter = RateLimiter.create(propertyHelper.getInteger(KnownProperties.SPARK_LIMIT_WRITE));
4737
maxRetries = propertyHelper.getInteger(KnownProperties.SPARK_MAX_RETRIES);
4838

49-
ttlCols = propertyHelper.getIntegerList(KnownProperties.ORIGIN_TTL_COLS);
50-
writeTimeStampCols = propertyHelper.getIntegerList(KnownProperties.ORIGIN_WRITETIME_COLS);
51-
52-
writeTimeStampFilter = propertyHelper.getBoolean(KnownProperties.ORIGIN_FILTER_WRITETS_ENABLED);
53-
if (writeTimeStampFilter) {
54-
batchSize = 1;
55-
propertyHelper.setProperty(KnownProperties.SPARK_BATCH_SIZE, batchSize);
56-
}
57-
minWriteTimeStampFilter = propertyHelper.getLong(KnownProperties.ORIGIN_FILTER_WRITETS_MIN);
58-
maxWriteTimeStampFilter = propertyHelper.getLong(KnownProperties.ORIGIN_FILTER_WRITETS_MAX);
59-
60-
customWritetime = propertyHelper.getLong(KnownProperties.TARGET_CUSTOM_WRITETIME);
61-
isCounterTable = propertyHelper.getBoolean(KnownProperties.ORIGIN_IS_COUNTER);
62-
63-
logger.info("PARAM -- Read Consistency: {}", readConsistencyLevel);
64-
logger.info("PARAM -- Write Consistency: {}", writeConsistencyLevel);
65-
logger.info("PARAM -- Write Batch Size: {}", batchSize);
6639
logger.info("PARAM -- Max Retries: {}", maxRetries);
67-
logger.info("PARAM -- Read Fetch Size: {}", fetchSizeInRows);
68-
logger.info("PARAM -- Source Keyspace Table: {}", sourceKeyspaceTable);
69-
logger.info("PARAM -- Destination Keyspace Table: {}", targetKeyspaceTable);
7040
logger.info("PARAM -- ReadRateLimit: {}", readLimiter.getRate());
7141
logger.info("PARAM -- WriteRateLimit: {}", writeLimiter.getRate());
72-
logger.info("PARAM -- TTLCols: {}", ttlCols);
73-
logger.info("PARAM -- WriteTimestampFilterCols: {}", writeTimeStampCols);
74-
logger.info("PARAM -- WriteTimestampFilter: {}", writeTimeStampFilter);
75-
if (writeTimeStampFilter) {
76-
logger.info("PARAM -- minWriteTimeStampFilter: {} datetime is {}", minWriteTimeStampFilter,
77-
Instant.ofEpochMilli(minWriteTimeStampFilter / 1000));
78-
logger.info("PARAM -- maxWriteTimeStampFilter: {} datetime is {}", maxWriteTimeStampFilter,
79-
Instant.ofEpochMilli(maxWriteTimeStampFilter / 1000));
80-
}
81-
82-
8342

8443
cqlHelper.initialize();
85-
String fullSelectQuery = cqlHelper.getCqlString(CqlHelper.CqlStatementType.ORIGIN_SELECT);
86-
logger.info("PARAM -- ORIGIN SELECT Query used: {}", fullSelectQuery);
87-
sourceSelectStatement = sourceSession.prepare(fullSelectQuery);
88-
89-
astraSelectStatement = astraSession.prepare(cqlHelper.getCqlString(CqlHelper.CqlStatementType.TARGET_SELECT_BY_PK));
90-
91-
hasRandomPartitioner = propertyHelper.getBoolean(KnownProperties.ORIGIN_HAS_RANDOM_PARTITIONER);
92-
93-
astraInsertStatement = astraSession.prepare(cqlHelper.getCqlString(CqlHelper.CqlStatementType.TARGET_INSERT));
94-
95-
// Handle rows with blank values for 'timestamp' data-type in primary-key fields
96-
if (null != propertyHelper.getLong(KnownProperties.TARGET_REPLACE_MISSING_TS))
97-
tsReplaceVal = propertyHelper.getLong(KnownProperties.TARGET_REPLACE_MISSING_TS);
98-
}
99-
100-
public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRow, Row astraRow) {
101-
return cqlHelper.bindInsert(insertStatement, sourceRow, astraRow);
102-
}
103-
104-
public BoundStatement selectFromAstra(PreparedStatement selectStatement, Row sourceRow) {
105-
return cqlHelper.selectFromTargetByPK(selectStatement, sourceRow);
10644
}
10745

10846
}
Lines changed: 2 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,13 @@
11
package datastax.astra.migrate;
22

3-
import com.datastax.oss.driver.api.core.ConsistencyLevel;
4-
import com.datastax.oss.driver.api.core.CqlSession;
5-
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
6-
import com.datastax.oss.driver.api.core.cql.Row;
73
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
8-
import datastax.astra.migrate.properties.KnownProperties;
94
import datastax.astra.migrate.properties.PropertyHelper;
105
import org.apache.spark.SparkConf;
116

12-
import java.util.ArrayList;
13-
import java.util.List;
14-
import java.util.Map;
15-
import java.util.Set;
16-
177
public abstract class BaseJobSession {
188

199
protected PropertyHelper propertyHelper = PropertyHelper.getInstance();
20-
protected PreparedStatement sourceSelectStatement;
21-
protected PreparedStatement astraSelectStatement;
22-
protected PreparedStatement astraInsertStatement;
23-
protected ConsistencyLevel readConsistencyLevel;
24-
protected ConsistencyLevel writeConsistencyLevel;
10+
protected CqlHelper cqlHelper;
2511

2612
// Read/Write Rate limiter
2713
// Determine the total throughput for the entire cluster in terms of wries/sec,
@@ -33,82 +19,11 @@ public abstract class BaseJobSession {
3319
protected RateLimiter writeLimiter;
3420
protected Integer maxRetries = 10;
3521

36-
protected CqlSession sourceSession;
37-
protected CqlSession astraSession;
38-
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
39-
protected List<MigrateDataType> idColTypes = new ArrayList<MigrateDataType>();
40-
// protected List<Integer> updateSelectMapping = new ArrayList<Integer>();
41-
42-
protected Integer batchSize = 1;
43-
protected Integer fetchSizeInRows = 1000;
4422
protected Integer printStatsAfter = 100000;
4523

46-
protected Boolean writeTimeStampFilter = Boolean.FALSE;
47-
protected Long minWriteTimeStampFilter = 0l;
48-
protected Long maxWriteTimeStampFilter = Long.MAX_VALUE;
49-
protected Long customWritetime = 0l;
50-
51-
protected List<Integer> writeTimeStampCols = new ArrayList<Integer>();
52-
protected List<Integer> ttlCols = new ArrayList<Integer>();
53-
protected Boolean isCounterTable;
54-
55-
protected String sourceKeyspaceTable;
56-
protected String targetKeyspaceTable;
57-
58-
protected Boolean hasRandomPartitioner;
59-
protected Boolean filterData;
60-
protected String filterColName;
61-
protected String filterColType;
62-
protected Integer filterColIndex;
63-
protected String filterColValue;
64-
65-
protected List<String> allCols;
66-
protected String tsReplaceValStr;
67-
protected long tsReplaceVal;
68-
6924
protected BaseJobSession(SparkConf sc) {
7025
propertyHelper.initializeSparkConf(sc);
71-
readConsistencyLevel = Util.mapToConsistencyLevel(propertyHelper.getString(KnownProperties.READ_CL));
72-
writeConsistencyLevel = Util.mapToConsistencyLevel(propertyHelper.getString(KnownProperties.WRITE_CL));;
26+
this.cqlHelper = new CqlHelper();
7327
}
7428

75-
public String getKey(Row sourceRow) {
76-
StringBuffer key = new StringBuffer();
77-
for (int index = 0; index < idColTypes.size(); index++) {
78-
MigrateDataType dataType = idColTypes.get(index);
79-
if (index == 0) {
80-
key.append(getData(dataType, index, sourceRow));
81-
} else {
82-
key.append(" %% " + getData(dataType, index, sourceRow));
83-
}
84-
}
85-
86-
return key.toString();
87-
}
88-
89-
public List<MigrateDataType> getTypes(String types) {
90-
List<MigrateDataType> dataTypes = new ArrayList<MigrateDataType>();
91-
for (String type : types.split(",")) {
92-
dataTypes.add(new MigrateDataType(type));
93-
}
94-
95-
return dataTypes;
96-
}
97-
98-
public Object getData(MigrateDataType dataType, int index, Row sourceRow) {
99-
if (dataType.typeClass == Map.class) {
100-
return sourceRow.getMap(index, dataType.subTypes.get(0), dataType.subTypes.get(1));
101-
} else if (dataType.typeClass == List.class) {
102-
return sourceRow.getList(index, dataType.subTypes.get(0));
103-
} else if (dataType.typeClass == Set.class) {
104-
return sourceRow.getSet(index, dataType.subTypes.get(0));
105-
} else if (isCounterTable && dataType.typeClass == Long.class) {
106-
Object data = sourceRow.get(index, dataType.typeClass);
107-
if (data == null) {
108-
return new Long(0);
109-
}
110-
}
111-
112-
return sourceRow.get(index, dataType.typeClass);
113-
}
11429
}

0 commit comments

Comments
 (0)