Skip to content

Commit f8aec02

Browse files
Fix MockPartialResponsePolicy to work with decoded response streams
The issue was that collectList() was consuming the entire decoded stream when the decoder policy was applied before the mock policy. Changed to stream-based limiting that works correctly whether the body is encoded or decoded. The limitStreamToBytes method manipulates the Flux directly without collecting all buffers, properly simulating network interruptions during streaming. Co-authored-by: gunjansingh-msft <[email protected]>
1 parent c5644ce commit f8aec02

File tree

1 file changed

+48
-45
lines changed

1 file changed

+48
-45
lines changed

sdk/storage/azure-storage-common/src/test-shared/java/com/azure/storage/common/test/shared/policy/MockPartialResponsePolicy.java

Lines changed: 48 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -68,55 +68,58 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
6868
return Mono.just(response);
6969
} else {
7070
this.tries -= 1;
71-
// Collect the body to be able to slice it properly
72-
return response.getBody().collectList().flatMap(bodyBuffers -> {
73-
if (bodyBuffers.isEmpty()) {
74-
// If no body was returned, don't attempt to slice a partial response
75-
return Mono.just(response);
76-
}
77-
78-
// Calculate total bytes available
79-
int totalBytes = bodyBuffers.stream().mapToInt(ByteBuffer::remaining).sum();
80-
81-
// Determine how many bytes to return (limited by maxBytesPerResponse)
82-
int bytesToReturn = Math.min(totalBytes, maxBytesPerResponse);
83-
84-
if (bytesToReturn >= totalBytes) {
85-
// Return all data and still throw error to simulate interruption during next chunk
86-
return Mono.just(new MockDownloadHttpResponse(response, 206,
87-
Flux.fromIterable(bodyBuffers)
88-
.concatWith(Flux.error(new IOException("Simulated timeout")))));
89-
}
90-
91-
// Create a new buffer with limited bytes
92-
ByteBuffer limited = ByteBuffer.allocate(bytesToReturn);
93-
int bytesCollected = 0;
94-
95-
for (ByteBuffer buffer : bodyBuffers) {
96-
int bufferRemaining = buffer.remaining();
97-
int bytesNeeded = bytesToReturn - bytesCollected;
98-
99-
if (bufferRemaining <= bytesNeeded) {
100-
// Take the entire buffer
101-
limited.put(buffer);
102-
bytesCollected += bufferRemaining;
103-
} else {
104-
// Take only part of this buffer
105-
ByteBuffer slice = buffer.duplicate();
106-
slice.limit(slice.position() + bytesNeeded);
107-
limited.put(slice);
108-
bytesCollected += bytesNeeded;
109-
break;
110-
}
71+
// Don't use collectList() as it would consume the entire stream.
72+
// Instead, manipulate the Flux directly to limit bytes before throwing error.
73+
// This works correctly whether the body is encoded or decoded.
74+
Flux<ByteBuffer> limitedBody = limitStreamToBytes(response.getBody(), maxBytesPerResponse);
75+
return Mono.just(new MockDownloadHttpResponse(response, 206, limitedBody));
76+
}
77+
});
78+
}
79+
80+
/**
81+
* Limits a Flux of ByteBuffers to emit at most maxBytes before throwing an IOException.
82+
* This works on the stream directly without collecting all buffers, allowing it to work
83+
* correctly whether the stream contains encoded or decoded data.
84+
*/
85+
private Flux<ByteBuffer> limitStreamToBytes(Flux<ByteBuffer> body, int maxBytes) {
86+
return Flux.defer(() -> {
87+
final long[] bytesEmitted = new long[]{0};
88+
return body.concatMap(buffer -> {
89+
if (buffer == null || !buffer.hasRemaining()) {
90+
return Flux.just(buffer);
91+
}
92+
93+
long remaining = maxBytes - bytesEmitted[0];
94+
if (remaining <= 0) {
95+
// Already emitted enough, throw error immediately
96+
return Flux.error(new IOException("Simulated timeout"));
97+
}
98+
99+
int bufferSize = buffer.remaining();
100+
if (bufferSize <= remaining) {
101+
// Emit the entire buffer
102+
bytesEmitted[0] += bufferSize;
103+
if (bytesEmitted[0] >= maxBytes) {
104+
// Hit the limit, emit this buffer then error
105+
return Flux.just(buffer).concatWith(Flux.error(new IOException("Simulated timeout")));
111106
}
107+
return Flux.just(buffer);
108+
} else {
109+
// Buffer is larger than remaining, need to slice it
110+
int bytesToEmit = (int) remaining;
111+
ByteBuffer slice = buffer.duplicate();
112+
slice.limit(slice.position() + bytesToEmit);
112113

114+
ByteBuffer limited = ByteBuffer.allocate(bytesToEmit);
115+
limited.put(slice);
113116
limited.flip();
114117

115-
// Return the limited buffer and simulate timeout
116-
return Mono.just(new MockDownloadHttpResponse(response, 206,
117-
Flux.just(limited).concatWith(Flux.error(new IOException("Simulated timeout")))));
118-
});
119-
}
118+
bytesEmitted[0] += bytesToEmit;
119+
// Emit the limited buffer then error
120+
return Flux.just(limited).concatWith(Flux.error(new IOException("Simulated timeout")));
121+
}
122+
});
120123
});
121124
}
122125

0 commit comments

Comments
 (0)