Skip to content

Commit 6c103b8

Browse files
tvaron3xinlian12
andauthored
Connector Stuck on Incorrect Container Names (#41160)
* connector not starting with incorrect container names * Updated changelog and reverted some changes * Add sink connector to also fail if tried to create with wrong container name. * remove unnecessary imports * Revert unnecessary changes * Update sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md Co-authored-by: Annie Liang <[email protected]> * testing database call * revert test change and fixed ci and live tests * reacting to comments * reacting to comments * reacting to comments * revert ps script * revert ps script * revert ps script * revert ps script * created container registry and added relevant docker images --------- Co-authored-by: Annie Liang <[email protected]>
1 parent 27b69d2 commit 6c103b8

File tree

10 files changed

+207
-19
lines changed

10 files changed

+207
-19
lines changed

sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Connector status appears as `FAILED` if try to create a connector with incorrect container names - See [PR 41160](https://github.com/Azure/azure-sdk-for-java/pull/41160)
1011

1112
#### Other Changes
1213

sdk/cosmos/azure-cosmos-kafka-connect/src/docker/startup.ps1

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,4 @@ Write-Host "Building Cosmos DB Kafka Connect Docker image"
2626
docker build . -t cosmosdb-kafka-connect:latest
2727

2828
Write-Host "Starting Docker Compose..."
29-
docker-compose up -d
29+
docker-compose up -d

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33

44
package com.azure.cosmos.kafka.connect;
55

6+
import com.azure.cosmos.CosmosAsyncClient;
67
import com.azure.cosmos.implementation.apachecommons.lang.RandomUtils;
8+
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
79
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
810
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig;
11+
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkContainersConfig;
912
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask;
1013
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTaskConfig;
1114
import org.apache.kafka.common.config.Config;
@@ -22,6 +25,7 @@
2225
import java.util.function.Function;
2326
import java.util.stream.Collectors;
2427

28+
import static com.azure.cosmos.kafka.connect.implementation.CosmosContainerUtils.validateContainers;
2529
import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateCosmosAccountAuthConfig;
2630
import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateThroughputControlConfig;
2731
import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateWriteConfig;
@@ -41,6 +45,11 @@ public void start(Map<String, String> props) {
4145
LOGGER.info("Starting the kafka cosmos sink connector");
4246
this.sinkConfig = new CosmosSinkConfig(props);
4347
this.connectorName = props.containsKey(CONNECTOR_NAME) ? props.get(CONNECTOR_NAME).toString() : "EMPTY";
48+
CosmosSinkContainersConfig containersConfig = this.sinkConfig.getContainersConfig();
49+
CosmosAsyncClient cosmosAsyncClient = CosmosClientStore.getCosmosClient(this.sinkConfig.getAccountConfig(), this.connectorName);
50+
validateContainers(new ArrayList<>(containersConfig.getTopicToContainerMap().values()), cosmosAsyncClient,
51+
containersConfig.getDatabaseName());
52+
cosmosAsyncClient.close();
4453
}
4554

4655
@Override

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
1515
import com.azure.cosmos.kafka.connect.implementation.source.CosmosMetadataStorageType;
1616
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceConfig;
17+
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceContainersConfig;
1718
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceTask;
1819
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceTaskConfig;
1920
import com.azure.cosmos.kafka.connect.implementation.source.FeedRangeContinuationTopicOffset;
@@ -52,6 +53,7 @@
5253
import java.util.function.Function;
5354
import java.util.stream.Collectors;
5455

56+
import static com.azure.cosmos.kafka.connect.implementation.CosmosContainerUtils.validateContainers;
5557
import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateCosmosAccountAuthConfig;
5658
import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateThroughputControlConfig;
5759

@@ -76,6 +78,9 @@ public void start(Map<String, String> props) {
7678
this.config = new CosmosSourceConfig(props);
7779
this.connectorName = props.containsKey(CONNECTOR_NAME) ? props.get(CONNECTOR_NAME).toString() : "EMPTY";
7880
this.cosmosClient = CosmosClientStore.getCosmosClient(this.config.getAccountConfig(), connectorName);
81+
CosmosSourceContainersConfig containersConfig = this.config.getContainersConfig();
82+
validateContainers(containersConfig.getIncludedContainers(),
83+
this.cosmosClient, containersConfig.getDatabaseName());
7984

8085
// IMPORTANT: sequence matters
8186
this.kafkaOffsetStorageReader = new MetadataKafkaStorageManager(this.context().offsetStorageReader());
@@ -247,6 +252,9 @@ private List<Map<String, String>> getFeedRangeTaskConfigs(List<FeedRangeTaskUnit
247252

248253
private Pair<MetadataTaskUnit, List<FeedRangeTaskUnit>> getAllTaskUnits() {
249254
List<CosmosContainerProperties> allContainers = this.monitorThread.getAllContainers().block();
255+
if (allContainers.isEmpty()) {
256+
throw new ConnectException("Some of the containers specified in the config were not found in the database.");
257+
}
250258
Map<String, String> containerTopicMap = this.getContainersTopicMap(allContainers);
251259
List<FeedRangeTaskUnit> allFeedRangeTaskUnits = new ArrayList<>();
252260
Map<String, List<FeedRange>> updatedContainerToFeedRangesMap = new ConcurrentHashMap<>();
@@ -462,7 +470,7 @@ private Map<String, String> getContainersTopicMap(List<CosmosContainerProperties
462470

463471
Map<String, String> effectiveContainersTopicMap = new HashMap<>();
464472
allContainers.forEach(containerProperties -> {
465-
// by default, we are using container id as the topic name as well unless customer override through containers.topicMap
473+
// by default, we are using container id as the topic name as well unless customer override through containers.topicMap
466474
if (topicMapFromConfig.containsKey(containerProperties.getId())) {
467475
effectiveContainersTopicMap.put(
468476
containerProperties.getId(),
@@ -473,7 +481,7 @@ private Map<String, String> getContainersTopicMap(List<CosmosContainerProperties
473481
containerProperties.getId());
474482
}
475483
});
476-
484+
477485
return effectiveContainersTopicMap;
478486
}
479487

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.kafka.connect.implementation;
5+
6+
import com.azure.cosmos.CosmosAsyncClient;
7+
import com.azure.cosmos.models.CosmosContainerProperties;
8+
import com.azure.cosmos.models.SqlParameter;
9+
import com.azure.cosmos.models.SqlQuerySpec;
10+
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
14+
public class CosmosContainerUtils {
15+
16+
public static void validateContainers(List<String> containerNames, CosmosAsyncClient cosmosAsyncClient, String databaseName) {
17+
StringBuilder queryStringBuilder = new StringBuilder();
18+
List<SqlParameter> parameters = new ArrayList<>();
19+
20+
queryStringBuilder.append("SELECT * FROM c WHERE c.id IN ( ");
21+
for (int i = 0; i < containerNames.size(); i++) {
22+
String idValue = containerNames.get(i);
23+
String idParamName = "@param" + i;
24+
25+
parameters.add(new SqlParameter(idParamName, idValue));
26+
queryStringBuilder.append(idParamName);
27+
28+
if (i < containerNames.size() - 1) {
29+
queryStringBuilder.append(", ");
30+
}
31+
}
32+
queryStringBuilder.append(" )");
33+
List<CosmosContainerProperties> cosmosContainerProperties = cosmosAsyncClient.getDatabase(databaseName)
34+
.queryContainers(new SqlQuerySpec(queryStringBuilder.toString(), parameters))
35+
.byPage()
36+
.flatMapIterable(response -> response.getResults())
37+
.collectList()
38+
.onErrorMap(throwable -> KafkaCosmosExceptionsHelper.convertToConnectException(throwable, "validateContainers failed.")).block();
39+
if (cosmosContainerProperties.isEmpty() || cosmosContainerProperties.size() != containerNames.size()) {
40+
throw new IllegalStateException("Containers specified in the config do not exist in the CosmosDB account.");
41+
}
42+
}
43+
}
Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
11
#!/usr/bin/env pwsh
22
$ErrorActionPreference='Stop'
3-
cd $PSScriptRoot
3+
Set-Location $PSScriptRoot
44

55
Write-Host "Deleting prior Cosmos DB connectors..."
6-
rm -rf "$PSScriptRoot/src/test/connectorPlugins/connectors"
7-
New-Item -Path "$PSScriptRoot/src/test/connectorPlugins" -ItemType "directory" -Name "connectors" -Force | Out-Null
6+
Remove-Item -Recurse -Force "$PSScriptRoot/connectors" -ErrorAction SilentlyContinue
7+
New-Item -Path "$PSScriptRoot" -ItemType "directory" -Name "connectors" -Force | Out-Null
88

99
Write-Host "Rebuilding Cosmos DB connectors..."
10-
mvn clean package -DskipTests -Dmaven.javadoc.skip
11-
Get-ChildItem -Path $PSScriptRoot/target -Filter "azure-cosmos-kafka-connect-*.jar" | Where-Object { $_.Name -notlike "azure-cosmos-kafka-connect-*-sources.jar" } | Copy-Item -Destination $PSScriptRoot/src/test/connectorPlugins/connectors
12-
cd $PSScriptRoot/src/test/connectorPlugins
10+
Set-Location $PSScriptRoot/../../..
11+
mvn --% clean package -DskipTests -Dmaven.javadoc.skip
12+
Get-ChildItem -Path $PSScriptRoot/../../../target -Filter "azure-cosmos-kafka-connect-*.jar" | Where-Object { $_.Name -notlike "azure-cosmos-kafka-connect-*-sources.jar" } | Copy-Item -Destination $PSScriptRoot/connectors
1313

1414
Write-Host "Adding custom Insert UUID SMT"
15-
cd $PSScriptRoot/src/test/connectorPlugins/connectors
16-
git clone https://github.com/confluentinc/kafka-connect-insert-uuid.git insertuuid -q && cd insertuuid
15+
Set-Location $PSScriptRoot/connectors
16+
git clone https://github.com/confluentinc/kafka-connect-insert-uuid.git insertuuid -q
17+
Set-Location insertuuid
1718
mvn clean package -DskipTests=true
18-
copy target\*.jar $PSScriptRoot/src/test/connectorPlugins/connectors
19-
rm -rf "$PSScriptRoot/src/test/connectorPlugins/connectors/insertuuid"
20-
cd $PSScriptRoot
19+
Copy-Item target\*.jar $PSScriptRoot/connectors
20+
Set-Location $PSScriptRoot
21+
Remove-Item -Recurse -Force "$PSScriptRoot/connectors/insertuuid"

sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSinkConnectorITest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.connect.storage.StringConverter;
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
26+
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorStatus;
2627
import org.testng.annotations.AfterClass;
2728
import org.testng.annotations.BeforeClass;
2829
import org.testng.annotations.DataProvider;
@@ -147,6 +148,61 @@ public void sinkToSingleContainer(boolean useMasterKey) throws InterruptedExcept
147148
}
148149
}
149150

151+
@Test(groups = { "kafka-integration" }, dataProvider = "sinkAuthParameterProvider")
152+
public void createConnectorWithWrongContainerName(boolean useMasterKey) {
153+
154+
logger.info("sinkToSingleContainer " + useMasterKey);
155+
Map<String, String> sinkConnectorConfig = new HashMap<>();
156+
String wrongContainerName = "wrongContainerName";
157+
String topicName = wrongContainerName + "-" + UUID.randomUUID();
158+
159+
sinkConnectorConfig.put("topics", topicName);
160+
sinkConnectorConfig.put("value.converter", JsonConverter.class.getName());
161+
sinkConnectorConfig.put("value.converter.schemas.enable", "false");
162+
sinkConnectorConfig.put("key.converter", StringConverter.class.getName());
163+
sinkConnectorConfig.put("connector.class", "com.azure.cosmos.kafka.connect.CosmosSinkConnector");
164+
sinkConnectorConfig.put("azure.cosmos.account.endpoint", KafkaCosmosTestConfigurations.HOST);
165+
sinkConnectorConfig.put("azure.cosmos.application.name", "Test");
166+
sinkConnectorConfig.put("azure.cosmos.sink.database.name", databaseName);
167+
sinkConnectorConfig.put("azure.cosmos.sink.containers.topicMap", topicName + "#" + wrongContainerName);
168+
169+
if (useMasterKey) {
170+
sinkConnectorConfig.put("azure.cosmos.account.key", KafkaCosmosTestConfigurations.MASTER_KEY);
171+
} else {
172+
sinkConnectorConfig.put("azure.cosmos.auth.type", CosmosAuthType.SERVICE_PRINCIPAL.getName());
173+
sinkConnectorConfig.put("azure.cosmos.account.tenantId", KafkaCosmosTestConfigurations.ACCOUNT_TENANT_ID);
174+
sinkConnectorConfig.put("azure.cosmos.auth.aad.clientId", KafkaCosmosTestConfigurations.ACCOUNT_AAD_CLIENT_ID);
175+
sinkConnectorConfig.put("azure.cosmos.auth.aad.clientSecret", KafkaCosmosTestConfigurations.ACCOUNT_AAD_CLIENT_SECRET);
176+
}
177+
178+
// Create topic ahead of time
179+
kafkaCosmosConnectContainer.createTopic(topicName, 1);
180+
181+
String connectorName = "simpleTest-" + UUID.randomUUID();
182+
try {
183+
// register the sink connector
184+
logger.info("Registering connector " + connectorName);
185+
kafkaCosmosConnectContainer.registerConnector(connectorName, sinkConnectorConfig);
186+
187+
// give some time for the connector to start up
188+
Thread.sleep(10000);
189+
// verify connector tasks
190+
ConnectorStatus connectorStatus = kafkaCosmosConnectContainer.getConnectorStatus(connectorName);
191+
assertThat(connectorStatus.getConnector().get("state").equals("FAILED")).isTrue();
192+
assertThat(connectorStatus.getConnector().get("trace")
193+
.contains("java.lang.IllegalStateException: Containers specified in the config do not exist in the CosmosDB account.")).isTrue();
194+
195+
} catch (InterruptedException e) {
196+
throw new RuntimeException(e);
197+
} finally {
198+
// IMPORTANT: remove the connector after use
199+
if (kafkaCosmosConnectContainer != null) {
200+
kafkaCosmosConnectContainer.deleteTopic(topicName);
201+
kafkaCosmosConnectContainer.deleteConnector(connectorName);
202+
}
203+
}
204+
}
205+
150206
@Test(groups = { "kafka-integration" }, timeOut = 10 * TIMEOUT)
151207
public void postAvroMessage() throws InterruptedException {
152208
Map<String, String> sinkConnectorConfig = new HashMap<>();

sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSinkConnectorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void requiredConfig() {
8787
assertThat(errorMessages.get("azure.cosmos.sink.containers.topicMap").size()).isGreaterThan(0);
8888
}
8989

90-
@Test(groups = "unit")
90+
@Test(groups = "kafka-emulator")
9191
public void taskConfigs() {
9292
CosmosSinkConnector sinkConnector = new CosmosSinkConnector();
9393
String connectorName = "test";

sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorITest.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,72 @@ public void readFromSingleContainer(boolean useMasterKey, CosmosMetadataStorageT
224224
}
225225
}
226226

227+
@Test(groups = { "kafka-integration" }, dataProvider = "sourceAuthParameterProvider")
228+
public void createConnectorWithWrongContainerName(boolean useMasterKey, CosmosMetadataStorageType metadataStorageType) {
229+
230+
logger.info("read from single container " + useMasterKey);
231+
String wrongContainerName = "wrongContainerName";
232+
String topicName = wrongContainerName + "-" + UUID.randomUUID();
233+
String metadataStorageName = "Metadata-" + UUID.randomUUID();
234+
235+
Map<String, String> sourceConnectorConfig = new HashMap<>();
236+
sourceConnectorConfig.put("connector.class", "com.azure.cosmos.kafka.connect.CosmosSourceConnector");
237+
sourceConnectorConfig.put("azure.cosmos.account.endpoint", KafkaCosmosTestConfigurations.HOST);
238+
sourceConnectorConfig.put("azure.cosmos.application.name", "Test");
239+
sourceConnectorConfig.put("azure.cosmos.source.database.name", databaseName);
240+
sourceConnectorConfig.put("azure.cosmos.source.containers.includeAll", "false");
241+
sourceConnectorConfig.put("azure.cosmos.source.containers.includedList", wrongContainerName);
242+
sourceConnectorConfig.put("azure.cosmos.source.containers.topicMap", topicName + "#" + wrongContainerName);
243+
244+
if (useMasterKey) {
245+
sourceConnectorConfig.put("azure.cosmos.account.key", KafkaCosmosTestConfigurations.MASTER_KEY);
246+
} else {
247+
sourceConnectorConfig.put("azure.cosmos.auth.type", CosmosAuthType.SERVICE_PRINCIPAL.getName());
248+
sourceConnectorConfig.put("azure.cosmos.account.tenantId", KafkaCosmosTestConfigurations.ACCOUNT_TENANT_ID);
249+
sourceConnectorConfig.put("azure.cosmos.auth.aad.clientId", KafkaCosmosTestConfigurations.ACCOUNT_AAD_CLIENT_ID);
250+
sourceConnectorConfig.put("azure.cosmos.auth.aad.clientSecret", KafkaCosmosTestConfigurations.ACCOUNT_AAD_CLIENT_SECRET);
251+
}
252+
253+
if (metadataStorageType == CosmosMetadataStorageType.COSMOS) {
254+
sourceConnectorConfig.put("azure.cosmos.source.metadata.storage.name", metadataStorageName);
255+
sourceConnectorConfig.put("azure.cosmos.source.metadata.storage.type", CosmosMetadataStorageType.COSMOS.getName());
256+
}
257+
258+
// Create topic ahead of time
259+
kafkaCosmosConnectContainer.createTopic(topicName, 1);
260+
String connectorName = "simpleTest-" + UUID.randomUUID();
261+
262+
try {
263+
// if using cosmos container to persiste the metadata, pre-create it
264+
if (metadataStorageType == CosmosMetadataStorageType.COSMOS) {
265+
logger.info("Creating metadata container");
266+
client.getDatabase(databaseName)
267+
.createContainerIfNotExists(metadataStorageName, "/id")
268+
.block();
269+
}
270+
271+
kafkaCosmosConnectContainer.registerConnector(connectorName, sourceConnectorConfig);
272+
273+
// give some time for the connector to start up
274+
Thread.sleep(10000);
275+
// verify connector tasks
276+
ConnectorStatus connectorStatus = kafkaCosmosConnectContainer.getConnectorStatus(connectorName);
277+
assertThat(connectorStatus.getConnector().get("state").equals("FAILED")).isTrue();
278+
assertThat(connectorStatus.getConnector().get("trace")
279+
.contains("java.lang.IllegalStateException: Containers specified in the config do not exist in the CosmosDB account.")).isTrue();
280+
} catch (InterruptedException e) {
281+
throw new RuntimeException(e);
282+
} finally {
283+
if (client != null) {
284+
285+
// delete the metadata container if created
286+
if (metadataStorageType == CosmosMetadataStorageType.COSMOS) {
287+
client.getDatabase(databaseName).getContainer(metadataStorageName).delete().block();
288+
}
289+
}
290+
}
291+
}
292+
227293
@Test(groups = { "kafka-integration" }, dataProvider = "metadataCosmosStorageParameterProvider", timeOut = 2 * TIMEOUT)
228294
public void connectorStart_metadata_cosmosStorageType(
229295
boolean useMasterKey,

0 commit comments

Comments
 (0)