Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
* A message mapper implementation for normalized changes.
* Create-, modify-, merged- and deleted-events are mapped to nested sparse JSON.
* For complete thing deletions (ThingDeleted), a special `_deleted` field is included with the deletion timestamp.
* Partial deletions (AttributeDeleted, FeatureDeleted, etc.) are not mapped and will be dropped.
* Partial deletions (AttributeDeleted, FeatureDeleted, etc.) can be mapped to `_deletedFields` when enabled.
* All other signals and incoming messages are dropped.
*/
public final class NormalizedMessageMapper extends AbstractMessageMapper {
Expand All @@ -61,6 +61,7 @@ public final class NormalizedMessageMapper extends AbstractMessageMapper {
* Config property to project parts from the mapping result.
*/
public static final String FIELDS = "fields";
static final String INCLUDE_DELETED_FIELDS = "includeDeletedFields";

private static final JsonFieldDefinition<String> THING_ID = Thing.JsonFields.ID;
private static final JsonFieldDefinition<String> MODIFIED = Thing.JsonFields.MODIFIED;
Expand All @@ -74,9 +75,15 @@ public final class NormalizedMessageMapper extends AbstractMessageMapper {
FieldType.SPECIAL,
FieldType.HIDDEN,
JsonSchemaVersion.V_2);
private static final JsonFieldDefinition<JsonObject> DELETED_FIELDS =
JsonFactory.newJsonObjectFieldDefinition("_deletedFields",
FieldType.SPECIAL,
FieldType.HIDDEN,
JsonSchemaVersion.V_2);

@Nullable
private JsonFieldSelector jsonFieldSelector;
private boolean includeDeletedFields;

/**
* Constructs a new instance of NormalizedMessageMapper extension.
Expand All @@ -91,6 +98,7 @@ public final class NormalizedMessageMapper extends AbstractMessageMapper {
private NormalizedMessageMapper(final NormalizedMessageMapper copyFromMapper) {
super(copyFromMapper);
this.jsonFieldSelector = copyFromMapper.jsonFieldSelector;
this.includeDeletedFields = copyFromMapper.includeDeletedFields;
}

@Override
Expand All @@ -114,6 +122,9 @@ public void doConfigure(final Connection connection, final MappingConfig mapping
fields.ifPresent(s ->
jsonFieldSelector =
JsonFactory.newFieldSelector(s, JsonParseOptions.newBuilder().withoutUrlDecoding().build()));
includeDeletedFields = configuration.findProperty(INCLUDE_DELETED_FIELDS)
.map(Boolean::parseBoolean)
.orElse(false);
}

@Override
Expand All @@ -129,7 +140,7 @@ public DittoHeaders getAdditionalInboundHeaders(final ExternalMessage message) {
@Override
public List<ExternalMessage> map(final Adaptable adaptable) {
final TopicPath topicPath = adaptable.getTopicPath();
return isThingChangeEvent(topicPath, adaptable.getPayload())
return isThingChangeEvent(topicPath, adaptable.getPayload(), includeDeletedFields)
? Collections.singletonList(flattenAsThingChange(adaptable))
: Collections.emptyList();
}
Expand Down Expand Up @@ -157,15 +168,21 @@ private ExternalMessage flattenAsThingChange(final Adaptable adaptable) {

payload.getTimestamp().ifPresent(timestamp -> builder.set(MODIFIED, timestamp.toString()));
payload.getRevision().ifPresent(revision -> builder.set(REVISION, revision));

// Add _deleted field only for complete thing deletions (ThingDeleted events)
// Partial deletions (AttributeDeleted, FeatureDeleted, etc.) are not mapped
if (isThingDeleted(topicPath, payload)) {
payload.getTimestamp().ifPresent(timestamp ->
builder.set(DELETED, timestamp.toString())
);
}


if (includeDeletedFields) {
final JsonObject deletedFields = extractDeletedFields(topicPath, payload, path, payloadValue.orElse(null));
if (!deletedFields.isEmpty()) {
builder.set(DELETED_FIELDS, deletedFields);
}
}

builder.set(ABRIDGED_ORIGINAL_MESSAGE, abridgeMessage(adaptable));

final var json = builder.build();
Expand Down Expand Up @@ -212,13 +229,16 @@ private static JsonObject dittoHeadersToJson(final DittoHeaders dittoHeaders) {
.collect(JsonCollectors.fieldsToObject());
}

private static boolean isThingChangeEvent(final TopicPath topicPath, final Payload payload) {
private static boolean isThingChangeEvent(final TopicPath topicPath,
final Payload payload,
final boolean includeDeletedFields) {
final var isThingEvent =
topicPath.isGroup(TopicPath.Group.THINGS) && topicPath.isCriterion(TopicPath.Criterion.EVENTS);

final var isChange = topicPath.isAction(TopicPath.Action.CREATED)
|| topicPath.isAction(TopicPath.Action.MODIFIED)
|| topicPath.isAction(TopicPath.Action.MERGED)
|| (includeDeletedFields && topicPath.isAction(TopicPath.Action.DELETED))
|| isThingDeleted(topicPath, payload);

return isThingEvent && isChange;
Expand All @@ -229,6 +249,60 @@ private static boolean isThingDeleted(final TopicPath topicPath, final Payload p
JsonPointer.of(payload.getPath()).isEmpty();
}

private static JsonObject extractDeletedFields(final TopicPath topicPath,
final Payload payload,
final JsonPointer path,
@Nullable final JsonValue payloadValue)
{
if (payload.getTimestamp().isEmpty()) {
return JsonObject.empty();
}
final JsonObjectBuilder deletedFieldsBuilder = JsonFactory.newObjectBuilder();
final String timestamp = payload.getTimestamp().orElseThrow().toString();

if (topicPath.isAction(TopicPath.Action.DELETED) && !path.isEmpty()) {
deletedFieldsBuilder.set(path.toString(), timestamp);
return deletedFieldsBuilder.build();
}

if (topicPath.isAction(TopicPath.Action.MERGED) && payloadValue != null) {
if (payloadValue.isNull() && !path.isEmpty()) {
deletedFieldsBuilder.set(path.toString(), timestamp);
} else if (payloadValue.isObject()) {
extractNullsFromMergePatch(payloadValue.asObject(), path, deletedFieldsBuilder, timestamp);
}
}

return deletedFieldsBuilder.build();
}

private static void extractNullsFromMergePatch(final JsonObject mergeObject,
final JsonPointer basePath,
final JsonObjectBuilder deletedFieldsBuilder,
final String timestamp) {
mergeObject.forEach(jsonField -> {
final JsonKey key = jsonField.getKey();
if (isRegexDeletionKey(key)) {
return;
}
final JsonPointer currentPath = basePath.isEmpty()
? JsonPointer.empty().addLeaf(key)
: basePath.addLeaf(key);
final JsonValue value = jsonField.getValue();
if (value.isNull()) {
deletedFieldsBuilder.set(currentPath.toString(), timestamp);
} else if (value.isObject()) {
extractNullsFromMergePatch(value.asObject(), currentPath, deletedFieldsBuilder, timestamp);
}
});
}

private static boolean isRegexDeletionKey(final JsonKey key) {
final String keyString = key.toString();
return keyString.startsWith("{{") && keyString.endsWith("}}") &&
(keyString.contains("~") || keyString.contains("/"));
}

private static JsonObject filterNullValuesAndEmptyObjects(final JsonObject jsonObject) {
final JsonObjectBuilder builder = JsonFactory.newObjectBuilder();

Expand Down Expand Up @@ -258,6 +332,7 @@ public String toString() {
return getClass().getSimpleName() + " [" +
super.toString() +
", jsonFieldSelector=" + jsonFieldSelector +
", includeDeletedFields=" + includeDeletedFields +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,68 @@ public void thingMergedWithOnlyNullValues() {
"}"));
}

@Test
public void thingMergedTracksDeletedFieldsFromNullValues() {
enableDeletedFields();
final JsonObject mergedObject = JsonObject.of("{\n" +
" \"attributes\": {\n" +
" \"location\": null,\n" +
" \"status\": \"active\"\n" +
" },\n" +
" \"features\": {\n" +
" \"sensor\": {\n" +
" \"properties\": {\n" +
" \"temp\": null,\n" +
" \"humidity\": 50\n" +
" }\n" +
" }\n" +
" }\n" +
"}");
final ThingMerged event = ThingMerged.of(ThingId.of("thing:merged"), JsonPointer.empty(), mergedObject,
1L, Instant.ofEpochSecond(1L), DittoHeaders.empty(), null);

final Adaptable adaptable = ADAPTER.toAdaptable(event);

Assertions.assertThat(mapToJson(adaptable))
.isEqualTo(JsonObject.of("{\n" +
" \"thingId\": \"thing:merged\",\n" +
" \"attributes\": {\n" +
" \"status\": \"active\"\n" +
" },\n" +
" \"features\": {\n" +
" \"sensor\": {\n" +
" \"properties\": {\n" +
" \"humidity\": 50\n" +
" }\n" +
" }\n" +
" },\n" +
" \"_modified\": \"1970-01-01T00:00:01Z\",\n" +
" \"_revision\": 1,\n" +
" \"_deletedFields\": {\n" +
" \"attributes\": {\n" +
" \"location\": \"1970-01-01T00:00:01Z\"\n" +
" },\n" +
" \"features\": {\n" +
" \"sensor\": {\n" +
" \"properties\": {\n" +
" \"temp\": \"1970-01-01T00:00:01Z\"\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
" \"_context\": {\n" +
" \"topic\": \"thing/merged/things/twin/events/merged\",\n" +
" \"path\": \"/\",\n" +
" \"value\":{\"attributes\":{\"status\":\"active\"},\"features\":{\"sensor\":{\"properties\":{\"humidity\":50}}}},\n" +
" \"headers\": {\n" +
" \"entity-revision\": \"1\",\n" +
" \"response-required\": \"false\",\n" +
" \"content-type\": \"application/merge-patch+json\"\n" +
" }\n" +
" }\n" +
"}"));
}

@Test
public void thingMergedWithExtraFields() {
final var thingId = ThingId.of("the.namespace:the-thing-id");
Expand Down Expand Up @@ -523,15 +585,125 @@ public void thingDeletedIsMappedWithDeletedTimestamp() {
}

@Test
public void deletedEventsAreNotMapped() {
assertNotMapped(AttributeDeleted.of(ThingId.of("thing:id"), JsonPointer.of("/the/quick/brown/fox/"), 3L,
public void deletedEventsAreNotMappedByDefault() {
assertNotMapped(AttributeDeleted.of(ThingId.of("thing:id"), JsonPointer.of("/the/quick/brown/fox"), 3L,
Instant.ofEpochSecond(3L), DittoHeaders.empty(), null));
assertNotMapped(FeaturePropertyDeleted.of(ThingId.of("thing:id"), "featureId",
JsonPointer.of("jumps/over/the/lazy/dog"), 4L, Instant.ofEpochSecond(4L), DittoHeaders.empty(), null));
assertNotMapped(FeatureDeleted.of(ThingId.of("thing:id"), "featureId", 5L, Instant.EPOCH,
DittoHeaders.empty(), null));
}

@Test
public void deletedEventsAreMappedWithDeletedFieldsWhenEnabled() {
enableDeletedFields();
final AttributeDeleted attributeDeleted = AttributeDeleted.of(
ThingId.of("thing:id"),
JsonPointer.of("/the/quick/brown/fox"),
3L,
Instant.ofEpochSecond(3L),
DittoHeaders.empty(),
null);

Assertions.assertThat(mapToJson(ADAPTER.toAdaptable(attributeDeleted)))
.isEqualTo(JsonObject.of("{\n" +
" \"thingId\": \"thing:id\",\n" +
" \"_modified\": \"1970-01-01T00:00:03Z\",\n" +
" \"_revision\": 3,\n" +
" \"_deletedFields\": {\n" +
" \"attributes\": {\n" +
" \"the\": {\n" +
" \"quick\": {\n" +
" \"brown\": {\n" +
" \"fox\": \"1970-01-01T00:00:03Z\"\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
" \"_context\": {\n" +
" \"topic\": \"thing/id/things/twin/events/deleted\",\n" +
" \"path\": \"/attributes/the/quick/brown/fox\",\n" +
" \"value\": null,\n" +
" \"headers\": {\n" +
" \"entity-revision\": \"3\",\n" +
" \"response-required\": \"false\"\n" +
" }\n" +
" }\n" +
"}"));

final FeaturePropertyDeleted propertyDeleted = FeaturePropertyDeleted.of(
ThingId.of("thing:id"),
"featureId",
JsonPointer.of("jumps/over/the/lazy/dog"),
4L,
Instant.ofEpochSecond(4L),
DittoHeaders.empty(),
null);

Assertions.assertThat(mapToJson(ADAPTER.toAdaptable(propertyDeleted)))
.isEqualTo(JsonObject.of("{\n" +
" \"thingId\": \"thing:id\",\n" +
" \"_modified\": \"1970-01-01T00:00:04Z\",\n" +
" \"_revision\": 4,\n" +
" \"_deletedFields\": {\n" +
" \"features\": {\n" +
" \"featureId\": {\n" +
" \"properties\": {\n" +
" \"jumps\": {\n" +
" \"over\": {\n" +
" \"the\": {\n" +
" \"lazy\": {\n" +
" \"dog\": \"1970-01-01T00:00:04Z\"\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
" \"_context\": {\n" +
" \"topic\": \"thing/id/things/twin/events/deleted\",\n" +
" \"path\": \"/features/featureId/properties/jumps/over/the/lazy/dog\",\n" +
" \"value\": null,\n" +
" \"headers\": {\n" +
" \"entity-revision\": \"4\",\n" +
" \"response-required\": \"false\"\n" +
" }\n" +
" }\n" +
"}"));

final FeatureDeleted featureDeleted = FeatureDeleted.of(
ThingId.of("thing:id"),
"featureId",
5L,
Instant.EPOCH,
DittoHeaders.empty(),
null);

Assertions.assertThat(mapToJson(ADAPTER.toAdaptable(featureDeleted)))
.isEqualTo(JsonObject.of("{\n" +
" \"thingId\": \"thing:id\",\n" +
" \"_modified\": \"1970-01-01T00:00:00Z\",\n" +
" \"_revision\": 5,\n" +
" \"_deletedFields\": {\n" +
" \"features\": {\n" +
" \"featureId\": \"1970-01-01T00:00:00Z\"\n" +
" }\n" +
" },\n" +
" \"_context\": {\n" +
" \"topic\": \"thing/id/things/twin/events/deleted\",\n" +
" \"path\": \"/features/featureId\",\n" +
" \"value\": null,\n" +
" \"headers\": {\n" +
" \"entity-revision\": \"5\",\n" +
" \"response-required\": \"false\"\n" +
" }\n" +
" }\n" +
"}"));
}

@Test
public void nonThingEventsAreNotMapped() {
// command
Expand All @@ -553,6 +725,14 @@ private void assertNotMapped(final Signal signal) {
assertThat(underTest.map(ADAPTER.toAdaptable(signal))).isEmpty();
}

private void enableDeletedFields() {
final Map<String, JsonValue> options = Map.of(
NormalizedMessageMapper.INCLUDE_DELETED_FIELDS, JsonValue.of("true"));
underTest.configure(connection, connectivityConfig,
DefaultMessageMapperConfiguration.of("normalizer", options, Map.of(), Map.of()),
actorSystem);
}

private JsonObject mapToJson(final Adaptable message) {
return underTest.map(message)
.stream()
Expand Down
Loading
Loading