@@ -107,13 +107,25 @@ private void applyStructuredMessage(HttpPipelineCallContext context) {
107107 BufferStagingArea stagingArea
108108 = new BufferStagingArea (STATIC_MAXIMUM_ENCODED_DATA_LENGTH , STATIC_MAXIMUM_ENCODED_DATA_LENGTH );
109109
110- Flux <ByteBuffer > encodedBody = context .getHttpRequest ()
111- .getBody ()
112- .flatMapSequential (stagingArea ::write , 1 , 1 )
113- .concatWith (Flux .defer (stagingArea ::flush ))
114- .concatMap (bufferAggregator -> bufferAggregator .asFlux ().concatMap (structuredMessageEncoder ::encode ));
110+ // reactive stream backpressure will not correctly propagate without limitRate
111+ // Flux<ByteBuffer> encodedBody = Flux.from(context.getHttpRequest().getBody())
112+ // .limitRate(1) // force one-at-a-time from source
113+ // .flatMapSequential(stagingArea::write, 1, 1)
114+ // .concatWith(Flux.defer(stagingArea::flush))
115+ // .concatMap(bufferAggregator -> bufferAggregator.asFlux().concatMap(structuredMessageEncoder::encode));
116+
117+ // Flux<ByteBuffer> encodedBody = context.getHttpRequest()
118+ // .getBody()
119+ // .concatMap(buffer -> splitLargeBuffer(buffer, STATIC_MAXIMUM_ENCODED_DATA_LENGTH))
120+ // .limitRate(1)
121+ // .concatMap(structuredMessageEncoder::encode, 1);
122+
123+ // Flux<ByteBuffer> encodedBody = context.getHttpRequest()
124+ // .getBody()
125+ // .flatMapSequential(stagingArea::write, 1, 1)
126+ // .concatWith(Flux.defer(stagingArea::flush))
127+ // .concatMap(bufferAggregator -> bufferAggregator.asFlux().concatMap(structuredMessageEncoder::encode));
115128
116- // Set the encoded body
117129 context .getHttpRequest ().setBody (encodedBody );
118130
119131 context .getHttpRequest ()
@@ -127,4 +139,29 @@ private void applyStructuredMessage(HttpPipelineCallContext context) {
127139 //return structuredMessageEncoder.getMessageCRC64();
128140 }
129141
142+ private Flux <ByteBuffer > splitLargeBuffer (ByteBuffer buffer , int maxChunkSize ) {
143+ // For small buffers, copy to ensure no retention of larger upstream buffers
144+ if (buffer .remaining () <= maxChunkSize ) {
145+ byte [] copy = new byte [buffer .remaining ()];
146+ buffer .get (copy );
147+ return Flux .just (ByteBuffer .wrap (copy ));
148+ }
149+
150+ return Flux .generate (buffer ::duplicate , (buf , sink ) -> {
151+ if (!buf .hasRemaining ()) {
152+ sink .complete ();
153+ return buf ;
154+ }
155+
156+ int chunkSize = Math .min (buf .remaining (), maxChunkSize );
157+
158+ // Copy data instead of slicing to avoid retaining original buffer
159+ byte [] chunkData = new byte [chunkSize ];
160+ buf .get (chunkData );
161+
162+ sink .next (ByteBuffer .wrap (chunkData ));
163+ return buf ;
164+ });
165+ }
166+
130167}
0 commit comments