Suspend/pause file download over HTTP (presignURL) #156
Replies: 2 comments
-
Download from MinIO using presignURL presignURL MUST be valid during whole download duration!
Example logic could look like following (we can omit logic for obtaining presignURL): TransferState - persist transfer state in Mongo @Document(collection = "transfer_states")
public class TransferState {
@Id
private String id;
private String uploadId;
private long downloadedBytes;
private int partNumber;
private List<String> etags = new ArrayList<>();
private String sourceBucket;
private String sourceObject;
private String destBucket;
private String destObject;
private LocalDateTime lastUpdated;
} TransferStateRepository - make changes to extends MongoRepository @Service
public class TransferStateRepository {
@Autowired
private MongoTemplate mongoTemplate;
public TransferState save(TransferState state) {
state.setLastUpdated(LocalDateTime.now());
return mongoTemplate.save(state);
}
public TransferState findById(String id) {
return mongoTemplate.findById(id, TransferState.class);
}
public void deleteById(String id) {
Query query = new Query(Criteria.where("_id").is(id));
mongoTemplate.remove(query, TransferState.class);
}
} public class PresignedPersistentDownloader implements Runnable {
private final OkHttpClient client = new OkHttpClient();
private final MinioClient sourceMinioClient;
private final MinioClient destMinioClient;
private final String sourceBucket;
private final String sourceObject;
private final String destBucket;
private final String destObject;
private final String transferId;
private final TransferStateRepository stateRepository;
private final int partSize = 5 * 1024 * 1024;
private volatile boolean paused = false;
private volatile boolean stopped = false;
private static final int MAX_RETRIES = 3;
public PresignedBucketDownloader(MinioClient sourceMinioClient,
MinioClient destMinioClient,
String sourceBucket,
String sourceObject,
String destBucket,
String destObject,
String transferId,
TransferStateRepository stateRepository) {
this.sourceMinioClient = sourceMinioClient;
this.destMinioClient = destMinioClient;
this.sourceBucket = sourceBucket;
this.sourceObject = sourceObject;
this.destBucket = destBucket;
this.destObject = destObject;
this.transferId = transferId;
this.stateRepository = stateRepository;
}
private TransferState getOrCreateState() {
TransferState state = stateRepository.findById(transferId);
if (state == null) {
state = new TransferState();
state.setId(transferId);
state.setSourceBucket(sourceBucket);
state.setSourceObject(sourceObject);
state.setDestBucket(destBucket);
state.setDestObject(destObject);
state.setDownloadedBytes(0);
state.setPartNumber(0);
state = stateRepository.save(state);
}
return state;
}
@Override
public void run() {
TransferState state = getOrCreateState();
int retryCount = 0;
try {
if (state.getUploadId() == null) {
state.setUploadId(destMinioClient.createMultipartUpload(
CreateMultipartUploadArgs.builder()
.bucket(destBucket)
.object(destObject)
.build()));
stateRepository.save(state);
}
while (retryCount < MAX_RETRIES && !stopped) {
try {
String presignedUrl = getPresignedUrl();
Request.Builder builder = new Request.Builder()
.url(presignedUrl);
if (state.downloadedBytes > 0) {
builder.addHeader("Range", "bytes=" + state.downloadedBytes + "-" +
(state.downloadedBytes + partSize - 1));
}
try (Response response = client.newCall(builder.build()).execute()) {
if (!response.isSuccessful() && response.code() != 206) {
throw new IOException("Download failed: " + response);
}
ResponseBody body = response.body();
if (body == null) throw new IOException("Empty response");
// Upload part to destination bucket
try (InputStream in = body.byteStream()) {
UploadPartResponse uploadPartResponse = destMinioClient.uploadPart(
UploadPartArgs.builder()
.bucket(destBucket)
.object(destObject)
.uploadId(state.uploadId)
.partNumber(state.partNumber + 1)
.stream(in, partSize, -1)
.build());
state.etags.add(uploadPartResponse.etag());
state.partNumber++;
state.downloadedBytes += partSize;
stateRepository.save(state);
while (paused) {
synchronized (this) {
wait();
}
}
}
}
// Check if download is complete
StatObjectResponse stat = sourceMinioClient.statObject(
StatObjectArgs.builder()
.bucket(sourceBucket)
.object(sourceObject)
.build());
if (state.downloadedBytes >= stat.size()) {
// Complete multipart upload
destMinioClient.completeMultipartUpload(
CompleteMultipartUploadArgs.builder()
.bucket(destBucket)
.object(destObject)
.uploadId(state.uploadId)
.parts(createPartsList(state.etags))
.build());
// Clean up state after successful completion
stateRepository.deleteById(transferId);
break;
}
} catch (Exception e) {
retryCount++;
if (retryCount < MAX_RETRIES) {
Thread.sleep(1000 * retryCount);
}
}
}
} catch (Exception e) {
// Abort multipart upload on failure
try {
destMinioClient.abortMultipartUpload(
AbortMultipartUploadArgs.builder()
.bucket(destBucket)
.object(destObject)
.uploadId(state.uploadId)
.build());
stateRepository.deleteById(transferId);
} catch (Exception abortException) {
// Log abort failure
}
}
}
private List<Part> createPartsList(List<String> etags) {
List<Part> parts = new ArrayList<>();
for (int i = 0; i < etags.size(); i++) {
parts.add(new Part(i + 1, etags.get(i)));
}
return parts;
}
private static class DownloadState implements Serializable {
String uploadId;
long downloadedBytes;
int partNumber;
List<String> etags = new ArrayList<>();
}
private String getPresignedUrl() throws Exception {
return minioClient.getPresignedObjectUrl(
GetPresignedObjectUrlArgs.builder()
.method(Method.GET)
.bucket(bucketName)
.object(objectName)
.expiry(EXPIRY_MINUTES, TimeUnit.MINUTES)
.build());
}
public void pause() {
paused = true;
}
public void resume() {
paused = false;
synchronized (this) {
notify();
}
}
public void stop() {
stopped = true;
resume(); // Wake up if paused
}
} Example usage: MinioClient sourceClient = MinioClient.builder()
.endpoint("http://source-minio")
.credentials("accessKey1", "secretKey1")
.build();
MinioClient destClient = MinioClient.builder()
.endpoint("http://dest-minio")
.credentials("accessKey2", "secretKey2")
.build();
@Autowired
private TransferStateRepository stateRepository;
// Create unique transfer ID
String transferId = UUID.randomUUID().toString();
PresignedBucketDownloader downloader = new PresignedBucketDownloader(
sourceClient,
destClient,
"source-bucket",
"large-file.zip",
"dest-bucket",
"copied-large-file.zip",
transferId,
stateRepository
);
Thread transferThread = new Thread(downloader);
transferThread.start(); |
Beta Was this translation helpful? Give feedback.
-
Optimized solution that does not rely on Range requests: HttpURLConnection connection = null;
try {
URL url = new URL(presignURL);
connection = (HttpURLConnection) url.openConnection();
connection.setConnectTimeout(DEFAULT_TIMEOUT);
connection.setReadTimeout(DEFAULT_TIMEOUT);
if (state.getDownloadedBytes() > 0) {
log.info("Resuming download from byte offset: {}", state.getDownloadedBytes());
connection.setRequestProperty("Range", "bytes=" + state.getDownloadedBytes() + "-");
}
int responseCode = connection.getResponseCode();
if (responseCode != HttpURLConnection.HTTP_OK && responseCode != HttpURLConnection.HTTP_PARTIAL) {
throw new IOException("Failed to get stream. HTTP response code: " + responseCode);
}
if (responseCode == HttpURLConnection.HTTP_OK) {
state.setTotalBytes(connection.getContentLengthLong());
}
String contentType = connection.getContentType();
String contentDisposition = connection.getHeaderField(HttpHeaders.CONTENT_DISPOSITION);
InputStream inputStream = connection.getInputStream();
String uploadId = state.getUploadId();
long totalBytes = state.getDownloadedBytes();
log.info("Part Number: {}, total bytes: {}", state.getPartNumber(), totalBytes);
if (state.getDownloadedBytes() == 0) {
log.info("Starting new download");
log.info("Starting multipart upload for key: {}", targetKey);
CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder()
.bucket(targetBucket)
.contentType(contentType)
.contentDisposition(contentDisposition)
.key(targetKey)
.build();
log.info("Creating multipart upload for key: {}", targetKey);
uploadId = s3AsyncClient.createMultipartUpload(createMultipartUploadRequest)
.join()
.uploadId();
state.setUploadId(uploadId);
}
List<CompletedPart> completedParts = new ArrayList<>();
List<String> savedEtags = state.getEtags();
for (int i = 0; i < savedEtags.size(); i++) {
completedParts.add(CompletedPart.builder()
.partNumber(i + 1)
.eTag(savedEtags.get(i))
.build());
}
byte[] buffer = new byte[CHUNK_SIZE];
ByteArrayOutputStream accumulator = new ByteArrayOutputStream();
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) > 0) {
accumulator.write(buffer, 0, bytesRead);
if (accumulator.size() >= CHUNK_SIZE) {
String eTag = uploadPart(s3AsyncClient, targetBucket, targetKey, uploadId, state.getPartNumber(), accumulator.toByteArray());
completedParts.add(CompletedPart.builder()
.partNumber(state.getPartNumber())
.eTag(eTag)
.build());
totalBytes = totalBytes + accumulator.toByteArray().length;
state.setDownloadedBytes(totalBytes);
state.incrementPartNumber();
state.getEtags().add(eTag);
transferArtifactStateRepository.save(state);
accumulator.reset();
}
}
if (accumulator.size() > 0) {
String eTag = uploadPart(s3AsyncClient, targetBucket, targetKey, uploadId, state.getPartNumber(), accumulator.toByteArray());
completedParts.add(CompletedPart.builder()
.partNumber(state.getPartNumber())
.eTag(eTag)
.build());
totalBytes += accumulator.size();
state.setDownloadedBytes(totalBytes);
state.incrementPartNumber();
transferArtifactStateRepository.save(state);
}
CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder()
.parts(completedParts)
.build();
CompleteMultipartUploadRequest completeMultipartUploadRequest =
CompleteMultipartUploadRequest.builder()
.bucket(targetBucket)
.key(targetKey)
.uploadId(uploadId)
.multipartUpload(completedMultipartUpload)
.build();
s3AsyncClient.completeMultipartUpload(completeMultipartUploadRequest).join();
log.info("Upload completed successfully.");
} catch (IOException e) {
log.error("Failed to download stream", e);
throw new DataTransferAPIException(e.getMessage());
} finally {
if (connection != null) {
connection.disconnect();
}
} Updated TransferArtifactState public class TransferArtifactState {
@Id
private String id;
@Setter
private String uploadId;
@Setter
private long downloadedBytes;
@Setter
private long totalBytes;
private int partNumber;
private List<String> etags = new ArrayList<>();
private String presignURL;
private String destBucket;
private String destObject;
@CreatedDate
private Instant issued;
@LastModifiedDate
private Instant modified;
@JsonIgnore
@CreatedBy
private String createdBy;
@JsonIgnore
@LastModifiedBy
private String lastModifiedBy;
@JsonIgnore
@Version
@Field("version")
private Long version; Proposed code works like already present code in S3ClientServiceImpl, it receives inputStream and process it using chunks. Current version stores information of processed bytes, so when it is stopped, it checks if we have started transfer process and if we have, it will send Range request to server, to fetch remaining data. Discussion should be done within team, to investigate possibility how can we wrap up current logic, in scope that it can be suspended/resumed/terminated, similar like Thread run/pause. We should add support, as fallback scenario, if server does not support Range requests, to fetch IS, and skip processed bytes: if (state.getDownloadedBytes() > 0) {
log.info("Skipping {} bytes to resume download", state.getDownloadedBytes());
long bytesToSkip = state.getDownloadedBytes();
long skipped = 0;
// Skip in chunks to handle large offsets
while (skipped < bytesToSkip) {
long skipResult = inputStream.skip(bytesToSkip - skipped);
if (skipResult <= 0) {
throw new IOException("Failed to skip to position " + bytesToSkip);
}
skipped += skipResult;
}
log.info("Successfully skipped to position {}", skipped);
} |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Consumer
To achieve resumable (pause/continue) downloads in Java using HTTP range requests, which are supported by many HTTP servers and endpoints.
Frameworks/Libraries:
Apache HttpClient
Supports HTTP range headers, allowing you to resume downloads by specifying the byte range.
OkHttp
Also supports range requests and is easy to use for resumable downloads.
The downloader runs in a separate thread and supports pause(), resume(), and stop().
Download progress is saved to a state file after each chunk.
On restart, the downloader loads the last saved progress and resumes.
The state file persists the URL, file path, and downloaded bytes.
Usage:
Provider side:
To support HTTP range requests for large file downloads, extend your controller and service to:
Improvements:
Controller that serves resumable file
Add @RequestHeader(value = "Range", required = false) to your method and pass it to the service.
Service offering static/local file
Beta Was this translation helpful? Give feedback.
All reactions