@@ -23,9 +23,26 @@ public class MockPartialResponsePolicy implements HttpPipelinePolicy {
2323 static final HttpHeaderName RANGE_HEADER = HttpHeaderName .RANGE ;
2424 private int tries ;
2525 private final List <String > rangeHeaders = new ArrayList <>();
26+ private final int maxBytesPerResponse ; // Maximum bytes to return before simulating timeout
2627
28+ /**
29+ * Creates a MockPartialResponsePolicy that simulates network interruptions.
30+ *
31+ * @param tries Number of times to simulate interruptions (0 = no interruptions)
32+ */
2733 public MockPartialResponsePolicy (int tries ) {
34+ this (tries , 560 ); // Default: return up to 560 bytes before interrupting (enough for 1 segment + header)
35+ }
36+
37+ /**
38+ * Creates a MockPartialResponsePolicy with configurable interruption behavior.
39+ *
40+ * @param tries Number of times to simulate interruptions (0 = no interruptions)
41+ * @param maxBytesPerResponse Maximum bytes to return in each interrupted response
42+ */
43+ public MockPartialResponsePolicy (int tries , int maxBytesPerResponse ) {
2844 this .tries = tries ;
45+ this .maxBytesPerResponse = maxBytesPerResponse ;
2946 }
3047
3148 @ Override
@@ -51,14 +68,56 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
5168 return Mono .just (response );
5269 } else {
5370 this .tries -= 1 ;
54- // Simulate partial response by taking only the first buffer from the stream and immediately
55- // throwing an error to simulate a network interruption. This tests smart retry behavior.
56- Flux <ByteBuffer > interruptedBody = response .getBody ().take (1 ).concatWith (Flux .error (new IOException ("Simulated timeout" )));
71+ // Simulate partial response by limiting the amount of data returned from the stream
72+ // before throwing an IOException to simulate a network interruption.
73+ // This tests smart retry behavior where downloads should resume from the last
74+ // complete segment boundary after each interruption.
75+ Flux <ByteBuffer > interruptedBody = limitAndInterruptStream (response .getBody (), maxBytesPerResponse );
5776 return Mono .just (new MockDownloadHttpResponse (response , 206 , interruptedBody ));
5877 }
5978 });
6079 }
6180
81+ /**
82+ * Limits a stream to return at most maxBytes before throwing an IOException.
83+ */
84+ private Flux <ByteBuffer > limitAndInterruptStream (Flux <ByteBuffer > body , int maxBytes ) {
85+ return Flux .defer (() -> {
86+ final int [] bytesEmitted = new int [] {0 };
87+ return body .concatMap (buffer -> {
88+ int remaining = maxBytes - bytesEmitted [0 ];
89+ if (remaining <= 0 ) {
90+ // Already emitted enough bytes, throw error now
91+ return Flux .error (new IOException ("Simulated timeout" ));
92+ }
93+
94+ int bytesToEmit = Math .min (buffer .remaining (), remaining );
95+ if (bytesToEmit < buffer .remaining ()) {
96+ // Need to slice the buffer
97+ ByteBuffer limited = ByteBuffer .allocate (bytesToEmit );
98+ int originalLimit = buffer .limit ();
99+ buffer .limit (buffer .position () + bytesToEmit );
100+ limited .put (buffer );
101+ buffer .limit (originalLimit );
102+ limited .flip ();
103+ bytesEmitted [0 ] += bytesToEmit ;
104+ // Emit the limited buffer, then error
105+ return Flux .just (limited ).concatWith (Flux .error (new IOException ("Simulated timeout" )));
106+ } else {
107+ // Emit the full buffer and continue
108+ bytesEmitted [0 ] += bytesToEmit ;
109+ if (bytesEmitted [0 ] >= maxBytes ) {
110+ // Reached the limit, emit this buffer then error
111+ return Flux .just (buffer ).concatWith (Flux .error (new IOException ("Simulated timeout" )));
112+ }
113+ return Flux .just (buffer );
114+ }
115+ });
116+ });
117+ }
118+ });
119+ }
120+
62121 public int getTriesRemaining () {
63122 return tries ;
64123 }
0 commit comments