Skip to content

Commit 71d8642

Browse files
Fixing test regressions (Azure#45798)
1 parent e9e619a commit 71d8642

File tree

2 files changed

+9
-6
lines changed

2 files changed

+9
-6
lines changed

sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,7 @@ private void setupDefaultConnectorInternalStatesWithMetadataKafkaReader(
756756
CosmosClientCache.getCosmosClient(
757757
cosmosSourceConfig.getAccountConfig(),
758758
"testKafkaConnector");
759-
KafkaCosmosReflectionUtils.setCosmosClient(sourceConnector, clientCacheItem.getClient());
759+
KafkaCosmosReflectionUtils.setCosmosClientCacheItem(sourceConnector, clientCacheItem);
760760

761761
InMemoryStorageReader inMemoryStorageReader = new InMemoryStorageReader();
762762
MetadataKafkaStorageManager metadataReader = new MetadataKafkaStorageManager(inMemoryStorageReader);
@@ -792,7 +792,7 @@ private void setupDefaultConnectorInternalStatesWithMetadataCosmosReader(
792792
CosmosClientCache.getCosmosClient(
793793
cosmosSourceConfig.getAccountConfig(),
794794
"testKafkaConnector");
795-
KafkaCosmosReflectionUtils.setCosmosClient(sourceConnector, clientCacheItem.getClient());
795+
KafkaCosmosReflectionUtils.setCosmosClientCacheItem(sourceConnector, clientCacheItem);
796796

797797
CosmosAsyncContainer container = clientCacheItem.getClient().getDatabase(databaseName).getContainer(containerName);
798798
MetadataCosmosStorageManager cosmosStorageManager = new MetadataCosmosStorageManager(container);

sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/KafkaCosmosReflectionUtils.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package com.azure.cosmos.kafka.connect;
55

66
import com.azure.cosmos.CosmosAsyncClient;
7+
import com.azure.cosmos.kafka.connect.implementation.CosmosClientCacheItem;
78
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask;
89
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceConfig;
910
import com.azure.cosmos.kafka.connect.implementation.source.IMetadataReader;
@@ -31,8 +32,8 @@ private static <T> T get(Object object, String fieldName) {
3132
}
3233
}
3334

34-
public static void setCosmosClient(CosmosSourceConnector sourceConnector, CosmosAsyncClient cosmosAsyncClient) {
35-
set(sourceConnector, cosmosAsyncClient,"cosmosClient");
35+
public static void setCosmosClientCacheItem(CosmosSourceConnector sourceConnector, CosmosClientCacheItem clientCacheItem) {
36+
set(sourceConnector, clientCacheItem,"cosmosClientItem");
3637
}
3738

3839
public static void setCosmosSourceConfig(CosmosSourceConnector sourceConnector, CosmosSourceConfig sourceConfig) {
@@ -58,7 +59,8 @@ public static void setMetadataMonitorThread(
5859
}
5960

6061
public static CosmosAsyncClient getCosmosClient(CosmosSourceConnector sourceConnector) {
61-
return get(sourceConnector,"cosmosClient");
62+
CosmosClientCacheItem clientCacheItem = get(sourceConnector,"cosmosClientItem");
63+
return clientCacheItem.getClient();
6264
}
6365

6466
public static MetadataKafkaStorageManager getKafkaOffsetStorageReader(CosmosSourceConnector sourceConnector) {
@@ -74,7 +76,8 @@ public static void setSinkTaskContext(CosmosSinkTask sinkTask, SinkTaskContext s
7476
}
7577

7678
public static CosmosAsyncClient getSinkTaskCosmosClient(CosmosSinkTask sinkTask) {
77-
return get(sinkTask,"cosmosClient");
79+
CosmosClientCacheItem clientCacheItem = get(sinkTask,"cosmosClientItem");
80+
return clientCacheItem.getClient();
7881
}
7982

8083
public static void setConnectorName(CosmosSourceConnector sourceConnector, String connectorName) {

0 commit comments

Comments
 (0)