Skip to content

Commit 96ec01b

Browse files
mieslepmsmygit
authored andcommitted
CDM-53 writetime and ttl columns are now automatically detected and configured, or can be manually configured.
1 parent 2f41c11 commit 96ec01b

File tree

10 files changed

+321
-38
lines changed

10 files changed

+321
-38
lines changed

SIT/cdm.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ fi
9797

9898
spark-submit --properties-file "${PROPERTIES}" \
9999
--master "local[*]" \
100-
--conf "spark.driver.extraJavaOptions=-Dlog4j.configurationFile=file:///local/log4j.xml -Dcom.datastax.cdm.log.level=TRACE" \
101-
--conf "spark.executor.extraJavaOptions=-Dlog4j.configurationFile=file:///local/log4j.xml -Dcom.datastax.cdm.log.level=TRACE" \
100+
--conf "spark.driver.extraJavaOptions=-Dlog4j.configurationFile=file:///local/log4j.xml -Dcom.datastax.cdm.log.level=DEBUG" \
101+
--conf "spark.executor.extraJavaOptions=-Dlog4j.configurationFile=file:///local/log4j.xml -Dcom.datastax.cdm.log.level=DEBUG" \
102102
--class ${CLASS} \
103103
/local/cassandra-data-migrator.jar
104104

SIT/smoke/03_ttl_writetime/breakData.cql

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,33 @@ INSERT INTO origin.smoke_ttl_writetime(key, w_col3) VALUES ('record6','C---') U
2121
-- tw_col2 is TTL and Writetime, so both changes should be propatated by the diff
2222
INSERT INTO origin.smoke_ttl_writetime(key, tw_col2) VALUES ('record7','B---') USING TTL 700000 AND TIMESTAMP 1087384200000000; -- 11:10:00
2323

24-
25-
26-
SELECT * FROM target.smoke_ttl_writetime;
24+
SELECT key
25+
,t_col1
26+
,WRITETIME(t_col1) AS t_col1_wt
27+
,TTL(t_col1) AS t_col1_ttl
28+
,tw_col2
29+
,WRITETIME(tw_col2) AS tw_col2_wt
30+
,TTL(tw_col2) AS tw_col2_ttl
31+
,w_col3
32+
,WRITETIME(w_col3) AS w_col3_wt
33+
,TTL(w_col3) AS w_col3_ttl
34+
,col4
35+
,WRITETIME(col4) AS col4_wt
36+
,TTL(col4) AS col4_ttl
37+
FROM origin.smoke_ttl_writetime;
38+
39+
SELECT key
40+
,t_col1
41+
,WRITETIME(t_col1) AS t_col1_wt
42+
,TTL(t_col1) AS t_col1_ttl
43+
,tw_col2
44+
,WRITETIME(tw_col2) AS tw_col2_wt
45+
,TTL(tw_col2) AS tw_col2_ttl
46+
,w_col3
47+
,WRITETIME(w_col3) AS w_col3_wt
48+
,TTL(w_col3) AS w_col3_ttl
49+
,col4
50+
,WRITETIME(col4) AS col4_wt
51+
,TTL(col4) AS col4_ttl
52+
FROM target.smoke_ttl_writetime;
2753

SIT/smoke/03_ttl_writetime/setup.cql

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,19 +51,19 @@ INSERT INTO origin.smoke_ttl_writetime(key, w_col3) VALUES ('record8','CCCC') US
5151
INSERT INTO origin.smoke_ttl_writetime(key, col4) VALUES ('record8','DDDD') USING TTL 700000 AND TIMESTAMP 1087383780000000; -- 11:00:03
5252

5353

54-
SELECT key,
55-
t_col1,
56-
writetime(t_col1) as wt_t_col1,
57-
ttl(t_col1) as ttl_t_col1,
58-
tw_col2,
59-
writetime(tw_col2) as wt_tw_col2,
60-
ttl(tw_col2) as ttl_tw_col2,
61-
w_col3,
62-
writetime(w_col3) as wt_w_col3,
63-
ttl(w_col3) as ttl_w_col3,
64-
col4,
65-
writetime(col4) as wt_col4,
66-
ttl(col4) as ttl_col4
54+
SELECT key
55+
,t_col1
56+
,WRITETIME(t_col1) AS t_col1_wt
57+
,TTL(t_col1) AS t_col1_ttl
58+
,tw_col2
59+
,WRITETIME(tw_col2) AS tw_col2_wt
60+
,TTL(tw_col2) AS tw_col2_ttl
61+
,w_col3
62+
,WRITETIME(w_col3) AS w_col3_wt
63+
,TTL(w_col3) AS w_col3_ttl
64+
,col4
65+
,WRITETIME(col4) AS col4_wt
66+
,TTL(col4) AS col4_ttl
6767
FROM origin.smoke_ttl_writetime;
6868

6969
DROP TABLE IF EXISTS target.smoke_ttl_writetime;

src/main/java/com/datastax/cdm/data/CqlData.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,16 @@ public static boolean isCollection(DataType dataType) {
6363
return false;
6464
}
6565

66+
public static boolean isFrozen(DataType dataType) {
67+
if (isPrimitive(dataType)) return false;
68+
if (dataType instanceof UserDefinedType) return ((UserDefinedType) dataType).isFrozen();
69+
if (dataType instanceof ListType) return ((ListType) dataType).isFrozen();
70+
if (dataType instanceof SetType) return ((SetType) dataType).isFrozen();
71+
if (dataType instanceof MapType) return ((MapType) dataType).isFrozen();
72+
if (dataType instanceof TupleType) return dataType.asCql(true, false).contains("frozen<");
73+
return false;
74+
}
75+
6676
public static Class getBindClass(DataType dataType) {
6777
Class primitiveClass = primitiveDataTypeToJavaClassMap.get(dataType);
6878
if (primitiveClass != null) return primitiveClass;

src/main/java/com/datastax/cdm/feature/WritetimeTTL.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ public class WritetimeTTL extends AbstractFeature {
1818
private final boolean logDebug = logger.isDebugEnabled();;
1919

2020
private List<String> ttlNames;
21+
private boolean autoTTLNames;
2122
private List<String> writetimeNames;
23+
private boolean autoWritetimeNames;
2224
private Long customWritetime = 0L;
2325
private List<Integer> ttlSelectColumnIndexes = null;
2426
private List<Integer> writetimeSelectColumnIndexes = null;
@@ -33,14 +35,18 @@ public boolean loadProperties(IPropertyHelper propertyHelper) {
3335
logger.info("PARAM -- TTLCols: {}", ttlNames);
3436
}
3537

38+
this.autoTTLNames = propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO);
3639
this.writetimeNames = getWritetimeNames(propertyHelper);
3740
if (null!=this.writetimeNames && !this.writetimeNames.isEmpty()) {
3841
logger.info("PARAM -- WriteTimestampCols: {}", writetimeNames);
42+
this.autoTTLNames = false;
3943
}
4044

45+
this.autoWritetimeNames = propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO);
4146
this.customWritetime = getCustomWritetime(propertyHelper);
4247
if (this.customWritetime > 0) {
4348
logger.info("PARAM -- {}: {} datetime is {} ", KnownProperties.TRANSFORM_CUSTOM_WRITETIME, customWritetime, Instant.ofEpochMilli(customWritetime / 1000));
49+
this.autoWritetimeNames = false;
4450
}
4551

4652
this.filterMin = getMinFilter(propertyHelper);
@@ -80,15 +86,40 @@ public boolean initializeAndValidate(CqlTable originTable, CqlTable targetTable)
8086
throw new IllegalArgumentException("Origin table is not an origin table");
8187
}
8288

89+
if (originTable.isCounterTable()) {
90+
if (isEnabled) {
91+
logger.error("Counter table cannot specify TTL or WriteTimestamp columns as they cannot set on write");
92+
isValid = false;
93+
isEnabled = false;
94+
return false;
95+
}
96+
97+
logger.info("Counter table does not support TTL or WriteTimestamp columns as they cannot set on write, so feature is disabled");
98+
return true;
99+
}
100+
83101
isValid = true;
84102
if (!validateProperties()) {
85103
isEnabled = false;
86104
return false;
87105
}
88106

107+
if (autoTTLNames) {
108+
this.ttlNames = originTable.getWritetimeTTLColumns();
109+
}
110+
111+
if (autoWritetimeNames) {
112+
this.writetimeNames = originTable.getWritetimeTTLColumns();
113+
}
114+
89115
validateTTLColumns(originTable);
90116
validateWritetimeColumns(originTable);
91117

118+
if (hasWriteTimestampFilter && (null==writetimeNames || writetimeNames.isEmpty())) {
119+
logger.error("WriteTimestamp filter is configured but no WriteTimestamp columns are defined");
120+
isValid = false;
121+
}
122+
92123
if (!isValid) isEnabled = false;
93124
logger.info("Feature {} is {}", this.getClass().getSimpleName(), isEnabled?"enabled":"disabled");
94125
return isValid;
@@ -117,8 +148,8 @@ public static Long getMaxFilter(IPropertyHelper propertyHelper) {
117148

118149
public Long getCustomWritetime() { return customWritetime; }
119150
public boolean hasWriteTimestampFilter() { return isEnabled && hasWriteTimestampFilter; }
120-
public Long getMinWriteTimeStampFilter() { return this.hasWriteTimestampFilter ? this.filterMin : Long.MIN_VALUE; }
121-
public Long getMaxWriteTimeStampFilter() { return this.hasWriteTimestampFilter ? this.filterMax : Long.MAX_VALUE; }
151+
public Long getMinWriteTimeStampFilter() { return (this.hasWriteTimestampFilter && null!=this.filterMin) ? this.filterMin : Long.MIN_VALUE; }
152+
public Long getMaxWriteTimeStampFilter() { return (this.hasWriteTimestampFilter && null!=this.filterMax) ? this.filterMax : Long.MAX_VALUE; }
122153

123154
public boolean hasTTLColumns() { return null!=this.ttlSelectColumnIndexes && !this.ttlSelectColumnIndexes.isEmpty(); }
124155
public boolean hasWritetimeColumns() { return customWritetime>0 || null!=this.writetimeSelectColumnIndexes && !this.writetimeSelectColumnIndexes.isEmpty(); }
@@ -157,6 +188,12 @@ private void validateTTLColumns(CqlTable originTable) {
157188
logger.error("TTL column {} is not present on origin table {}", ttlName, originTable.getKeyspaceName());
158189
isValid = false;
159190
return;
191+
} else {
192+
if (!originTable.isWritetimeTTLColumn(ttlName)) {
193+
logger.error("TTL column {} is not a column which can provide a TTL on origin table {}", ttlName, originTable.getKeyspaceName());
194+
isValid = false;
195+
return;
196+
}
160197
}
161198

162199
newColumnNames.add("TTL(" + ttlName + ")");
@@ -183,6 +220,12 @@ private void validateWritetimeColumns(CqlTable originTable) {
183220
logger.error("Writetime column {} is not configured for origin table {}", writetimeName, originTable.getKeyspaceName());
184221
isValid = false;
185222
return;
223+
} else {
224+
if (!originTable.isWritetimeTTLColumn(writetimeName)) {
225+
logger.error("Writetime column {} is not a column which can provide a WRITETIME on origin table {}", writetimeName, originTable.getKeyspaceName());
226+
isValid = false;
227+
return;
228+
}
186229
}
187230

188231
newColumnNames.add("WRITETIME(" + writetimeName + ")");

src/main/java/com/datastax/cdm/properties/KnownProperties.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,22 @@ public enum PropertyType {
6060
// Properties that describe the origin schema
6161
//==========================================================================
6262
public static final String ORIGIN_KEYSPACE_TABLE = "spark.cdm.schema.origin.keyspaceTable";
63+
public static final String ORIGIN_TTL_AUTO = "spark.cdm.schema.origin.column.ttl.automatic";
6364
public static final String ORIGIN_TTL_NAMES = "spark.cdm.schema.origin.column.ttl.names";
65+
public static final String ORIGIN_WRITETIME_AUTO = "spark.cdm.schema.origin.column.writetime.automatic";
6466
public static final String ORIGIN_WRITETIME_NAMES = "spark.cdm.schema.origin.column.writetime.names";
6567

6668
public static final String ORIGIN_COLUMN_NAMES_TO_TARGET = "spark.cdm.schema.origin.column.names.to.target";
6769

68-
6970
static {
7071
types.put(ORIGIN_KEYSPACE_TABLE, PropertyType.STRING);
7172
required.add(ORIGIN_KEYSPACE_TABLE);
7273
types.put(ORIGIN_TTL_NAMES, PropertyType.STRING_LIST);
74+
types.put(ORIGIN_TTL_AUTO, PropertyType.BOOLEAN);
75+
defaults.put(ORIGIN_TTL_AUTO, "true");
7376
types.put(ORIGIN_WRITETIME_NAMES, PropertyType.STRING_LIST);
77+
types.put(ORIGIN_WRITETIME_AUTO, PropertyType.BOOLEAN);
78+
defaults.put(ORIGIN_WRITETIME_AUTO, "true");
7479

7580
types.put(ORIGIN_COLUMN_NAMES_TO_TARGET, PropertyType.STRING_LIST);
7681
}

src/main/java/com/datastax/cdm/schema/CqlTable.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class CqlTable extends BaseTable {
4545
private List<ColumnMetadata> cqlAllColumns;
4646
private Map<String,DataType> columnNameToCqlTypeMap;
4747
private final List<Class> bindClasses;
48+
private List<String> writetimeTTLColumns;
4849

4950
private CqlTable otherCqlTable;
5051
private List<Integer> correspondingIndexes;
@@ -306,13 +307,41 @@ private void setCqlMetadata(CqlSession cqlSession) {
306307
.filter(md -> !this.cqlAllColumns.contains(md))
307308
.collect(Collectors.toCollection(() -> this.cqlAllColumns));
308309

310+
this.writetimeTTLColumns = tableMetadata.getColumns().values().stream()
311+
.filter(columnMetadata -> canColumnHaveTTLorWritetime(tableMetadata, columnMetadata))
312+
.map(ColumnMetadata::getName)
313+
.map(CqlIdentifier::asInternal)
314+
.collect(Collectors.toList());
315+
309316
this.columnNameToCqlTypeMap = this.cqlAllColumns.stream()
310317
.collect(Collectors.toMap(
311318
columnMetadata -> columnMetadata.getName().asInternal(),
312319
ColumnMetadata::getType
313320
));
314321
}
315322

323+
private boolean canColumnHaveTTLorWritetime(TableMetadata tableMetadata, ColumnMetadata columnMetadata) {
324+
DataType dataType = columnMetadata.getType();
325+
boolean isKeyColumn = tableMetadata.getPartitionKey().contains(columnMetadata) ||
326+
tableMetadata.getClusteringColumns().containsKey(columnMetadata);
327+
328+
if (isKeyColumn) return false;
329+
if (CqlData.isPrimitive(dataType)) return true;
330+
if (dataType instanceof TupleType) return true; // TODO: WRITETIME and TTL functions are very slow on Tuples in cqlsh...should they be supported here?
331+
if (CqlData.isFrozen(dataType)) return true;
332+
return false;
333+
}
334+
335+
public List<String> getWritetimeTTLColumns() {
336+
return this.writetimeTTLColumns.stream()
337+
.filter(columnName -> this.columnNames.contains(columnName))
338+
.collect(Collectors.toList());
339+
}
340+
341+
public boolean isWritetimeTTLColumn(String columnName) {
342+
return this.writetimeTTLColumns.contains(columnName);
343+
}
344+
316345
private static ConsistencyLevel mapToConsistencyLevel(String level) {
317346
ConsistencyLevel retVal = ConsistencyLevel.LOCAL_QUORUM;
318347
if (StringUtils.isNotEmpty(level)) {

src/resources/sparkConf.properties

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,29 @@ spark.cdm.target.connect.password cassandra
5353
# Recommended Parameters:
5454
# spark.cdm.schema.origin
5555
# .column
56-
# .ttl.names : Default is empty. Names from .column.names to be combined using the MAX
57-
# function to determine the TTL of the entire migrated record. Will use target
58-
# table default when not set. The names cannot include any columns listed in
59-
# partition-key,clustering-key.
60-
# .writetime.names: Default is empty. Names from .column.names to be combined using the MAX
61-
# function to determine the TIMESTAMP of the entire migrated record. Will use
62-
# target table default when not set. The names cannot include any columns
63-
# listed in the primary key e.g. partition-key,clustering-key
56+
# .ttl
57+
# .automatic : Default is true, unless .ttl.names is specified. When true, the TTL of the
58+
# target record will be determined by finding the maxiumum TTL of
59+
# all origin columns that can have TTL set (which excludes partition key,
60+
# clustering key, collections/UDT/tuple, and frozen columns). When false, and
61+
# .names is not set, the target record will have the TTL determined by the target
62+
# table configuration.
63+
# .names : Default is empty, meaning they will be determined automatically if that is set
64+
# (see above). Specify a subset of eligible columns that are used to calculate
65+
# the TTL of the target record.
66+
# .writetime
67+
# .automatic : Default is true, unless .writetime.names is specified. When true, the WRITETIME of
68+
# the target record will be determined by finding the maxiumum WRITETIME of
69+
# all origin columns that can have WRITETIME set (which excludes partition key,
70+
# clustering key, collections/UDT/tuple, and frozen columns). When false, and
71+
# .names is not set, the target record will have the WRITETIME determined by the target
72+
# table configuration.
73+
#
74+
# *** Note spark.cdm.transform.custom.writetime overrides this setting ***
75+
#
76+
# .names : Default is empty, meaning they will be determined automatically if that is set
77+
# (see above). Specify a subset of eligible columns that are used to calculate
78+
# the WRITETIME of the target record.
6479
#
6580
# Other Parameters:
6681
# spark.cdm.schema.origin
@@ -70,10 +85,12 @@ spark.cdm.target.connect.password cassandra
7085
# origin_column_name:target_column_name. The list is comma-separated. Only renamed
7186
# columns need to be listed.
7287
#-----------------------------------------------------------------------------------------------------------
73-
spark.cdm.schema.origin.keyspaceTable keyspace_name.table_name
74-
spark.cdm.schema.origin.column.ttl.names data_col1,data_col2,...
75-
spark.cdm.schema.origin.column.writetime.names data_col1,data_col2,...
76-
#spark.cdm.schema.origin.column.names.to.target partition_col1:partition_col_1,partition_col2:partition_col_2,...
88+
spark.cdm.schema.origin.keyspaceTable keyspace_name.table_name
89+
#spark.cdm.schema.origin.column.ttl.automatic true
90+
#spark.cdm.schema.origin.column.ttl.names data_col1,data_col2,...
91+
#spark.cdm.schema.origin.column.writetime.automatic true
92+
#spark.cdm.schema.origin.column.writetime.names data_col1,data_col2,...
93+
#spark.cdm.schema.origin.column.names.to.target partition_col1:partition_col_1,partition_col2:partition_col_2,...
7794

7895
#===========================================================================================================
7996
# Details about the Target Schema

src/test/java/com/datastax/cdm/cql/CommonMocks.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ public void setOriginTableWhens() {
259259
});
260260
when(originTable.getColumnCqlTypes()).thenReturn(originColumnTypes);
261261
when(originTable.getDataType(anyString())).thenAnswer(invocation -> {String name = invocation.getArgument(0,String.class); return originColumnTypes.get(originColumnNames.indexOf(name));});
262+
when(originTable.getWritetimeTTLColumns()).thenReturn(originValueColumns);
262263

263264
when(originTable.getOtherCqlTable()).thenReturn(targetTable);
264265
when(originTable.getCorrespondingIndex(anyInt())).thenAnswer(invocation -> targetColumnNames.indexOf(originColumnNames.get(invocation.getArgument(0, Integer.class))));

0 commit comments

Comments
 (0)