@@ -417,60 +417,46 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
417417 .map (bodyByteBuf -> leakDetectionDebuggingEnabled
418418 ? bodyByteBuf .retain ().touch (this )
419419 : bodyByteBuf .retain ())
420- .publishOn (CosmosSchedulers .TRANSPORT_RESPONSE_BOUNDED_ELASTIC )
421- .doOnDiscard (ByteBuf .class , buf -> {
422- if (buf .refCnt () > 0 ) {
423- io .netty .util .ReferenceCountUtil .safeRelease (buf );
424- }
425- });
420+ .publishOn (CosmosSchedulers .TRANSPORT_RESPONSE_BOUNDED_ELASTIC );
426421
427422 return contentObservable
428423 .map (content -> {
429424 if (leakDetectionDebuggingEnabled ) {
430425 content .touch (this );
431426 }
432427
433- try {
434- // Capture transport client request timeline
435- ReactorNettyRequestRecord reactorNettyRequestRecord = httpResponse .request ().reactorNettyRequestRecord ();
436- if (reactorNettyRequestRecord != null ) {
437- reactorNettyRequestRecord .setTimeCompleted (Instant .now ());
438- }
439-
440- StoreResponse rsp = request
441- .getEffectiveHttpTransportSerializer (this )
442- .unwrapToStoreResponse (httpRequest .uri ().toString (), request , httpResponseStatus , httpResponseHeaders , content );
443-
444- if (reactorNettyRequestRecord != null ) {
445- rsp .setRequestTimeline (reactorNettyRequestRecord .takeTimelineSnapshot ());
428+ // Capture transport client request timeline
429+ ReactorNettyRequestRecord reactorNettyRequestRecord = httpResponse .request ().reactorNettyRequestRecord ();
430+ if (reactorNettyRequestRecord != null ) {
431+ reactorNettyRequestRecord .setTimeCompleted (Instant .now ());
432+ }
446433
447- if (this .gatewayServerErrorInjector != null ) {
448- // only configure when fault injection is used
449- rsp .setFaultInjectionRuleId (
450- request
451- .faultInjectionRequestContext
452- .getFaultInjectionRuleId (reactorNettyRequestRecord .getTransportRequestId ()));
434+ StoreResponse rsp = request
435+ .getEffectiveHttpTransportSerializer (this )
436+ .unwrapToStoreResponse (httpRequest .uri ().toString (), request , httpResponseStatus , httpResponseHeaders , content );
453437
454- rsp .setFaultInjectionRuleEvaluationResults (
455- request
456- .faultInjectionRequestContext
457- .getFaultInjectionRuleEvaluationResults (reactorNettyRequestRecord .getTransportRequestId ()));
458- }
459- }
438+ if (reactorNettyRequestRecord != null ) {
439+ rsp .setRequestTimeline (reactorNettyRequestRecord .takeTimelineSnapshot ());
460440
461- if (request .requestContext .cosmosDiagnostics != null ) {
462- BridgeInternal .recordGatewayResponse (request .requestContext .cosmosDiagnostics , request , rsp , globalEndpointManager );
463- }
441+ if (this .gatewayServerErrorInjector != null ) {
442+ // only configure when fault injection is used
443+ rsp .setFaultInjectionRuleId (
444+ request
445+ .faultInjectionRequestContext
446+ .getFaultInjectionRuleId (reactorNettyRequestRecord .getTransportRequestId ()));
464447
465- return rsp ;
466- } catch (Throwable t ) {
467- if (content .refCnt () > 0 ) {
468- // Unwrap failed before StoreResponse took ownership -> release our retain
469- io .netty .util .ReferenceCountUtil .safeRelease (content );
448+ rsp .setFaultInjectionRuleEvaluationResults (
449+ request
450+ .faultInjectionRequestContext
451+ .getFaultInjectionRuleEvaluationResults (reactorNettyRequestRecord .getTransportRequestId ()));
470452 }
453+ }
471454
472- throw t ;
455+ if (request .requestContext .cosmosDiagnostics != null ) {
456+ BridgeInternal .recordGatewayResponse (request .requestContext .cosmosDiagnostics , request , rsp , globalEndpointManager );
473457 }
458+
459+ return rsp ;
474460 })
475461 .single ();
476462
0 commit comments