Skip to content

Commit 82e8563

Browse files
committed
Fixed empty value in map issue
1 parent ed0d750 commit 82e8563

File tree

6 files changed

+102
-36
lines changed

6 files changed

+102
-36
lines changed

.classpath

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,17 @@
99
<classpathentry excluding="**" kind="src" output="target/classes" path="src/resources">
1010
<attributes>
1111
<attribute name="maven.pomderived" value="true"/>
12+
<attribute name="optional" value="true"/>
1213
</attributes>
1314
</classpathentry>
1415
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
1516
<attributes>
17+
<attribute name="test" value="true"/>
1618
<attribute name="optional" value="true"/>
1719
<attribute name="maven.pomderived" value="true"/>
18-
<attribute name="test" value="true"/>
1920
</attributes>
2021
</classpathentry>
21-
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
22+
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11">
2223
<attributes>
2324
<attribute name="maven.pomderived" value="true"/>
2425
</attributes>
@@ -28,5 +29,23 @@
2829
<attribute name="maven.pomderived" value="true"/>
2930
</attributes>
3031
</classpathentry>
32+
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
33+
<attributes>
34+
<attribute name="test" value="true"/>
35+
<attribute name="maven.pomderived" value="true"/>
36+
<attribute name="optional" value="true"/>
37+
</attributes>
38+
</classpathentry>
39+
<classpathentry kind="src" path="target/generated-sources/annotations">
40+
<attributes>
41+
<attribute name="optional" value="true"/>
42+
</attributes>
43+
</classpathentry>
44+
<classpathentry kind="src" output="target/test-classes" path="target/generated-test-sources/test-annotations">
45+
<attributes>
46+
<attribute name="optional" value="true"/>
47+
<attribute name="test" value="true"/>
48+
</attributes>
49+
</classpathentry>
3150
<classpathentry kind="output" path="target/classes"/>
3251
</classpath>

RELEASE.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
# Release Notes
2+
## [4.1.16] - 2024-05-31
3+
- Added property to manage null values in Map fields
4+
- Updated README
5+
26
## [4.1.15] - 2024-03-05
37
- Internal CI/CD release fix
48

src/main/java/com/datastax/cdm/feature/WritetimeTTL.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,12 @@ public boolean initializeAndValidate(CqlTable originTable, CqlTable targetTable)
147147
isValid = false;
148148
}
149149

150-
if (this.writetimeIncrement == 0L && (null!=writetimeNames && !writetimeNames.isEmpty()) && originTable.hasUnfrozenList()) {
151-
logger.warn("Writetime is configured, but the origin table at least one unfrozen List, and there is a zero-value increment configured at "+
152-
KnownProperties.TRANSFORM_CUSTOM_WRITETIME_INCREMENT+"; this may result in duplicate list entries when "+
153-
KnownProperties.AUTOCORRECT_MISMATCH+" is enabled.");
154-
}
150+
if (this.writetimeIncrement == 0L && (null != writetimeNames && !writetimeNames.isEmpty())
151+
&& originTable.hasUnfrozenList()) {
152+
logger.warn("Origin table has at least one unfrozen List, and "
153+
+ KnownProperties.TRANSFORM_CUSTOM_WRITETIME_INCREMENT
154+
+ " is set to zero; this may result in duplicate list entries on reruns or validation with autocorrect.");
155+
}
155156

156157
if (!isValid) isEnabled = false;
157158
logger.info("Feature {} is {}", this.getClass().getSimpleName(), isEnabled?"enabled":"disabled");

src/main/java/com/datastax/cdm/properties/KnownProperties.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,13 @@
1515
*/
1616
package com.datastax.cdm.properties;
1717

18-
import java.util.*;
18+
import java.util.ArrayList;
19+
import java.util.Arrays;
20+
import java.util.HashMap;
21+
import java.util.HashSet;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Set;
1925

2026
public class KnownProperties {
2127

@@ -167,24 +173,27 @@ public enum PropertyType {
167173
public static final String TRANSFORM_CODECS = "spark.cdm.transform.codecs";
168174
public static final String TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT = "spark.cdm.transform.codecs.timestamp.string.format";
169175
public static final String TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE = "spark.cdm.transform.codecs.timestamp.string.zone";
176+
public static final String TRANSFORM_MAP_REMOVE_KEY_WITH_NO_VALUE = "spark.cdm.transform.map.remove.null.vaue";
170177

171178

172179

173180
// TODO: 3.3.0 refactored how guardrails are handled, this needs to be merged forward
174181
// public static final String GUARDRAIL_FIELD_LIMIT_MB = "spark.guardrail.colSizeInKB"; //10
175182

176-
static {
177-
types.put(TRANSFORM_REPLACE_MISSING_TS, PropertyType.NUMBER);
178-
types.put(TRANSFORM_CUSTOM_WRITETIME, PropertyType.NUMBER);
179-
defaults.put(TRANSFORM_CUSTOM_WRITETIME, "0");
180-
types.put(TRANSFORM_CUSTOM_WRITETIME_INCREMENT, PropertyType.NUMBER);
181-
defaults.put(TRANSFORM_CUSTOM_WRITETIME_INCREMENT, "0");
182-
types.put(TRANSFORM_CODECS, PropertyType.STRING_LIST);
183-
types.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT, PropertyType.STRING);
184-
defaults.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT, "yyyyMMddHHmmss");
185-
types.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE, PropertyType.STRING);
186-
defaults.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE, "UTC");
187-
}
183+
static {
184+
types.put(TRANSFORM_REPLACE_MISSING_TS, PropertyType.NUMBER);
185+
types.put(TRANSFORM_CUSTOM_WRITETIME, PropertyType.NUMBER);
186+
defaults.put(TRANSFORM_CUSTOM_WRITETIME, "0");
187+
types.put(TRANSFORM_CUSTOM_WRITETIME_INCREMENT, PropertyType.NUMBER);
188+
defaults.put(TRANSFORM_CUSTOM_WRITETIME_INCREMENT, "0");
189+
types.put(TRANSFORM_CODECS, PropertyType.STRING_LIST);
190+
types.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT, PropertyType.STRING);
191+
defaults.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT, "yyyyMMddHHmmss");
192+
types.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE, PropertyType.STRING);
193+
defaults.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE, "UTC");
194+
types.put(TRANSFORM_MAP_REMOVE_KEY_WITH_NO_VALUE, PropertyType.BOOLEAN);
195+
defaults.put(TRANSFORM_MAP_REMOVE_KEY_WITH_NO_VALUE, "false");
196+
}
188197

189198
//==========================================================================
190199
// Cassandra-side Filters

src/main/java/com/datastax/cdm/schema/CqlTable.java

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,29 @@
1515
*/
1616
package com.datastax.cdm.schema;
1717

18+
import java.time.Instant;
19+
import java.util.ArrayList;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.Objects;
24+
import java.util.Optional;
25+
import java.util.Set;
26+
import java.util.stream.Collectors;
27+
import java.util.stream.IntStream;
28+
29+
import org.apache.commons.lang.StringUtils;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import com.datastax.cdm.data.CqlConversion;
34+
import com.datastax.cdm.data.CqlData;
35+
import com.datastax.cdm.data.DataUtility;
1836
import com.datastax.cdm.feature.Feature;
1937
import com.datastax.cdm.feature.Featureset;
2038
import com.datastax.cdm.feature.WritetimeTTL;
39+
import com.datastax.cdm.properties.KnownProperties;
40+
import com.datastax.cdm.properties.PropertyHelper;
2141
import com.datastax.oss.driver.api.core.ConsistencyLevel;
2242
import com.datastax.oss.driver.api.core.CqlIdentifier;
2343
import com.datastax.oss.driver.api.core.CqlSession;
@@ -26,22 +46,12 @@
2646
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
2747
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
2848
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
29-
import com.datastax.oss.driver.api.core.type.*;
49+
import com.datastax.oss.driver.api.core.type.DataType;
50+
import com.datastax.oss.driver.api.core.type.DataTypes;
51+
import com.datastax.oss.driver.api.core.type.ListType;
52+
import com.datastax.oss.driver.api.core.type.TupleType;
3053
import com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException;
3154
import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
32-
import com.datastax.cdm.data.CqlData;
33-
import com.datastax.cdm.data.CqlConversion;
34-
import com.datastax.cdm.data.DataUtility;
35-
import com.datastax.cdm.properties.KnownProperties;
36-
import com.datastax.cdm.properties.PropertyHelper;
37-
import org.apache.commons.lang.StringUtils;
38-
import org.slf4j.Logger;
39-
import org.slf4j.LoggerFactory;
40-
41-
import java.time.Instant;
42-
import java.util.*;
43-
import java.util.stream.Collectors;
44-
import java.util.stream.IntStream;
4555

4656
public class CqlTable extends BaseTable {
4757
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
@@ -163,6 +173,9 @@ public Integer getBatchSize() {
163173
return prop;
164174
}
165175

176+
private boolean removeMapWithNoValues = propertyHelper
177+
.getBoolean(KnownProperties.TRANSFORM_MAP_REMOVE_KEY_WITH_NO_VALUE);
178+
166179
// Adds to the current column list based on the name and type of columns already existing in the table
167180
// This is useful where a feature is adding a column by name of an existing column.
168181
// If the column is already present, the bind class is added to the return list.
@@ -277,6 +290,11 @@ public Object getAndConvertData(int index, Row row) {
277290
if (null==thisObject) {
278291
return convertNull(index);
279292
}
293+
294+
if (removeMapWithNoValues && thisObject instanceof Map) {
295+
removeNullValuesFrmMap(thisObject);
296+
}
297+
280298
CqlConversion cqlConversion = this.cqlConversions.get(index);
281299
if (null==cqlConversion) {
282300
if (logTrace) logger.trace("{} Index:{} not converting:{}",isOrigin?"origin":"target",index,thisObject);
@@ -288,6 +306,12 @@ public Object getAndConvertData(int index, Row row) {
288306
}
289307
}
290308

309+
private Object removeNullValuesFrmMap(Object thisObject) {
310+
Set<Map.Entry> ms = (((Map) thisObject).entrySet());
311+
return ms.stream().filter(e -> (e.getValue() != null))
312+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
313+
}
314+
291315
public Object convertNull(int thisIndex) {
292316
// We do not need to convert nulls for non-PK columns
293317
int otherIndex = this.getCorrespondingIndex(thisIndex);

src/resources/cdm-detailed.properties

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,16 @@ spark.cdm.autocorrect.mismatch false
170170
# spark.cdm.perfops
171171
# .numParts : Defaults is 10000. In standard operation, the full token range (-2^63..2^63-1)
172172
# is divided into a number of parts which will be parallel-processed. You should
173-
# aim for each part to comprise a total of ≈1-10GB of data to migrate. During
174-
# initial testing, you may want this to be a small number (even 1).
173+
# aim for each part to be ~10MB of data to migrate. The default is assuming the
174+
# table size is close to 100GB (10MB * 10K), if the table size is significantly
175+
# less or more than that, adjust this value accordingly.
175176
# .batchSize : Defaults is 5. When writing to Target, this comprises the number of records that
176177
# will be put into an UNLOGGED batch. CDM will tend to work on the same partition
177178
# at a time so if your partition sizes are larger, this number may be increased.
178179
# If .batchSize would mean that more than 1 partition is often contained in a batch,
179180
# the figure should be reduced. Ideally < 1% of batches have more than 1 partition.
181+
# For tables where primary-key=partition-key OR average row-size larger than 20 KB,
182+
# always set this value to 1.
180183
# .ratelimit
181184
# .origin : Defaults to 20000. Concurrent number of operations across all parallel threads
182185
# from Origin. This may be adjusted up (or down), depending on the amount of data
@@ -253,13 +256,19 @@ spark.cdm.perfops.ratelimit.target 40000
253256
# .codecs.timestamp Configuration for CQL_TIMESTAMP_TO_STRING_FORMAT codec.
254257
# .string.format Default is yyyyMMddHHmmss ; DateTimeFormatter.ofPattern(formatString)
255258
# .string.zone Default is UTC ; Must be in ZoneRulesProvider.getAvailableZoneIds()
259+
#
260+
# .map.remove.null.value Default is false. Setting this to true will remove any entries in Map
261+
# field with empty or null values. Such values can create NPE exception
262+
# if the value type does not support empty or null values (like Timestamp)
263+
# and this property is false. Set it to true to handle such exceptions.
256264
#-----------------------------------------------------------------------------------------------------------
257265
#spark.cdm.transform.missing.key.ts.replace.value 1685577600000
258266
#spark.cdm.transform.custom.writetime 0
259267
#spark.cdm.transform.custom.writetime.incrementBy 0
260268
#spark.cdm.transform.codecs
261269
#spark.cdm.transform.codecs.timestamp.string.format yyyyMMddHHmmss
262270
#spark.cdm.transform.codecs.timestamp.string.zone UTC
271+
#spark.cdm.transform.map.remove.null.vaue false
263272

264273
#===========================================================================================================
265274
# Cassandra Filters are applied on the coordinator node. Note that, depending on the filter, the coordinator

0 commit comments

Comments
 (0)