Skip to content

Commit 4cfc859

Browse files
authored
CDM-14 : renaming most variables for clarity (#110)
1 parent 834c0b3 commit 4cfc859

14 files changed

+184
-184
lines changed

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,19 @@ public class AbstractJobSession extends BaseJobSession {
2020

2121
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2222

23-
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
24-
this(sourceSession, astraSession, sc, false);
23+
protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
24+
this(originSession, targetSession, sc, false);
2525
}
2626

27-
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc, boolean isJobMigrateRowsFromFile) {
27+
protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc, boolean isJobMigrateRowsFromFile) {
2828
super(sc);
2929

30-
if (sourceSession == null) {
30+
if (originSession == null) {
3131
return;
3232
}
3333

34-
this.sourceSession = sourceSession;
35-
this.astraSession = astraSession;
34+
this.originSessionSession = originSession;
35+
this.targetSession = targetSession;
3636

3737
batchSize = new Integer(Util.getSparkPropOr(sc, "spark.batchSize", "5"));
3838
fetchSizeInRows = new Integer(Util.getSparkPropOr(sc, "spark.read.fetch.sizeInRows", "1000"));
@@ -45,8 +45,8 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
4545
writeLimiter = RateLimiter.create(new Integer(Util.getSparkPropOr(sc, "spark.writeRateLimit", "40000")));
4646
maxRetries = Integer.parseInt(sc.get("spark.maxRetries", "0"));
4747

48-
sourceKeyspaceTable = Util.getSparkProp(sc, "spark.origin.keyspaceTable");
49-
astraKeyspaceTable = Util.getSparkProp(sc, "spark.target.keyspaceTable");
48+
originKeyspaceTable = Util.getSparkProp(sc, "spark.origin.keyspaceTable");
49+
targetKeyspaceTable = Util.getSparkProp(sc, "spark.target.keyspaceTable");
5050

5151
String ttlColsStr = Util.getSparkPropOrEmpty(sc, "spark.query.ttl.cols");
5252
if (null != ttlColsStr && ttlColsStr.trim().length() > 0) {
@@ -91,8 +91,8 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
9191
logger.info("PARAM -- Write Batch Size: {}", batchSize);
9292
logger.info("PARAM -- Max Retries: {}", maxRetries);
9393
logger.info("PARAM -- Read Fetch Size: {}", fetchSizeInRows);
94-
logger.info("PARAM -- Source Keyspace Table: {}", sourceKeyspaceTable);
95-
logger.info("PARAM -- Destination Keyspace Table: {}", astraKeyspaceTable);
94+
logger.info("PARAM -- Source Keyspace Table: {}", originKeyspaceTable);
95+
logger.info("PARAM -- Destination Keyspace Table: {}", targetKeyspaceTable);
9696
logger.info("PARAM -- ReadRateLimit: {}", readLimiter.getRate());
9797
logger.info("PARAM -- WriteRateLimit: {}", writeLimiter.getRate());
9898
logger.info("PARAM -- TTLCols: {}", ttlCols);
@@ -107,9 +107,9 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
107107

108108
String selectCols = Util.getSparkProp(sc, "spark.query.origin");
109109
String partitionKey = Util.getSparkProp(sc, "spark.query.origin.partitionKey");
110-
String sourceSelectCondition = Util.getSparkPropOrEmpty(sc, "spark.query.condition");
111-
if (!sourceSelectCondition.isEmpty() && !sourceSelectCondition.trim().toUpperCase().startsWith("AND")) {
112-
sourceSelectCondition = " AND " + sourceSelectCondition;
110+
String originSelectCondition = Util.getSparkPropOrEmpty(sc, "spark.query.condition");
111+
if (!originSelectCondition.isEmpty() && !originSelectCondition.trim().toUpperCase().startsWith("AND")) {
112+
originSelectCondition = " AND " + originSelectCondition;
113113
}
114114

115115
final StringBuilder selectTTLWriteTimeCols = new StringBuilder();
@@ -139,17 +139,17 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
139139

140140
String fullSelectQuery;
141141
if (!isJobMigrateRowsFromFile) {
142-
fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable +
142+
fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + originKeyspaceTable +
143143
" where token(" + partitionKey.trim() + ") >= ? and token(" + partitionKey.trim() + ") <= ? " +
144-
sourceSelectCondition + " ALLOW FILTERING";
144+
originSelectCondition + " ALLOW FILTERING";
145145
} else {
146-
fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable + " where " + insertBinds;
146+
fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + originKeyspaceTable + " where " + insertBinds;
147147
}
148-
sourceSelectStatement = sourceSession.prepare(fullSelectQuery);
148+
originSelectStatement = originSession.prepare(fullSelectQuery);
149149
logger.info("PARAM -- Query used: {}", fullSelectQuery);
150150

151-
astraSelectStatement = astraSession.prepare(
152-
"select " + insertCols + " from " + astraKeyspaceTable
151+
targetSelectStatement = targetSession.prepare(
152+
"select " + insertCols + " from " + targetKeyspaceTable
153153
+ " where " + insertBinds);
154154

155155
hasRandomPartitioner = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.origin.hasRandomPartitioner", "false"));
@@ -161,7 +161,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
161161
}
162162

163163
String counterTableUpdate = Util.getSparkProp(sc, "spark.counterTable.cql");
164-
astraInsertStatement = astraSession.prepare(counterTableUpdate);
164+
targetInsertStatement = targetSession.prepare(counterTableUpdate);
165165
} else {
166166
insertBinds = "";
167167
for (String str : insertCols.split(",")) {
@@ -172,7 +172,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
172172
}
173173
}
174174

175-
String fullInsertQuery = "insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")";
175+
String fullInsertQuery = "insert into " + targetKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")";
176176
if (!ttlCols.isEmpty()) {
177177
fullInsertQuery += " USING TTL ?";
178178
if (!writeTimeStampCols.isEmpty()) {
@@ -181,7 +181,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
181181
} else if (!writeTimeStampCols.isEmpty()) {
182182
fullInsertQuery += " USING TIMESTAMP ?";
183183
}
184-
astraInsertStatement = astraSession.prepare(fullInsertQuery);
184+
targetInsertStatement = targetSession.prepare(fullInsertQuery);
185185
}
186186

187187
// Handle rows with blank values for 'timestamp' data-type in primary-key fields
@@ -191,35 +191,35 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
191191
}
192192
}
193193

194-
public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRow, Row astraRow) {
194+
public BoundStatement bindInsert(PreparedStatement insertStatement, Row originRow, Row targetRow) {
195195
BoundStatement boundInsertStatement = insertStatement.bind().setConsistencyLevel(writeConsistencyLevel);
196196

197197
if (isCounterTable) {
198198
for (int index = 0; index < selectColTypes.size(); index++) {
199199
MigrateDataType dataType = selectColTypes.get(updateSelectMapping.get(index));
200-
// compute the counter delta if reading from astra for the difference
201-
if (astraRow != null && index < (selectColTypes.size() - idColTypes.size())) {
202-
boundInsertStatement = boundInsertStatement.set(index, (sourceRow.getLong(updateSelectMapping.get(index)) - astraRow.getLong(updateSelectMapping.get(index))), Long.class);
200+
// compute the counter delta if reading from target for the difference
201+
if (targetRow != null && index < (selectColTypes.size() - idColTypes.size())) {
202+
boundInsertStatement = boundInsertStatement.set(index, (originRow.getLong(updateSelectMapping.get(index)) - targetRow.getLong(updateSelectMapping.get(index))), Long.class);
203203
} else {
204-
boundInsertStatement = boundInsertStatement.set(index, getData(dataType, updateSelectMapping.get(index), sourceRow), dataType.typeClass);
204+
boundInsertStatement = boundInsertStatement.set(index, getData(dataType, updateSelectMapping.get(index), originRow), dataType.typeClass);
205205
}
206206
}
207207
} else {
208208
int index = 0;
209209
for (index = 0; index < selectColTypes.size(); index++) {
210-
boundInsertStatement = getBoundStatement(sourceRow, boundInsertStatement, index, selectColTypes);
210+
boundInsertStatement = getBoundStatement(originRow, boundInsertStatement, index, selectColTypes);
211211
if (boundInsertStatement == null) return null;
212212
}
213213

214214
if (!ttlCols.isEmpty()) {
215-
boundInsertStatement = boundInsertStatement.set(index, getLargestTTL(sourceRow), Integer.class);
215+
boundInsertStatement = boundInsertStatement.set(index, getLargestTTL(originRow), Integer.class);
216216
index++;
217217
}
218218
if (!writeTimeStampCols.isEmpty()) {
219219
if (customWritetime > 0) {
220220
boundInsertStatement = boundInsertStatement.set(index, customWritetime, Long.class);
221221
} else {
222-
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow), Long.class);
222+
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(originRow), Long.class);
223223
}
224224
}
225225
}
@@ -228,34 +228,34 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
228228
return boundInsertStatement.setTimeout(Duration.ofSeconds(10));
229229
}
230230

231-
public int getLargestTTL(Row sourceRow) {
231+
public int getLargestTTL(Row row) {
232232
return IntStream.range(0, ttlCols.size())
233-
.map(i -> sourceRow.getInt(selectColTypes.size() + i)).max().getAsInt();
233+
.map(i -> row.getInt(selectColTypes.size() + i)).max().getAsInt();
234234
}
235235

236-
public long getLargestWriteTimeStamp(Row sourceRow) {
236+
public long getLargestWriteTimeStamp(Row row) {
237237
return IntStream.range(0, writeTimeStampCols.size())
238-
.mapToLong(i -> sourceRow.getLong(selectColTypes.size() + ttlCols.size() + i)).max().getAsLong();
238+
.mapToLong(i -> row.getLong(selectColTypes.size() + ttlCols.size() + i)).max().getAsLong();
239239
}
240240

241-
public BoundStatement selectFromAstra(PreparedStatement selectStatement, Row sourceRow) {
241+
public BoundStatement selectFromTarget(PreparedStatement selectStatement, Row originRow) {
242242
BoundStatement boundSelectStatement = selectStatement.bind().setConsistencyLevel(readConsistencyLevel);
243243
for (int index = 0; index < idColTypes.size(); index++) {
244-
boundSelectStatement = getBoundStatement(sourceRow, boundSelectStatement, index, idColTypes);
244+
boundSelectStatement = getBoundStatement(originRow, boundSelectStatement, index, idColTypes);
245245
if (boundSelectStatement == null) return null;
246246
}
247247

248248
return boundSelectStatement;
249249
}
250250

251-
private BoundStatement getBoundStatement(Row sourceRow, BoundStatement boundSelectStatement, int index,
251+
private BoundStatement getBoundStatement(Row row, BoundStatement boundSelectStatement, int index,
252252
List<MigrateDataType> cols) {
253253
MigrateDataType dataTypeObj = cols.get(index);
254-
Object colData = getData(dataTypeObj, index, sourceRow);
254+
Object colData = getData(dataTypeObj, index, row);
255255

256256
// Handle rows with blank values in primary-key fields
257257
if (index < idColTypes.size()) {
258-
Optional<Object> optionalVal = handleBlankInPrimaryKey(index, colData, dataTypeObj.typeClass, sourceRow);
258+
Optional<Object> optionalVal = handleBlankInPrimaryKey(index, colData, dataTypeObj.typeClass, row);
259259
if (!optionalVal.isPresent()) {
260260
return null;
261261
}
@@ -265,16 +265,16 @@ private BoundStatement getBoundStatement(Row sourceRow, BoundStatement boundSele
265265
return boundSelectStatement;
266266
}
267267

268-
protected Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Class dataType, Row sourceRow) {
269-
return handleBlankInPrimaryKey(index, colData, dataType, sourceRow, true);
268+
protected Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Class dataType, Row originRow) {
269+
return handleBlankInPrimaryKey(index, colData, dataType, originRow, true);
270270
}
271271

272-
protected Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Class dataType, Row sourceRow, boolean logWarn) {
272+
protected Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Class dataType, Row originRow, boolean logWarn) {
273273
// Handle rows with blank values for 'String' data-type in primary-key fields
274274
if (index < idColTypes.size() && colData == null && dataType == String.class) {
275275
if (logWarn) {
276276
logger.warn("For row with Key: {}, found String primary-key column {} with blank value",
277-
getKey(sourceRow), allCols[index]);
277+
getKey(originRow), allCols[index]);
278278
}
279279
return Optional.of("");
280280
}
@@ -284,12 +284,12 @@ protected Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Cl
284284
if (tsReplaceValStr.isEmpty()) {
285285
logger.error("Skipping row with Key: {} as Timestamp primary-key column {} has invalid blank value. " +
286286
"Alternatively rerun the job with --conf spark.target.replace.blankTimestampKeyUsingEpoch=\"<fixed-epoch-value>\" " +
287-
"option to replace the blanks with a fixed timestamp value", getKey(sourceRow), allCols[index]);
287+
"option to replace the blanks with a fixed timestamp value", getKey(originRow), allCols[index]);
288288
return Optional.empty();
289289
}
290290
if (logWarn) {
291291
logger.warn("For row with Key: {}, found Timestamp primary-key column {} with invalid blank value. " +
292-
"Using value {} instead", getKey(sourceRow), allCols[index], Instant.ofEpochSecond(tsReplaceVal));
292+
"Using value {} instead", getKey(originRow), allCols[index], Instant.ofEpochSecond(tsReplaceVal));
293293
}
294294
return Optional.of(Instant.ofEpochSecond(tsReplaceVal));
295295
}

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414

1515
public abstract class BaseJobSession {
1616

17-
protected PreparedStatement sourceSelectStatement;
18-
protected PreparedStatement astraSelectStatement;
19-
protected PreparedStatement astraInsertStatement;
17+
protected PreparedStatement originSelectStatement;
18+
protected PreparedStatement targetSelectStatement;
19+
protected PreparedStatement targetInsertStatement;
2020
protected ConsistencyLevel readConsistencyLevel;
2121
protected ConsistencyLevel writeConsistencyLevel;
2222

@@ -30,8 +30,8 @@ public abstract class BaseJobSession {
3030
protected RateLimiter writeLimiter;
3131
protected Integer maxRetries = 10;
3232

33-
protected CqlSession sourceSession;
34-
protected CqlSession astraSession;
33+
protected CqlSession originSessionSession;
34+
protected CqlSession targetSession;
3535
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
3636
protected List<MigrateDataType> idColTypes = new ArrayList<MigrateDataType>();
3737
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();
@@ -49,8 +49,8 @@ public abstract class BaseJobSession {
4949
protected List<Integer> ttlCols = new ArrayList<Integer>();
5050
protected Boolean isCounterTable;
5151

52-
protected String sourceKeyspaceTable;
53-
protected String astraKeyspaceTable;
52+
protected String originKeyspaceTable;
53+
protected String targetKeyspaceTable;
5454

5555
protected Boolean hasRandomPartitioner;
5656
protected Boolean filterData;
@@ -68,14 +68,14 @@ protected BaseJobSession(SparkConf sc) {
6868
writeConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.write"));
6969
}
7070

71-
public String getKey(Row sourceRow) {
71+
public String getKey(Row originRow) {
7272
StringBuffer key = new StringBuffer();
7373
for (int index = 0; index < idColTypes.size(); index++) {
7474
MigrateDataType dataType = idColTypes.get(index);
7575
if (index == 0) {
76-
key.append(getData(dataType, index, sourceRow));
76+
key.append(getData(dataType, index, originRow));
7777
} else {
78-
key.append(" %% " + getData(dataType, index, sourceRow));
78+
key.append(" %% " + getData(dataType, index, originRow));
7979
}
8080
}
8181

@@ -91,20 +91,20 @@ public List<MigrateDataType> getTypes(String types) {
9191
return dataTypes;
9292
}
9393

94-
public Object getData(MigrateDataType dataType, int index, Row sourceRow) {
94+
public Object getData(MigrateDataType dataType, int index, Row originRow) {
9595
if (dataType.typeClass == Map.class) {
96-
return sourceRow.getMap(index, dataType.subTypes.get(0), dataType.subTypes.get(1));
96+
return originRow.getMap(index, dataType.subTypes.get(0), dataType.subTypes.get(1));
9797
} else if (dataType.typeClass == List.class) {
98-
return sourceRow.getList(index, dataType.subTypes.get(0));
98+
return originRow.getList(index, dataType.subTypes.get(0));
9999
} else if (dataType.typeClass == Set.class) {
100-
return sourceRow.getSet(index, dataType.subTypes.get(0));
100+
return originRow.getSet(index, dataType.subTypes.get(0));
101101
} else if (isCounterTable && dataType.typeClass == Long.class) {
102-
Object data = sourceRow.get(index, dataType.typeClass);
102+
Object data = originRow.get(index, dataType.typeClass);
103103
if (data == null) {
104104
return new Long(0);
105105
}
106106
}
107107

108-
return sourceRow.get(index, dataType.typeClass);
108+
return originRow.get(index, dataType.typeClass);
109109
}
110110
}

0 commit comments

Comments
 (0)