Asynchronous Download API endpoint #153
IgorBalog-Eng
started this conversation in
Ideas
Replies: 2 comments
-
Starting point implementation for tracking transfers and logic for terminating and bucket cleanup in case of interruption/termination DataTransferAPIService public JsonNode terminateTransfer(String transferProcessId) {
TransferProcess transferProcess = findTransferProcessById(transferProcessId);
// First terminate any active transfer
transferStatusTracker.terminateTransfer(transferProcessId);
... remaining logic Consider refactoring DataTransferAPIService since there are a lot constructor parameters for initialization (if applicable to refactor) |
Beta Was this translation helpful? Give feedback.
0 replies
-
Improvement that includes using async calls in different way: public interface DataTransferStrategy {
CompletableFuture<Void> transfer(TransferProcess transferProcess);
} strategy @Service
@Slf4j
public class HttpPullTransferStrategy implements DataTransferStrategy {
@Override
public CompletableFuture<Void> transfer(TransferProcess transferProcess) {
log.info("Executing HTTP PULL transfer for process {}", transferProcess.getId());
String authorization = extractAuthorization(transferProcess);
return downloadAndUploadToS3(
transferProcess.getDataAddress().getEndpoint(),
authorization,
transferProcess.getId()
).thenAccept(key ->
log.info("Stored transfer process id - {} data!", key));
}
// Rest of the class remains the same
} service public CompletableFuture<Void> downloadData(String transferProcessId) {
TransferProcess transferProcess = findTransferProcessById(transferProcessId);
if (!transferProcess.getState().equals(TransferState.STARTED)) {
log.error("Download aborted, Transfer Process is not in STARTED state");
return CompletableFuture.failedFuture(
new DataTransferAPIException("Download aborted, Transfer Process is not in STARTED state"));
}
policyCheck(transferProcess);
log.info("Starting download transfer process id - {} data...", transferProcessId);
// Get appropriate strategy and execute transfer
DataTransferStrategy strategy = dataTransferStrategyFactory.getStrategy(transferProcess.getFormat());
return strategy.transfer(transferProcess)
.thenAccept(unused -> {
TransferProcess transferProcessWithData = TransferProcess.Builder.newInstance()
.id(transferProcess.getId())
.agreementId(transferProcess.getAgreementId())
.consumerPid(transferProcess.getConsumerPid())
.providerPid(transferProcess.getProviderPid())
.callbackAddress(transferProcess.getCallbackAddress())
.dataAddress(transferProcess.getDataAddress())
.isDownloaded(true)
.dataId(transferProcessId)
.format(transferProcess.getFormat())
.state(transferProcess.getState())
.role(transferProcess.getRole())
.datasetId(transferProcess.getDatasetId())
.created(transferProcess.getCreated())
.createdBy(transferProcess.getCreatedBy())
.modified(transferProcess.getModified())
.lastModifiedBy(transferProcess.getLastModifiedBy())
.version(transferProcess.getVersion())
.build();
transferProcessRepository.save(transferProcessWithData);
log.info("Transfer process {} updated with downloaded status", transferProcessId);
});
} controller @GetMapping(path = { "/{transferProcessId}/download" })
public ResponseEntity<GenericApiResponse<String>> downloadData(
@PathVariable String transferProcessId) {
log.info("Initiating async download for transfer process id - {}", transferProcessId);
apiService.downloadData(transferProcessId)
.whenComplete((result, throwable) -> {
if (throwable != null) {
log.error("Download failed for process {}: {}",
transferProcessId, throwable.getMessage(), throwable);
} else {
log.info("Download completed successfully for process {}", transferProcessId);
}
});
return ResponseEntity.accepted()
.contentType(MediaType.APPLICATION_JSON)
.body(GenericApiResponse.success(
"Download started for transfer process " + transferProcessId,
"Request accepted"));
} |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
Current implementation of API download endpoint is synchronous, meaning that if it is downloading large file, which takes several minutes to transfer, it will block response from controller.
We should improve this logic and return response OK, transfer started (with maybe some additional information) and unblock that servlet.
This will also require changes on UI, since it is dependent and relying on transfer to complete (response from controller) to handle logic for displaying buttons.
Proposition:
Phase 1:
Phase 2:
Once async download is completed, we should handle logic to terminate transfer, which will affect stopping current data transfer and perform cleanup of bucket (already imported parts)
Investigate if it is possible to initiate TERMINATE from PROVIDER side, since COSNUMER is the one that handles data transfer (download) using presignedURL. Should be something like
Beta Was this translation helpful? Give feedback.
All reactions