@@ -13,19 +13,38 @@ import com.scalableminds.webknossos.datastore.dataformats.{MagLocator, MappingPr
1313import com .scalableminds .webknossos .datastore .helpers .{DatasetDeleter , IntervalScheduler }
1414import com .scalableminds .webknossos .datastore .models .datasource ._
1515import com .scalableminds .webknossos .datastore .models .datasource .inbox .{InboxDataSource , UnusableDataSource }
16- import com .scalableminds .webknossos .datastore .storage .{DataVaultService , RemoteSourceDescriptorService }
16+ import com .scalableminds .webknossos .datastore .storage .{
17+ CredentialConfigReader ,
18+ DataVaultService ,
19+ RemoteSourceDescriptorService ,
20+ S3AccessKeyCredential
21+ }
1722import com .typesafe .scalalogging .LazyLogging
1823import com .scalableminds .util .tools .Box .tryo
1924import com .scalableminds .util .tools ._
25+ import com .scalableminds .webknossos .datastore .datavault .S3DataVault
2026import play .api .inject .ApplicationLifecycle
2127import play .api .libs .json .Json
28+ import software .amazon .awssdk .auth .credentials .{AwsBasicCredentials , StaticCredentialsProvider }
29+ import software .amazon .awssdk .core .checksums .RequestChecksumCalculation
30+ import software .amazon .awssdk .regions .Region
31+ import software .amazon .awssdk .services .s3 .S3AsyncClient
32+ import software .amazon .awssdk .services .s3 .model .{
33+ Delete ,
34+ DeleteObjectsRequest ,
35+ DeleteObjectsResponse ,
36+ ListObjectsV2Request ,
37+ ObjectIdentifier
38+ }
2239
2340import java .io .{File , FileWriter }
2441import java .net .URI
2542import java .nio .file .{Files , Path }
2643import scala .concurrent .ExecutionContext
2744import scala .concurrent .duration ._
2845import scala .io .Source
46+ import scala .jdk .CollectionConverters ._
47+ import scala .jdk .FutureConverters ._
2948
3049class DataSourceService @ Inject ()(
3150 config : DataStoreConfig ,
@@ -446,4 +465,115 @@ class DataSourceService @Inject()(
446465 remoteSourceDescriptorService.removeVaultFromCache(attachment)))
447466 } yield dataLayer.mags.length
448467 } yield removedEntriesList.sum
468+
469+ private lazy val globalCredentials = {
470+ val res = config.Datastore .DataVaults .credentials.flatMap { credentialConfig =>
471+ new CredentialConfigReader (credentialConfig).getCredential
472+ }
473+ logger.info(s " Parsed ${res.length} global data vault credentials from datastore config. " )
474+ res
475+ }
476+
477+ def datasetInControlledS3 (dataSource : DataSource ) = {
478+ def commonPrefix (strings : Seq [String ]): String = {
479+ if (strings.isEmpty) return " "
480+
481+ strings.reduce { (a, b) =>
482+ a.zip(b).takeWhile { case (c1, c2) => c1 == c2 }.map(_._1).mkString
483+ }
484+ }
485+
486+ val allPaths = dataSource.allExplicitPaths
487+ val sharedPath = commonPrefix(allPaths)
488+ val matchingCredentials = globalCredentials.filter(c => sharedPath.startsWith(c.name))
489+ matchingCredentials.nonEmpty && sharedPath.startsWith(" s3" )
490+ }
491+
492+ private lazy val s3UploadCredentialsOpt : Option [(String , String )] =
493+ config.Datastore .DataVaults .credentials.flatMap { credentialConfig =>
494+ new CredentialConfigReader (credentialConfig).getCredential
495+ }.collectFirst {
496+ case S3AccessKeyCredential (credentialName, accessKeyId, secretAccessKey, _, _)
497+ if config.Datastore .S3Upload .credentialName == credentialName =>
498+ (accessKeyId, secretAccessKey)
499+ }
500+ private lazy val s3Client : S3AsyncClient = S3AsyncClient
501+ .builder()
502+ .credentialsProvider(
503+ StaticCredentialsProvider .create(
504+ AwsBasicCredentials .builder
505+ .accessKeyId(s3UploadCredentialsOpt.getOrElse((" " , " " ))._1)
506+ .secretAccessKey(s3UploadCredentialsOpt.getOrElse((" " , " " ))._2)
507+ .build()
508+ ))
509+ .crossRegionAccessEnabled(true )
510+ .forcePathStyle(true )
511+ .endpointOverride(new URI (config.Datastore .S3Upload .endpoint))
512+ .region(Region .US_EAST_1 )
513+ // Disabling checksum calculation prevents files being stored with Content Encoding "aws-chunked".
514+ .requestChecksumCalculation(RequestChecksumCalculation .WHEN_REQUIRED )
515+ .build()
516+
517+ def deleteFromControlledS3 (dataSource : DataSource ): Fox [Unit ] = {
518+ // TODO: Do we handle other datasets using the same layers?
519+
520+ def deleteBatch (bucket : String , keys : Seq [String ]): Fox [DeleteObjectsResponse ] =
521+ if (keys.isEmpty) Fox .empty
522+ else {
523+ Fox .fromFuture(
524+ s3Client
525+ .deleteObjects(
526+ DeleteObjectsRequest
527+ .builder()
528+ .bucket(bucket)
529+ .delete(
530+ Delete
531+ .builder()
532+ .objects(
533+ keys.map(k => ObjectIdentifier .builder().key(k).build()).asJava
534+ )
535+ .build()
536+ )
537+ .build()
538+ )
539+ .asScala)
540+ }
541+
542+ def listKeysAtPrefix (bucket : String , prefix : String ): Fox [Seq [String ]] = {
543+ def listRec (continuationToken : Option [String ], acc : Seq [String ]): Fox [Seq [String ]] = {
544+ val builder = ListObjectsV2Request .builder().bucket(bucket).prefix(prefix).maxKeys(1000 )
545+ val request = continuationToken match {
546+ case Some (token) => builder.continuationToken(token).build()
547+ case None => builder.build()
548+ }
549+ for {
550+ response <- Fox .fromFuture(s3Client.listObjectsV2(request).asScala)
551+ keys = response.contents().asScala.map(_.key())
552+ allKeys = acc ++ keys
553+ result <- if (response.isTruncated) {
554+ listRec(Option (response.nextContinuationToken()), allKeys)
555+ } else {
556+ Fox .successful(allKeys)
557+ }
558+ } yield result
559+ }
560+ listRec(None , Seq ())
561+ }
562+
563+ for {
564+ _ <- Fox .successful(())
565+ paths = dataSource.allExplicitPaths
566+ // Assume everything is in the same bucket
567+ firstPath <- paths.headOption.toFox ?~> " No explicit paths found for dataset in controlled S3"
568+ bucket <- S3DataVault
569+ .hostBucketFromUri(new URI (firstPath))
570+ .toFox ?~> s " Could not determine S3 bucket from path $firstPath"
571+ prefixes <- Fox .combined(paths.map(path => S3DataVault .objectKeyFromUri(new URI (path)).toFox))
572+ keys : Seq [String ] <- Fox .serialCombined(prefixes)(listKeysAtPrefix(bucket, _)).map(_.flatten)
573+ uniqueKeys = keys.distinct
574+ _ = logger.info(
575+ s " Deleting ${uniqueKeys.length} objects from controlled S3 bucket $bucket for dataset ${dataSource.id}" )
576+ _ <- Fox .serialCombined(uniqueKeys.grouped(1000 ).toSeq)(deleteBatch(bucket, _)).map(_ => ())
577+ } yield ()
578+ }
449579}
0 commit comments