Skip to content

Commit 75bb046

Browse files
xinlian12annie-mac
andauthored
fixBadRequestForIncludingAllContainersInKafkaSourceConnector (#46389)
* fix badRequestException when include all containers --------- Co-authored-by: annie-mac <[email protected]>
1 parent b1d7a02 commit 75bb046

File tree

8 files changed

+417
-74
lines changed

8 files changed

+417
-74
lines changed

sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#### Bugs Fixed
1010
* Fixed an issue where `CosmosSourceConnector` got stuck when restart - See [PR 46378](https://github.com/Azure/azure-sdk-for-java/pull/46378)
11+
* Fixed `BadRequestException` in `CosmosSourceConnector` when using `azure.cosmos.source.containers.includeAll=true` - See [PR 46389](https://github.com/Azure/azure-sdk-for-java/pull/46389)
1112

1213
#### Other Changes
1314

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,14 @@ public void start(Map<String, String> props) {
9898
this.connectorName = props.containsKey(CONNECTOR_NAME) ? props.get(CONNECTOR_NAME).toString() : "EMPTY";
9999
this.cosmosClientItem = CosmosClientCache.getCosmosClient(this.config.getAccountConfig(), connectorName);
100100
CosmosSourceContainersConfig containersConfig = this.config.getContainersConfig();
101-
validateDatabaseAndContainers(
102-
containersConfig.getIncludedContainers(),
101+
102+
List<String> containersExistedInDatabase = validateDatabaseAndContainers(
103+
containersConfig.isIncludeAllContainers() ? new ArrayList<>() : containersConfig.getIncludedContainers(),
103104
this.cosmosClientItem.getClient(),
104105
containersConfig.getDatabaseName());
105106

107+
validateContainersInTopicMap(containersExistedInDatabase, new ArrayList<>(containersConfig.getContainerToTopicMap().keySet()));
108+
106109
// IMPORTANT: sequence matters
107110
this.kafkaOffsetStorageReader = new MetadataKafkaStorageManager(this.context().offsetStorageReader());
108111
this.metadataReader = this.getMetadataReader();
@@ -126,6 +129,16 @@ public void start(Map<String, String> props) {
126129

127130
}
128131

132+
private void validateContainersInTopicMap(
133+
List<String> containersExistedInDatabase,
134+
List<String> containersInTopicMap) {
135+
if (containersInTopicMap != null
136+
&& !containersInTopicMap.isEmpty()
137+
&& !containersExistedInDatabase.containsAll(containersInTopicMap)) {
138+
throw new IllegalStateException("Containers specified in the topic map do not exist in the CosmosDB account.");
139+
}
140+
}
141+
129142
@Override
130143
public Class<? extends Task> taskClass() {
131144
return CosmosSourceTask.class;
@@ -534,13 +547,7 @@ private List<FeedRange> getFeedRanges(CosmosContainerProperties containerPropert
534547

535548
private Map<String, String> getContainersTopicMap(List<CosmosContainerProperties> allContainers) {
536549
Map<String, String> topicMapFromConfig =
537-
this.config.getContainersConfig().getContainersTopicMap()
538-
.stream()
539-
.map(containerTopicMapString -> containerTopicMapString.split("#"))
540-
.collect(
541-
Collectors.toMap(
542-
containerTopicMapArray -> containerTopicMapArray[1],
543-
containerTopicMapArray -> containerTopicMapArray[0]));
550+
this.config.getContainersConfig().getContainerToTopicMap();
544551

545552
Map<String, String> effectiveContainersTopicMap = new HashMap<>();
546553
allContainers.forEach(containerProperties -> {

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosContainerUtils.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,35 @@
1010

1111
import java.util.ArrayList;
1212
import java.util.List;
13+
import java.util.stream.Collectors;
1314

1415
public class CosmosContainerUtils {
16+
public static List<String> validateDatabaseAndContainers(
17+
List<String> includedContainers,
18+
CosmosAsyncClient cosmosAsyncClient,
19+
String databaseName) {
1520

16-
public static void validateDatabaseAndContainers(List<String> containerNames, CosmosAsyncClient cosmosAsyncClient, String databaseName) {
1721
StringBuilder queryStringBuilder = new StringBuilder();
1822
List<SqlParameter> parameters = new ArrayList<>();
1923

20-
queryStringBuilder.append("SELECT * FROM c WHERE c.id IN ( ");
21-
for (int i = 0; i < containerNames.size(); i++) {
22-
String idValue = containerNames.get(i);
23-
String idParamName = "@param" + i;
24+
if (includedContainers == null || includedContainers.isEmpty()) {
25+
queryStringBuilder.append("SELECT * FROM c");
26+
} else {
27+
queryStringBuilder.append("SELECT * FROM c WHERE c.id IN ( ");
28+
for (int i = 0; i < includedContainers.size(); i++) {
29+
String idValue = includedContainers.get(i);
30+
String idParamName = "@param" + i;
2431

25-
parameters.add(new SqlParameter(idParamName, idValue));
26-
queryStringBuilder.append(idParamName);
32+
parameters.add(new SqlParameter(idParamName, idValue));
33+
queryStringBuilder.append(idParamName);
2734

28-
if (i < containerNames.size() - 1) {
29-
queryStringBuilder.append(", ");
35+
if (i < includedContainers.size() - 1) {
36+
queryStringBuilder.append(", ");
37+
}
3038
}
39+
queryStringBuilder.append(" )");
3140
}
32-
queryStringBuilder.append(" )");
41+
3342
List<CosmosContainerProperties> cosmosContainerProperties = cosmosAsyncClient.getDatabase(databaseName)
3443
.queryContainers(new SqlQuerySpec(queryStringBuilder.toString(), parameters))
3544
.byPage()
@@ -45,8 +54,17 @@ public static void validateDatabaseAndContainers(List<String> containerNames, Co
4554
return KafkaCosmosExceptionsHelper.convertToConnectException(throwable, "validateDatabaseAndContainers failed.");
4655
})
4756
.block();
48-
if (cosmosContainerProperties.isEmpty() || cosmosContainerProperties.size() != containerNames.size()) {
57+
58+
List<String> containersFromDatabase = cosmosContainerProperties
59+
.stream()
60+
.map(CosmosContainerProperties::getId)
61+
.collect(Collectors.toList());
62+
63+
if (containersFromDatabase.isEmpty()
64+
|| (includedContainers != null && !containersFromDatabase.containsAll(includedContainers))) {
4965
throw new IllegalStateException("Containers specified in the config do not exist in the CosmosDB account.");
5066
}
67+
68+
return containersFromDatabase;
5169
}
5270
}

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceConfig.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.time.format.DateTimeParseException;
1515
import java.util.List;
1616
import java.util.Map;
17+
import java.util.stream.Collectors;
1718

1819
/**
1920
* Common Configuration for Cosmos DB Kafka source connector.
@@ -295,22 +296,29 @@ private CosmosSourceContainersConfig parseContainersConfig() {
295296
String databaseName = this.getString(DATABASE_NAME_CONF);
296297
boolean includeAllContainers = this.getBoolean(CONTAINERS_INCLUDE_ALL_CONFIG);
297298
List<String> containersIncludedList = this.getContainersIncludedList();
298-
List<String> containersTopicMap = this.getContainersTopicMap();
299+
Map<String, String> containerToTopicMap = this.getContainersTopicMap();
299300

300301
return new CosmosSourceContainersConfig(
301302
databaseName,
302303
includeAllContainers,
303304
containersIncludedList,
304-
containersTopicMap
305+
containerToTopicMap
305306
);
306307
}
307308

308309
private List<String> getContainersIncludedList() {
309310
return convertToList(this.getString(CONTAINERS_INCLUDED_LIST_CONFIG));
310311
}
311312

312-
private List<String> getContainersTopicMap() {
313-
return convertToList(this.getString(CONTAINERS_TOPIC_MAP_CONFIG));
313+
private Map<String, String> getContainersTopicMap() {
314+
List<String> topicToContainerMapList = convertToList(this.getString(CONTAINERS_TOPIC_MAP_CONFIG));
315+
return topicToContainerMapList
316+
.stream()
317+
.map(topicToContainerMapString -> topicToContainerMapString.split(CosmosSourceContainersConfig.CONTAINER_TOPIC_MAP_SEPARATOR))
318+
.collect(
319+
Collectors.toMap(
320+
containerTopicMapArray -> containerTopicMapArray[1],
321+
containerTopicMapArray -> containerTopicMapArray[0]));
314322
}
315323

316324
private CosmosMetadataConfig parseMetadataConfig() {

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceContainersConfig.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
77

88
import java.util.List;
9+
import java.util.Map;
910

1011
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
1112
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
@@ -16,21 +17,21 @@ public class CosmosSourceContainersConfig {
1617
private final String databaseName;
1718
private final boolean includeAllContainers;
1819
private final List<String> includedContainers;
19-
private final List<String> containersTopicMap;
20+
private final Map<String, String> containerToTopicMap;
2021

2122
public CosmosSourceContainersConfig(
2223
String databaseName,
2324
boolean includeAllContainers,
2425
List<String> includedContainers,
25-
List<String> containersTopicMap) {
26+
Map<String, String> containerToTopicMap) {
2627

2728
checkArgument(StringUtils.isNotEmpty(databaseName), "Argument 'databaseName' can not be null");
2829
checkNotNull(includedContainers, "Argument 'includedContainers' can not be null");
2930

3031
this.databaseName = databaseName;
3132
this.includeAllContainers = includeAllContainers;
3233
this.includedContainers = includedContainers;
33-
this.containersTopicMap = containersTopicMap;
34+
this.containerToTopicMap = containerToTopicMap;
3435
}
3536

3637
public String getDatabaseName() {
@@ -45,7 +46,7 @@ public List<String> getIncludedContainers() {
4546
return includedContainers;
4647
}
4748

48-
public List<String> getContainersTopicMap() {
49-
return containersTopicMap;
49+
public Map<String, String> getContainerToTopicMap() {
50+
return this.containerToTopicMap;
5051
}
5152
}

0 commit comments

Comments
 (0)