@@ -417,46 +417,60 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
417417 .map (bodyByteBuf -> leakDetectionDebuggingEnabled
418418 ? bodyByteBuf .retain ().touch (this )
419419 : bodyByteBuf .retain ())
420- .publishOn (CosmosSchedulers .TRANSPORT_RESPONSE_BOUNDED_ELASTIC );
420+ .publishOn (CosmosSchedulers .TRANSPORT_RESPONSE_BOUNDED_ELASTIC )
421+ .doOnDiscard (ByteBuf .class , buf -> {
422+ if (buf .refCnt () > 0 ) {
423+ io .netty .util .ReferenceCountUtil .safeRelease (buf );
424+ }
425+ });
421426
422427 return contentObservable
423428 .map (content -> {
424429 if (leakDetectionDebuggingEnabled ) {
425430 content .touch (this );
426431 }
427432
428- // Capture transport client request timeline
429- ReactorNettyRequestRecord reactorNettyRequestRecord = httpResponse .request ().reactorNettyRequestRecord ();
430- if (reactorNettyRequestRecord != null ) {
431- reactorNettyRequestRecord .setTimeCompleted (Instant .now ());
432- }
433+ try {
434+ // Capture transport client request timeline
435+ ReactorNettyRequestRecord reactorNettyRequestRecord = httpResponse .request ().reactorNettyRequestRecord ();
436+ if (reactorNettyRequestRecord != null ) {
437+ reactorNettyRequestRecord .setTimeCompleted (Instant .now ());
438+ }
433439
434- StoreResponse rsp = request
435- .getEffectiveHttpTransportSerializer (this )
436- .unwrapToStoreResponse (httpRequest .uri ().toString (), request , httpResponseStatus , httpResponseHeaders , content );
440+ StoreResponse rsp = request
441+ .getEffectiveHttpTransportSerializer (this )
442+ .unwrapToStoreResponse (httpRequest .uri ().toString (), request , httpResponseStatus , httpResponseHeaders , content );
437443
438- if (reactorNettyRequestRecord != null ) {
439- rsp .setRequestTimeline (reactorNettyRequestRecord .takeTimelineSnapshot ());
444+ if (reactorNettyRequestRecord != null ) {
445+ rsp .setRequestTimeline (reactorNettyRequestRecord .takeTimelineSnapshot ());
440446
441- if (this .gatewayServerErrorInjector != null ) {
442- // only configure when fault injection is used
443- rsp .setFaultInjectionRuleId (
444- request
445- .faultInjectionRequestContext
446- .getFaultInjectionRuleId (reactorNettyRequestRecord .getTransportRequestId ()));
447+ if (this .gatewayServerErrorInjector != null ) {
448+ // only configure when fault injection is used
449+ rsp .setFaultInjectionRuleId (
450+ request
451+ .faultInjectionRequestContext
452+ .getFaultInjectionRuleId (reactorNettyRequestRecord .getTransportRequestId ()));
447453
448- rsp .setFaultInjectionRuleEvaluationResults (
449- request
450- .faultInjectionRequestContext
451- .getFaultInjectionRuleEvaluationResults (reactorNettyRequestRecord .getTransportRequestId ()));
454+ rsp .setFaultInjectionRuleEvaluationResults (
455+ request
456+ .faultInjectionRequestContext
457+ .getFaultInjectionRuleEvaluationResults (reactorNettyRequestRecord .getTransportRequestId ()));
458+ }
452459 }
453- }
454460
455- if (request .requestContext .cosmosDiagnostics != null ) {
456- BridgeInternal .recordGatewayResponse (request .requestContext .cosmosDiagnostics , request , rsp , globalEndpointManager );
457- }
461+ if (request .requestContext .cosmosDiagnostics != null ) {
462+ BridgeInternal .recordGatewayResponse (request .requestContext .cosmosDiagnostics , request , rsp , globalEndpointManager );
463+ }
458464
459- return rsp ;
465+ return rsp ;
466+ } catch (Throwable t ) {
467+ if (content .refCnt () > 0 ) {
468+ // Unwrap failed before StoreResponse took ownership -> release our retain
469+ io .netty .util .ReferenceCountUtil .safeRelease (content );
470+ }
471+
472+ throw t ;
473+ }
460474 })
461475 .single ();
462476
0 commit comments