Skip to content

Commit 3913fe4

Browse files
mieslepmsmygit
authored andcommitted
CDM-52 : implement writetimeIncrement feature which can avoid autocorrect creating duplicate list entries
1 parent 96ec01b commit 3913fe4

File tree

5 files changed

+70
-3
lines changed

5 files changed

+70
-3
lines changed

src/main/java/com/datastax/cdm/feature/WritetimeTTL.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class WritetimeTTL extends AbstractFeature {
2727
private Long filterMin;
2828
private Long filterMax;
2929
private boolean hasWriteTimestampFilter;
30+
private Long writetimeIncrement;
3031

3132
@Override
3233
public boolean loadProperties(IPropertyHelper propertyHelper) {
@@ -49,6 +50,8 @@ public boolean loadProperties(IPropertyHelper propertyHelper) {
4950
this.autoWritetimeNames = false;
5051
}
5152

53+
this.writetimeIncrement = propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME_INCREMENT);
54+
5255
this.filterMin = getMinFilter(propertyHelper);
5356
this.filterMax = getMaxFilter(propertyHelper);
5457
this.hasWriteTimestampFilter = (null != filterMin && null != filterMax && filterMin > 0 && filterMax > 0 && filterMax > filterMin);
@@ -74,6 +77,12 @@ protected boolean validateProperties() {
7477
validateFilterRangeProperties();
7578
validateTTLNames();
7679
validateWritetimeNames();
80+
81+
if (null==this.writetimeIncrement || this.writetimeIncrement < 0L) {
82+
logger.error(KnownProperties.TRANSFORM_CUSTOM_WRITETIME_INCREMENT + " must be set to a value greater than or equal to zero");
83+
isValid = false;
84+
}
85+
7786
return isValid;
7887
}
7988

@@ -120,6 +129,12 @@ public boolean initializeAndValidate(CqlTable originTable, CqlTable targetTable)
120129
isValid = false;
121130
}
122131

132+
if (this.writetimeIncrement == 0L && (null!=writetimeNames && !writetimeNames.isEmpty()) && originTable.hasUnfrozenList()) {
133+
logger.warn("Writetime is configured, but the origin table at least one unfrozen List, and there is a zero-value increment configured at "+
134+
KnownProperties.TRANSFORM_CUSTOM_WRITETIME_INCREMENT+"; this may result in duplicate list entries when "+
135+
KnownProperties.AUTOCORRECT_MISMATCH+" is enabled.");
136+
}
137+
123138
if (!isValid) isEnabled = false;
124139
logger.info("Feature {} is {}", this.getClass().getSimpleName(), isEnabled?"enabled":"disabled");
125140
return isValid;
@@ -162,7 +177,7 @@ public Long getLargestWriteTimeStamp(Row row) {
162177
.mapToLong(row::getLong)
163178
.filter(Objects::nonNull)
164179
.max();
165-
return max.isPresent() ? max.getAsLong() : null;
180+
return max.isPresent() ? max.getAsLong() + this.writetimeIncrement : null;
166181
}
167182

168183
public Integer getLargestTTL(Row row) {

src/main/java/com/datastax/cdm/properties/KnownProperties.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ public enum PropertyType {
142142
//==========================================================================
143143
public static final String TRANSFORM_REPLACE_MISSING_TS = "spark.cdm.transform.missing.key.ts.replace.value";
144144
public static final String TRANSFORM_CUSTOM_WRITETIME = "spark.cdm.transform.custom.writetime";
145+
public static final String TRANSFORM_CUSTOM_WRITETIME_INCREMENT = "spark.cdm.transform.custom.writetime.incrementBy";
145146
public static final String TRANSFORM_CODECS = "spark.cdm.transform.codecs";
146147
public static final String TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT = "spark.cdm.transform.codecs.timestamp.string.format";
147148
public static final String TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE = "spark.cdm.transform.codecs.timestamp.string.zone";
@@ -155,6 +156,8 @@ public enum PropertyType {
155156
types.put(TRANSFORM_REPLACE_MISSING_TS, PropertyType.NUMBER);
156157
types.put(TRANSFORM_CUSTOM_WRITETIME, PropertyType.NUMBER);
157158
defaults.put(TRANSFORM_CUSTOM_WRITETIME, "0");
159+
types.put(TRANSFORM_CUSTOM_WRITETIME_INCREMENT, PropertyType.NUMBER);
160+
defaults.put(TRANSFORM_CUSTOM_WRITETIME_INCREMENT, "0");
158161
types.put(TRANSFORM_CODECS, PropertyType.STRING_LIST);
159162
types.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT, PropertyType.STRING);
160163
defaults.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT, "yyyyMMddHHmmss");

src/main/java/com/datastax/cdm/schema/CqlTable.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,14 @@ public boolean isWritetimeTTLColumn(String columnName) {
342342
return this.writetimeTTLColumns.contains(columnName);
343343
}
344344

345+
public boolean hasUnfrozenList() {
346+
return this.cqlAllColumns.stream()
347+
.filter(columnMetadata ->
348+
columnNames.contains(columnMetadata.getName().asInternal()) &&
349+
columnMetadata.getType() instanceof ListType)
350+
.anyMatch(columnMetadata -> !CqlData.isFrozen(columnMetadata.getType()));
351+
}
352+
345353
private static ConsistencyLevel mapToConsistencyLevel(String level) {
346354
ConsistencyLevel retVal = ConsistencyLevel.LOCAL_QUORUM;
347355
if (StringUtils.isNotEmpty(level)) {

src/resources/sparkConf.properties

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,12 +191,18 @@ spark.cdm.perfops.writeRateLimit 40000
191191
# MigrateData operation would fail. This parameter allows a crude
192192
# constant value to be used in its place, separate from the Constant
193193
# Values feature.
194-
# .custom.writetime Default is 0 (diabled). Timestamp value in microseconds to use as the
194+
# .custom
195+
# .writetime Default is 0 (diabled). Timestamp value in microseconds to use as the
195196
# WRITETIME for the target record. This is useful when the WRITETIME of
196197
# the record in Origin cannot be determined (such as the only non-key
197198
# columns are collections). This parameter allows a crude constant value
198199
# to be used in its place, and overrides
199200
# .schema.origin.column.writetime.indexes.
201+
# .writetime.incrementBy Default is 0. This is useful when you have a List that is not frozen,
202+
# and are updating this via the autocorrect feature. Lists are not idempotent,
203+
# and subsequent UPSERTs would add duplicates to the list. Future versions
204+
# of CDM may tombstone the previous list, but for now this solution is
205+
# viable and, crucially, more performant.
200206
# .codecs Default is empty. A comma-separated list of additional codecs to
201207
# enable. Current codecs are:
202208
# INT_STRING : int stored in a String
@@ -216,7 +222,8 @@ spark.cdm.perfops.writeRateLimit 40000
216222
# .string.zone Default is UTC ; Must be in ZoneRulesProvider.getAvailableZoneIds()
217223
#-----------------------------------------------------------------------------------------------------------
218224
#spark.cdm.transform.missing.key.ts.replace.value
219-
#spark.cdm.transform.custom.writetime 0
225+
#spark.cdm.transform.custom.writetime 0
226+
#spark.cdm.transform.custom.writetime.incrementBy 0
220227
#spark.cdm.transform.codecs
221228
#spark.cdm.transform.codecs.timestamp.string.format yyyyMMddHHmmss
222229
#spark.cdm.transform.codecs.timestamp.string.zone UTC

src/test/java/com/datastax/cdm/feature/TTLAndWritetimeTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,4 +428,38 @@ public void testOriginIsTarget() {
428428
feature.loadProperties(propertyHelper);
429429
assertThrows(IllegalArgumentException.class, () -> feature.initializeAndValidate(originTable, targetTable));
430430
}
431+
432+
@Test
433+
public void writetimeIncrementTest() {
434+
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(0L);
435+
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME_INCREMENT)).thenReturn(314L);
436+
when(originTable.indexOf("WRITETIME("+writetimeColumnName+")")).thenReturn(100);
437+
when(originRow.getLong(eq(100))).thenReturn(1000L);
438+
when(originTable.indexOf("WRITETIME("+writetimeTTLColumnName+")")).thenReturn(101);
439+
when(originRow.getLong(eq(101))).thenReturn(3000L);
440+
441+
feature.loadProperties(propertyHelper);
442+
feature.initializeAndValidate(originTable, targetTable);
443+
444+
Long largestWritetime = feature.getLargestWriteTimeStamp(originRow);
445+
assertEquals(3314L, largestWritetime);
446+
}
447+
448+
@Test
449+
public void invalidWritetimeIncrement() {
450+
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME_INCREMENT)).thenReturn(-1L);
451+
feature.loadProperties(propertyHelper);
452+
assertFalse(feature.isEnabled());
453+
}
454+
455+
@Test
456+
public void testZeroIncrementWithUnfrozenList() {
457+
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME_INCREMENT)).thenReturn(0L);
458+
when(originTable.hasUnfrozenList()).thenReturn(true);
459+
460+
feature.loadProperties(propertyHelper);
461+
feature.initializeAndValidate(originTable, targetTable);
462+
assertTrue(feature.isEnabled());
463+
}
464+
431465
}

0 commit comments

Comments
 (0)