Skip to content

Commit c1210cf

Browse files
xinlian12annie-macannie-mac
authored
Fix for connection state listener (Azure#27242)
* collect connection state listner metrics * move connection state listener to RntbdRequestManager level, which improvements the handling when the connection is idle Co-authored-by: annie-mac <[email protected]> Co-authored-by: annie-mac <[email protected]>
1 parent 162b541 commit c1210cf

18 files changed

+206
-81
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,8 @@
55
#### Features Added
66
* Added Beta API `continueOnInitError` in `ThroughputControlGroupConfigBuilder` - See [PR 27702](https://github.com/Azure/azure-sdk-for-java/pull/27702)
77

8-
#### Breaking Changes
9-
108
#### Bugs Fixed
11-
12-
#### Other Changes
9+
* Added improvement for handling idle connection close event when `connectionEndpointRediscoveryEnabled` is enabled - See [PR 27242](https://github.com/Azure/azure-sdk-for-java/pull/27242)
1310

1411
### 4.28.1 (2022-04-08)
1512
#### Other Changes

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressResolver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public Mono<AddressInformation[]> resolveAsync(
8585
}
8686

8787
@Override
88-
public void updateAddresses(RxDocumentServiceRequest request, URI serverKey) {
88+
public int updateAddresses(URI serverKey) {
8989
throw new NotImplementedException("updateAddresses() is not supported in AddressResolver");
9090
}
9191

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.util.Objects;
6060
import java.util.Set;
6161
import java.util.concurrent.ConcurrentHashMap;
62+
import java.util.concurrent.atomic.AtomicInteger;
6263
import java.util.stream.Collectors;
6364

6465
public class GatewayAddressCache implements IAddressCache {
@@ -167,10 +168,12 @@ public GatewayAddressCache(
167168
}
168169

169170
@Override
170-
public void updateAddresses(final URI serverKey) {
171+
public int updateAddresses(final URI serverKey) {
171172

172173
Objects.requireNonNull(serverKey, "expected non-null serverKey");
173174

175+
AtomicInteger updatedCacheEntryCount = new AtomicInteger(0);
176+
174177
if (this.tcpConnectionEndpointRediscoveryEnabled) {
175178
this.serverPartitionAddressToPkRangeIdMap.computeIfPresent(serverKey, (uri, partitionKeyRangeIdentitySet) -> {
176179

@@ -180,6 +183,8 @@ public void updateAddresses(final URI serverKey) {
180183
} else {
181184
this.serverPartitionAddressCache.remove(partitionKeyRangeIdentity);
182185
}
186+
187+
updatedCacheEntryCount.incrementAndGet();
183188
}
184189

185190
return null;
@@ -188,6 +193,7 @@ public void updateAddresses(final URI serverKey) {
188193
logger.warn("tcpConnectionEndpointRediscovery is not enabled, should not reach here.");
189194
}
190195

196+
return updatedCacheEntryCount.get();
191197
}
192198

193199
@Override

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.Map;
3333
import java.util.Objects;
3434
import java.util.concurrent.ConcurrentHashMap;
35+
import java.util.concurrent.atomic.AtomicInteger;
3536
import java.util.stream.Collectors;
3637

3738
public class GlobalAddressResolver implements IAddressResolver {
@@ -110,23 +111,23 @@ Mono<Void> openAsync(DocumentCollection collection) {
110111
}
111112

112113
@Override
113-
public void updateAddresses(final RxDocumentServiceRequest request, final URI serverKey) {
114+
public int updateAddresses(final URI serverKey) {
114115

115-
Objects.requireNonNull(request, "expected non-null request");
116116
Objects.requireNonNull(serverKey, "expected non-null serverKey");
117117

118-
if (this.tcpConnectionEndpointRediscoveryEnabled) {
119-
URI serviceEndpoint = this.endpointManager.resolveServiceEndpoint(request);
120-
this.addressCacheByEndpoint.computeIfPresent(serviceEndpoint, (ignored, endpointCache) -> {
118+
AtomicInteger updatedCount = new AtomicInteger(0);
121119

120+
if (this.tcpConnectionEndpointRediscoveryEnabled) {
121+
for (EndpointCache endpointCache : this.addressCacheByEndpoint.values()) {
122122
final GatewayAddressCache addressCache = endpointCache.addressCache;
123-
addressCache.updateAddresses(serverKey);
124123

125-
return endpointCache;
126-
});
124+
updatedCount.accumulateAndGet(addressCache.updateAddresses(serverKey), (oldValue, newValue) -> oldValue + newValue);
125+
}
127126
} else {
128127
logger.warn("tcpConnectionEndpointRediscovery is not enabled, should not reach here.");
129128
}
129+
130+
return updatedCount.get();
130131
}
131132

132133
@Override

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public interface IAddressCache {
1717
*
1818
*
1919
*/
20-
void updateAddresses(URI serverKey);
20+
int updateAddresses(URI serverKey);
2121

2222
/**
2323
* Resolves physical addresses by either PartitionKeyRangeIdentity.

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressResolver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ Mono<AddressInformation[]> resolveAsync(
1414
RxDocumentServiceRequest request,
1515
boolean forceRefreshPartitionAddresses);
1616

17-
void updateAddresses(RxDocumentServiceRequest request, URI serverKey);
17+
int updateAddresses(URI serverKey);
1818
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHandler.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,18 @@ public class RntbdClientChannelHandler extends ChannelInitializer<Channel> imple
2828
private static final Logger logger = LoggerFactory.getLogger(RntbdClientChannelHandler.class);
2929
private final ChannelHealthChecker healthChecker;
3030
private final Config config;
31+
private final RntbdConnectionStateListener connectionStateListener;
3132

32-
RntbdClientChannelHandler(final Config config, final ChannelHealthChecker healthChecker) {
33+
RntbdClientChannelHandler(
34+
final Config config,
35+
final ChannelHealthChecker healthChecker,
36+
final RntbdConnectionStateListener connectionStateListener) {
3337
checkNotNull(healthChecker, "expected non-null healthChecker");
3438
checkNotNull(config, "expected non-null config");
39+
3540
this.healthChecker = healthChecker;
3641
this.config = config;
42+
this.connectionStateListener = connectionStateListener;
3743
}
3844

3945
/**
@@ -98,7 +104,9 @@ protected void initChannel(final Channel channel) {
98104

99105
final RntbdRequestManager requestManager = new RntbdRequestManager(
100106
this.healthChecker,
101-
this.config.maxRequestsPerChannel());
107+
this.config.maxRequestsPerChannel(),
108+
this.connectionStateListener);
109+
102110
final long idleConnectionTimerResolutionInNanos = config.idleConnectionTimerResolutionInNanos();
103111
final ChannelPipeline pipeline = channel.pipeline();
104112

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,30 +183,39 @@ public final class RntbdClientChannelPool implements ChannelPool {
183183

184184
private final ScheduledFuture<?> pendingAcquisitionExpirationFuture;
185185
private final ClientTelemetry clientTelemetry;
186+
186187
/**
187188
* Initializes a newly created {@link RntbdClientChannelPool} instance.
188189
*
189190
* @param bootstrap the {@link Bootstrap} that is used for connections.
190191
* @param config the {@link Config} that is used for the channel pool instance created.
192+
* @param clientTelemetry the {@link ClientTelemetry} that is used to track client telemetry related metrics.
193+
* @param connectionStateListener the {@link RntbdConnectionStateListener}.
191194
*/
192-
RntbdClientChannelPool(final RntbdServiceEndpoint endpoint, final Bootstrap bootstrap, final Config config, final ClientTelemetry clientTelemetry) {
193-
this(endpoint, bootstrap, config, new RntbdClientChannelHealthChecker(config), clientTelemetry);
195+
RntbdClientChannelPool(
196+
final RntbdServiceEndpoint endpoint,
197+
final Bootstrap bootstrap,
198+
final Config config,
199+
final ClientTelemetry clientTelemetry,
200+
final RntbdConnectionStateListener connectionStateListener) {
201+
this(endpoint, bootstrap, config, new RntbdClientChannelHealthChecker(config), clientTelemetry, connectionStateListener);
194202
}
195203

196204
private RntbdClientChannelPool(
197205
final RntbdServiceEndpoint endpoint,
198206
final Bootstrap bootstrap,
199207
final Config config,
200208
final RntbdClientChannelHealthChecker healthChecker,
201-
final ClientTelemetry clientTelemetry) {
209+
final ClientTelemetry clientTelemetry,
210+
final RntbdConnectionStateListener connectionStateListener) {
202211

203212
checkNotNull(endpoint, "expected non-null endpoint");
204213
checkNotNull(bootstrap, "expected non-null bootstrap");
205214
checkNotNull(config, "expected non-null config");
206215
checkNotNull(healthChecker, "expected non-null healthChecker");
207216

208217
this.endpoint = endpoint;
209-
this.poolHandler = new RntbdClientChannelHandler(config, healthChecker);
218+
this.poolHandler = new RntbdClientChannelHandler(config, healthChecker, connectionStateListener);
210219
this.executor = bootstrap.config().group().next();
211220
this.healthChecker = healthChecker;
212221

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConnectionStateListener.java

Lines changed: 25 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33

44
package com.azure.cosmos.implementation.directconnectivity.rntbd;
55

6-
import com.azure.cosmos.implementation.GoneException;
7-
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
86
import com.azure.cosmos.implementation.directconnectivity.IAddressResolver;
97
import org.slf4j.Logger;
108
import org.slf4j.LoggerFactory;
@@ -22,6 +20,7 @@ public class RntbdConnectionStateListener {
2220

2321
private final IAddressResolver addressResolver;
2422
private final RntbdEndpoint endpoint;
23+
private final RntbdConnectionStateListenerMetrics metrics;
2524

2625
// endregion
2726

@@ -30,55 +29,45 @@ public class RntbdConnectionStateListener {
3029
public RntbdConnectionStateListener(final IAddressResolver addressResolver, final RntbdEndpoint endpoint) {
3130
this.addressResolver = checkNotNull(addressResolver, "expected non-null addressResolver");
3231
this.endpoint = checkNotNull(endpoint, "expected non-null endpoint");
32+
this.metrics = new RntbdConnectionStateListenerMetrics();
3333
}
3434

3535
// endregion
3636

3737
// region Methods
3838

39-
public void onException(final RxDocumentServiceRequest request, Throwable exception) {
40-
checkNotNull(request, "expect non-null request");
39+
public void onException(Throwable exception) {
4140
checkNotNull(exception, "expect non-null exception");
4241

43-
if (exception instanceof GoneException) {
44-
final Throwable cause = exception.getCause();
45-
46-
if (cause != null) {
47-
48-
// GoneException was produced by the client, not the server
49-
//
50-
// This could occur for example:
51-
//
52-
// * an operation fails due to an IOException which indicates a connection reset by the server,
53-
// * a channel closes unexpectedly because the server stopped taking requests, or
54-
// * an error was detected by the transport client (e.g., IllegalStateException)
55-
// * a request timed out in pending acquisition queue
56-
// * a request failed fast in admission control layer due to high load
57-
// * channel connect timed out
58-
//
59-
// Currently, only ClosedChannelException will raise onConnectionEvent since it is more sure of a signal the server is going down.
60-
61-
if (cause instanceof IOException) {
62-
63-
if (cause instanceof ClosedChannelException) {
64-
this.onConnectionEvent(RntbdConnectionEvent.READ_EOF, request, exception);
65-
} else {
66-
if (logger.isDebugEnabled()) {
67-
logger.debug("Will not raise the connection state change event for error {}", cause);
68-
}
69-
}
42+
this.metrics.record();
43+
44+
// * An operation could fail due to an IOException which indicates a connection reset by the server,
45+
// * or a channel closes unexpectedly because the server stopped taking requests
46+
//
47+
// Currently, only ClosedChannelException will raise onConnectionEvent since it is more sure of a signal the server is going down.
48+
49+
if (exception instanceof IOException) {
50+
51+
if (exception instanceof ClosedChannelException) {
52+
this.metrics.recordAddressUpdated(this.onConnectionEvent(RntbdConnectionEvent.READ_EOF, exception));
53+
} else {
54+
if (logger.isDebugEnabled()) {
55+
logger.debug("Will not raise the connection state change event for error", exception);
7056
}
7157
}
7258
}
7359
}
7460

61+
public RntbdConnectionStateListenerMetrics getMetrics() {
62+
return this.metrics;
63+
}
64+
7565
// endregion
7666

7767
// region Privates
7868

79-
private void onConnectionEvent(final RntbdConnectionEvent event, final RxDocumentServiceRequest request, final Throwable exception) {
69+
private int onConnectionEvent(final RntbdConnectionEvent event, final Throwable exception) {
8070

81-
checkNotNull(request, "expected non-null request");
8271
checkNotNull(exception, "expected non-null exception");
8372

8473
if (event == RntbdConnectionEvent.READ_EOF) {
@@ -92,11 +81,13 @@ private void onConnectionEvent(final RntbdConnectionEvent event, final RxDocumen
9281
RntbdObjectMapper.toJson(exception));
9382
}
9483

95-
this.addressResolver.updateAddresses(request, this.endpoint.serverKey());
84+
return this.addressResolver.updateAddresses(this.endpoint.serverKey());
9685
} else {
9786
logger.warn("Endpoint closed while onConnectionEvent: {}", this.endpoint);
9887
}
9988
}
89+
90+
return 0;
10091
}
10192
// endregion
10293
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.implementation.directconnectivity.rntbd;
5+
6+
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
7+
import com.fasterxml.jackson.core.JsonGenerator;
8+
import com.fasterxml.jackson.databind.SerializerProvider;
9+
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
import java.io.IOException;
14+
import java.io.Serializable;
15+
import java.time.Instant;
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
@JsonSerialize(using = RntbdConnectionStateListenerMetrics.RntbdConnectionStateListenerMetricsJsonSerializer.class)
19+
public final class RntbdConnectionStateListenerMetrics implements Serializable {
20+
private static final long serialVersionUID = 1L;
21+
private static final Logger logger = LoggerFactory.getLogger(RntbdConnectionStateListenerMetrics.class);
22+
23+
private final AtomicReference<Instant> lastCallTimestamp;
24+
private final AtomicReference<Pair<Instant, Integer>> lastActionableContext;
25+
26+
public RntbdConnectionStateListenerMetrics() {
27+
28+
this.lastCallTimestamp = new AtomicReference<>();
29+
this.lastActionableContext = new AtomicReference<>();
30+
}
31+
32+
public void recordAddressUpdated(int addressEntryUpdatedCount) {
33+
this.lastActionableContext.set(Pair.of(this.lastCallTimestamp.get(), addressEntryUpdatedCount));
34+
}
35+
36+
public void record() {
37+
this.lastCallTimestamp.set(Instant.now());
38+
}
39+
40+
final static class RntbdConnectionStateListenerMetricsJsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer<RntbdConnectionStateListenerMetrics> {
41+
42+
public RntbdConnectionStateListenerMetricsJsonSerializer() {
43+
}
44+
45+
@Override
46+
public void serialize(RntbdConnectionStateListenerMetrics metrics, JsonGenerator writer, SerializerProvider serializers) throws IOException {
47+
writer.writeStartObject();
48+
49+
writer.writeStringField(
50+
"lastCallTimestamp",
51+
metrics.lastCallTimestamp.get() == null ? "N/A" : metrics.lastCallTimestamp.toString());
52+
53+
if (metrics.lastActionableContext.get() != null) {
54+
writer.writeStringField("lastActionableContext", metrics.lastActionableContext.get().toString());
55+
}
56+
57+
writer.writeEndObject();
58+
}
59+
}
60+
}

0 commit comments

Comments
 (0)