Skip to content

Commit b16d664

Browse files
authored
Allows using Collection and/or UDT fields for ttl & writetime calculations (#319)
* Implemented support for preserving writetimes & TTL on tables with only collection * Additional tests, fixes and docs * Fixed naming
1 parent efb1558 commit b16d664

17 files changed

+492
-42
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,9 @@ spark-submit --properties-file cdm.properties \
146146
# Things to know
147147
- Each run (Migration or Validation) can be tracked (when enabled). You can find summary and details of the same in tables `cdm_run_info` and `cdm_run_details` in the target keyspace.
148148
- CDM does not migrate `ttl` & `writetime` at the field-level (for optimization reasons). It instead finds the field with the highest `ttl` & the field with the highest `writetime` within an `origin` row and uses those values on the entire `target` row.
149-
- CDM ignores `ttl` & `writetime` on collection and UDT fields while computing the highest value
150-
- If a table has only collection and/or UDT non-key columns and not table-level `ttl` configuration, the target will have no `ttl`, which can lead to inconsistencies between `origin` and `target` as rows expire on `origin` due to `ttl` expiry.
151-
- If a table has only collection and/or UDT non-key columns, the `writetime` used on target will be time the job was run. Alternatively if needed, the param `spark.cdm.transform.custom.writetime` can be used to set a static custom value for `writetime`.
149+
- CDM ignores using collection and UDT fields for `ttl` & `writetime` calculations by default for performance reasons. If you want to include such fields, set `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true`.
150+
- If a table has only collection and/or UDT non-key columns and no table-level `ttl` configuration, the target will have no `ttl`, which can lead to inconsistencies between `origin` and `target` as rows expire on `origin` due to `ttl` expiry. If you want to avoid this, we recommend setting `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true` in such scenarios.
151+
- If a table has only collection and/or UDT non-key columns, the `writetime` used on target will be time the job was run. If you want to avoid this, we recommend setting `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true` in such scenarios.
152152
- When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in `list` type columns. Note this is [due to a Cassandra/DSE bug](https://issues.apache.org/jira/browse/CASSANDRA-11368) and not a CDM issue. This issue can be addressed by enabling and setting a positive value for `spark.cdm.transform.custom.writetime.incrementBy` param. This param was specifically added to address this issue.
153153
- When you rerun job to resume from a previous run, the run metrics (read, write, skipped, etc.) captured in table `cdm_run_info` will be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well.
154154

RELEASE.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
# Release Notes
2+
## [4.6.0] - 2024-10-18
3+
- Allow using Collections and/or UDTs for `ttl` & `writetime` calculations. This is specifically helpful in scenarios where the only non-key columns are Collections and/or UDTs.
4+
25
## [4.5.1] - 2024-10-11
36
- Made CDM generated SCB unique & much short-lived when using the TLS option to connect to Astra more securely.
47

src/main/java/com/datastax/cdm/cql/EnhancedSession.java

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,27 @@
1515
*/
1616
package com.datastax.cdm.cql;
1717

18+
import java.util.ArrayList;
19+
import java.util.Arrays;
1820
import java.util.List;
1921

2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
2224

2325
import com.datastax.cdm.cql.codec.CodecFactory;
2426
import com.datastax.cdm.cql.codec.Codecset;
25-
import com.datastax.cdm.cql.statement.*;
27+
import com.datastax.cdm.cql.statement.OriginSelectByPKStatement;
28+
import com.datastax.cdm.cql.statement.OriginSelectByPartitionRangeStatement;
29+
import com.datastax.cdm.cql.statement.TargetInsertStatement;
30+
import com.datastax.cdm.cql.statement.TargetSelectByPKStatement;
31+
import com.datastax.cdm.cql.statement.TargetUpdateStatement;
32+
import com.datastax.cdm.cql.statement.TargetUpsertStatement;
2633
import com.datastax.cdm.data.PKFactory;
2734
import com.datastax.cdm.properties.KnownProperties;
2835
import com.datastax.cdm.properties.PropertyHelper;
2936
import com.datastax.cdm.schema.CqlTable;
3037
import com.datastax.oss.driver.api.core.CqlSession;
31-
import com.datastax.oss.driver.api.core.type.DataType;
32-
import com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException;
33-
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
3438
import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
35-
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
3639

3740
public class EnhancedSession {
3841
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
@@ -96,26 +99,16 @@ public TargetUpsertStatement getTargetUpsertStatement() {
9699
}
97100

98101
private CqlSession initSession(PropertyHelper propertyHelper, CqlSession session) {
99-
List<String> codecList = propertyHelper.getStringList(KnownProperties.TRANSFORM_CODECS);
100-
if (null != codecList && !codecList.isEmpty()) {
101-
MutableCodecRegistry registry = (MutableCodecRegistry) session.getContext().getCodecRegistry();
102-
103-
for (String codecString : codecList) {
104-
Codecset codecEnum = Codecset.valueOf(codecString);
105-
for (TypeCodec<?> codec : CodecFactory.getCodecPair(propertyHelper, codecEnum)) {
106-
DataType dataType = codec.getCqlType();
107-
GenericType<?> javaType = codec.getJavaType();
108-
if (logDebug)
109-
logger.debug("Registering Codec {} for CQL type {} and Java type {}",
110-
codec.getClass().getSimpleName(), dataType, javaType);
111-
try {
112-
registry.codecFor(dataType, javaType);
113-
} catch (CodecNotFoundException e) {
114-
registry.register(codec);
115-
}
116-
}
117-
}
118-
}
102+
// BIGINT_BIGINTEGER codec is always needed to compare C* writetimes in collection columns
103+
List<String> codecList = new ArrayList<>(Arrays.asList("BIGINT_BIGINTEGER"));
104+
105+
if (null != propertyHelper.getStringList(KnownProperties.TRANSFORM_CODECS))
106+
codecList.addAll(propertyHelper.getStringList(KnownProperties.TRANSFORM_CODECS));
107+
MutableCodecRegistry registry = (MutableCodecRegistry) session.getContext().getCodecRegistry();
108+
109+
codecList.stream().map(Codecset::valueOf).map(codec -> CodecFactory.getCodecPair(propertyHelper, codec))
110+
.flatMap(List::stream).forEach(registry::register);
111+
119112
return session;
120113
}
121114

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.cdm.cql.codec;
17+
18+
import java.math.BigInteger;
19+
import java.nio.ByteBuffer;
20+
21+
import org.jetbrains.annotations.NotNull;
22+
23+
import com.datastax.cdm.properties.PropertyHelper;
24+
import com.datastax.oss.driver.api.core.ProtocolVersion;
25+
import com.datastax.oss.driver.api.core.type.DataType;
26+
import com.datastax.oss.driver.api.core.type.DataTypes;
27+
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
28+
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
29+
30+
public class BIGINT_BigIntegerCodec extends AbstractBaseCodec<BigInteger> {
31+
32+
public BIGINT_BigIntegerCodec(PropertyHelper propertyHelper) {
33+
super(propertyHelper);
34+
}
35+
36+
@Override
37+
public @NotNull GenericType<BigInteger> getJavaType() {
38+
return GenericType.BIG_INTEGER;
39+
}
40+
41+
@Override
42+
public @NotNull DataType getCqlType() {
43+
return DataTypes.BIGINT;
44+
}
45+
46+
@Override
47+
public ByteBuffer encode(BigInteger value, @NotNull ProtocolVersion protocolVersion) {
48+
if (value == null) {
49+
return null;
50+
} else {
51+
return TypeCodecs.BIGINT.encode(value.longValue(), protocolVersion);
52+
}
53+
}
54+
55+
@Override
56+
public BigInteger decode(ByteBuffer bytes, @NotNull ProtocolVersion protocolVersion) {
57+
return BigInteger.valueOf(TypeCodecs.BIGINT.decode(bytes, protocolVersion));
58+
}
59+
60+
@Override
61+
public @NotNull String format(BigInteger value) {
62+
return TypeCodecs.BIGINT.format(value.longValue());
63+
}
64+
65+
@Override
66+
public BigInteger parse(String value) {
67+
return BigInteger.valueOf(TypeCodecs.BIGINT.parse(value));
68+
}
69+
70+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.cdm.cql.codec;
17+
18+
import java.nio.ByteBuffer;
19+
20+
import org.jetbrains.annotations.NotNull;
21+
22+
import com.datastax.cdm.properties.PropertyHelper;
23+
import com.datastax.oss.driver.api.core.ProtocolVersion;
24+
import com.datastax.oss.driver.api.core.type.DataType;
25+
import com.datastax.oss.driver.api.core.type.DataTypes;
26+
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
27+
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
28+
29+
public class BigInteger_BIGINTCodec extends AbstractBaseCodec<Integer> {
30+
31+
public BigInteger_BIGINTCodec(PropertyHelper propertyHelper) {
32+
super(propertyHelper);
33+
}
34+
35+
@Override
36+
public @NotNull GenericType<Integer> getJavaType() {
37+
return GenericType.INTEGER;
38+
}
39+
40+
@Override
41+
public @NotNull DataType getCqlType() {
42+
return DataTypes.INT;
43+
}
44+
45+
@Override
46+
public ByteBuffer encode(Integer value, @NotNull ProtocolVersion protocolVersion) {
47+
if (value == null) {
48+
return null;
49+
} else {
50+
return TypeCodecs.INT.encode(value, protocolVersion);
51+
}
52+
}
53+
54+
@Override
55+
public Integer decode(ByteBuffer bytes, @NotNull ProtocolVersion protocolVersion) {
56+
return TypeCodecs.INT.decode(bytes, protocolVersion);
57+
}
58+
59+
@Override
60+
public @NotNull String format(Integer value) {
61+
return TypeCodecs.INT.format(value);
62+
}
63+
64+
@Override
65+
public Integer parse(String value) {
66+
return TypeCodecs.INT.parse(value);
67+
}
68+
69+
}

src/main/java/com/datastax/cdm/cql/codec/CodecFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ public static List<TypeCodec<?>> getCodecPair(PropertyHelper propertyHelper, Cod
3434
return Arrays.asList(new DOUBLE_StringCodec(propertyHelper), new TEXT_DoubleCodec(propertyHelper));
3535
case BIGINT_STRING:
3636
return Arrays.asList(new BIGINT_StringCodec(propertyHelper), new TEXT_LongCodec(propertyHelper));
37+
case BIGINT_BIGINTEGER:
38+
return Arrays.asList(new BIGINT_BigIntegerCodec(propertyHelper),
39+
new BigInteger_BIGINTCodec(propertyHelper));
3740
case STRING_BLOB:
3841
return Arrays.asList(new TEXT_BLOBCodec(propertyHelper), new BLOB_TEXTCodec(propertyHelper));
3942
case ASCII_BLOB:

src/main/java/com/datastax/cdm/cql/codec/Codecset.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@
1616
package com.datastax.cdm.cql.codec;
1717

1818
public enum Codecset {
19-
INT_STRING, DOUBLE_STRING, BIGINT_STRING, DECIMAL_STRING, TIMESTAMP_STRING_MILLIS, TIMESTAMP_STRING_FORMAT,
20-
POINT_TYPE, POLYGON_TYPE, DATE_RANGE, LINE_STRING, STRING_BLOB, ASCII_BLOB
19+
INT_STRING, DOUBLE_STRING, BIGINT_STRING, BIGINT_BIGINTEGER, DECIMAL_STRING, TIMESTAMP_STRING_MILLIS,
20+
TIMESTAMP_STRING_FORMAT, POINT_TYPE, POLYGON_TYPE, DATE_RANGE, LINE_STRING, STRING_BLOB, ASCII_BLOB
2121
}

src/main/java/com/datastax/cdm/feature/WritetimeTTL.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.datastax.cdm.feature;
1717

18+
import java.math.BigInteger;
1819
import java.time.Instant;
1920
import java.util.*;
2021
import java.util.stream.Collectors;
@@ -45,6 +46,7 @@ public class WritetimeTTL extends AbstractFeature {
4546
private Long filterMax;
4647
private boolean hasWriteTimestampFilter;
4748
private Long writetimeIncrement;
49+
private boolean allowCollectionsForWritetimeTTL;
4850

4951
@Override
5052
public boolean loadProperties(IPropertyHelper propertyHelper) {
@@ -61,7 +63,7 @@ public boolean loadProperties(IPropertyHelper propertyHelper) {
6163
logger.info("PARAM -- WriteTimestampCols: {}", writetimeNames);
6264
this.autoWritetimeNames = false;
6365
}
64-
66+
allowCollectionsForWritetimeTTL = propertyHelper.getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_CALC);
6567
this.customWritetime = getCustomWritetime(propertyHelper);
6668
if (this.customWritetime > 0) {
6769
logger.info("PARAM -- {}: {} datetime is {} ", KnownProperties.TRANSFORM_CUSTOM_WRITETIME, customWritetime,
@@ -233,20 +235,49 @@ public Long getLargestWriteTimeStamp(Row row) {
233235
return this.customWritetime;
234236
if (null == this.writetimeSelectColumnIndexes || this.writetimeSelectColumnIndexes.isEmpty())
235237
return null;
236-
OptionalLong max = this.writetimeSelectColumnIndexes.stream().mapToLong(row::getLong).filter(Objects::nonNull)
237-
.max();
238+
239+
OptionalLong max = (allowCollectionsForWritetimeTTL) ? getMaxWriteTimeStampForCollections(row)
240+
: getMaxWriteTimeStamp(row);
241+
238242
return max.isPresent() ? max.getAsLong() + this.writetimeIncrement : null;
239243
}
240244

245+
private OptionalLong getMaxWriteTimeStampForCollections(Row row) {
246+
return this.writetimeSelectColumnIndexes.stream().map(col -> {
247+
if (row.getType(col).equals(DataTypes.BIGINT))
248+
return Arrays.asList(row.getLong(col));
249+
return row.getList(col, BigInteger.class).stream().filter(Objects::nonNull).map(BigInteger::longValue)
250+
.collect(Collectors.toList());
251+
}).flatMap(List::stream).filter(Objects::nonNull).mapToLong(Long::longValue).max();
252+
}
253+
254+
private OptionalLong getMaxWriteTimeStamp(Row row) {
255+
return this.writetimeSelectColumnIndexes.stream().filter(Objects::nonNull).mapToLong(row::getLong).max();
256+
}
257+
241258
public Integer getLargestTTL(Row row) {
242259
if (logDebug)
243260
logger.debug("getLargestTTL: customTTL={}, ttlSelectColumnIndexes={}", customTTL, ttlSelectColumnIndexes);
244261
if (this.customTTL > 0)
245262
return this.customTTL.intValue();
246263
if (null == this.ttlSelectColumnIndexes || this.ttlSelectColumnIndexes.isEmpty())
247264
return null;
248-
OptionalInt max = this.ttlSelectColumnIndexes.stream().mapToInt(row::getInt).filter(Objects::nonNull).max();
249-
return max.isPresent() ? max.getAsInt() : null;
265+
266+
OptionalInt max = (allowCollectionsForWritetimeTTL) ? getMaxTTLForCollections(row) : getMaxTTL(row);
267+
268+
return max.isPresent() ? max.getAsInt() : 0;
269+
}
270+
271+
private OptionalInt getMaxTTLForCollections(Row row) {
272+
return this.ttlSelectColumnIndexes.stream().map(col -> {
273+
if (row.getType(col).equals(DataTypes.INT))
274+
return Arrays.asList(row.getInt(col));
275+
return row.getList(col, Integer.class).stream().filter(Objects::nonNull).collect(Collectors.toList());
276+
}).flatMap(List::stream).filter(Objects::nonNull).mapToInt(Integer::intValue).max();
277+
}
278+
279+
private OptionalInt getMaxTTL(Row row) {
280+
return this.ttlSelectColumnIndexes.stream().filter(Objects::nonNull).mapToInt(row::getInt).max();
250281
}
251282

252283
private void validateTTLColumns(CqlTable originTable) {

src/main/java/com/datastax/cdm/properties/KnownProperties.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public enum PropertyType {
7878
public static final String ORIGIN_TTL_NAMES = "spark.cdm.schema.origin.column.ttl.names";
7979
public static final String ORIGIN_WRITETIME_AUTO = "spark.cdm.schema.origin.column.writetime.automatic";
8080
public static final String ORIGIN_WRITETIME_NAMES = "spark.cdm.schema.origin.column.writetime.names";
81+
public static final String ALLOW_COLL_FOR_WRITETIME_TTL_CALC = "spark.cdm.schema.ttlwritetime.calc.useCollections";
8182

8283
public static final String ORIGIN_COLUMN_NAMES_TO_TARGET = "spark.cdm.schema.origin.column.names.to.target";
8384

@@ -90,6 +91,8 @@ public enum PropertyType {
9091
types.put(ORIGIN_WRITETIME_NAMES, PropertyType.STRING_LIST);
9192
types.put(ORIGIN_WRITETIME_AUTO, PropertyType.BOOLEAN);
9293
defaults.put(ORIGIN_WRITETIME_AUTO, "true");
94+
types.put(ALLOW_COLL_FOR_WRITETIME_TTL_CALC, PropertyType.BOOLEAN);
95+
defaults.put(ALLOW_COLL_FOR_WRITETIME_TTL_CALC, "false");
9396
types.put(ORIGIN_COLUMN_NAMES_TO_TARGET, PropertyType.STRING_LIST);
9497
}
9598

src/main/java/com/datastax/cdm/schema/CqlTable.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,15 +470,19 @@ private void setCqlMetadata(CqlSession cqlSession) {
470470
.filter(md -> !extractJsonExclusive || md.getName().asCql(true).endsWith(columnName))
471471
.collect(Collectors.toCollection(() -> this.cqlAllColumns));
472472

473+
boolean allowCollectionsForWritetimeTTL = propertyHelper
474+
.getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_CALC);
473475
this.writetimeTTLColumns = tableMetadata.getColumns().values().stream()
474-
.filter(columnMetadata -> canColumnHaveTTLorWritetime(tableMetadata, columnMetadata))
476+
.filter(columnMetadata -> canColumnHaveTTLorWritetime(tableMetadata, columnMetadata,
477+
allowCollectionsForWritetimeTTL))
475478
.map(ColumnMetadata::getName).map(CqlIdentifier::asInternal).collect(Collectors.toList());
476479

477480
this.columnNameToCqlTypeMap = this.cqlAllColumns.stream().collect(
478481
Collectors.toMap(columnMetadata -> columnMetadata.getName().asInternal(), ColumnMetadata::getType));
479482
}
480483

481-
private boolean canColumnHaveTTLorWritetime(TableMetadata tableMetadata, ColumnMetadata columnMetadata) {
484+
private boolean canColumnHaveTTLorWritetime(TableMetadata tableMetadata, ColumnMetadata columnMetadata,
485+
boolean allowCollectionsForWritetimeTTL) {
482486
DataType dataType = columnMetadata.getType();
483487
boolean isKeyColumn = tableMetadata.getPartitionKey().contains(columnMetadata)
484488
|| tableMetadata.getClusteringColumns().containsKey(columnMetadata);
@@ -492,6 +496,8 @@ private boolean canColumnHaveTTLorWritetime(TableMetadata tableMetadata, ColumnM
492496
// supported here?
493497
if (CqlData.isFrozen(dataType))
494498
return true;
499+
if (allowCollectionsForWritetimeTTL && CqlData.isCollection(dataType))
500+
return true;
495501
return false;
496502
}
497503

0 commit comments

Comments
 (0)