Skip to content

Commit f14d55b

Browse files
xinlian12annie-mac
andauthored
fixKafkaConnectorStuckIssue (#46378)
* fix connector stuck issue --------- Co-authored-by: annie-mac <[email protected]>
1 parent c1b5e6d commit f14d55b

File tree

4 files changed

+13
-1
lines changed

4 files changed

+13
-1
lines changed

sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixed an issue where `CosmosSourceConnector` got stuck when restart - See [PR 46378](https://github.com/Azure/azure-sdk-for-java/pull/46378)
1011

1112
#### Other Changes
1213

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import com.azure.cosmos.CosmosAsyncContainer;
77
import com.azure.cosmos.CosmosAsyncDatabase;
8+
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
89
import com.azure.cosmos.implementation.UUIDs;
910
import com.azure.cosmos.implementation.apachecommons.lang.RandomUtils;
1011
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
@@ -51,6 +52,11 @@ public final class CosmosSinkConnector extends SinkConnector implements AutoClos
5152
private String connectorName;
5253
private CosmosClientCacheItem cosmosClientItem;
5354

55+
static {
56+
//initialize all accessors from different threads can cause deadlock issues, so here we force loading ahead of time
57+
ImplementationBridgeHelpers.initializeAllAccessors();
58+
}
59+
5460
@Override
5561
public void start(Map<String, String> props) {
5662
LOGGER.info("Starting the kafka cosmos sink connector");

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ public final class CosmosSourceConnector extends SourceConnector implements Auto
8080
private IMetadataReader metadataReader;
8181
private String connectorName;
8282

83+
static {
84+
//initialize all accessors from different threads can cause deadlock issues, so here we force loading ahead of time
85+
ImplementationBridgeHelpers.initializeAllAccessors();
86+
}
87+
8388
@Override
8489
public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
8590
return ExactlyOnceSupport.SUPPORTED;

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@
102102
public class ImplementationBridgeHelpers {
103103
private final static Logger logger = LoggerFactory.getLogger(ImplementationBridgeHelpers.class);
104104

105-
private static void initializeAllAccessors() {
105+
public static void initializeAllAccessors() {
106106
ModelBridgeInternal.initializeAllAccessors();
107107
UtilBridgeInternal.initializeAllAccessors();
108108
BridgeInternal.initializeAllAccessors();

0 commit comments

Comments
 (0)