@@ -34,23 +34,13 @@ import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCrede
3434import software .amazon .awssdk .core .checksums .RequestChecksumCalculation
3535import software .amazon .awssdk .regions .Region
3636import software .amazon .awssdk .services .s3 .S3AsyncClient
37- import software .amazon .awssdk .services .s3 .model .{
38- CompleteMultipartUploadRequest ,
39- CompletedMultipartUpload ,
40- CompletedPart ,
41- CreateMultipartUploadRequest ,
42- UploadPartRequest
43- }
4437import software .amazon .awssdk .transfer .s3 .S3TransferManager
4538import software .amazon .awssdk .transfer .s3 .model .UploadDirectoryRequest
4639
4740import java .io .{File , RandomAccessFile }
4841import java .net .URI
49- import java .util
5042import java .nio .file .{Files , Path }
51- import java .util .stream .{Collectors , StreamSupport }
5243import scala .concurrent .{ExecutionContext , Future }
53- import scala .jdk .CollectionConverters .IterableHasAsJava
5444import scala .jdk .FutureConverters ._
5545
5646case class ReserveUploadInformation (
@@ -175,10 +165,6 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
175165 s " upload___ ${uploadId}___datasetId "
176166 private def redisKeyForFilePaths (uploadId : String ): String =
177167 s " upload___ ${uploadId}___filePaths "
178- private def redisKeyForS3MultipartUploadId (uploadId : String , fileName : String ): String =
179- s " upload___ ${uploadId}___file___ ${fileName}___s3MultipartUploadId "
180- private def redisKeyForS3PartETag (uploadId : String , fileName : String , partNumber : Long ): String =
181- s " upload___ ${uploadId}___file___ ${fileName}___partETag___ $partNumber"
182168
183169 cleanUpOrphanUploads()
184170
@@ -349,169 +335,6 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
349335 } else Fox .successful(())
350336 }
351337
352- private lazy val s3UploadCredentialsOpt : Option [(String , String )] =
353- dataStoreConfig.Datastore .DataVaults .credentials.flatMap { credentialConfig =>
354- new CredentialConfigReader (credentialConfig).getCredential
355- }.collectFirst {
356- case S3AccessKeyCredential (credentialName, accessKeyId, secretAccessKey, _, _)
357- if dataStoreConfig.Datastore .S3Upload .credentialName == credentialName =>
358- (accessKeyId, secretAccessKey)
359- }
360- private lazy val s3Client : S3AsyncClient = S3AsyncClient
361- .builder()
362- .credentialsProvider(
363- StaticCredentialsProvider .create(
364- AwsBasicCredentials .builder
365- .accessKeyId(s3UploadCredentialsOpt.getOrElse((" " , " " ))._1)
366- .secretAccessKey(s3UploadCredentialsOpt.getOrElse((" " , " " ))._2)
367- .build()
368- ))
369- .crossRegionAccessEnabled(true )
370- .forcePathStyle(true )
371- .endpointOverride(new URI (dataStoreConfig.Datastore .S3Upload .endpoint))
372- .region(Region .US_EAST_1 )
373- // Disabling checksum calculation prevents files being stored with Content Encoding "aws-chunked".
374- .requestChecksumCalculation(RequestChecksumCalculation .WHEN_REQUIRED )
375- .build()
376-
377- def handleUploadChunkAws (
378- uploadFileId : String ,
379- chunkSize : Long ,
380- currentChunkSize : Long ,
381- totalChunkCount : Long ,
382- currentChunkNumber : Long ,
383- chunkFile : File ,
384- bucketName : String ,
385- objectKey : String
386- ): Fox [Unit ] = {
387- val uploadId = extractDatasetUploadId(uploadFileId)
388-
389- def getAllPartETags (uploadId : String , filePath : String , totalChunkCount : Long ): Fox [Vector [(Int , String )]] =
390- for {
391- possibleEtags <- Fox .combined(
392- (1L to totalChunkCount).map(i =>
393- runningUploadMetadataStore.find(redisKeyForS3PartETag(uploadId, filePath, i)))
394- )
395- etagsWithIndex = possibleEtags.zipWithIndex
396- foundEtags = etagsWithIndex.collect {
397- case (Some (etag), idx) => (idx + 1 , etag) // partNumber starts at 1
398- }
399- } yield foundEtags.toVector
400-
401- for {
402- dataSourceId <- getDataSourceIdByUploadId(uploadId)
403- (filePath, uploadDir) <- getFilePathAndDirOfUploadId(uploadFileId)
404-
405- isFileKnown <- runningUploadMetadataStore.contains(redisKeyForFileChunkCount(uploadId, filePath))
406- totalFileSizeInBytesOpt <- runningUploadMetadataStore.findLong(redisKeyForTotalFileSizeInBytes(uploadId))
407-
408- _ <- Fox .runOptional(totalFileSizeInBytesOpt) { maxFileSize =>
409- runningUploadMetadataStore
410- .increaseBy(redisKeyForCurrentUploadedTotalFileSizeInBytes(uploadId), currentChunkSize)
411- .flatMap(newTotalFileSizeInBytesOpt => {
412- if (newTotalFileSizeInBytesOpt.getOrElse(0L ) > maxFileSize) {
413- cleanUpDatasetExceedingSize(uploadDir, uploadId).flatMap(_ =>
414- Fox .failure(" dataset.upload.moreBytesThanReserved" ))
415- } else Fox .successful(())
416- })
417- }
418-
419- // Initialize multipart upload on first chunk
420- _ <- Fox .runIf(! isFileKnown) {
421- for {
422- _ <- runningUploadMetadataStore.insertIntoSet(redisKeyForFileNameSet(uploadId), filePath)
423- _ <- runningUploadMetadataStore.insert(
424- redisKeyForFileChunkCount(uploadId, filePath),
425- String .valueOf(totalChunkCount)
426- )
427- // Start multipart upload
428- createResp <- Fox .fromFuture {
429- s3Client
430- .createMultipartUpload(
431- CreateMultipartUploadRequest .builder().bucket(bucketName).key(objectKey).build()
432- )
433- .asScala
434- }
435- _ <- runningUploadMetadataStore.insert(
436- redisKeyForS3MultipartUploadId(uploadId, filePath),
437- createResp.uploadId()
438- )
439- } yield ()
440- }
441-
442- isNewChunk <- runningUploadMetadataStore.insertIntoSet(
443- redisKeyForFileChunkSet(uploadId, filePath),
444- String .valueOf(currentChunkNumber)
445- )
446-
447- } yield {
448- if (isNewChunk) {
449- try {
450- val bytes = Files .readAllBytes(chunkFile.toPath)
451- for {
452- s3UploadIdOpt <- runningUploadMetadataStore.find(redisKeyForS3MultipartUploadId(uploadId, filePath))
453- s3UploadId <- s3UploadIdOpt.toFox ?~> s " No multipart uploadId found for $filePath"
454-
455- // Upload part to S3
456- uploadResp <- Fox .fromFuture {
457- s3Client
458- .uploadPart(
459- UploadPartRequest
460- .builder()
461- .bucket(bucketName)
462- .key(objectKey)
463- .uploadId(s3UploadId)
464- .partNumber(currentChunkNumber.toInt)
465- .contentLength(currentChunkSize)
466- .build(),
467- software.amazon.awssdk.core.async.AsyncRequestBody .fromBytes(bytes)
468- )
469- .asScala
470- }
471-
472- // Store ETag for later completion
473- _ <- runningUploadMetadataStore.insert(
474- redisKeyForS3PartETag(uploadId, filePath, currentChunkNumber),
475- uploadResp.eTag()
476- )
477-
478- // Complete multipart upload if all chunks uploaded
479- _ <- Fox .runIf(currentChunkNumber == totalChunkCount) {
480- for {
481- eTags <- getAllPartETags(uploadId, filePath, totalChunkCount)
482- completedParts : util.List [CompletedPart ] = StreamSupport
483- .stream(eTags.map {
484- case (partNum, etag) =>
485- CompletedPart .builder().partNumber(partNum).eTag(etag).build()
486- }.asJava.spliterator(), false )
487- .collect(Collectors .toList())
488- completeReq = CompleteMultipartUploadRequest
489- .builder()
490- .bucket(bucketName)
491- .key(objectKey)
492- .uploadId(s3UploadId)
493- .multipartUpload(
494- CompletedMultipartUpload .builder().parts(completedParts).build()
495- )
496- .build()
497- _ <- Fox .fromFuture(s3Client.completeMultipartUpload(completeReq).asScala)
498- } yield ()
499- }
500- } yield Fox .successful(())
501-
502- } catch {
503- case e : Exception =>
504- runningUploadMetadataStore.removeFromSet(redisKeyForFileChunkSet(uploadId, filePath),
505- String .valueOf(currentChunkNumber))
506- val errorMsg =
507- s " Error receiving chunk $currentChunkNumber for upload ${dataSourceId.directoryName}: ${e.getMessage}"
508- logger.warn(errorMsg)
509- Fox .failure(errorMsg)
510- }
511- } else Fox .successful(())
512- }
513- }
514-
515338 def cancelUpload (cancelUploadInformation : CancelUploadInformation ): Fox [Unit ] = {
516339 val uploadId = cancelUploadInformation.uploadId
517340 for {
@@ -567,7 +390,6 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
567390 unpackToDir,
568391 dataSourceId,
569392 linkedLayerInfo.layersToLink).shiftBox
570- // Post-processing needs to be handled differently for s3 uploads?
571393 _ <- cleanUpOnFailure(postProcessingResult,
572394 dataSourceId,
573395 datasetNeedsConversion,
@@ -580,7 +402,7 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
580402 s3ObjectKey = s " ${dataStoreConfig.Datastore .S3Upload .objectKeyPrefix}/ $uploadId/ "
581403 _ <- uploadDirectoryToS3(unpackToDir, dataStoreConfig.Datastore .S3Upload .bucketName, s3ObjectKey)
582404 endPointHost = new URI (dataStoreConfig.Datastore .S3Upload .endpoint).getHost
583- s3DataSource <- dataSourceService.replacePaths (
405+ s3DataSource <- dataSourceService.prependAllPaths (
584406 dataSource,
585407 newBasePath = s " s3:// $endPointHost/ ${dataStoreConfig.Datastore .S3Upload .bucketName}/ $s3ObjectKey" )
586408 _ <- remoteWebknossosClient.updateDataSource(s3DataSource, datasetId, allowNewPaths = true )
@@ -670,6 +492,31 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
670492 exploreLocalLayerService.writeLocalDatasourceProperties(dataSource, path))
671493 } yield path
672494
495+ private lazy val s3UploadCredentialsOpt : Option [(String , String )] =
496+ dataStoreConfig.Datastore .DataVaults .credentials.flatMap { credentialConfig =>
497+ new CredentialConfigReader (credentialConfig).getCredential
498+ }.collectFirst {
499+ case S3AccessKeyCredential (credentialName, accessKeyId, secretAccessKey, _, _)
500+ if dataStoreConfig.Datastore .S3Upload .credentialName == credentialName =>
501+ (accessKeyId, secretAccessKey)
502+ }
503+ private lazy val s3Client : S3AsyncClient = S3AsyncClient
504+ .builder()
505+ .credentialsProvider(
506+ StaticCredentialsProvider .create(
507+ AwsBasicCredentials .builder
508+ .accessKeyId(s3UploadCredentialsOpt.getOrElse((" " , " " ))._1)
509+ .secretAccessKey(s3UploadCredentialsOpt.getOrElse((" " , " " ))._2)
510+ .build()
511+ ))
512+ .crossRegionAccessEnabled(true )
513+ .forcePathStyle(true )
514+ .endpointOverride(new URI (dataStoreConfig.Datastore .S3Upload .endpoint))
515+ .region(Region .US_EAST_1 )
516+ // Disabling checksum calculation prevents files being stored with Content Encoding "aws-chunked".
517+ .requestChecksumCalculation(RequestChecksumCalculation .WHEN_REQUIRED )
518+ .build()
519+
673520 private lazy val transferManager = S3TransferManager .builder().s3Client(s3Client).build()
674521
675522 private def uploadDirectoryToS3 (
@@ -967,7 +814,6 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
967814 dataSourceId <- getDataSourceIdByUploadId(uploadId)
968815 _ <- runningUploadMetadataStore.remove(redisKeyForDataSourceId(uploadId))
969816 _ <- runningUploadMetadataStore.remove(redisKeyForDatasetId(uploadId))
970- // TODO: Remove S3 multipart upload if present
971817 _ <- runningUploadMetadataStore.remove(redisKeyForLinkedLayerIdentifier(uploadId))
972818 _ <- runningUploadMetadataStore.remove(redisKeyForUploadId(dataSourceId))
973819 _ <- runningUploadMetadataStore.remove(redisKeyForFilePaths(uploadId))
0 commit comments