Skip to content

Commit 9ec0672

Browse files
Range based multipart download subscriber
1 parent 9e6bf61 commit 9ec0672

File tree

4 files changed

+836
-0
lines changed

4 files changed

+836
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.internal.multipart;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
import java.util.regex.Matcher;
21+
import java.util.regex.Pattern;
22+
import org.reactivestreams.Subscriber;
23+
import org.reactivestreams.Subscription;
24+
import software.amazon.awssdk.annotations.Immutable;
25+
import software.amazon.awssdk.annotations.SdkInternalApi;
26+
import software.amazon.awssdk.annotations.ThreadSafe;
27+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
28+
import software.amazon.awssdk.services.s3.S3AsyncClient;
29+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
30+
import software.amazon.awssdk.services.s3.presignedurl.model.PresignedUrlDownloadRequest;
31+
import software.amazon.awssdk.utils.Logger;
32+
33+
/**
34+
* A subscriber implementation that will download all individual parts for a multipart presigned URL download request.
35+
* It receives individual {@link AsyncResponseTransformer} instances which will be used to perform the individual
36+
* range-based part requests using presigned URLs. This is a 'one-shot' class, it should <em>NOT</em> be reused
37+
* for more than one multipart download.
38+
*
39+
* <p>Unlike the standard {@link MultipartDownloaderSubscriber} which uses S3's native multipart API with part numbers,
40+
* this subscriber uses HTTP range requests against presigned URLs to achieve multipart download functionality.
41+
* <p>This implementation is thread-safe and handles concurrent part downloads while maintaining proper
42+
* ordering and validation of responses.</p>
43+
*/
44+
@ThreadSafe
45+
@Immutable
46+
@SdkInternalApi
47+
public class PresignedUrlMultipartDownloaderSubscriber
48+
implements Subscriber<AsyncResponseTransformer<GetObjectResponse, GetObjectResponse>> {
49+
50+
private static final Logger log = Logger.loggerFor(PresignedUrlMultipartDownloaderSubscriber.class);
51+
private static final String BYTES_RANGE_PREFIX = "bytes=";
52+
private static final Pattern CONTENT_RANGE_PATTERN = Pattern.compile("bytes\\s+(\\d+)-(\\d+)/(\\d+)");
53+
54+
private final S3AsyncClient s3AsyncClient;
55+
private final PresignedUrlDownloadRequest presignedUrlDownloadRequest;
56+
private final long configuredPartSizeInBytes;
57+
private final int completedParts;
58+
private final CompletableFuture<Void> future;
59+
private final Object lock = new Object();
60+
private volatile MultipartDownloadState state;
61+
private Subscription subscription;
62+
63+
private static class MultipartDownloadState {
64+
final long totalContentLength;
65+
final long actualPartSizeInBytes;
66+
final int totalParts;
67+
final AtomicInteger completedParts;
68+
final String etag;
69+
70+
MultipartDownloadState(long totalLength, long partSize, int totalParts, String etag, int completedParts) {
71+
this.totalContentLength = totalLength;
72+
this.actualPartSizeInBytes = partSize;
73+
this.totalParts = totalParts;
74+
this.completedParts = new AtomicInteger(completedParts);
75+
this.etag = etag;
76+
}
77+
}
78+
79+
public PresignedUrlMultipartDownloaderSubscriber(
80+
S3AsyncClient s3AsyncClient,
81+
PresignedUrlDownloadRequest presignedUrlDownloadRequest,
82+
long configuredPartSizeInBytes) {
83+
this.s3AsyncClient = s3AsyncClient;
84+
this.presignedUrlDownloadRequest = presignedUrlDownloadRequest;
85+
this.configuredPartSizeInBytes = configuredPartSizeInBytes;
86+
this.completedParts = 0;
87+
this.future = new CompletableFuture<>();
88+
}
89+
90+
@Override
91+
public void onSubscribe(Subscription s) {
92+
synchronized (lock) {
93+
if (subscription != null) {
94+
s.cancel();
95+
return;
96+
}
97+
this.subscription = s;
98+
s.request(1);
99+
}
100+
}
101+
102+
@Override
103+
public void onNext(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> asyncResponseTransformer) {
104+
if (asyncResponseTransformer == null) {
105+
subscription.cancel();
106+
throw new NullPointerException("onNext must not be called with null asyncResponseTransformer");
107+
}
108+
synchronized (lock) {
109+
if (state == null) {
110+
performSizeDiscoveryAndFirstPart(asyncResponseTransformer);
111+
} else {
112+
downloadNextPart(asyncResponseTransformer);
113+
}
114+
}
115+
}
116+
117+
private void performSizeDiscoveryAndFirstPart(AsyncResponseTransformer<GetObjectResponse,
118+
GetObjectResponse> asyncResponseTransformer) {
119+
if (completedParts > 0) {
120+
performSizeDiscoveryOnly(asyncResponseTransformer);
121+
return;
122+
}
123+
long endByte = configuredPartSizeInBytes - 1;
124+
String firstPartRange = String.format("%s0-%d", BYTES_RANGE_PREFIX, endByte);
125+
PresignedUrlDownloadRequest firstPartRequest = presignedUrlDownloadRequest.toBuilder()
126+
.range(firstPartRange)
127+
.build();
128+
s3AsyncClient.presignedUrlExtension().getObject(firstPartRequest, asyncResponseTransformer)
129+
.whenComplete((response, error) -> {
130+
if (error != null) {
131+
log.debug(() -> "Error encountered during first part request");
132+
onError(error);
133+
return;
134+
}
135+
try {
136+
String contentRange = response.contentRange();
137+
if (contentRange == null) {
138+
onError(new IllegalStateException("No Content-Range header in response"));
139+
return;
140+
}
141+
long totalSize = parseContentRangeForTotalSize(contentRange);
142+
if (totalSize <= configuredPartSizeInBytes) {
143+
subscription.cancel();
144+
return;
145+
}
146+
String etag = response.eTag();
147+
initializeStateAfterFirstPart(totalSize, etag);
148+
if (state.totalParts > 1) {
149+
subscription.request(1);
150+
} else {
151+
subscription.cancel();
152+
}
153+
} catch (Exception e) {
154+
log.debug(() -> "Error during first part processing", e);
155+
onError(e);
156+
}
157+
});
158+
}
159+
160+
private void performSizeDiscoveryOnly(
161+
AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> asyncResponseTransformer) {
162+
String sizeDiscoveryRange = String.format("%s0-0", BYTES_RANGE_PREFIX);
163+
PresignedUrlDownloadRequest sizeDiscoveryRequest = presignedUrlDownloadRequest.toBuilder()
164+
.range(sizeDiscoveryRange)
165+
.build();
166+
167+
s3AsyncClient.presignedUrlExtension().getObject(sizeDiscoveryRequest, asyncResponseTransformer)
168+
.whenComplete((response, error) -> {
169+
if (error != null) {
170+
log.debug(() -> "Error encountered during size discovery request");
171+
onError(error);
172+
return;
173+
}
174+
try {
175+
String contentRange = response.contentRange();
176+
if (contentRange == null) {
177+
onError(new IllegalStateException("No Content-Range header in response"));
178+
return;
179+
}
180+
long totalSize = parseContentRangeForTotalSize(contentRange);
181+
String etag = response.eTag();
182+
if (etag == null) {
183+
onError(new IllegalStateException("No ETag in response, cannot ensure consistency"));
184+
return;
185+
}
186+
int totalParts = calculateTotalParts(totalSize, configuredPartSizeInBytes);
187+
this.state = new MultipartDownloadState(totalSize, configuredPartSizeInBytes,
188+
totalParts, etag, completedParts);
189+
if (completedParts < state.totalParts) {
190+
subscription.request(1);
191+
} else {
192+
subscription.cancel();
193+
}
194+
} catch (Exception e) {
195+
log.debug(() -> "Error during size discovery processing", e);
196+
onError(e);
197+
}
198+
});
199+
}
200+
201+
private void downloadNextPart(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> transformer) {
202+
int nextPartIndex = state.completedParts.getAndIncrement();
203+
if (nextPartIndex >= state.totalParts) {
204+
subscription.cancel();
205+
return;
206+
}
207+
PresignedUrlDownloadRequest partRequest = createPartRequest(nextPartIndex);
208+
String expectedRange = partRequest.range();
209+
s3AsyncClient.presignedUrlExtension().getObject(partRequest, transformer)
210+
.whenComplete((response, error) -> {
211+
if (error != null) {
212+
log.debug(() -> "Error encountered during part request with range=" + expectedRange);
213+
onError(error);
214+
} else {
215+
try {
216+
validatePartResponse(response, nextPartIndex, expectedRange);
217+
int completedCount = nextPartIndex + 1;
218+
if (completedCount < state.totalParts) {
219+
subscription.request(1);
220+
} else {
221+
subscription.cancel();
222+
}
223+
} catch (Exception validationError) {
224+
log.debug(() -> "Validation failed for part " + (nextPartIndex + 1));
225+
onError(validationError);
226+
}
227+
}
228+
});
229+
}
230+
231+
private void initializeStateAfterFirstPart(long totalSize, String etag) {
232+
int totalParts = calculateTotalParts(totalSize, configuredPartSizeInBytes);
233+
this.state = new MultipartDownloadState(totalSize, configuredPartSizeInBytes, totalParts, etag, completedParts + 1);
234+
}
235+
236+
private long parseContentRangeForTotalSize(String contentRange) {
237+
Matcher matcher = CONTENT_RANGE_PATTERN.matcher(contentRange);
238+
if (!matcher.matches()) {
239+
throw new IllegalArgumentException("Invalid Content-Range header: " + contentRange);
240+
}
241+
return Long.parseLong(matcher.group(3));
242+
}
243+
244+
private int calculateTotalParts(long contentLength, long partSize) {
245+
return (int) Math.ceil((double) contentLength / partSize);
246+
}
247+
248+
private PresignedUrlDownloadRequest createPartRequest(int partIndex) {
249+
long startByte = partIndex * state.actualPartSizeInBytes;
250+
long endByte = Math.min(startByte + state.actualPartSizeInBytes - 1, state.totalContentLength - 1);
251+
String rangeHeader = BYTES_RANGE_PREFIX + startByte + "-" + endByte;
252+
253+
return presignedUrlDownloadRequest.toBuilder()
254+
.range(rangeHeader)
255+
.build();
256+
}
257+
258+
@Override
259+
public void onError(Throwable t) {
260+
log.debug(() -> "Error in multipart download", t);
261+
future.completeExceptionally(t);
262+
}
263+
264+
@Override
265+
public void onComplete() {
266+
future.complete(null);
267+
}
268+
269+
public CompletableFuture<Void> future() {
270+
return this.future;
271+
}
272+
273+
private void validatePartResponse(GetObjectResponse response, int partIndex, String expectedRange) {
274+
if (response == null) {
275+
throw new IllegalArgumentException("Response cannot be null");
276+
}
277+
String responseETag = response.eTag();
278+
if (responseETag != null && state.etag != null && !state.etag.equals(responseETag)) {
279+
throw new IllegalStateException("ETag mismatch - object may have changed during download");
280+
}
281+
}
282+
}

0 commit comments

Comments
 (0)