Skip to content

Commit 872403c

Browse files
committed
Made TTL & Writetimes independent of each other
Bug-fix: Will not insert 0 writetime if fields left empty Simplified config
1 parent 9667f17 commit 872403c

File tree

5 files changed

+34
-37
lines changed

5 files changed

+34
-37
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>1.6</version>
6+
<version>1.7</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -35,20 +35,17 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
3535
sourceKeyspaceTable = sparkConf.get("spark.source.keyspaceTable");
3636
astraKeyspaceTable = sparkConf.get("spark.destination.keyspaceTable");
3737

38-
isPreserveTTLWritetime = Boolean.parseBoolean(sparkConf.get("spark.preserveTTLWriteTime", "false"));
39-
if (isPreserveTTLWritetime) {
40-
String ttlColsStr = sparkConf.get("spark.preserveTTLWriteTime.ttl.cols");
41-
if (null != ttlColsStr && ttlColsStr.trim().length() > 0) {
42-
for (String ttlCol : ttlColsStr.split(",")) {
43-
ttlCols.add(Integer.parseInt(ttlCol));
44-
}
38+
String ttlColsStr = sparkConf.get("spark.query.ttl.cols");
39+
if (null != ttlColsStr && ttlColsStr.trim().length() > 0) {
40+
for (String ttlCol : ttlColsStr.split(",")) {
41+
ttlCols.add(Integer.parseInt(ttlCol));
4542
}
43+
}
4644

47-
String writeTimestampColsStr = sparkConf.get("spark.preserveTTLWriteTime.writetime.cols");
48-
if (null != writeTimestampColsStr && writeTimestampColsStr.trim().length() > 0) {
49-
for (String writeTimeStampCol : writeTimestampColsStr.split(",")) {
50-
writeTimeStampCols.add(Integer.parseInt(writeTimeStampCol));
51-
}
45+
String writeTimestampColsStr = sparkConf.get("spark.query.writetime.cols");
46+
if (null != writeTimestampColsStr && writeTimestampColsStr.trim().length() > 0) {
47+
for (String writeTimeStampCol : writeTimestampColsStr.split(",")) {
48+
writeTimeStampCols.add(Integer.parseInt(writeTimeStampCol));
5249
}
5350
}
5451

@@ -75,26 +72,22 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
7572
logger.info("PARAM -- Destination Keyspace Table: " + astraKeyspaceTable);
7673
logger.info("PARAM -- ReadRateLimit: " + readLimiter.getRate());
7774
logger.info("PARAM -- WriteRateLimit: " + writeLimiter.getRate());
78-
logger.info("PARAM -- WriteTimestampFilter: " + writeTimeStampFilter);
79-
logger.info("PARAM -- WriteTimestampFilterCols: " + writeTimeStampCols);
80-
logger.info("PARAM -- isPreserveTTLWritetime: " + isPreserveTTLWritetime);
81-
logger.info("PARAM -- isPreserveTTLWritetime: " + isPreserveTTLWritetime);
8275
logger.info("PARAM -- TTLCols: " + ttlCols);
76+
logger.info("PARAM -- WriteTimestampFilterCols: " + writeTimeStampCols);
77+
logger.info("PARAM -- WriteTimestampFilter: " + writeTimeStampFilter);
8378

8479
String selectCols = sparkConf.get("spark.query.source");
8580
String partionKey = sparkConf.get("spark.query.source.partitionKey");
8681
String sourceSelectCondition = sparkConf.get("spark.query.condition", "");
8782

8883
final StringBuilder selectTTLWriteTimeCols = new StringBuilder();
89-
if (isPreserveTTLWritetime) {
90-
String[] allCols = selectCols.split(",");
91-
ttlCols.forEach(col -> {
92-
selectTTLWriteTimeCols.append(",ttl(" + allCols[col] + ")");
93-
});
94-
writeTimeStampCols.forEach(col -> {
95-
selectTTLWriteTimeCols.append(",writetime(" + allCols[col] + ")");
96-
});
97-
}
84+
String[] allCols = selectCols.split(",");
85+
ttlCols.forEach(col -> {
86+
selectTTLWriteTimeCols.append(",ttl(" + allCols[col] + ")");
87+
});
88+
writeTimeStampCols.forEach(col -> {
89+
selectTTLWriteTimeCols.append(",writetime(" + allCols[col] + ")");
90+
});
9891
String fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols.toString() + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
9992
+ ") >= ? and token(" + partionKey.trim() + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING";
10093
sourceSelectStatement = sourceSession.prepare(fullSelectQuery);
@@ -103,7 +96,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
10396
selectColTypes = getTypes(sparkConf.get("spark.query.types"));
10497
String idCols = sparkConf.get("spark.query.destination.id", "");
10598
idColTypes = selectColTypes.subList(0, idCols.split(",").length);
106-
99+
107100
String insertCols = sparkConf.get("spark.query.destination", "");
108101
if (null == insertCols || insertCols.trim().isEmpty()) {
109102
insertCols = selectCols;
@@ -140,11 +133,16 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
140133
}
141134
}
142135

143-
if (isPreserveTTLWritetime) {
144-
astraInsertStatement = astraSession.prepare("insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ") using TTL ? and TIMESTAMP ?");
145-
} else {
146-
astraInsertStatement = astraSession.prepare("insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")");
136+
String fullInsertQuery = "insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")";
137+
if (!ttlCols.isEmpty()) {
138+
fullInsertQuery += " USING TTL ?";
139+
if (!writeTimeStampCols.isEmpty()) {
140+
fullInsertQuery += " AND TIMESTAMP ?";
141+
}
142+
} else if (!writeTimeStampCols.isEmpty()) {
143+
fullInsertQuery += " USING TIMESTAMP ?";
147144
}
145+
astraInsertStatement = astraSession.prepare(fullInsertQuery);
148146
}
149147
}
150148

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ public abstract class BaseJobSession {
3232
protected Integer batchSize = 1;
3333
protected Integer printStatsAfter = 100000;
3434

35-
protected Boolean isPreserveTTLWritetime = Boolean.FALSE;
3635
protected Boolean writeTimeStampFilter = Boolean.FALSE;
3736
protected Long minWriteTimeStampFilter = 0l;
3837
protected Long maxWriteTimeStampFilter = Long.MAX_VALUE;

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,11 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
173173
}
174174
}
175175

176-
if (isPreserveTTLWritetime) {
176+
if (!ttlCols.isEmpty()) {
177177
boundInsertStatement = boundInsertStatement.set(index, getLargestTTL(sourceRow), Integer.class);
178178
index++;
179+
}
180+
if (!writeTimeStampCols.isEmpty()) {
179181
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow), Long.class);
180182
}
181183
}

src/resources/sparkConf.properties

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,13 @@ spark.query.source.partitionKey partition-key
2727
spark.query.destination partition-key,clustering-key,order-date,amount
2828
spark.query.destination.id partition-key,clustering-key
2929
spark.query.types 9,1,4,3
30+
spark.query.ttl.cols 2,3
31+
spark.query.writetime.cols 2,3
3032

3133
spark.counterTable false
3234
spark.counterTable.cql
3335
spark.counterTable.cql.index 0
3436

35-
spark.preserveTTLWriteTime true
36-
spark.preserveTTLWriteTime.ttl.cols 2,3
37-
spark.preserveTTLWriteTime.writetime.cols 2,3
38-
3937
spark.source.writeTimeStampFilter false
4038
spark.source.minWriteTimeStampFilter 0
4139
spark.source.maxWriteTimeStampFilter 9223372036854775807

0 commit comments

Comments
 (0)