@@ -5,6 +5,7 @@ import com.scalableminds.util.accesscontext.TokenContext
55import com .scalableminds .util .io .PathUtils .ensureDirectoryBox
66import com .scalableminds .util .io .{PathUtils , ZipIO }
77import com .scalableminds .util .objectid .ObjectId
8+ import com .scalableminds .util .time .Instant
89import com .scalableminds .util .tools .Box .tryo
910import com .scalableminds .util .tools ._
1011import com .scalableminds .webknossos .datastore .DataStoreConfig
@@ -15,6 +16,7 @@ import com.scalableminds.webknossos.datastore.datareaders.n5.{N5Header, N5Metada
1516import com .scalableminds .webknossos .datastore .datareaders .precomputed .PrecomputedHeader .FILENAME_INFO
1617import com .scalableminds .webknossos .datastore .datareaders .zarr .NgffMetadata .FILENAME_DOT_ZATTRS
1718import com .scalableminds .webknossos .datastore .datareaders .zarr .ZarrHeader .FILENAME_DOT_ZARRAY
19+ import com .scalableminds .webknossos .datastore .datavault .S3DataVault
1820import com .scalableminds .webknossos .datastore .explore .ExploreLocalLayerService
1921import com .scalableminds .webknossos .datastore .helpers .{DatasetDeleter , DirectoryConstants }
2022import com .scalableminds .webknossos .datastore .models .UnfinishedUpload
@@ -396,19 +398,20 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
396398 datasetNeedsConversion,
397399 label = s " processing dataset at $unpackToDir" )
398400 dataSource = dataSourceService.dataSourceFromDir(unpackToDir, dataSourceId.organizationId)
399- datasetSizeBytes : Long <- if (uploadToS3) {
401+ datasetSizeBytes : Long <- if (uploadToS3 && ! datasetNeedsConversion ) {
400402 for {
401403 _ <- Fox .successful(())
402- _ = logger.info(
403- s " Starting upload of dataset ${dataSourceId.organizationId} / ${dataSourceId.directoryName} to S3. " )
404+ beforeS3Upload = Instant .now
405+ s3UploadBucket <- s3UploadBucketOpt.toFox
404406 s3ObjectKey = s " ${dataStoreConfig.Datastore .S3Upload .objectKeyPrefix}/ $uploadId/ "
405- _ <- uploadDirectoryToS3(unpackToDir, dataSource, dataStoreConfig.Datastore .S3Upload .bucketName, s3ObjectKey)
406- _ = logger.info(
407- s " Finished upload of dataset ${dataSourceId.organizationId}/ ${dataSourceId.directoryName} to S3. " )
408- endPointHost = new URI (dataStoreConfig.Datastore .S3Upload .endpoint).getHost
409- s3DataSource <- dataSourceService.prependAllPaths(
410- dataSource,
411- newBasePath = s " s3:// $endPointHost/ ${dataStoreConfig.Datastore .S3Upload .bucketName}/ $s3ObjectKey" )
407+ _ <- uploadDirectoryToS3(unpackToDir, dataSource, s3UploadBucket, s3ObjectKey)
408+ _ = Instant .logSince(beforeS3Upload,
409+ s " Upload of dataset ${dataSourceId.organizationId}/ ${dataSourceId.directoryName} to S3 " ,
410+ logger)
411+ endPointHost = new URI (dataStoreConfig.Datastore .S3Upload .credentialName).getHost
412+ s3DataSource <- dataSourceService.prependAllPaths(dataSource,
413+ newBasePath =
414+ s " s3:// $endPointHost/ $s3UploadBucket/ $s3ObjectKey" )
412415 _ <- remoteWebknossosClient.updateDataSource(s3DataSource, datasetId, allowNewPaths = true )
413416 datasetSize <- tryo(FileUtils .sizeOfDirectoryAsBigInteger(new File (unpackToDir.toString)).longValue).toFox
414417 _ = this .synchronized {
@@ -504,24 +507,41 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
504507 if dataStoreConfig.Datastore .S3Upload .credentialName == credentialName =>
505508 (accessKeyId, secretAccessKey)
506509 }
507- private lazy val s3Client : S3AsyncClient = S3AsyncClient
508- .builder()
509- .credentialsProvider(
510- StaticCredentialsProvider .create(
511- AwsBasicCredentials .builder
512- .accessKeyId(s3UploadCredentialsOpt.getOrElse((" " , " " ))._1)
513- .secretAccessKey(s3UploadCredentialsOpt.getOrElse((" " , " " ))._2)
514- .build()
515- ))
516- .crossRegionAccessEnabled(true )
517- .forcePathStyle(true )
518- .endpointOverride(new URI (dataStoreConfig.Datastore .S3Upload .endpoint))
519- .region(Region .US_EAST_1 )
520- // Disabling checksum calculation prevents files being stored with Content Encoding "aws-chunked".
521- .requestChecksumCalculation(RequestChecksumCalculation .WHEN_REQUIRED )
522- .build()
523-
524- private lazy val transferManager = S3TransferManager .builder().s3Client(s3Client).build()
510+ private lazy val s3UploadBucketOpt : Option [String ] =
511+ S3DataVault .hostBucketFromUri(new URI (dataStoreConfig.Datastore .S3Upload .credentialName))
512+ private lazy val s3UploadEndpoint : URI = {
513+ val credentialUri = new URI (dataStoreConfig.Datastore .S3Upload .credentialName)
514+ new URI (
515+ " https" ,
516+ null ,
517+ credentialUri.getHost,
518+ - 1 ,
519+ null ,
520+ null ,
521+ null
522+ )
523+ }
524+ private lazy val s3ClientBox : Box [S3AsyncClient ] = for {
525+ accessKeyId <- Box (s3UploadCredentialsOpt.map(_._1))
526+ secretAccessKey <- Box (s3UploadCredentialsOpt.map(_._2))
527+ } yield
528+ S3AsyncClient
529+ .builder()
530+ .credentialsProvider(
531+ StaticCredentialsProvider .create(
532+ AwsBasicCredentials .builder.accessKeyId(accessKeyId).secretAccessKey(secretAccessKey).build()
533+ ))
534+ .crossRegionAccessEnabled(true )
535+ .forcePathStyle(true )
536+ .endpointOverride(s3UploadEndpoint)
537+ .region(Region .US_EAST_1 )
538+ // Disabling checksum calculation prevents files being stored with Content Encoding "aws-chunked".
539+ .requestChecksumCalculation(RequestChecksumCalculation .WHEN_REQUIRED )
540+ .build()
541+
542+ private lazy val transferManagerBox : Box [S3TransferManager ] = for {
543+ client <- s3ClientBox
544+ } yield S3TransferManager .builder().s3Client(client).build()
525545
526546 private def uploadDirectoryToS3 (
527547 dataDir : Path ,
@@ -542,6 +562,7 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
542562 logger.warn(s " Could not delete file $file before upload to S3: ${e.getMessage}" )
543563 }
544564 })
565+ transferManager <- transferManagerBox.toFox ?~> " S3 upload is not properly configured, cannot get S3 client"
545566 directoryUpload = transferManager.uploadDirectory(
546567 UploadDirectoryRequest .builder().bucket(bucketName).s3Prefix(prefix).source(dataDir).build()
547568 )
@@ -620,15 +641,16 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
620641 datasetNeedsConversion : Boolean ,
621642 uploadToS3 : Boolean ): Path = {
622643 val dataSourceDir = {
623- if (uploadToS3 )
624- s3UploadDirectory (dataSourceId.organizationId, dataSourceId.directoryName)
644+ if (datasetNeedsConversion )
645+ dataBaseDir.resolve (dataSourceId.organizationId).resolve(forConversionDir).resolve( dataSourceId.directoryName)
625646 else {
626- if (datasetNeedsConversion )
627- dataBaseDir.resolve (dataSourceId.organizationId).resolve(forConversionDir).resolve( dataSourceId.directoryName)
647+ if (uploadToS3 )
648+ s3UploadDirectory (dataSourceId.organizationId, dataSourceId.directoryName)
628649 else
629650 dataBaseDir.resolve(dataSourceId.organizationId).resolve(dataSourceId.directoryName)
630651 }
631652 }
653+
632654 dataSourceDir
633655 }
634656
0 commit comments