Skip to content

Commit 3e0b67b

Browse files
xinlian12annie-mac
andauthored
addCosmosClientCacheInKafkaConnector (#45633)
* add cosmos client cache in kafka connector --------- Co-authored-by: annie-mac <[email protected]>
1 parent 8d9e62d commit 3e0b67b

20 files changed

+1255
-249
lines changed

sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixed an issue where Cosmos client is not being closed properly when connector failed to start - See [PR 45633](https://github.com/Azure/azure-sdk-for-java/pull/45633)
1011

1112
#### Other Changes
1213

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33

44
package com.azure.cosmos.kafka.connect;
55

6-
import com.azure.cosmos.CosmosAsyncClient;
76
import com.azure.cosmos.CosmosAsyncContainer;
87
import com.azure.cosmos.CosmosAsyncDatabase;
98
import com.azure.cosmos.implementation.UUIDs;
109
import com.azure.cosmos.implementation.apachecommons.lang.RandomUtils;
1110
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
12-
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
11+
import com.azure.cosmos.kafka.connect.implementation.CosmosClientCache;
12+
import com.azure.cosmos.kafka.connect.implementation.CosmosClientCacheItem;
1313
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlConfig;
1414
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
1515
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
@@ -49,20 +49,30 @@ public final class CosmosSinkConnector extends SinkConnector implements AutoClos
4949

5050
private CosmosSinkConfig sinkConfig;
5151
private String connectorName;
52-
private CosmosAsyncClient cosmosClient;
52+
private CosmosClientCacheItem cosmosClientItem;
5353

5454
@Override
5555
public void start(Map<String, String> props) {
5656
LOGGER.info("Starting the kafka cosmos sink connector");
57-
this.sinkConfig = new CosmosSinkConfig(props);
58-
this.connectorName = props.containsKey(CONNECTOR_NAME) ? props.get(CONNECTOR_NAME).toString() : "EMPTY";
59-
CosmosSinkContainersConfig containersConfig = this.sinkConfig.getContainersConfig();
60-
this.cosmosClient =
61-
CosmosClientStore.getCosmosClient(this.sinkConfig.getAccountConfig(), this.connectorName);
62-
validateDatabaseAndContainers(
63-
new ArrayList<>(containersConfig.getTopicToContainerMap().values()),
64-
this.cosmosClient,
65-
containersConfig.getDatabaseName());
57+
try {
58+
this.sinkConfig = new CosmosSinkConfig(props);
59+
this.connectorName = props.containsKey(CONNECTOR_NAME) ? props.get(CONNECTOR_NAME).toString() : "EMPTY";
60+
CosmosSinkContainersConfig containersConfig = this.sinkConfig.getContainersConfig();
61+
this.cosmosClientItem =
62+
CosmosClientCache.getCosmosClient(this.sinkConfig.getAccountConfig(), this.connectorName);
63+
validateDatabaseAndContainers(
64+
new ArrayList<>(containersConfig.getTopicToContainerMap().values()),
65+
this.cosmosClientItem.getClient(),
66+
containersConfig.getDatabaseName());
67+
} catch (Exception e) {
68+
LOGGER.warn("Error starting the kafka cosmos sink connector", e);
69+
// if connector failed to start, release initialized resources here
70+
this.cleanup();
71+
72+
// re-throw the exception back to kafka
73+
throw e;
74+
}
75+
6676
}
6777

6878
@Override
@@ -105,7 +115,10 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
105115
private String getClientMetadataCachesSnapshotString() {
106116
CosmosSinkContainersConfig containersConfig = this.sinkConfig.getContainersConfig();
107117
List<String> containerNames = new ArrayList<>(containersConfig.getTopicToContainerMap().values());
108-
CosmosAsyncDatabase database = this.cosmosClient.getDatabase(containersConfig.getDatabaseName());
118+
CosmosAsyncDatabase database =
119+
this.cosmosClientItem
120+
.getClient()
121+
.getDatabase(containersConfig.getDatabaseName());
109122

110123
// read a random item from each container to populate the collection cache
111124
for (String containerName : containerNames) {
@@ -118,40 +131,43 @@ private String getClientMetadataCachesSnapshotString() {
118131
if (cosmosThroughputControlConfig.isThroughputControlEnabled()) {
119132
if (cosmosThroughputControlConfig.getThroughputControlAccountConfig() == null) {
120133
CosmosAsyncContainer throughputControlContainer =
121-
this.cosmosClient
134+
this.cosmosClientItem
135+
.getClient()
122136
.getDatabase(cosmosThroughputControlConfig.getGlobalThroughputControlDatabaseName())
123137
.getContainer(cosmosThroughputControlConfig.getGlobalThroughputControlContainerName());
124138
readRandomItemFromContainer(throughputControlContainer);
125139
}
126140
}
127141

128-
return KafkaCosmosUtils.convertClientMetadataCacheSnapshotToString(this.cosmosClient);
142+
return KafkaCosmosUtils.convertClientMetadataCacheSnapshotToString(this.cosmosClientItem.getClient());
129143
}
130144

131145
private String getThroughputControlClientMetadataCachesSnapshotString() {
132-
CosmosAsyncClient throughputControlClient = null;
146+
CosmosClientCacheItem throughputControlClientItem = null;
133147
CosmosThroughputControlConfig throughputControlConfig = this.sinkConfig.getThroughputControlConfig();
134148

135149
try {
136150
if (throughputControlConfig.isThroughputControlEnabled()
137151
&& throughputControlConfig.getThroughputControlAccountConfig() != null) {
138-
throughputControlClient = CosmosClientStore.getCosmosClient(
152+
throughputControlClientItem = CosmosClientCache.getCosmosClient(
139153
throughputControlConfig.getThroughputControlAccountConfig(),
140154
this.connectorName
141155
);
142156
}
143157

144-
if (throughputControlClient != null) {
158+
if (throughputControlClientItem != null) {
145159
readRandomItemFromContainer(
146-
throughputControlClient
160+
throughputControlClientItem
161+
.getClient()
147162
.getDatabase(throughputControlConfig.getGlobalThroughputControlDatabaseName())
148163
.getContainer(throughputControlConfig.getGlobalThroughputControlContainerName()));
164+
return KafkaCosmosUtils.convertClientMetadataCacheSnapshotToString(throughputControlClientItem.getClient());
149165
}
150-
return KafkaCosmosUtils.convertClientMetadataCacheSnapshotToString(throughputControlClient);
151166

167+
return null;
152168
} finally {
153-
if (throughputControlClient != null) {
154-
throughputControlClient.close();
169+
if (throughputControlClientItem != null) {
170+
CosmosClientCache.releaseCosmosClient(throughputControlClientItem.getClientConfig());
155171
}
156172
}
157173
}
@@ -169,12 +185,18 @@ private void readRandomItemFromContainer(CosmosAsyncContainer container) {
169185
}
170186
}
171187

188+
private void cleanup() {
189+
LOGGER.info("Cleaning up CosmosSinkConnector");
190+
if (this.cosmosClientItem != null) {
191+
CosmosClientCache.releaseCosmosClient(this.cosmosClientItem.getClientConfig());
192+
this.cosmosClientItem = null;
193+
}
194+
}
195+
172196
@Override
173197
public void stop() {
174-
if (this.cosmosClient != null) {
175-
this.cosmosClient.close();
176-
}
177-
LOGGER.info("Kafka Cosmos sink connector {} is stopped.");
198+
LOGGER.info("Stopping Kafka CosmosDB sink connector");
199+
cleanup();
178200
}
179201

180202
@Override

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java

Lines changed: 76 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@
33

44
package com.azure.cosmos.kafka.connect;
55

6-
import com.azure.cosmos.CosmosAsyncClient;
76
import com.azure.cosmos.CosmosAsyncContainer;
87
import com.azure.cosmos.CosmosAsyncDatabase;
98
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
109
import com.azure.cosmos.implementation.UUIDs;
1110
import com.azure.cosmos.implementation.apachecommons.lang.RandomUtils;
1211
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
1312
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
14-
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
13+
import com.azure.cosmos.kafka.connect.implementation.CosmosClientCache;
14+
import com.azure.cosmos.kafka.connect.implementation.CosmosClientCacheItem;
1515
import com.azure.cosmos.kafka.connect.implementation.CosmosMasterKeyAuthConfig;
1616
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlConfig;
1717
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
@@ -74,7 +74,7 @@ public final class CosmosSourceConnector extends SourceConnector implements Auto
7474
private static final int METADATA_CONTAINER_DEFAULT_RU_CONFIG = 4000;
7575

7676
private CosmosSourceConfig config;
77-
private CosmosAsyncClient cosmosClient;
77+
private CosmosClientCacheItem cosmosClientItem;
7878
private MetadataMonitorThread monitorThread;
7979
private MetadataKafkaStorageManager kafkaOffsetStorageReader;
8080
private IMetadataReader metadataReader;
@@ -88,28 +88,37 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig
8888
@Override
8989
public void start(Map<String, String> props) {
9090
LOGGER.info("Starting the kafka cosmos source connector");
91-
this.config = new CosmosSourceConfig(props);
92-
this.connectorName = props.containsKey(CONNECTOR_NAME) ? props.get(CONNECTOR_NAME).toString() : "EMPTY";
93-
this.cosmosClient = CosmosClientStore.getCosmosClient(this.config.getAccountConfig(), connectorName);
94-
CosmosSourceContainersConfig containersConfig = this.config.getContainersConfig();
95-
validateDatabaseAndContainers(
96-
containersConfig.getIncludedContainers(),
97-
this.cosmosClient,
98-
containersConfig.getDatabaseName());
99-
100-
// IMPORTANT: sequence matters
101-
this.kafkaOffsetStorageReader = new MetadataKafkaStorageManager(this.context().offsetStorageReader());
102-
this.metadataReader = this.getMetadataReader();
103-
this.monitorThread = new MetadataMonitorThread(
104-
connectorName,
105-
this.config.getContainersConfig(),
106-
this.config.getMetadataConfig(),
107-
this.context(),
108-
this.metadataReader,
109-
this.cosmosClient
110-
);
111-
112-
this.monitorThread.start();
91+
try {
92+
this.config = new CosmosSourceConfig(props);
93+
this.connectorName = props.containsKey(CONNECTOR_NAME) ? props.get(CONNECTOR_NAME).toString() : "EMPTY";
94+
this.cosmosClientItem = CosmosClientCache.getCosmosClient(this.config.getAccountConfig(), connectorName);
95+
CosmosSourceContainersConfig containersConfig = this.config.getContainersConfig();
96+
validateDatabaseAndContainers(
97+
containersConfig.getIncludedContainers(),
98+
this.cosmosClientItem.getClient(),
99+
containersConfig.getDatabaseName());
100+
101+
// IMPORTANT: sequence matters
102+
this.kafkaOffsetStorageReader = new MetadataKafkaStorageManager(this.context().offsetStorageReader());
103+
this.metadataReader = this.getMetadataReader();
104+
this.monitorThread = new MetadataMonitorThread(
105+
connectorName,
106+
this.config.getContainersConfig(),
107+
this.config.getMetadataConfig(),
108+
this.context(),
109+
this.metadataReader,
110+
this.cosmosClientItem.getClient()
111+
);
112+
113+
this.monitorThread.start();
114+
} catch (Exception e) {
115+
// if the connector failed to start, release initialized resources here
116+
LOGGER.warn("Error starting the kafka cosmos sink connector", e);
117+
this.cleanup();
118+
// re-throw the exception back to kafka
119+
throw e;
120+
}
121+
113122
}
114123

115124
@Override
@@ -151,18 +160,26 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
151160
return taskConfigs;
152161
}
153162

154-
@Override
155-
public void stop() {
156-
LOGGER.info("Stopping Kafka CosmosDB source connector");
157-
if (this.cosmosClient != null) {
158-
LOGGER.debug("Closing cosmos client");
159-
this.cosmosClient.close();
160-
}
163+
private void cleanup() {
164+
LOGGER.info("Cleaning up CosmosSourceConnector");
161165

166+
// Close monitor thread first since it uses the cosmos client
162167
if (this.monitorThread != null) {
163168
LOGGER.debug("Closing monitoring thread");
164169
this.monitorThread.close();
165170
}
171+
172+
if (this.cosmosClientItem != null) {
173+
LOGGER.debug("Releasing cosmos client");
174+
CosmosClientCache.releaseCosmosClient(this.cosmosClientItem.getClientConfig());
175+
this.cosmosClientItem = null;
176+
}
177+
}
178+
179+
@Override
180+
public void stop() {
181+
LOGGER.info("Stopping Kafka CosmosDB source connector");
182+
cleanup();
166183
}
167184

168185
@Override
@@ -181,7 +198,8 @@ private IMetadataReader getMetadataReader() {
181198
return this.kafkaOffsetStorageReader;
182199
case COSMOS:
183200
CosmosAsyncContainer metadataContainer =
184-
this.cosmosClient
201+
this.cosmosClientItem
202+
.getClient()
185203
.getDatabase(this.config.getContainersConfig().getDatabaseName())
186204
.getContainer(this.config.getMetadataConfig().getStorageName());
187205
// validate the metadata container config
@@ -230,7 +248,8 @@ private Mono<CosmosContainerResponse> createMetadataContainer() {
230248
}
231249

232250
private Mono<CosmosContainerResponse> createMetadataContainer(Integer throughput) {
233-
return this.cosmosClient
251+
return this.cosmosClientItem
252+
.getClient()
234253
.getDatabase(this.config.getContainersConfig().getDatabaseName())
235254
.createContainer(
236255
this.config.getMetadataConfig().getStorageName(),
@@ -363,7 +382,10 @@ private Map<FeedRange, KafkaCosmosChangeFeedState> getEffectiveFeedRangesContinu
363382
.block().v;
364383

365384
Map<FeedRange, KafkaCosmosChangeFeedState> effectiveFeedRangesContinuationMap = new LinkedHashMap<>();
366-
CosmosAsyncContainer container = this.cosmosClient.getDatabase(databaseName).getContainer(containerProperties.getId());
385+
CosmosAsyncContainer container =
386+
this.cosmosClientItem
387+
.getClient()
388+
.getDatabase(databaseName).getContainer(containerProperties.getId());
367389

368390
Flux.fromIterable(containerFeedRanges)
369391
.flatMap(containerFeedRange -> {
@@ -493,7 +515,8 @@ private KafkaCosmosChangeFeedState getContinuationStateFromOffset(
493515
}
494516

495517
private List<FeedRange> getFeedRanges(CosmosContainerProperties containerProperties) {
496-
return this.cosmosClient
518+
return this.cosmosClientItem
519+
.getClient()
497520
.getDatabase(this.config.getContainersConfig().getDatabaseName())
498521
.getContainer(containerProperties.getId())
499522
.getFeedRanges()
@@ -543,7 +566,10 @@ private String getClientMetadataCachesSnapshotString() {
543566
.map(CosmosContainerProperties::getId)
544567
.collect(Collectors.toList()))
545568
.block();
546-
CosmosAsyncDatabase database = this.cosmosClient.getDatabase(containersConfig.getDatabaseName());
569+
CosmosAsyncDatabase database =
570+
this.cosmosClientItem
571+
.getClient()
572+
.getDatabase(containersConfig.getDatabaseName());
547573

548574
// read a random item from each container to populate the collection cache
549575
for (String containerName : containerNames) {
@@ -556,7 +582,8 @@ private String getClientMetadataCachesSnapshotString() {
556582
if (cosmosThroughputControlConfig.isThroughputControlEnabled()) {
557583
if (cosmosThroughputControlConfig.getThroughputControlAccountConfig() == null) {
558584
CosmosAsyncContainer throughputControlContainer =
559-
this.cosmosClient
585+
this.cosmosClientItem
586+
.getClient()
560587
.getDatabase(cosmosThroughputControlConfig.getGlobalThroughputControlDatabaseName())
561588
.getContainer(cosmosThroughputControlConfig.getGlobalThroughputControlContainerName());
562589
readRandomItemFromContainer(throughputControlContainer);
@@ -568,33 +595,36 @@ private String getClientMetadataCachesSnapshotString() {
568595
readRandomItemFromContainer(database.getContainer(this.config.getMetadataConfig().getStorageName()));
569596
}
570597

571-
return KafkaCosmosUtils.convertClientMetadataCacheSnapshotToString(this.cosmosClient);
598+
return KafkaCosmosUtils.convertClientMetadataCacheSnapshotToString(this.cosmosClientItem.getClient());
572599
}
573600

574601
private String getThroughputControlClientMetadataCachesSnapshotString() {
575-
CosmosAsyncClient throughputControlClient = null;
602+
CosmosClientCacheItem throughputControlClientItem = null;
576603
try {
577604
CosmosThroughputControlConfig throughputControlConfig = this.config.getThroughputControlConfig();
578605
if (throughputControlConfig.isThroughputControlEnabled()
579606
&& throughputControlConfig.getThroughputControlAccountConfig() != null) {
580-
throughputControlClient = CosmosClientStore.getCosmosClient(
607+
throughputControlClientItem = CosmosClientCache.getCosmosClient(
581608
this.config.getThroughputControlConfig().getThroughputControlAccountConfig(),
582609
this.connectorName
583610
);
584611
}
585612

586-
if (throughputControlClient != null) {
613+
if (throughputControlClientItem != null) {
587614
this.readRandomItemFromContainer(
588-
throughputControlClient
615+
throughputControlClientItem
616+
.getClient()
589617
.getDatabase(throughputControlConfig.getGlobalThroughputControlDatabaseName())
590618
.getContainer(throughputControlConfig.getGlobalThroughputControlContainerName())
591619
);
620+
return KafkaCosmosUtils.convertClientMetadataCacheSnapshotToString(throughputControlClientItem.getClient());
592621
}
593622

594-
return KafkaCosmosUtils.convertClientMetadataCacheSnapshotToString(throughputControlClient);
623+
return null;
624+
595625
} finally {
596-
if (throughputControlClient != null) {
597-
throughputControlClient.close();
626+
if (throughputControlClientItem != null) {
627+
CosmosClientCache.releaseCosmosClient(throughputControlClientItem.getClientConfig());
598628
}
599629
}
600630
}

0 commit comments

Comments
 (0)