Skip to content

Commit 92aca37

Browse files
committed
Add support for varint. Change code to not require update statement when there is not counter column and default primary key string nulls to blanks.
1 parent 05a15dd commit 92aca37

File tree

2 files changed

+23
-10
lines changed

2 files changed

+23
-10
lines changed

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,13 @@ protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, Spar
5555
}
5656
isPreserveTTLWritetime = Boolean.parseBoolean(sparkConf.get("spark.migrate.preserveTTLWriteTime", "false"));
5757

58-
String updateSelectMappingStr = sparkConf.get("spark.migrate.source.counterTable.update.select.index", "0");
59-
for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
60-
updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
61-
}
6258
if (isCounterTable) {
59+
String updateSelectMappingStr = sparkConf.get("spark.migrate.source.counterTable.update.select.index", "0");
60+
for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
61+
updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
62+
}
63+
64+
6365
String counterTableUpdate = sparkConf.get("spark.migrate.source.counterTable.update.cql");
6466
astraInsertStatement = astraSession.prepare(counterTableUpdate);
6567
} else {
@@ -89,11 +91,15 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
8991
if (batchSize == 1 || writeTimeStampFilter) {
9092
for (Row sourceRow : resultSet) {
9193
readLimiter.acquire(1);
92-
// only process rows greater than writeTimeStampFilter
93-
Long sourceWriteTimeStamp = getLargestWriteTimeStamp(sourceRow);
94-
if (writeTimeStampFilter && (sourceWriteTimeStamp < minWriteTimeStampFilter)
95-
|| sourceWriteTimeStamp > maxWriteTimeStampFilter) {
96-
continue;
94+
95+
if(writeTimeStampFilter) {
96+
// only process rows greater than writeTimeStampFilter
97+
Long sourceWriteTimeStamp = getLargestWriteTimeStamp(sourceRow);
98+
if (sourceWriteTimeStamp < minWriteTimeStampFilter
99+
|| sourceWriteTimeStamp > maxWriteTimeStampFilter) {
100+
continue;
101+
}
102+
97103
}
98104

99105
writeLimiter.acquire(1);
@@ -222,7 +228,11 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
222228
MigrateDataType dataType = insertColTypes.get(index);
223229

224230
try {
225-
boundInsertStatement = boundInsertStatement.set(index, getData(dataType, index, sourceRow), dataType.typeClass);
231+
Object colData = getData(dataType, index, sourceRow);
232+
if(index < idColTypes.size() && colData==null && dataType.typeClass==String.class){
233+
colData="";
234+
}
235+
boundInsertStatement = boundInsertStatement.set(index, colData, dataType.typeClass);
226236
} catch (NullPointerException e) {
227237
// ignore the exception for map values being null
228238
if (dataType.typeClass != Map.class) {

src/main/java/datastax/astra/migrate/MigrateDataType.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.datastax.oss.driver.api.core.data.UdtValue;
55

66
import java.math.BigDecimal;
7+
import java.math.BigInteger;
78
import java.nio.ByteBuffer;
89
import java.time.Instant;
910
import java.time.LocalDate;
@@ -78,6 +79,8 @@ private Class getType(int type) {
7879
return LocalDate.class;
7980
case 16:
8081
return UdtValue.class;
82+
case 17:
83+
return BigInteger.class;
8184
}
8285

8386
return Object.class;

0 commit comments

Comments
 (0)