Skip to content

Commit c5644ce

Browse files
Revert to collectList approach with proper byte limiting for structured messages
The streaming approach had issues with buffer consumption. Reverted to collecting buffers first but with proper byte limiting (560 bytes default) to ensure at least one segment completes before interruption. This properly tests smart retry from segment boundaries. Co-authored-by: gunjansingh-msft <[email protected]>
1 parent 65ab1bf commit c5644ce

File tree

1 file changed

+48
-44
lines changed

1 file changed

+48
-44
lines changed

sdk/storage/azure-storage-common/src/test-shared/java/com/azure/storage/common/test/shared/policy/MockPartialResponsePolicy.java

Lines changed: 48 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -68,51 +68,55 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
6868
return Mono.just(response);
6969
} else {
7070
this.tries -= 1;
71-
// Simulate partial response by limiting the amount of data returned from the stream
72-
// before throwing an IOException to simulate a network interruption.
73-
// This tests smart retry behavior where downloads should resume from the last
74-
// complete segment boundary after each interruption.
75-
Flux<ByteBuffer> interruptedBody = limitAndInterruptStream(response.getBody(), maxBytesPerResponse);
76-
return Mono.just(new MockDownloadHttpResponse(response, 206, interruptedBody));
77-
}
78-
});
79-
}
80-
81-
/**
82-
* Limits a stream to return at most maxBytes before throwing an IOException.
83-
*/
84-
private Flux<ByteBuffer> limitAndInterruptStream(Flux<ByteBuffer> body, int maxBytes) {
85-
return Flux.defer(() -> {
86-
final int[] bytesEmitted = new int[] {0};
87-
return body.concatMap(buffer -> {
88-
int remaining = maxBytes - bytesEmitted[0];
89-
if (remaining <= 0) {
90-
// Already emitted enough bytes, throw error now
91-
return Flux.error(new IOException("Simulated timeout"));
92-
}
93-
94-
int bytesToEmit = Math.min(buffer.remaining(), remaining);
95-
if (bytesToEmit < buffer.remaining()) {
96-
// Need to slice the buffer
97-
ByteBuffer limited = ByteBuffer.allocate(bytesToEmit);
98-
int originalLimit = buffer.limit();
99-
buffer.limit(buffer.position() + bytesToEmit);
100-
limited.put(buffer);
101-
buffer.limit(originalLimit);
102-
limited.flip();
103-
bytesEmitted[0] += bytesToEmit;
104-
// Emit the limited buffer, then error
105-
return Flux.just(limited).concatWith(Flux.error(new IOException("Simulated timeout")));
106-
} else {
107-
// Emit the full buffer and continue
108-
bytesEmitted[0] += bytesToEmit;
109-
if (bytesEmitted[0] >= maxBytes) {
110-
// Reached the limit, emit this buffer then error
111-
return Flux.just(buffer).concatWith(Flux.error(new IOException("Simulated timeout")));
71+
// Collect the body to be able to slice it properly
72+
return response.getBody().collectList().flatMap(bodyBuffers -> {
73+
if (bodyBuffers.isEmpty()) {
74+
// If no body was returned, don't attempt to slice a partial response
75+
return Mono.just(response);
11276
}
113-
return Flux.just(buffer);
114-
}
115-
});
77+
78+
// Calculate total bytes available
79+
int totalBytes = bodyBuffers.stream().mapToInt(ByteBuffer::remaining).sum();
80+
81+
// Determine how many bytes to return (limited by maxBytesPerResponse)
82+
int bytesToReturn = Math.min(totalBytes, maxBytesPerResponse);
83+
84+
if (bytesToReturn >= totalBytes) {
85+
// Return all data and still throw error to simulate interruption during next chunk
86+
return Mono.just(new MockDownloadHttpResponse(response, 206,
87+
Flux.fromIterable(bodyBuffers)
88+
.concatWith(Flux.error(new IOException("Simulated timeout")))));
89+
}
90+
91+
// Create a new buffer with limited bytes
92+
ByteBuffer limited = ByteBuffer.allocate(bytesToReturn);
93+
int bytesCollected = 0;
94+
95+
for (ByteBuffer buffer : bodyBuffers) {
96+
int bufferRemaining = buffer.remaining();
97+
int bytesNeeded = bytesToReturn - bytesCollected;
98+
99+
if (bufferRemaining <= bytesNeeded) {
100+
// Take the entire buffer
101+
limited.put(buffer);
102+
bytesCollected += bufferRemaining;
103+
} else {
104+
// Take only part of this buffer
105+
ByteBuffer slice = buffer.duplicate();
106+
slice.limit(slice.position() + bytesNeeded);
107+
limited.put(slice);
108+
bytesCollected += bytesNeeded;
109+
break;
110+
}
111+
}
112+
113+
limited.flip();
114+
115+
// Return the limited buffer and simulate timeout
116+
return Mono.just(new MockDownloadHttpResponse(response, 206,
117+
Flux.just(limited).concatWith(Flux.error(new IOException("Simulated timeout")))));
118+
});
119+
}
116120
});
117121
}
118122

0 commit comments

Comments
 (0)