@@ -54,27 +54,14 @@ public class PresignedUrlMultipartDownloaderSubscriber
54
54
private final S3AsyncClient s3AsyncClient ;
55
55
private final PresignedUrlDownloadRequest presignedUrlDownloadRequest ;
56
56
private final long configuredPartSizeInBytes ;
57
- private final int completedParts ;
58
57
private final CompletableFuture <Void > future ;
59
58
private final Object lock = new Object ();
60
- private volatile MultipartDownloadState state ;
61
- private Subscription subscription ;
59
+ private final AtomicInteger completedParts ;
62
60
63
- private static class MultipartDownloadState {
64
- final long totalContentLength ;
65
- final long actualPartSizeInBytes ;
66
- final int totalParts ;
67
- final AtomicInteger completedParts ;
68
- final String etag ;
69
-
70
- MultipartDownloadState (long totalLength , long partSize , int totalParts , String etag , int completedParts ) {
71
- this .totalContentLength = totalLength ;
72
- this .actualPartSizeInBytes = partSize ;
73
- this .totalParts = totalParts ;
74
- this .completedParts = new AtomicInteger (completedParts );
75
- this .etag = etag ;
76
- }
77
- }
61
+ private volatile Long totalContentLength ;
62
+ private volatile Integer totalParts ;
63
+ private volatile String eTag ;
64
+ private volatile Subscription subscription ;
78
65
79
66
public PresignedUrlMultipartDownloaderSubscriber (
80
67
S3AsyncClient s3AsyncClient ,
@@ -83,7 +70,7 @@ public PresignedUrlMultipartDownloaderSubscriber(
83
70
this .s3AsyncClient = s3AsyncClient ;
84
71
this .presignedUrlDownloadRequest = presignedUrlDownloadRequest ;
85
72
this .configuredPartSizeInBytes = configuredPartSizeInBytes ;
86
- this .completedParts = 0 ;
73
+ this .completedParts = new AtomicInteger ( 0 ) ;
87
74
this .future = new CompletableFuture <>();
88
75
}
89
76
@@ -102,135 +89,87 @@ public void onSubscribe(Subscription s) {
102
89
@ Override
103
90
public void onNext (AsyncResponseTransformer <GetObjectResponse , GetObjectResponse > asyncResponseTransformer ) {
104
91
if (asyncResponseTransformer == null ) {
105
- subscription .cancel ();
106
92
throw new NullPointerException ("onNext must not be called with null asyncResponseTransformer" );
107
93
}
94
+
95
+ int nextPartIndex ;
108
96
synchronized (lock ) {
109
- if (state == null ) {
110
- performSizeDiscoveryAndFirstPart (asyncResponseTransformer );
111
- } else {
112
- downloadNextPart (asyncResponseTransformer );
97
+ nextPartIndex = completedParts .get ();
98
+ if (totalParts != null && nextPartIndex >= totalParts ) {
99
+ log .debug (() -> String .format ("Completing multipart download after a total of %d parts downloaded." , totalParts ));
100
+ subscription .cancel ();
101
+ return ;
113
102
}
103
+ completedParts .incrementAndGet ();
114
104
}
105
+
106
+ makeRangeRequest (nextPartIndex , asyncResponseTransformer );
115
107
}
116
108
117
- private void performSizeDiscoveryAndFirstPart (AsyncResponseTransformer <GetObjectResponse ,
118
- GetObjectResponse > asyncResponseTransformer ) {
119
- if (completedParts > 0 ) {
120
- performSizeDiscoveryOnly (asyncResponseTransformer );
121
- return ;
122
- }
123
- long endByte = configuredPartSizeInBytes - 1 ;
124
- String firstPartRange = String .format ("%s0-%d" , BYTES_RANGE_PREFIX , endByte );
125
- PresignedUrlDownloadRequest firstPartRequest = presignedUrlDownloadRequest .toBuilder ()
126
- .range (firstPartRange )
127
- .build ();
128
- s3AsyncClient .presignedUrlExtension ().getObject (firstPartRequest , asyncResponseTransformer )
129
- .whenComplete ((response , error ) -> {
130
- if (error != null ) {
131
- log .debug (() -> "Error encountered during first part request" );
132
- onError (error );
133
- return ;
134
- }
135
- try {
136
- String contentRange = response .contentRange ();
137
- if (contentRange == null ) {
138
- onError (new IllegalStateException ("No Content-Range header in response" ));
139
- return ;
140
- }
141
- long totalSize = parseContentRangeForTotalSize (contentRange );
142
- if (totalSize <= configuredPartSizeInBytes ) {
143
- subscription .cancel ();
144
- return ;
145
- }
146
- String etag = response .eTag ();
147
- initializeStateAfterFirstPart (totalSize , etag );
148
- if (state .totalParts > 1 ) {
149
- subscription .request (1 );
150
- } else {
151
- subscription .cancel ();
152
- }
153
- } catch (Exception e ) {
154
- log .debug (() -> "Error during first part processing" , e );
155
- onError (e );
156
- }
157
- });
158
- }
159
-
160
- private void performSizeDiscoveryOnly (
161
- AsyncResponseTransformer <GetObjectResponse , GetObjectResponse > asyncResponseTransformer ) {
162
- String sizeDiscoveryRange = String .format ("%s0-0" , BYTES_RANGE_PREFIX );
163
- PresignedUrlDownloadRequest sizeDiscoveryRequest = presignedUrlDownloadRequest .toBuilder ()
164
- .range (sizeDiscoveryRange )
165
- .build ();
166
-
167
- s3AsyncClient .presignedUrlExtension ().getObject (sizeDiscoveryRequest , asyncResponseTransformer )
168
- .whenComplete ((response , error ) -> {
169
- if (error != null ) {
170
- log .debug (() -> "Error encountered during size discovery request" );
171
- onError (error );
172
- return ;
173
- }
174
- try {
175
- String contentRange = response .contentRange ();
176
- if (contentRange == null ) {
177
- onError (new IllegalStateException ("No Content-Range header in response" ));
178
- return ;
179
- }
180
- long totalSize = parseContentRangeForTotalSize (contentRange );
181
- String etag = response .eTag ();
182
- if (etag == null ) {
183
- onError (new IllegalStateException ("No ETag in response, cannot ensure consistency" ));
184
- return ;
185
- }
186
- int totalParts = calculateTotalParts (totalSize , configuredPartSizeInBytes );
187
- this .state = new MultipartDownloadState (totalSize , configuredPartSizeInBytes ,
188
- totalParts , etag , completedParts );
189
- if (completedParts < state .totalParts ) {
190
- subscription .request (1 );
191
- } else {
192
- subscription .cancel ();
193
- }
194
- } catch (Exception e ) {
195
- log .debug (() -> "Error during size discovery processing" , e );
196
- onError (e );
197
- }
198
- });
109
+ private void makeRangeRequest (int partIndex ,
110
+ AsyncResponseTransformer <GetObjectResponse ,
111
+ GetObjectResponse > asyncResponseTransformer ) {
112
+ PresignedUrlDownloadRequest partRequest = createPartRequest (partIndex );
113
+ log .debug (() -> "Sending range request for part " + partIndex + " with range=" + partRequest .range ());
114
+
115
+ s3AsyncClient .presignedUrlExtension ()
116
+ .getObject (partRequest , asyncResponseTransformer )
117
+ .whenComplete ((response , error ) -> {
118
+ if (error != null ) {
119
+ log .debug (() -> "Error encountered during part request for part " + partIndex );
120
+ handleError (error );
121
+ return ;
122
+ }
123
+ requestMoreIfNeeded (response );
124
+ });
199
125
}
200
126
201
- private void downloadNextPart (AsyncResponseTransformer <GetObjectResponse , GetObjectResponse > transformer ) {
202
- int nextPartIndex = state .completedParts .getAndIncrement ();
203
- if (nextPartIndex >= state .totalParts ) {
204
- subscription .cancel ();
205
- return ;
127
+ private void requestMoreIfNeeded (GetObjectResponse response ) {
128
+ int totalComplete = completedParts .get ();
129
+ log .debug (() -> String .format ("Completed part %d" , totalComplete ));
130
+
131
+ synchronized (lock ) {
132
+ if (eTag == null ) {
133
+ this .eTag = response .eTag ();
134
+ log .debug (() -> String .format ("Multipart object ETag: %s" , this .eTag ));
135
+ } else if (response .eTag () != null && !eTag .equals (response .eTag ())) {
136
+ handleError (new IllegalStateException ("ETag mismatch - object may have changed during download" ));
137
+ return ;
138
+ }
139
+ if (totalContentLength == null && response .contentRange () != null ) {
140
+ try {
141
+ validateResponse (response );
142
+ long totalSize = parseContentRangeForTotalSize (response .contentRange ());
143
+ int calculatedTotalParts = calculateTotalParts (totalSize , configuredPartSizeInBytes );
144
+ this .totalContentLength = totalSize ;
145
+ this .totalParts = calculatedTotalParts ;
146
+ log .debug (() -> String .format ("Total content length: %d, Total parts: %d" , totalSize , calculatedTotalParts ));
147
+ } catch (Exception e ) {
148
+ log .debug (() -> "Failed to parse content range" , e );
149
+ handleError (e );
150
+ return ;
151
+ }
152
+ }
153
+ if (totalParts != null && totalParts > 1 && totalComplete < totalParts ) {
154
+ subscription .request (1 );
155
+ } else {
156
+ log .debug (() -> String .format ("Completing multipart download after a total of %d parts downloaded." , totalParts ));
157
+ subscription .cancel ();
158
+ }
206
159
}
207
- PresignedUrlDownloadRequest partRequest = createPartRequest (nextPartIndex );
208
- String expectedRange = partRequest .range ();
209
- s3AsyncClient .presignedUrlExtension ().getObject (partRequest , transformer )
210
- .whenComplete ((response , error ) -> {
211
- if (error != null ) {
212
- log .debug (() -> "Error encountered during part request with range=" + expectedRange );
213
- onError (error );
214
- } else {
215
- try {
216
- validatePartResponse (response , nextPartIndex , expectedRange );
217
- int completedCount = nextPartIndex + 1 ;
218
- if (completedCount < state .totalParts ) {
219
- subscription .request (1 );
220
- } else {
221
- subscription .cancel ();
222
- }
223
- } catch (Exception validationError ) {
224
- log .debug (() -> "Validation failed for part " + (nextPartIndex + 1 ));
225
- onError (validationError );
226
- }
227
- }
228
- });
229
160
}
230
161
231
- private void initializeStateAfterFirstPart (long totalSize , String etag ) {
232
- int totalParts = calculateTotalParts (totalSize , configuredPartSizeInBytes );
233
- this .state = new MultipartDownloadState (totalSize , configuredPartSizeInBytes , totalParts , etag , completedParts + 1 );
162
+ private void validateResponse (GetObjectResponse response ) {
163
+ if (response == null ) {
164
+ throw new IllegalStateException ("Response cannot be null" );
165
+ }
166
+ if (response .contentRange () == null ) {
167
+ throw new IllegalStateException ("No Content-Range header in response" );
168
+ }
169
+ Long contentLength = response .contentLength ();
170
+ if (contentLength == null || contentLength <= 0 ) {
171
+ throw new IllegalStateException ("Invalid or missing Content-Length in response" );
172
+ }
234
173
}
235
174
236
175
private long parseContentRangeForTotalSize (String contentRange ) {
@@ -246,15 +185,29 @@ private int calculateTotalParts(long contentLength, long partSize) {
246
185
}
247
186
248
187
private PresignedUrlDownloadRequest createPartRequest (int partIndex ) {
249
- long startByte = partIndex * state .actualPartSizeInBytes ;
250
- long endByte = Math .min (startByte + state .actualPartSizeInBytes - 1 , state .totalContentLength - 1 );
188
+ long startByte = partIndex * configuredPartSizeInBytes ;
189
+ long endByte ;
190
+
191
+ if (totalContentLength != null ) {
192
+ endByte = Math .min (startByte + configuredPartSizeInBytes - 1 , totalContentLength - 1 );
193
+ } else {
194
+ endByte = startByte + configuredPartSizeInBytes - 1 ;
195
+ }
251
196
String rangeHeader = BYTES_RANGE_PREFIX + startByte + "-" + endByte ;
252
-
253
197
return presignedUrlDownloadRequest .toBuilder ()
254
198
.range (rangeHeader )
255
199
.build ();
256
200
}
257
201
202
+ private void handleError (Throwable t ) {
203
+ synchronized (lock ) {
204
+ if (subscription != null ) {
205
+ subscription .cancel ();
206
+ }
207
+ }
208
+ onError (t );
209
+ }
210
+
258
211
@ Override
259
212
public void onError (Throwable t ) {
260
213
log .debug (() -> "Error in multipart download" , t );
@@ -269,14 +222,4 @@ public void onComplete() {
269
222
public CompletableFuture <Void > future () {
270
223
return this .future ;
271
224
}
272
-
273
- private void validatePartResponse (GetObjectResponse response , int partIndex , String expectedRange ) {
274
- if (response == null ) {
275
- throw new IllegalArgumentException ("Response cannot be null" );
276
- }
277
- String responseETag = response .eTag ();
278
- if (responseETag != null && state .etag != null && !state .etag .equals (responseETag )) {
279
- throw new IllegalStateException ("ETag mismatch - object may have changed during download" );
280
- }
281
- }
282
225
}
0 commit comments