Skip to content

Commit 02ee9f3

Browse files
Test fixes
1 parent f216f74 commit 02ee9f3

File tree

5 files changed

+85
-36
lines changed

5 files changed

+85
-36
lines changed

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,12 @@ public class RetryContextOnDiagnosticTest extends TestSuiteBase {
9898
private AddressSelector addressSelector;
9999
private volatile AutoCloseable disableNettyLeakDetectionScope;
100100

101-
@BeforeClass(groups = "unit")
101+
@BeforeClass(groups = {"unit", "long-emulator"})
102102
public void beforeClass_DisableNettyLeakDetection() {
103103
this.disableNettyLeakDetectionScope = CosmosNettyLeakDetectorFactory.createDisableLeakDetectionScope();
104104
}
105105

106-
@AfterClass(groups = "unit", alwaysRun = true)
106+
@AfterClass(groups = {"unit", "long-emulator"}, alwaysRun = true)
107107
public void afterClass_ReactivateNettyLeakDetection() throws Exception {
108108
if (this.disableNettyLeakDetectionScope != null) {
109109
this.disableNettyLeakDetectionScope.close();

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OfferQueryTest.java

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -153,30 +153,37 @@ public void queryCollections_NoResults() throws Exception {
153153

154154
String query = "SELECT * from root r where r.id = '2'";
155155
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
156-
CosmosAsyncClient cosmosClient = new CosmosClientBuilder()
156+
try (CosmosAsyncClient cosmosClient = new CosmosClientBuilder()
157157
.key(TestConfigurations.MASTER_KEY)
158158
.endpoint(TestConfigurations.HOST)
159-
.buildAsyncClient();
160-
QueryFeedOperationState dummyState = new QueryFeedOperationState(
161-
cosmosClient,
162-
"SomeSpanName",
163-
"SomeDBName",
164-
"SomeContainerName",
165-
ResourceType.Document,
166-
OperationType.Query,
167-
null,
168-
options,
169-
new CosmosPagedFluxOptions()
170-
);
171-
Flux<FeedResponse<DocumentCollection>> queryObservable = client.queryCollections(getDatabaseLink(), query, dummyState);
172-
173-
FeedResponseListValidator<DocumentCollection> validator = new FeedResponseListValidator.Builder<DocumentCollection>()
174-
.containsExactly(new ArrayList<>())
175-
.numberOfPages(1)
176-
.pageSatisfy(0, new FeedResponseValidator.Builder<DocumentCollection>()
159+
.buildAsyncClient()) {
160+
161+
QueryFeedOperationState dummyState = new QueryFeedOperationState(
162+
cosmosClient,
163+
"SomeSpanName",
164+
"SomeDBName",
165+
"SomeContainerName",
166+
ResourceType.Document,
167+
OperationType.Query,
168+
null,
169+
options,
170+
new CosmosPagedFluxOptions()
171+
);
172+
173+
try {
174+
Flux<FeedResponse<DocumentCollection>> queryObservable = client.queryCollections(getDatabaseLink(), query, dummyState);
175+
176+
FeedResponseListValidator<DocumentCollection> validator = new FeedResponseListValidator.Builder<DocumentCollection>()
177+
.containsExactly(new ArrayList<>())
178+
.numberOfPages(1)
179+
.pageSatisfy(0, new FeedResponseValidator.Builder<DocumentCollection>()
177180
.requestChargeGreaterThanOrEqualTo(1.0).build())
178-
.build();
179-
validateQuerySuccess(queryObservable, validator);
181+
.build();
182+
validateQuerySuccess(queryObservable, validator);
183+
} finally {
184+
safeClose(dummyState);
185+
}
186+
}
180187
}
181188

182189
@BeforeClass(groups = { "query" }, timeOut = SETUP_TIMEOUT)

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -211,13 +211,20 @@ public StoreResponse unwrapToStoreResponse(
211211
retainedContent,
212212
"Argument 'retainedContent' must not be null - use empty ByteBuf when theres is no payload.");
213213

214+
if (leakDetectionDebuggingEnabled) {
215+
retainedContent.touch(
216+
"RxGatewayStoreModel.unwrapToStoreResponse before validate - refCnt: " + retainedContent.refCnt());
217+
logger.info("RxGatewayStoreModel.unwrapToStoreResponse before validate - refCnt: {}", retainedContent.refCnt());
218+
}
219+
214220
// If there is any error in the header response this throws exception
215221
validateOrThrow(request, HttpResponseStatus.valueOf(statusCode), headers, retainedContent);
216222

217223
int size;
218224
if ((size = retainedContent.readableBytes()) > 0) {
219225
if (leakDetectionDebuggingEnabled) {
220226
retainedContent.touch("RxGatewayStoreModel before creating StoreResponse - refCnt: " + retainedContent.refCnt());
227+
logger.info("RxGatewayStoreModel before creating StoreResponse - refCnt: {}", retainedContent.refCnt());
221228
}
222229

223230
return new StoreResponse(
@@ -419,34 +426,40 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
419426
.switchIfEmpty(Mono.just(Unpooled.EMPTY_BUFFER))
420427
.map(bodyByteBuf -> {
421428
if (leakDetectionDebuggingEnabled) {
422-
bodyByteBuf.touch("RxGatewayStoreModel - before retain - refCnt: " + bodyByteBuf.refCnt());
429+
bodyByteBuf.touch("RxGatewayStoreModel - buffer after aggregate before retain - refCnt: " + bodyByteBuf.refCnt());
430+
logger.info("RxGatewayStoreModel - buffer after aggregate before retain - refCnt: {}", bodyByteBuf.refCnt());
431+
}
432+
433+
if (bodyByteBuf != Unpooled.EMPTY_BUFFER) {
434+
bodyByteBuf.retain(); // +1 for our downstream work
423435
}
424436

425-
// if not empty this is the aggregate result - for which ownership was already transferred
426-
ByteBuf retainedContent = bodyByteBuf.retain();
427437
if (leakDetectionDebuggingEnabled) {
428-
retainedContent.touch("RxGatewayStoreModel - after retain - refCnt: " + retainedContent.refCnt());
438+
bodyByteBuf.touch("RxGatewayStoreModel - touch retained buffer - refCnt: " + bodyByteBuf.refCnt());
439+
logger.info("RxGatewayStoreModel - touch retained buffer - refCnt: {]", bodyByteBuf.refCnt());
429440
}
430441

431-
return retainedContent;
442+
return bodyByteBuf;
432443
})
433-
.publishOn(CosmosSchedulers.TRANSPORT_RESPONSE_BOUNDED_ELASTIC)
434444
.doOnDiscard(ByteBuf.class, buf -> {
435445
if (buf.refCnt() > 0) {
436446
if (leakDetectionDebuggingEnabled) {
437447
buf.touch("RxGatewayStoreModel - doOnDiscard - begin - refCnt: " + buf.refCnt());
448+
logger.info("RxGatewayStoreModel - doOnDiscard - begin - refCnt: {}", buf.refCnt());
438449
}
439450

440451
// there could be a race with the catch in the .map operator below
441452
// so, use safeRelease
442453
ReferenceCountUtil.safeRelease(buf);
443454
}
444-
});
455+
})
456+
.publishOn(CosmosSchedulers.TRANSPORT_RESPONSE_BOUNDED_ELASTIC);
445457

446458
return contentObservable
447459
.map(content -> {
448460
if (leakDetectionDebuggingEnabled) {
449461
content.touch("RxGatewayStoreModel - before capturing transport timeline - refCnt: " + content.refCnt());
462+
logger.info("RxGatewayStoreModel - before capturing transport timeline - refCnt: {}", content.refCnt());
450463
}
451464

452465
try {
@@ -458,6 +471,7 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
458471

459472
if (leakDetectionDebuggingEnabled) {
460473
content.touch("RxGatewayStoreModel - before creating StoreResponse - refCnt: " + content.refCnt());
474+
logger.info("RxGatewayStoreModel - before creating StoreResponse - refCnt: {}", content.refCnt());
461475
}
462476
StoreResponse rsp = request
463477
.getEffectiveHttpTransportSerializer(this)
@@ -489,6 +503,7 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
489503
if (content.refCnt() > 0) {
490504
if (leakDetectionDebuggingEnabled) {
491505
content.touch("RxGatewayStoreModel -exception creating StoreResponse - refCnt: " + content.refCnt());
506+
logger.info("RxGatewayStoreModel -exception creating StoreResponse - refCnt: {}", content.refCnt());
492507
}
493508
// Unwrap failed before StoreResponse took ownership -> release our retain
494509
// there could be a race with the doOnDiscard above - so, use safeRelease

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics;
1212
import com.fasterxml.jackson.databind.JsonNode;
1313
import io.netty.buffer.ByteBufInputStream;
14+
import io.netty.util.IllegalReferenceCountException;
1415
import org.slf4j.Logger;
1516
import org.slf4j.LoggerFactory;
1617

@@ -75,8 +76,10 @@ public StoreResponse(
7576
try {
7677
contentStream.close();
7778
} catch (Throwable e) {
78-
// Log as warning instead of debug to make ByteBuf leak issues more visible
79-
logger.warn("Failed to close content stream. This may cause a Netty ByteBuf leak.", e);
79+
if (!(e instanceof IllegalReferenceCountException)) {
80+
// Log as warning instead of debug to make ByteBuf leak issues more visible
81+
logger.warn("Failed to close content stream. This may cause a Netty ByteBuf leak.", e);
82+
}
8083
}
8184
}
8285
} else {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import io.netty.handler.codec.http.HttpMethod;
1212
import io.netty.handler.logging.LogLevel;
1313
import io.netty.resolver.DefaultAddressResolverGroup;
14+
import io.netty.util.ReferenceCountUtil;
15+
import io.netty.util.ResourceLeakDetector;
1416
import org.reactivestreams.Publisher;
1517
import org.reactivestreams.Subscription;
1618
import org.slf4j.Logger;
@@ -39,7 +41,8 @@
3941
* HttpClient that is implemented using reactor-netty.
4042
*/
4143
public class ReactorNettyClient implements HttpClient {
42-
44+
private static final boolean leakDetectionDebuggingEnabled = ResourceLeakDetector.getLevel().ordinal() >=
45+
ResourceLeakDetector.Level.ADVANCED.ordinal();
4346
private static final String REACTOR_NETTY_REQUEST_RECORD_KEY = "reactorNettyRequestRecordKey";
4447

4548
private static final Logger logger = LoggerFactory.getLogger(ReactorNettyClient.class.getSimpleName());
@@ -348,7 +351,15 @@ public HttpHeaders headers() {
348351
public Mono<ByteBuf> body() {
349352
return ByteBufFlux
350353
.fromInbound(
351-
bodyIntern().doOnDiscard(ByteBuf.class, io.netty.util.ReferenceCountUtil::safeRelease)
354+
bodyIntern().doOnDiscard(
355+
ByteBuf.class,
356+
buf -> {
357+
if (leakDetectionDebuggingEnabled && buf.refCnt() > 0) {
358+
buf.touch("ReactorNettyHttpResponse.body - onDiscard - refCnt: " + buf.refCnt());
359+
logger.info("ReactorNettyHttpResponse.body - onDiscard - refCnt: {}", buf.refCnt());
360+
ReferenceCountUtil.safeRelease(buf);
361+
}
362+
})
352363
)
353364
.aggregate()
354365
.doOnSubscribe(this::updateSubscriptionState);
@@ -400,8 +411,21 @@ private void releaseOnNotSubscribedResponse(ReactorNettyResponseState reactorNet
400411
if (logger.isDebugEnabled()) {
401412
logger.debug("Releasing body, not yet subscribed");
402413
}
403-
this.bodyIntern()
404-
.doOnNext(io.netty.util.ReferenceCountUtil::safeRelease)
414+
415+
if (leakDetectionDebuggingEnabled) {
416+
logger.info("Releasing body, not yet subscribed");
417+
}
418+
419+
body()
420+
.map(buf -> {
421+
if (leakDetectionDebuggingEnabled && buf.refCnt() > 0) {
422+
buf.touch("ReactorNettyHttpResponse.releaseOnNotSubscribedResponse - refCnt: " + buf.refCnt());
423+
logger.info("ReactorNettyHttpResponse.releaseOnNotSubscribedResponse - refCnt: {}", buf.refCnt());
424+
ReferenceCountUtil.safeRelease(buf);
425+
}
426+
427+
return buf;
428+
})
405429
.subscribe(v -> {}, ex -> {}, () -> {});
406430
}
407431
}

0 commit comments

Comments
 (0)