Skip to content

Commit 39ab797

Browse files
authored
Merge pull request #88 from datastax/feature/blank-timestamp-in-key
Handle blank timestamp values in primary-key columns graciously
2 parents c7e6f3c + 620a193 commit 39ab797

File tree

6 files changed

+109
-40
lines changed

6 files changed

+109
-40
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
<properties>
1010
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
11-
<revision>3.1.0</revision>
11+
<revision>3.2.1</revision>
1212
<scala.version>2.12.17</scala.version>
1313
<scala.main.version>2.12</scala.main.version>
1414
<spark.version>3.3.1</spark.version>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 68 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212

1313
import java.time.Duration;
1414
import java.time.Instant;
15-
import java.util.Map;
15+
import java.util.List;
16+
import java.util.Optional;
1617
import java.util.stream.IntStream;
1718

1819
public class AbstractJobSession extends BaseJobSession {
@@ -25,11 +26,11 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
2526

2627
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc, boolean isJobMigrateRowsFromFile) {
2728
super(sc);
28-
29+
2930
if (sourceSession == null) {
3031
return;
3132
}
32-
33+
3334
this.sourceSession = sourceSession;
3435
this.astraSession = astraSession;
3536

@@ -105,14 +106,14 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
105106
}
106107

107108
String selectCols = Util.getSparkProp(sc, "spark.query.origin");
108-
String partionKey = Util.getSparkProp(sc, "spark.query.origin.partitionKey");
109+
String partitionKey = Util.getSparkProp(sc, "spark.query.origin.partitionKey");
109110
String sourceSelectCondition = Util.getSparkPropOrEmpty(sc, "spark.query.condition");
110111
if (!sourceSelectCondition.isEmpty() && !sourceSelectCondition.trim().toUpperCase().startsWith("AND")) {
111112
sourceSelectCondition = " AND " + sourceSelectCondition;
112113
}
113114

114115
final StringBuilder selectTTLWriteTimeCols = new StringBuilder();
115-
String[] allCols = selectCols.split(",");
116+
allCols = selectCols.split(",");
116117
ttlCols.forEach(col -> {
117118
selectTTLWriteTimeCols.append(",ttl(" + allCols[col] + ")");
118119
});
@@ -138,8 +139,9 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
138139

139140
String fullSelectQuery;
140141
if (!isJobMigrateRowsFromFile) {
141-
fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
142-
+ ") >= ? and token(" + partionKey.trim() + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING";
142+
fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable +
143+
" where token(" + partitionKey.trim() + ") >= ? and token(" + partitionKey.trim() + ") <= ? " +
144+
sourceSelectCondition + " ALLOW FILTERING";
143145
} else {
144146
fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable + " where " + insertBinds;
145147
}
@@ -181,6 +183,12 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
181183
}
182184
astraInsertStatement = astraSession.prepare(fullInsertQuery);
183185
}
186+
187+
// Handle rows with blank values for 'timestamp' data-type in primary-key fields
188+
tsReplaceValStr = Util.getSparkPropOr(sc, "spark.target.replace.blankTimestampKeyUsingEpoch", "");
189+
if (!tsReplaceValStr.isEmpty()) {
190+
tsReplaceVal = Long.parseLong(tsReplaceValStr);
191+
}
184192
}
185193

186194
public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRow, Row astraRow) {
@@ -199,21 +207,8 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
199207
} else {
200208
int index = 0;
201209
for (index = 0; index < selectColTypes.size(); index++) {
202-
MigrateDataType dataTypeObj = selectColTypes.get(index);
203-
Class dataType = dataTypeObj.typeClass;
204-
205-
try {
206-
Object colData = getData(dataTypeObj, index, sourceRow);
207-
if (index < idColTypes.size() && colData == null && dataType == String.class) {
208-
colData = "";
209-
}
210-
boundInsertStatement = boundInsertStatement.set(index, colData, dataType);
211-
} catch (NullPointerException e) {
212-
// ignore the exception for map values being null
213-
if (dataType != Map.class) {
214-
throw e;
215-
}
216-
}
210+
boundInsertStatement = getBoundStatement(sourceRow, boundInsertStatement, index, selectColTypes);
211+
if (boundInsertStatement == null) return null;
217212
}
218213

219214
if (!ttlCols.isEmpty()) {
@@ -246,12 +241,60 @@ public long getLargestWriteTimeStamp(Row sourceRow) {
246241
public BoundStatement selectFromAstra(PreparedStatement selectStatement, Row sourceRow) {
247242
BoundStatement boundSelectStatement = selectStatement.bind().setConsistencyLevel(readConsistencyLevel);
248243
for (int index = 0; index < idColTypes.size(); index++) {
249-
MigrateDataType dataType = idColTypes.get(index);
250-
boundSelectStatement = boundSelectStatement.set(index, getData(dataType, index, sourceRow),
251-
dataType.typeClass);
244+
boundSelectStatement = getBoundStatement(sourceRow, boundSelectStatement, index, idColTypes);
245+
if (boundSelectStatement == null) return null;
252246
}
253247

254248
return boundSelectStatement;
255249
}
256250

251+
private BoundStatement getBoundStatement(Row sourceRow, BoundStatement boundSelectStatement, int index,
252+
List<MigrateDataType> cols) {
253+
MigrateDataType dataTypeObj = cols.get(index);
254+
Object colData = getData(dataTypeObj, index, sourceRow);
255+
256+
// Handle rows with blank values in primary-key fields
257+
if (index < idColTypes.size()) {
258+
Optional<Object> optionalVal = handleBlankInPrimaryKey(index, colData, dataTypeObj.typeClass, sourceRow);
259+
if (!optionalVal.isPresent()) {
260+
return null;
261+
}
262+
colData = optionalVal.get();
263+
}
264+
boundSelectStatement = boundSelectStatement.set(index, colData, dataTypeObj.typeClass);
265+
return boundSelectStatement;
266+
}
267+
268+
protected Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Class dataType, Row sourceRow) {
269+
return handleBlankInPrimaryKey(index, colData, dataType, sourceRow, true);
270+
}
271+
272+
protected Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Class dataType, Row sourceRow, boolean logWarn) {
273+
// Handle rows with blank values for 'String' data-type in primary-key fields
274+
if (index < idColTypes.size() && colData == null && dataType == String.class) {
275+
if (logWarn) {
276+
logger.warn("For row with Key: {}, found String primary-key column {} with blank value",
277+
getKey(sourceRow), allCols[index]);
278+
}
279+
return Optional.of("");
280+
}
281+
282+
// Handle rows with blank values for 'timestamp' data-type in primary-key fields
283+
if (index < idColTypes.size() && colData == null && dataType == Instant.class) {
284+
if (tsReplaceValStr.isEmpty()) {
285+
logger.error("Skipping row with Key: {} as Timestamp primary-key column {} has invalid blank value. " +
286+
"Alternatively rerun the job with --conf spark.target.replace.blankTimestampKeyUsingEpoch=\"<fixed-epoch-value>\" " +
287+
"option to replace the blanks with a fixed timestamp value", getKey(sourceRow), allCols[index]);
288+
return Optional.empty();
289+
}
290+
if (logWarn) {
291+
logger.warn("For row with Key: {}, found Timestamp primary-key column {} with invalid blank value. " +
292+
"Using value {} instead", getKey(sourceRow), allCols[index], Instant.ofEpochSecond(tsReplaceVal));
293+
}
294+
return Optional.of(Instant.ofEpochSecond(tsReplaceVal));
295+
}
296+
297+
return Optional.of(colData);
298+
}
299+
257300
}

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ public abstract class BaseJobSession {
5959
protected Integer filterColIndex;
6060
protected String filterColValue;
6161

62+
protected String[] allCols;
63+
protected String tsReplaceValStr;
64+
protected long tsReplaceVal;
65+
6266
protected BaseJobSession(SparkConf sc) {
6367
readConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.read"));
6468
writeConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.write"));

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,12 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
9595
astraRow = astraReadResultSet.one();
9696
}
9797

98-
CompletionStage<AsyncResultSet> astraWriteResultSet = astraSession
99-
.executeAsync(bindInsert(astraInsertStatement, sourceRow, astraRow));
98+
BoundStatement bInsert = bindInsert(astraInsertStatement, sourceRow, astraRow);
99+
if (null == bInsert) {
100+
skipCnt++;
101+
continue;
102+
}
103+
CompletionStage<AsyncResultSet> astraWriteResultSet = astraSession.executeAsync(bInsert);
100104
writeResults.add(astraWriteResultSet);
101105
if (writeResults.size() > fetchSizeInRows) {
102106
writeCnt += iterateAndClearWriteResults(writeResults, 1);
@@ -124,7 +128,12 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
124128
}
125129

126130
writeLimiter.acquire(1);
127-
batchStatement = batchStatement.add(bindInsert(astraInsertStatement, sourceRow, null));
131+
BoundStatement bInsert = bindInsert(astraInsertStatement, sourceRow, null);
132+
if (null == bInsert) {
133+
skipCnt++;
134+
continue;
135+
}
136+
batchStatement = batchStatement.add(bInsert);
128137

129138
// if batch threshold is met, send the writes and clear the batch
130139
if (batchStatement.size() >= batchSize) {

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.datastax.oss.driver.api.core.CqlSession;
44
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
5+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
56
import com.datastax.oss.driver.api.core.cql.ResultSet;
67
import com.datastax.oss.driver.api.core.cql.Row;
78
import com.datastax.oss.driver.api.core.data.UdtValue;
@@ -12,6 +13,7 @@
1213
import java.math.BigInteger;
1314
import java.util.HashMap;
1415
import java.util.Map;
16+
import java.util.Optional;
1517
import java.util.concurrent.CompletionStage;
1618
import java.util.concurrent.atomic.AtomicLong;
1719
import java.util.stream.IntStream;
@@ -74,11 +76,15 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
7476
printCounts(false);
7577
}
7678

77-
CompletionStage<AsyncResultSet> targetRowFuture = astraSession
78-
.executeAsync(selectFromAstra(astraSelectStatement, srcRow));
79-
srcToTargetRowMap.put(srcRow, targetRowFuture);
80-
if (srcToTargetRowMap.size() > fetchSizeInRows) {
81-
diffAndClear(srcToTargetRowMap);
79+
BoundStatement bSelect = selectFromAstra(astraSelectStatement, srcRow);
80+
if (null == bSelect) {
81+
skippedCounter.incrementAndGet();
82+
} else {
83+
CompletionStage<AsyncResultSet> targetRowFuture = astraSession.executeAsync(bSelect);
84+
srcToTargetRowMap.put(srcRow, targetRowFuture);
85+
if (srcToTargetRowMap.size() > fetchSizeInRows) {
86+
diffAndClear(srcToTargetRowMap);
87+
}
8288
}
8389
} else {
8490
readCounter.incrementAndGet();
@@ -165,13 +171,20 @@ private void diff(Row sourceRow, Row astraRow) {
165171
private String isDifferent(Row sourceRow, Row astraRow) {
166172
StringBuffer diffData = new StringBuffer();
167173
IntStream.range(0, selectColTypes.size()).parallel().forEach(index -> {
168-
MigrateDataType dataType = selectColTypes.get(index);
169-
Object source = getData(dataType, index, sourceRow);
170-
Object astra = getData(dataType, index, astraRow);
174+
MigrateDataType dataTypeObj = selectColTypes.get(index);
175+
Object source = getData(dataTypeObj, index, sourceRow);
176+
if (index < idColTypes.size()) {
177+
Optional<Object> optionalVal = handleBlankInPrimaryKey(index, source, dataTypeObj.typeClass, sourceRow, false);
178+
if (optionalVal.isPresent()) {
179+
source = optionalVal.get();
180+
}
181+
}
182+
183+
Object astra = getData(dataTypeObj, index, astraRow);
171184

172-
boolean isDiff = dataType.diff(source, astra);
185+
boolean isDiff = dataTypeObj.diff(source, astra);
173186
if (isDiff) {
174-
if (dataType.typeClass.equals(UdtValue.class)) {
187+
if (dataTypeObj.typeClass.equals(UdtValue.class)) {
175188
String sourceUdtContent = ((UdtValue) source).getFormattedContents();
176189
String astraUdtContent = ((UdtValue) astra).getFormattedContents();
177190
if (!sourceUdtContent.equals(astraUdtContent)) {

src/resources/sparkConf.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ spark.query.types 9,1,4,3
6464
#############################################################################################################
6565

6666
# ENABLE ONLY IF COLUMN NAMES ON TARGET IS DIFFERENT FROM ORIGIN (SCHEMA & DATA-TYPES MUST BE SAME)
67-
#spark.query.target partition-key,clustering-key,order-date,amount
67+
#spark.query.target comma-separated-partition-key,comma-separated-clustering-key,comma-separated-other-columns
6868

6969
# The tool adds TTL & Writetime at row-level (not field-level).
7070
# The largest TTL & Writetime values are used if multiple indexes are listed (comma separated)

0 commit comments

Comments
 (0)