Skip to content

Commit 3aa1fd3

Browse files
Modify schema of metadata for feed ranges and containers to use unied Schema (Azure#45018)
* Modify schema of metadata for feed ranges and containers to use unified Schema * Update UnifiedMetadataSchemaConstants.java * Fixing LINTing errors * Adding unit tests * Fixing LINTing issues * Fixing unit test failure * Adding changelog * Update CosmosSourceConnectorITest.java
1 parent 0b7dcd1 commit 3aa1fd3

File tree

8 files changed

+365
-23
lines changed

8 files changed

+365
-23
lines changed

sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Prevented the usage of different schemas in the `cosmos.metadata.topic` topic helps when a customer selects output Kafka record value format as JSON_SR, AVRO or PROTOBUF (ie any SR format). With this change there is a unified schema used for the metadata topic instead. - See [PR 45018](https://github.com/Azure/azure-sdk-for-java/pull/45018)
1011

1112
#### Other Changes
1213
* Added `authEndpointOverride` option for all AAD authentication types - See [PR 45016](https://github.com/Azure/azure-sdk-for-java/pull/45016)

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
import com.azure.cosmos.models.FeedRange;
1818
import com.azure.cosmos.models.FeedResponse;
1919
import com.azure.cosmos.models.ModelBridgeInternal;
20+
import com.fasterxml.jackson.core.JsonProcessingException;
2021
import com.fasterxml.jackson.databind.JsonNode;
21-
import com.fasterxml.jackson.databind.node.ObjectNode;
2222
import org.apache.kafka.connect.data.Schema;
2323
import org.apache.kafka.connect.data.SchemaAndValue;
2424
import org.apache.kafka.connect.source.SourceRecord;
@@ -41,7 +41,7 @@ public class CosmosSourceTask extends SourceTask {
4141
private CosmosSourceTaskConfig taskConfig;
4242
private CosmosAsyncClient cosmosClient;
4343
private CosmosAsyncClient throughputControlCosmosClient;
44-
private Queue<ITaskUnit> taskUnitsQueue = new LinkedList<>();
44+
private final Queue<ITaskUnit> taskUnitsQueue = new LinkedList<>();
4545

4646
@Override
4747
public String version() {
@@ -135,13 +135,19 @@ private List<SourceRecord> executeMetadataTask(MetadataTaskUnit taskUnit) {
135135
List<SourceRecord> sourceRecords = new ArrayList<>();
136136

137137
// add the containers metadata record - it tracks the databaseName -> List[containerRid] mapping
138-
Pair<ContainersMetadataTopicPartition, ContainersMetadataTopicOffset> containersMetadata = taskUnit.getContainersMetadata();
138+
Pair<ContainersMetadataTopicPartition, ContainersMetadataTopicOffset> containersMetadata =
139+
taskUnit.getContainersMetadata();
139140

140141
// Convert JSON to Kafka Connect struct and JSON schema
141-
SchemaAndValue containersMetadataSchemaAndValue = JsonToStruct.recordToSchemaAndValue(
142-
Utils.getSimpleObjectMapper().convertValue(
143-
ContainersMetadataTopicOffset.toMap(containersMetadata.getRight()),
144-
ObjectNode.class));
142+
SchemaAndValue containersMetadataSchemaAndValue = null;
143+
try {
144+
containersMetadataSchemaAndValue = JsonToStruct.recordToUnifiedSchema(
145+
MetadataEntityTypes.CONTAINERS_METADATA_V1,
146+
Utils.getSimpleObjectMapper().writeValueAsString(
147+
ContainersMetadataTopicOffset.toMap(containersMetadata.getRight())));
148+
} catch (JsonProcessingException e) {
149+
throw new RuntimeException(e);
150+
}
145151

146152
sourceRecords.add(
147153
new SourceRecord(
@@ -155,10 +161,15 @@ private List<SourceRecord> executeMetadataTask(MetadataTaskUnit taskUnit) {
155161

156162
// add the container feedRanges metadata record - it tracks the containerRid -> List[FeedRange] mapping
157163
for (Pair<FeedRangesMetadataTopicPartition, FeedRangesMetadataTopicOffset> feedRangesMetadata : taskUnit.getFeedRangesMetadataList()) {
158-
SchemaAndValue feedRangeMetadataSchemaAndValue = JsonToStruct.recordToSchemaAndValue(
159-
Utils.getSimpleObjectMapper().convertValue(
160-
FeedRangesMetadataTopicOffset.toMap(feedRangesMetadata.getRight()),
161-
ObjectNode.class));
164+
SchemaAndValue feedRangeMetadataSchemaAndValue = null;
165+
try {
166+
feedRangeMetadataSchemaAndValue = JsonToStruct.recordToUnifiedSchema(
167+
MetadataEntityTypes.FEED_RANGES_METADATA_V1,
168+
Utils.getSimpleObjectMapper().writeValueAsString(
169+
FeedRangesMetadataTopicOffset.toMap(feedRangesMetadata.getRight())));
170+
} catch (JsonProcessingException e) {
171+
throw new RuntimeException(e);
172+
}
162173

163174
sourceRecords.add(
164175
new SourceRecord(

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/JsonToStruct.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33

44
package com.azure.cosmos.kafka.connect.implementation.source;
55

6+
import com.azure.cosmos.implementation.Utils;
67
import com.fasterxml.jackson.databind.JsonNode;
8+
import com.fasterxml.jackson.databind.type.MapType;
79
import org.apache.kafka.connect.data.Schema;
810
import org.apache.kafka.connect.data.SchemaAndValue;
911
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -13,10 +15,12 @@
1315

1416
import java.util.ArrayList;
1517
import java.util.Iterator;
18+
import java.util.LinkedHashMap;
1619
import java.util.List;
1720
import java.util.Map;
1821
import java.util.Objects;
1922

23+
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
2024
import static java.lang.String.format;
2125
import static org.apache.kafka.connect.data.Values.convertToByte;
2226
import static org.apache.kafka.connect.data.Values.convertToDouble;
@@ -26,9 +30,24 @@
2630
import static org.apache.kafka.connect.data.Values.convertToShort;
2731

2832
public class JsonToStruct {
33+
public static final MapType JACKSON_MAP_TYPE = Utils
34+
.getSimpleObjectMapper()
35+
.getTypeFactory()
36+
.constructMapType(LinkedHashMap.class, String.class, Object.class);
37+
2938
private static final Logger LOGGER = LoggerFactory.getLogger(JsonToStruct.class);
3039
private static final String SCHEMA_NAME_TEMPLATE = "inferred_name_%s";
3140

41+
public static SchemaAndValue recordToUnifiedSchema(final String entityType, final String jsonValue) {
42+
checkNotNull(entityType, "Argument 'entityType' should not be null");
43+
44+
Struct struct = new Struct(UnifiedMetadataSchemaConstants.SCHEMA)
45+
.put(UnifiedMetadataSchemaConstants.ENTITY_TYPE_NAME, entityType)
46+
.put(UnifiedMetadataSchemaConstants.JSON_VALUE_NAME, jsonValue);
47+
48+
return new SchemaAndValue(UnifiedMetadataSchemaConstants.SCHEMA, struct);
49+
}
50+
3251
public static SchemaAndValue recordToSchemaAndValue(final JsonNode node) {
3352
Schema nodeSchema = inferSchema(node);
3453
Struct struct = new Struct(nodeSchema);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.cosmos.kafka.connect.implementation.source;
4+
5+
public final class MetadataEntityTypes {
6+
public static final String CONTAINERS_METADATA_V1 = "ContainersMetadata-1.0";
7+
public static final String FEED_RANGES_METADATA_V1 = "FeedRangesMetadata-1.0";
8+
}

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/MetadataKafkaStorageManager.java

Lines changed: 101 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,60 @@
55

66
import com.azure.cosmos.implementation.Utils;
77
import com.azure.cosmos.models.FeedRange;
8+
import com.fasterxml.jackson.core.JsonProcessingException;
89
import org.apache.kafka.connect.storage.OffsetStorageReader;
910
import reactor.core.publisher.Mono;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
import org.apache.kafka.connect.errors.ConnectException;
1014

1115
import java.util.Map;
1216

1317
public class MetadataKafkaStorageManager implements IMetadataReader {
18+
private static final Logger LOGGER = LoggerFactory.getLogger(MetadataKafkaStorageManager.class);
1419
private final OffsetStorageReader offsetStorageReader;
1520

1621
public MetadataKafkaStorageManager(OffsetStorageReader offsetStorageReader) {
1722
this.offsetStorageReader = offsetStorageReader;
1823
}
1924

25+
public static Utils.ValueHolder<FeedRangesMetadataTopicOffset> parseFeedRangesMetadata(
26+
String databaseName,
27+
String containerRid,
28+
Map<String, Object> topicOffsetMap) {
29+
30+
// The data is stored in a Struct with our unified schema
31+
if (topicOffsetMap.containsKey(UnifiedMetadataSchemaConstants.ENTITY_TYPE_NAME)) {
32+
String entityType = (String) topicOffsetMap.get(UnifiedMetadataSchemaConstants.ENTITY_TYPE_NAME);
33+
if (MetadataEntityTypes.FEED_RANGES_METADATA_V1.equals(entityType)) {
34+
String jsonValue = (String) topicOffsetMap.get(UnifiedMetadataSchemaConstants.JSON_VALUE_NAME);
35+
36+
if (jsonValue != null) {
37+
Map<String, Object> feedRangesMap;
38+
try {
39+
feedRangesMap = Utils
40+
.getSimpleObjectMapper()
41+
.readValue(jsonValue, JsonToStruct.JACKSON_MAP_TYPE);
42+
} catch (JsonProcessingException e) {
43+
throw new RuntimeException(e);
44+
}
45+
46+
return new Utils.ValueHolder<>(FeedRangesMetadataTopicOffset.fromMap(feedRangesMap));
47+
}
48+
49+
LOGGER.warn(
50+
"No feed ranges found in unified schema for database: {}, containerRid: {}",
51+
databaseName,
52+
containerRid);
53+
return new Utils.ValueHolder<>(null);
54+
}
55+
56+
throw new IllegalStateException("Unknown EntityType '" + entityType + "'");
57+
}
58+
59+
return new Utils.ValueHolder<>(FeedRangesMetadataTopicOffset.fromMap(topicOffsetMap));
60+
}
61+
2062
public Mono<Utils.ValueHolder<FeedRangesMetadataTopicOffset>> getFeedRangesMetadataOffset(
2163
String databaseName,
2264
String containerRid,
@@ -27,17 +69,72 @@ public Mono<Utils.ValueHolder<FeedRangesMetadataTopicOffset>> getFeedRangesMetad
2769
FeedRangesMetadataTopicPartition.toMap(
2870
new FeedRangesMetadataTopicPartition(databaseName, containerRid, connectorName)));
2971

30-
return Mono.just(new Utils.ValueHolder<>(FeedRangesMetadataTopicOffset.fromMap(topicOffsetMap)));
72+
if (topicOffsetMap == null) {
73+
return Mono.just(new Utils.ValueHolder<>(null));
74+
}
75+
76+
try {
77+
return Mono.just(parseFeedRangesMetadata(databaseName, containerRid, topicOffsetMap));
78+
} catch (Exception e) {
79+
LOGGER.error("Error processing feed ranges metadata from unified schema", e);
80+
return Mono.error(new ConnectException("Failed to process feed ranges metadata", e));
81+
}
82+
}
83+
84+
public static Utils.ValueHolder<ContainersMetadataTopicOffset> parseContainersMetadata(
85+
String databaseName,
86+
Map<String, Object> topicOffsetMap) {
87+
88+
// The data is stored in a Struct with our unified schema
89+
if (topicOffsetMap.containsKey(UnifiedMetadataSchemaConstants.ENTITY_TYPE_NAME)) {
90+
String entityType = (String) topicOffsetMap.get(UnifiedMetadataSchemaConstants.ENTITY_TYPE_NAME);
91+
if (MetadataEntityTypes.CONTAINERS_METADATA_V1.equals(entityType)) {
92+
String jsonValue = (String) topicOffsetMap.get(UnifiedMetadataSchemaConstants.JSON_VALUE_NAME);
93+
94+
if (jsonValue != null) {
95+
Map<String, Object> containerRidsMap;
96+
try {
97+
containerRidsMap = Utils
98+
.getSimpleObjectMapper()
99+
.readValue(jsonValue, JsonToStruct.JACKSON_MAP_TYPE);
100+
} catch (JsonProcessingException e) {
101+
throw new RuntimeException(e);
102+
}
103+
104+
return new Utils.ValueHolder<>(ContainersMetadataTopicOffset.fromMap(containerRidsMap));
105+
}
106+
107+
LOGGER.warn("No container RIDs found in unified schema for database: {}", databaseName);
108+
return new Utils.ValueHolder<>(null);
109+
}
110+
111+
throw new IllegalStateException("Unknown EntityType '" + entityType + "'");
112+
}
113+
114+
return new Utils.ValueHolder<>(ContainersMetadataTopicOffset.fromMap(topicOffsetMap));
31115
}
32116

33-
public Mono<Utils.ValueHolder<ContainersMetadataTopicOffset>> getContainersMetadataOffset(String databaseName, String connectorName) {
117+
public Mono<Utils.ValueHolder<ContainersMetadataTopicOffset>> getContainersMetadataOffset(
118+
String databaseName,
119+
String connectorName) {
120+
34121
Map<String, Object> topicOffsetMap =
35122
this.offsetStorageReader
36123
.offset(
37124
ContainersMetadataTopicPartition.toMap(
38125
new ContainersMetadataTopicPartition(databaseName, connectorName)));
39126

40-
return Mono.just(new Utils.ValueHolder<>(ContainersMetadataTopicOffset.fromMap(topicOffsetMap)));
127+
if (topicOffsetMap == null) {
128+
return Mono.just(new Utils.ValueHolder<>(null));
129+
}
130+
131+
try {
132+
// The data is stored in a Struct with our unified schema
133+
return Mono.just(parseContainersMetadata(databaseName, topicOffsetMap));
134+
} catch (Exception e) {
135+
LOGGER.error("Error processing containers metadata from unified schema", e);
136+
return Mono.error(new ConnectException("Failed to process containers metadata", e));
137+
}
41138
}
42139

43140
public FeedRangeContinuationTopicOffset getFeedRangeContinuationOffset(
@@ -55,6 +152,6 @@ public FeedRangeContinuationTopicOffset getFeedRangeContinuationOffset(
55152
}
56153

57154
public OffsetStorageReader getOffsetStorageReader() {
58-
return offsetStorageReader;
155+
return this.offsetStorageReader;
59156
}
60157
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.cosmos.kafka.connect.implementation.source;
4+
5+
import org.apache.kafka.connect.data.Schema;
6+
import org.apache.kafka.connect.data.SchemaBuilder;
7+
8+
public final class UnifiedMetadataSchemaConstants {
9+
public static final String SCHEMA_NAME = "CosmosUnifiedMetadataSchema";
10+
public static final String ENTITY_TYPE_NAME = "cosmosMetadataEntityType";
11+
public static final String JSON_VALUE_NAME = "jsonValue";
12+
13+
public static final Schema SCHEMA = SchemaBuilder
14+
.struct()
15+
.name(SCHEMA_NAME)
16+
.field(ENTITY_TYPE_NAME, Schema.STRING_SCHEMA)
17+
.field(JSON_VALUE_NAME, Schema.STRING_SCHEMA)
18+
.build();
19+
}

sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorITest.java

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66
import com.azure.cosmos.CosmosAsyncClient;
77
import com.azure.cosmos.CosmosAsyncContainer;
88
import com.azure.cosmos.CosmosClientBuilder;
9-
import com.azure.cosmos.implementation.TestConfigurations;
109
import com.azure.cosmos.implementation.Utils;
1110
import com.azure.cosmos.kafka.connect.implementation.CosmosAuthType;
1211
import com.azure.cosmos.kafka.connect.implementation.source.ContainersMetadataTopicOffset;
1312
import com.azure.cosmos.kafka.connect.implementation.source.CosmosMetadataStorageType;
1413
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceConfig;
1514
import com.azure.cosmos.kafka.connect.implementation.source.FeedRangesMetadataTopicOffset;
15+
import com.azure.cosmos.kafka.connect.implementation.source.MetadataEntityTypes;
16+
import com.azure.cosmos.kafka.connect.implementation.source.UnifiedMetadataSchemaConstants;
17+
import com.fasterxml.jackson.core.JsonProcessingException;
1618
import com.fasterxml.jackson.core.type.TypeReference;
1719
import com.fasterxml.jackson.databind.JsonNode;
1820
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -159,7 +161,7 @@ public void readFromSingleContainer(boolean useMasterKey, CosmosMetadataStorageT
159161
int expectedMetadataRecordsCount = metadataStorageType == CosmosMetadataStorageType.COSMOS ? 0 : 2;
160162
int expectedItemRecords = createdItems.size();
161163

162-
Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> {;
164+
Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> {
163165
kafkaConsumer.poll(Duration.ofMillis(1000))
164166
.iterator()
165167
.forEachRemaining(consumerRecord -> {
@@ -177,22 +179,52 @@ public void readFromSingleContainer(boolean useMasterKey, CosmosMetadataStorageT
177179
//validate containers metadata record
178180
ConsumerRecord<String, JsonNode> containerMetadataRecord = metadataRecords.get(0);
179181
assertThat(containerMetadataRecord.key()).isEqualTo(databaseName + "_" + connectorName);
180-
ContainersMetadataTopicOffset containersMetadataTopicOffset =
181-
ContainersMetadataTopicOffset.fromMap(
182+
183+
JsonNode rootJsonNode = containerMetadataRecord.value().get("payload");
184+
assertThat(rootJsonNode).isNotNull();
185+
assertThat(rootJsonNode.get(UnifiedMetadataSchemaConstants.ENTITY_TYPE_NAME)).isNotNull();
186+
assertThat(rootJsonNode.get(UnifiedMetadataSchemaConstants.ENTITY_TYPE_NAME).textValue())
187+
.isEqualTo(MetadataEntityTypes.CONTAINERS_METADATA_V1);
188+
JsonNode jsonValueNode = rootJsonNode.get(UnifiedMetadataSchemaConstants.JSON_VALUE_NAME);
189+
assertThat(jsonValueNode).isNotNull();
190+
String jsonValue = jsonValueNode.textValue();
191+
assertThat(jsonValue).isNotNull();
192+
193+
ContainersMetadataTopicOffset containersMetadataTopicOffset = null;
194+
try {
195+
containersMetadataTopicOffset = ContainersMetadataTopicOffset.fromMap(
182196
Utils.getSimpleObjectMapper()
183-
.convertValue(containerMetadataRecord.value().get("payload"), new TypeReference<Map<String, Object>>(){})
197+
.readValue(jsonValue, new TypeReference<Map<String, Object>>() {})
184198
);
199+
} catch (JsonProcessingException e) {
200+
throw new RuntimeException(e);
201+
}
185202
assertThat(containersMetadataTopicOffset.getContainerRids().size()).isEqualTo(1);
186203
assertThat(containersMetadataTopicOffset.getContainerRids().contains(containerRid)).isTrue();
187204

188205
// validate feed ranges metadata record
189206
ConsumerRecord<String, JsonNode> feedRangesMetadataRecord = metadataRecords.get(1);
190207
assertThat(feedRangesMetadataRecord.key()).isEqualTo(databaseName + "_" + containerRid + "_" + connectorName);
191-
FeedRangesMetadataTopicOffset feedRangesMetadataTopicOffsetOffset =
192-
FeedRangesMetadataTopicOffset.fromMap(
208+
209+
rootJsonNode = feedRangesMetadataRecord.value().get("payload");
210+
assertThat(rootJsonNode).isNotNull();
211+
assertThat(rootJsonNode.get(UnifiedMetadataSchemaConstants.ENTITY_TYPE_NAME)).isNotNull();
212+
assertThat(rootJsonNode.get(UnifiedMetadataSchemaConstants.ENTITY_TYPE_NAME).textValue())
213+
.isEqualTo(MetadataEntityTypes.FEED_RANGES_METADATA_V1);
214+
jsonValueNode = rootJsonNode.get(UnifiedMetadataSchemaConstants.JSON_VALUE_NAME);
215+
assertThat(jsonValueNode).isNotNull();
216+
jsonValue = jsonValueNode.textValue();
217+
assertThat(jsonValue).isNotNull();
218+
219+
FeedRangesMetadataTopicOffset feedRangesMetadataTopicOffsetOffset = null;
220+
try {
221+
feedRangesMetadataTopicOffsetOffset = FeedRangesMetadataTopicOffset.fromMap(
193222
Utils.getSimpleObjectMapper()
194-
.convertValue(feedRangesMetadataRecord.value().get("payload"), new TypeReference<Map<String, Object>>(){})
223+
.readValue(jsonValue, new TypeReference<Map<String, Object>>() {})
195224
);
225+
} catch (JsonProcessingException e) {
226+
throw new RuntimeException(e);
227+
}
196228
assertThat(feedRangesMetadataTopicOffsetOffset.getFeedRanges().size()).isEqualTo(1);
197229
}
198230

0 commit comments

Comments
 (0)