Skip to content

Commit 7d69c6b

Browse files
authored
Feature/transform fields (#294)
* Implement overwrite function in extract-json feature. * Added test to verify mapped properties * Added copyright
1 parent 0b1f50e commit 7d69c6b

File tree

6 files changed

+72
-3
lines changed

6 files changed

+72
-3
lines changed

src/main/java/com/datastax/cdm/feature/ExtractJson.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class ExtractJson extends AbstractFeature {
4040

4141
private String targetColumnName = "";
4242
private Integer targetColumnIndex = -1;
43+
private boolean overwriteTarget = false;
4344

4445
@Override
4546
public boolean loadProperties(IPropertyHelper helper) {
@@ -49,6 +50,8 @@ public boolean loadProperties(IPropertyHelper helper) {
4950

5051
originColumnName = getColumnName(helper, KnownProperties.EXTRACT_JSON_ORIGIN_COLUMN_NAME);
5152
targetColumnName = getColumnName(helper, KnownProperties.EXTRACT_JSON_TARGET_COLUMN_MAPPING);
53+
overwriteTarget = helper.getBoolean(KnownProperties.EXTRACT_JSON_TARGET_OVERWRITE);
54+
5255
// Convert columnToFieldMapping to targetColumnName and originJsonFieldName
5356
if (!targetColumnName.isBlank()) {
5457
String[] parts = targetColumnName.split("\\:");
@@ -146,6 +149,10 @@ public String getTargetColumnName() {
146149
return isEnabled ? targetColumnName : "";
147150
}
148151

152+
public boolean overwriteTarget() {
153+
return overwriteTarget;
154+
}
155+
149156
private String getColumnName(IPropertyHelper helper, String colName) {
150157
String columnName = CqlTable.unFormatName(helper.getString(colName));
151158
return (null == columnName) ? "" : columnName;

src/main/java/com/datastax/cdm/job/DiffJobSession.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public class DiffJobSession extends CopyJobSession {
6666
boolean logDebug = logger.isDebugEnabled();
6767
boolean logTrace = logger.isTraceEnabled();
6868
private ExtractJson extractJsonFeature;
69+
private boolean overwriteTarget;
6970

7071
public DiffJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
7172
super(originSession, targetSession, sc);
@@ -111,6 +112,7 @@ public DiffJobSession(CqlSession originSession, CqlSession targetSession, SparkC
111112
}
112113

113114
extractJsonFeature = (ExtractJson) this.targetSession.getCqlTable().getFeature(Featureset.EXTRACT_JSON);
115+
overwriteTarget = extractJsonFeature.isEnabled() && extractJsonFeature.overwriteTarget();
114116

115117
logger.info("CQL -- origin select: {}", this.originSession.getOriginSelectByPartitionRangeStatement().getCQL());
116118
logger.info("CQL -- target select: {}", this.targetSession.getTargetSelectByPKStatement().getCQL());
@@ -270,7 +272,13 @@ private String isDifferent(Record record) {
270272
logger.trace("PK {}, targetIndex {} skipping constant column {}", pk, targetIndex,
271273
targetColumnNames.get(targetIndex));
272274
return; // nothing to compare in origin
273-
} else if (targetIndex == extractJsonFeature.getTargetColumnIndex()) {
275+
}
276+
277+
targetAsOriginType = targetSession.getCqlTable().getAndConvertData(targetIndex, targetRow);
278+
if (targetIndex == extractJsonFeature.getTargetColumnIndex()) {
279+
if (!overwriteTarget && null != targetAsOriginType) {
280+
return; // skip validation when target has data
281+
}
274282
originIndex = extractJsonFeature.getOriginColumnIndex();
275283
origin = extractJsonFeature.extract(originRow.getString(originIndex));
276284
} else {
@@ -301,7 +309,6 @@ private String isDifferent(Record record) {
301309
+ explodeMapKeyIndex + ", valueIndex:" + explodeMapValueIndex + ")");
302310
}
303311
}
304-
targetAsOriginType = targetSession.getCqlTable().getAndConvertData(targetIndex, targetRow);
305312

306313
if (logDebug)
307314
logger.debug(

src/main/java/com/datastax/cdm/properties/KnownProperties.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,15 @@ public enum PropertyType {
243243
public static final String EXTRACT_JSON_EXCLUSIVE = "spark.cdm.feature.extractJson.exclusive";
244244
public static final String EXTRACT_JSON_ORIGIN_COLUMN_NAME = "spark.cdm.feature.extractJson.originColumn";
245245
public static final String EXTRACT_JSON_TARGET_COLUMN_MAPPING = "spark.cdm.feature.extractJson.propertyMapping";
246+
public static final String EXTRACT_JSON_TARGET_OVERWRITE = "spark.cdm.feature.extractJson.overwrite";
246247

247248
static {
248249
types.put(EXTRACT_JSON_EXCLUSIVE, PropertyType.BOOLEAN);
249250
defaults.put(EXTRACT_JSON_EXCLUSIVE, "false");
250251
types.put(EXTRACT_JSON_ORIGIN_COLUMN_NAME, PropertyType.STRING);
251252
types.put(EXTRACT_JSON_TARGET_COLUMN_MAPPING, PropertyType.STRING);
252-
}
253+
types.put(EXTRACT_JSON_TARGET_OVERWRITE, PropertyType.BOOLEAN);
254+
defaults.put(EXTRACT_JSON_TARGET_OVERWRITE, "false"); }
253255

254256
// ==========================================================================
255257
// Guardrail Feature

src/resources/cdm-detailed.properties

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,10 +388,16 @@ spark.cdm.perfops.ratelimit.target 20000
388388
# - If the specified JSON property does not exist in the JSON content, the Target column
389389
# will be set to null.
390390
# Note: This feature currently supports extraction of only one JSON property.
391+
#
392+
# .overwrite Default is false. This property only applies to Validation run (NA for Migration)
393+
# When set to true, the extracted JSON value will overwrite any existing value in the
394+
# Target column during Validation. False will skip validation if the Target column has
395+
# any non-null value.
391396
#-----------------------------------------------------------------------------------------------------------
392397
#spark.cdm.feature.extractJson.exclusive false
393398
#spark.cdm.feature.extractJson.originColumn origin_columnname_with_json_content
394399
#spark.cdm.feature.extractJson.propertyMapping origin_json_propertyname:target_columnname
400+
#spark.cdm.feature.extractJson.overwrite false
395401

396402
#===========================================================================================================
397403
# Guardrail feature manages records that exceed guardrail checks. The Guardrail job will generate a

src/test/java/com/datastax/cdm/feature/ExtractJsonTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public class ExtractJsonTest {
5959

6060
String standardOriginName = "content";
6161
String standardTargetName = "age";
62+
String mappedTargetName = "personAge:person_age";
6263

6364
@BeforeEach
6465
public void setup() {
@@ -98,9 +99,22 @@ public void loadProperties() {
9899
assertAll(
99100
() -> assertTrue(loaded, "properties are loaded and valid"),
100101
() -> assertTrue(feature.isEnabled()),
102+
() -> assertFalse(feature.overwriteTarget()),
101103
() -> assertEquals(standardTargetName, feature.getTargetColumnName())
102104
);
103105
}
106+
107+
@Test
108+
public void loadPropertiesWithMapping() {
109+
when(propertyHelper.getString(KnownProperties.EXTRACT_JSON_TARGET_COLUMN_MAPPING)).thenReturn(mappedTargetName);
110+
boolean loaded = feature.loadProperties(propertyHelper);
111+
112+
assertAll(
113+
() -> assertTrue(loaded, "properties are loaded and valid"),
114+
() -> assertTrue(feature.isEnabled()),
115+
() -> assertEquals("person_age", feature.getTargetColumnName())
116+
);
117+
}
104118

105119
@Test
106120
public void loadPropertiesException() {
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.cdm.feature;
17+
18+
import static org.junit.jupiter.api.Assertions.assertEquals;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
class TrackRunTest {
23+
24+
@Test
25+
void test() {
26+
assertEquals("MIGRATE", TrackRun.RUN_TYPE.MIGRATE.name());
27+
assertEquals("DIFF_DATA", TrackRun.RUN_TYPE.DIFF_DATA.name());
28+
29+
assertEquals(2, TrackRun.RUN_TYPE.values().length);
30+
assertEquals(5, TrackRun.RUN_STATUS.values().length);
31+
}
32+
33+
}

0 commit comments

Comments
 (0)