Skip to content

Commit 28d4861

Browse files
committed
CDM-35 enabling custom writetime feature
1 parent eba1718 commit 28d4861

File tree

4 files changed

+25
-5
lines changed

4 files changed

+25
-5
lines changed

src/main/java/datastax/cdm/cql/statement/AbstractOriginSelectStatement.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public abstract class AbstractOriginSelectStatement extends BaseCdmStatement {
2323
private final Boolean writeTimestampFilterEnabled;
2424
private final Long minWriteTimeStampFilter;
2525
private final Long maxWriteTimeStampFilter;
26+
private final Long customWritetime;
2627

2728
private final Boolean filterColumnEnabled;
2829
private final Integer filterColumnIndex;
@@ -46,6 +47,11 @@ public AbstractOriginSelectStatement(PropertyHelper propertyHelper, CqlHelper cq
4647
Instant.ofEpochMilli(getMaxWriteTimeStampFilter() / 1000));
4748
}
4849

50+
customWritetime = getCustomWritetime();
51+
if (customWritetime > 0)
52+
logger.info("PARAM -- {}: {} datetime is {} ", KnownProperties.TRANSFORM_CUSTOM_WRITETIME, customWritetime,
53+
Instant.ofEpochMilli(customWritetime / 1000));
54+
4955
filterColumnString = getFilterColumnString();
5056
filterColumnIndex = getFilterColumnIndex();
5157
filterColumnType = getFilterColumnType();
@@ -107,6 +113,7 @@ public boolean shouldFilterRecord(Record record) {
107113
}
108114

109115
public Long getLargestWriteTimeStamp(Row row) {
116+
if (customWritetime > 0) return customWritetime;
110117
if (null==writeTimestampIndexes || writeTimestampIndexes.isEmpty()) return null;
111118
OptionalLong max = writeTimestampIndexes.stream()
112119
.mapToLong(row::getLong)
@@ -172,6 +179,11 @@ public boolean hasWriteTimestampFilter() {
172179
null != writetimeCols && !writetimeCols.isEmpty());
173180
}
174181

182+
private Long getCustomWritetime() {
183+
Long rtn = propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME);
184+
return (null==rtn || rtn < 0) ? 0L : rtn;
185+
}
186+
175187
private String getFilterColumnString() {
176188
String rtn = propertyHelper.getString(KnownProperties.FILTER_COLUMN_VALUE);
177189
if (null!=rtn) return rtn.trim();

src/main/java/datastax/cdm/cql/statement/AbstractTargetUpsertStatement.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,9 @@ private void setTTLAndWriteTimeNames() {
116116
List<Integer> ttlColumnNames = propertyHelper.getIntegerList(KnownProperties.ORIGIN_TTL_INDEXES);
117117
usingTTL = null!= ttlColumnNames && !ttlColumnNames.isEmpty();
118118
List<Integer> writeTimeColumnNames = propertyHelper.getIntegerList(KnownProperties.ORIGIN_WRITETIME_INDEXES);
119-
usingWriteTime = null!= writeTimeColumnNames && !writeTimeColumnNames.isEmpty();
119+
Long customWritetime = propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME);
120+
usingWriteTime = (null!= writeTimeColumnNames && !writeTimeColumnNames.isEmpty()
121+
|| null != customWritetime && customWritetime > 0);
120122
}
121123

122124
private void setCounterIndexes() {

src/main/java/datastax/cdm/properties/KnownProperties.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,16 +173,15 @@ public enum PropertyType {
173173
// Guardrails and Transformations
174174
//==========================================================================
175175
public static final String TRANSFORM_REPLACE_MISSING_TS = "spark.cdm.transform.missing.key.ts.replace.value";
176+
public static final String TRANSFORM_CUSTOM_WRITETIME = "spark.cdm.transform.custom.writetime";
176177

177-
// TODO: this was previously in the configuration but not referenced in 3.3.0 other than in config
178-
// public static final String TRANSFORM_WRITETIME_CUSTOM_VALUE = "spark.target.custom.writeTime"; // 0
179178
// TODO: 3.3.0 refactored how guardrails are handled, this needs to be merged forward
180179
// public static final String GUARDRAIL_FIELD_LIMIT_MB = "spark.guardrail.colSizeInKB"; //10
181180

182181
static {
183182
types.put(TRANSFORM_REPLACE_MISSING_TS, PropertyType.NUMBER);
184-
// types.put(TRANSFORM_WRITETIME_CUSTOM_VALUE, PropertyType.NUMBER);
185-
// defaults.put(TRANSFORM_WRITETIME_CUSTOM_VALUE, "0");
183+
types.put(TRANSFORM_CUSTOM_WRITETIME, PropertyType.NUMBER);
184+
defaults.put(TRANSFORM_CUSTOM_WRITETIME, "0");
186185
// types.put(GUARDRAIL_FIELD_LIMIT_MB, PropertyType.NUMBER);
187186
// defaults.put(GUARDRAIL_FIELD_LIMIT_MB, "0");
188187
}

src/resources/sparkConf.properties

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,15 @@ spark.cdm.perfops.writeRateLimit 40000
228228
# MigrateData operation would fail. This parameter allows a crude
229229
# constant value to be used in its place, separate from the Constant
230230
# Values feature.
231+
# .custom.writetime Default is 0 (diabled). Timestamp value in microseconds to use as the
232+
# WRITETIME for the target record. This is useful when the WRITETIME of
233+
# the record in Origin cannot be determined (such as the only non-key
234+
# columns are collections). This parameter allows a crude constant value
235+
# to be used in its place, and overrides
236+
# .schema.origin.column.writetime.indexes.
231237
#-----------------------------------------------------------------------------------------------------------
232238
#spark.cdm.transform.missing.key.ts.replace.value
239+
#spark.cdm.transform.custom.writetime 0
233240

234241
#===========================================================================================================
235242
# Cassandra Filters are applied on the coordinator node. Note that, depending on the filter, the coordinator

0 commit comments

Comments
 (0)