Skip to content

Commit df071b8

Browse files
authored
Merge pull request #2307 from beyonnex-io/extend-mapper-to-include-delete-fields
add deleteField mapping
2 parents edf0464 + 0e68887 commit df071b8

File tree

3 files changed

+290
-10
lines changed

3 files changed

+290
-10
lines changed

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/NormalizedMessageMapper.java

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
* A message mapper implementation for normalized changes.
5151
* Create-, modify-, merged- and deleted-events are mapped to nested sparse JSON.
5252
* For complete thing deletions (ThingDeleted), a special `_deleted` field is included with the deletion timestamp.
53-
* Partial deletions (AttributeDeleted, FeatureDeleted, etc.) are not mapped and will be dropped.
53+
* Partial deletions (AttributeDeleted, FeatureDeleted, etc.) can be mapped to `_deletedFields` when enabled.
5454
* All other signals and incoming messages are dropped.
5555
*/
5656
public final class NormalizedMessageMapper extends AbstractMessageMapper {
@@ -61,6 +61,7 @@ public final class NormalizedMessageMapper extends AbstractMessageMapper {
6161
* Config property to project parts from the mapping result.
6262
*/
6363
public static final String FIELDS = "fields";
64+
static final String INCLUDE_DELETED_FIELDS = "includeDeletedFields";
6465

6566
private static final JsonFieldDefinition<String> THING_ID = Thing.JsonFields.ID;
6667
private static final JsonFieldDefinition<String> MODIFIED = Thing.JsonFields.MODIFIED;
@@ -74,9 +75,15 @@ public final class NormalizedMessageMapper extends AbstractMessageMapper {
7475
FieldType.SPECIAL,
7576
FieldType.HIDDEN,
7677
JsonSchemaVersion.V_2);
78+
private static final JsonFieldDefinition<JsonObject> DELETED_FIELDS =
79+
JsonFactory.newJsonObjectFieldDefinition("_deletedFields",
80+
FieldType.SPECIAL,
81+
FieldType.HIDDEN,
82+
JsonSchemaVersion.V_2);
7783

7884
@Nullable
7985
private JsonFieldSelector jsonFieldSelector;
86+
private boolean includeDeletedFields;
8087

8188
/**
8289
* Constructs a new instance of NormalizedMessageMapper extension.
@@ -91,6 +98,7 @@ public final class NormalizedMessageMapper extends AbstractMessageMapper {
9198
private NormalizedMessageMapper(final NormalizedMessageMapper copyFromMapper) {
9299
super(copyFromMapper);
93100
this.jsonFieldSelector = copyFromMapper.jsonFieldSelector;
101+
this.includeDeletedFields = copyFromMapper.includeDeletedFields;
94102
}
95103

96104
@Override
@@ -114,6 +122,9 @@ public void doConfigure(final Connection connection, final MappingConfig mapping
114122
fields.ifPresent(s ->
115123
jsonFieldSelector =
116124
JsonFactory.newFieldSelector(s, JsonParseOptions.newBuilder().withoutUrlDecoding().build()));
125+
includeDeletedFields = configuration.findProperty(INCLUDE_DELETED_FIELDS)
126+
.map(Boolean::parseBoolean)
127+
.orElse(false);
117128
}
118129

119130
@Override
@@ -129,7 +140,7 @@ public DittoHeaders getAdditionalInboundHeaders(final ExternalMessage message) {
129140
@Override
130141
public List<ExternalMessage> map(final Adaptable adaptable) {
131142
final TopicPath topicPath = adaptable.getTopicPath();
132-
return isThingChangeEvent(topicPath, adaptable.getPayload())
143+
return isThingChangeEvent(topicPath, adaptable.getPayload(), includeDeletedFields)
133144
? Collections.singletonList(flattenAsThingChange(adaptable))
134145
: Collections.emptyList();
135146
}
@@ -157,15 +168,21 @@ private ExternalMessage flattenAsThingChange(final Adaptable adaptable) {
157168

158169
payload.getTimestamp().ifPresent(timestamp -> builder.set(MODIFIED, timestamp.toString()));
159170
payload.getRevision().ifPresent(revision -> builder.set(REVISION, revision));
160-
171+
161172
// Add _deleted field only for complete thing deletions (ThingDeleted events)
162-
// Partial deletions (AttributeDeleted, FeatureDeleted, etc.) are not mapped
163173
if (isThingDeleted(topicPath, payload)) {
164174
payload.getTimestamp().ifPresent(timestamp ->
165175
builder.set(DELETED, timestamp.toString())
166176
);
167177
}
168-
178+
179+
if (includeDeletedFields) {
180+
final JsonObject deletedFields = extractDeletedFields(topicPath, payload, path, payloadValue.orElse(null));
181+
if (!deletedFields.isEmpty()) {
182+
builder.set(DELETED_FIELDS, deletedFields);
183+
}
184+
}
185+
169186
builder.set(ABRIDGED_ORIGINAL_MESSAGE, abridgeMessage(adaptable));
170187

171188
final var json = builder.build();
@@ -212,13 +229,16 @@ private static JsonObject dittoHeadersToJson(final DittoHeaders dittoHeaders) {
212229
.collect(JsonCollectors.fieldsToObject());
213230
}
214231

215-
private static boolean isThingChangeEvent(final TopicPath topicPath, final Payload payload) {
232+
private static boolean isThingChangeEvent(final TopicPath topicPath,
233+
final Payload payload,
234+
final boolean includeDeletedFields) {
216235
final var isThingEvent =
217236
topicPath.isGroup(TopicPath.Group.THINGS) && topicPath.isCriterion(TopicPath.Criterion.EVENTS);
218237

219238
final var isChange = topicPath.isAction(TopicPath.Action.CREATED)
220239
|| topicPath.isAction(TopicPath.Action.MODIFIED)
221240
|| topicPath.isAction(TopicPath.Action.MERGED)
241+
|| (includeDeletedFields && topicPath.isAction(TopicPath.Action.DELETED))
222242
|| isThingDeleted(topicPath, payload);
223243

224244
return isThingEvent && isChange;
@@ -229,6 +249,60 @@ private static boolean isThingDeleted(final TopicPath topicPath, final Payload p
229249
JsonPointer.of(payload.getPath()).isEmpty();
230250
}
231251

252+
private static JsonObject extractDeletedFields(final TopicPath topicPath,
253+
final Payload payload,
254+
final JsonPointer path,
255+
@Nullable final JsonValue payloadValue)
256+
{
257+
if (payload.getTimestamp().isEmpty()) {
258+
return JsonObject.empty();
259+
}
260+
final JsonObjectBuilder deletedFieldsBuilder = JsonFactory.newObjectBuilder();
261+
final String timestamp = payload.getTimestamp().orElseThrow().toString();
262+
263+
if (topicPath.isAction(TopicPath.Action.DELETED) && !path.isEmpty()) {
264+
deletedFieldsBuilder.set(path.toString(), timestamp);
265+
return deletedFieldsBuilder.build();
266+
}
267+
268+
if (topicPath.isAction(TopicPath.Action.MERGED) && payloadValue != null) {
269+
if (payloadValue.isNull() && !path.isEmpty()) {
270+
deletedFieldsBuilder.set(path.toString(), timestamp);
271+
} else if (payloadValue.isObject()) {
272+
extractNullsFromMergePatch(payloadValue.asObject(), path, deletedFieldsBuilder, timestamp);
273+
}
274+
}
275+
276+
return deletedFieldsBuilder.build();
277+
}
278+
279+
private static void extractNullsFromMergePatch(final JsonObject mergeObject,
280+
final JsonPointer basePath,
281+
final JsonObjectBuilder deletedFieldsBuilder,
282+
final String timestamp) {
283+
mergeObject.forEach(jsonField -> {
284+
final JsonKey key = jsonField.getKey();
285+
if (isRegexDeletionKey(key)) {
286+
return;
287+
}
288+
final JsonPointer currentPath = basePath.isEmpty()
289+
? JsonPointer.empty().addLeaf(key)
290+
: basePath.addLeaf(key);
291+
final JsonValue value = jsonField.getValue();
292+
if (value.isNull()) {
293+
deletedFieldsBuilder.set(currentPath.toString(), timestamp);
294+
} else if (value.isObject()) {
295+
extractNullsFromMergePatch(value.asObject(), currentPath, deletedFieldsBuilder, timestamp);
296+
}
297+
});
298+
}
299+
300+
private static boolean isRegexDeletionKey(final JsonKey key) {
301+
final String keyString = key.toString();
302+
return keyString.startsWith("{{") && keyString.endsWith("}}") &&
303+
(keyString.contains("~") || keyString.contains("/"));
304+
}
305+
232306
private static JsonObject filterNullValuesAndEmptyObjects(final JsonObject jsonObject) {
233307
final JsonObjectBuilder builder = JsonFactory.newObjectBuilder();
234308

@@ -258,6 +332,7 @@ public String toString() {
258332
return getClass().getSimpleName() + " [" +
259333
super.toString() +
260334
", jsonFieldSelector=" + jsonFieldSelector +
335+
", includeDeletedFields=" + includeDeletedFields +
261336
"]";
262337
}
263338
}

connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/NormalizedMessageMapperTest.java

Lines changed: 182 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,68 @@ public void thingMergedWithOnlyNullValues() {
265265
"}"));
266266
}
267267

268+
@Test
269+
public void thingMergedTracksDeletedFieldsFromNullValues() {
270+
enableDeletedFields();
271+
final JsonObject mergedObject = JsonObject.of("{\n" +
272+
" \"attributes\": {\n" +
273+
" \"location\": null,\n" +
274+
" \"status\": \"active\"\n" +
275+
" },\n" +
276+
" \"features\": {\n" +
277+
" \"sensor\": {\n" +
278+
" \"properties\": {\n" +
279+
" \"temp\": null,\n" +
280+
" \"humidity\": 50\n" +
281+
" }\n" +
282+
" }\n" +
283+
" }\n" +
284+
"}");
285+
final ThingMerged event = ThingMerged.of(ThingId.of("thing:merged"), JsonPointer.empty(), mergedObject,
286+
1L, Instant.ofEpochSecond(1L), DittoHeaders.empty(), null);
287+
288+
final Adaptable adaptable = ADAPTER.toAdaptable(event);
289+
290+
Assertions.assertThat(mapToJson(adaptable))
291+
.isEqualTo(JsonObject.of("{\n" +
292+
" \"thingId\": \"thing:merged\",\n" +
293+
" \"attributes\": {\n" +
294+
" \"status\": \"active\"\n" +
295+
" },\n" +
296+
" \"features\": {\n" +
297+
" \"sensor\": {\n" +
298+
" \"properties\": {\n" +
299+
" \"humidity\": 50\n" +
300+
" }\n" +
301+
" }\n" +
302+
" },\n" +
303+
" \"_modified\": \"1970-01-01T00:00:01Z\",\n" +
304+
" \"_revision\": 1,\n" +
305+
" \"_deletedFields\": {\n" +
306+
" \"attributes\": {\n" +
307+
" \"location\": \"1970-01-01T00:00:01Z\"\n" +
308+
" },\n" +
309+
" \"features\": {\n" +
310+
" \"sensor\": {\n" +
311+
" \"properties\": {\n" +
312+
" \"temp\": \"1970-01-01T00:00:01Z\"\n" +
313+
" }\n" +
314+
" }\n" +
315+
" }\n" +
316+
" },\n" +
317+
" \"_context\": {\n" +
318+
" \"topic\": \"thing/merged/things/twin/events/merged\",\n" +
319+
" \"path\": \"/\",\n" +
320+
" \"value\":{\"attributes\":{\"status\":\"active\"},\"features\":{\"sensor\":{\"properties\":{\"humidity\":50}}}},\n" +
321+
" \"headers\": {\n" +
322+
" \"entity-revision\": \"1\",\n" +
323+
" \"response-required\": \"false\",\n" +
324+
" \"content-type\": \"application/merge-patch+json\"\n" +
325+
" }\n" +
326+
" }\n" +
327+
"}"));
328+
}
329+
268330
@Test
269331
public void thingMergedWithExtraFields() {
270332
final var thingId = ThingId.of("the.namespace:the-thing-id");
@@ -523,15 +585,125 @@ public void thingDeletedIsMappedWithDeletedTimestamp() {
523585
}
524586

525587
@Test
526-
public void deletedEventsAreNotMapped() {
527-
assertNotMapped(AttributeDeleted.of(ThingId.of("thing:id"), JsonPointer.of("/the/quick/brown/fox/"), 3L,
588+
public void deletedEventsAreNotMappedByDefault() {
589+
assertNotMapped(AttributeDeleted.of(ThingId.of("thing:id"), JsonPointer.of("/the/quick/brown/fox"), 3L,
528590
Instant.ofEpochSecond(3L), DittoHeaders.empty(), null));
529591
assertNotMapped(FeaturePropertyDeleted.of(ThingId.of("thing:id"), "featureId",
530592
JsonPointer.of("jumps/over/the/lazy/dog"), 4L, Instant.ofEpochSecond(4L), DittoHeaders.empty(), null));
531593
assertNotMapped(FeatureDeleted.of(ThingId.of("thing:id"), "featureId", 5L, Instant.EPOCH,
532594
DittoHeaders.empty(), null));
533595
}
534596

597+
@Test
598+
public void deletedEventsAreMappedWithDeletedFieldsWhenEnabled() {
599+
enableDeletedFields();
600+
final AttributeDeleted attributeDeleted = AttributeDeleted.of(
601+
ThingId.of("thing:id"),
602+
JsonPointer.of("/the/quick/brown/fox"),
603+
3L,
604+
Instant.ofEpochSecond(3L),
605+
DittoHeaders.empty(),
606+
null);
607+
608+
Assertions.assertThat(mapToJson(ADAPTER.toAdaptable(attributeDeleted)))
609+
.isEqualTo(JsonObject.of("{\n" +
610+
" \"thingId\": \"thing:id\",\n" +
611+
" \"_modified\": \"1970-01-01T00:00:03Z\",\n" +
612+
" \"_revision\": 3,\n" +
613+
" \"_deletedFields\": {\n" +
614+
" \"attributes\": {\n" +
615+
" \"the\": {\n" +
616+
" \"quick\": {\n" +
617+
" \"brown\": {\n" +
618+
" \"fox\": \"1970-01-01T00:00:03Z\"\n" +
619+
" }\n" +
620+
" }\n" +
621+
" }\n" +
622+
" }\n" +
623+
" },\n" +
624+
" \"_context\": {\n" +
625+
" \"topic\": \"thing/id/things/twin/events/deleted\",\n" +
626+
" \"path\": \"/attributes/the/quick/brown/fox\",\n" +
627+
" \"value\": null,\n" +
628+
" \"headers\": {\n" +
629+
" \"entity-revision\": \"3\",\n" +
630+
" \"response-required\": \"false\"\n" +
631+
" }\n" +
632+
" }\n" +
633+
"}"));
634+
635+
final FeaturePropertyDeleted propertyDeleted = FeaturePropertyDeleted.of(
636+
ThingId.of("thing:id"),
637+
"featureId",
638+
JsonPointer.of("jumps/over/the/lazy/dog"),
639+
4L,
640+
Instant.ofEpochSecond(4L),
641+
DittoHeaders.empty(),
642+
null);
643+
644+
Assertions.assertThat(mapToJson(ADAPTER.toAdaptable(propertyDeleted)))
645+
.isEqualTo(JsonObject.of("{\n" +
646+
" \"thingId\": \"thing:id\",\n" +
647+
" \"_modified\": \"1970-01-01T00:00:04Z\",\n" +
648+
" \"_revision\": 4,\n" +
649+
" \"_deletedFields\": {\n" +
650+
" \"features\": {\n" +
651+
" \"featureId\": {\n" +
652+
" \"properties\": {\n" +
653+
" \"jumps\": {\n" +
654+
" \"over\": {\n" +
655+
" \"the\": {\n" +
656+
" \"lazy\": {\n" +
657+
" \"dog\": \"1970-01-01T00:00:04Z\"\n" +
658+
" }\n" +
659+
" }\n" +
660+
" }\n" +
661+
" }\n" +
662+
" }\n" +
663+
" }\n" +
664+
" }\n" +
665+
" },\n" +
666+
" \"_context\": {\n" +
667+
" \"topic\": \"thing/id/things/twin/events/deleted\",\n" +
668+
" \"path\": \"/features/featureId/properties/jumps/over/the/lazy/dog\",\n" +
669+
" \"value\": null,\n" +
670+
" \"headers\": {\n" +
671+
" \"entity-revision\": \"4\",\n" +
672+
" \"response-required\": \"false\"\n" +
673+
" }\n" +
674+
" }\n" +
675+
"}"));
676+
677+
final FeatureDeleted featureDeleted = FeatureDeleted.of(
678+
ThingId.of("thing:id"),
679+
"featureId",
680+
5L,
681+
Instant.EPOCH,
682+
DittoHeaders.empty(),
683+
null);
684+
685+
Assertions.assertThat(mapToJson(ADAPTER.toAdaptable(featureDeleted)))
686+
.isEqualTo(JsonObject.of("{\n" +
687+
" \"thingId\": \"thing:id\",\n" +
688+
" \"_modified\": \"1970-01-01T00:00:00Z\",\n" +
689+
" \"_revision\": 5,\n" +
690+
" \"_deletedFields\": {\n" +
691+
" \"features\": {\n" +
692+
" \"featureId\": \"1970-01-01T00:00:00Z\"\n" +
693+
" }\n" +
694+
" },\n" +
695+
" \"_context\": {\n" +
696+
" \"topic\": \"thing/id/things/twin/events/deleted\",\n" +
697+
" \"path\": \"/features/featureId\",\n" +
698+
" \"value\": null,\n" +
699+
" \"headers\": {\n" +
700+
" \"entity-revision\": \"5\",\n" +
701+
" \"response-required\": \"false\"\n" +
702+
" }\n" +
703+
" }\n" +
704+
"}"));
705+
}
706+
535707
@Test
536708
public void nonThingEventsAreNotMapped() {
537709
// command
@@ -553,6 +725,14 @@ private void assertNotMapped(final Signal signal) {
553725
assertThat(underTest.map(ADAPTER.toAdaptable(signal))).isEmpty();
554726
}
555727

728+
private void enableDeletedFields() {
729+
final Map<String, JsonValue> options = Map.of(
730+
NormalizedMessageMapper.INCLUDE_DELETED_FIELDS, JsonValue.of("true"));
731+
underTest.configure(connection, connectivityConfig,
732+
DefaultMessageMapperConfiguration.of("normalizer", options, Map.of(), Map.of()),
733+
actorSystem);
734+
}
735+
556736
private JsonObject mapToJson(final Adaptable message) {
557737
return underTest.map(message)
558738
.stream()

0 commit comments

Comments
 (0)