@@ -13,19 +13,38 @@ import com.scalableminds.webknossos.datastore.dataformats.{MagLocator, MappingPr
1313import  com .scalableminds .webknossos .datastore .helpers .{DatasetDeleter , IntervalScheduler }
1414import  com .scalableminds .webknossos .datastore .models .datasource ._ 
1515import  com .scalableminds .webknossos .datastore .models .datasource .inbox .{InboxDataSource , UnusableDataSource }
16- import  com .scalableminds .webknossos .datastore .storage .{DataVaultService , RemoteSourceDescriptorService }
16+ import  com .scalableminds .webknossos .datastore .storage .{
17+   CredentialConfigReader ,
18+   DataVaultService ,
19+   RemoteSourceDescriptorService ,
20+   S3AccessKeyCredential 
21+ }
1722import  com .typesafe .scalalogging .LazyLogging 
1823import  com .scalableminds .util .tools .Box .tryo 
1924import  com .scalableminds .util .tools ._ 
25+ import  com .scalableminds .webknossos .datastore .datavault .S3DataVault 
2026import  play .api .inject .ApplicationLifecycle 
2127import  play .api .libs .json .Json 
28+ import  software .amazon .awssdk .auth .credentials .{AwsBasicCredentials , StaticCredentialsProvider }
29+ import  software .amazon .awssdk .core .checksums .RequestChecksumCalculation 
30+ import  software .amazon .awssdk .regions .Region 
31+ import  software .amazon .awssdk .services .s3 .S3AsyncClient 
32+ import  software .amazon .awssdk .services .s3 .model .{
33+   Delete ,
34+   DeleteObjectsRequest ,
35+   DeleteObjectsResponse ,
36+   ListObjectsV2Request ,
37+   ObjectIdentifier 
38+ }
2239
2340import  java .io .{File , FileWriter }
2441import  java .net .URI 
2542import  java .nio .file .{Files , Path }
2643import  scala .concurrent .ExecutionContext 
2744import  scala .concurrent .duration ._ 
2845import  scala .io .Source 
46+ import  scala .jdk .CollectionConverters ._ 
47+ import  scala .jdk .FutureConverters ._ 
2948
3049class  DataSourceService  @ Inject ()(
3150    config : DataStoreConfig ,
@@ -446,4 +465,115 @@ class DataSourceService @Inject()(
446465          remoteSourceDescriptorService.removeVaultFromCache(attachment)))
447466      } yield  dataLayer.mags.length
448467    } yield  removedEntriesList.sum
468+ 
469+   private  lazy  val  globalCredentials  =  {
470+     val  res  =  config.Datastore .DataVaults .credentials.flatMap { credentialConfig => 
471+       new  CredentialConfigReader (credentialConfig).getCredential
472+     }
473+     logger.info(s " Parsed  ${res.length} global data vault credentials from datastore config. " )
474+     res
475+   }
476+ 
477+   def  datasetInControlledS3 (dataSource : DataSource ) =  {
478+     def  commonPrefix (strings : Seq [String ]):  String  =  {
479+       if  (strings.isEmpty) return  " " 
480+ 
481+       strings.reduce { (a, b) => 
482+         a.zip(b).takeWhile { case  (c1, c2) =>  c1 ==  c2 }.map(_._1).mkString
483+       }
484+     }
485+ 
486+     val  allPaths  =  dataSource.allExplicitPaths
487+     val  sharedPath  =  commonPrefix(allPaths)
488+     val  matchingCredentials  =  globalCredentials.filter(c =>  sharedPath.startsWith(c.name))
489+     matchingCredentials.nonEmpty &&  sharedPath.startsWith(" s3" 
490+   }
491+ 
492+   private  lazy  val  s3UploadCredentialsOpt :  Option [(String , String )] = 
493+     config.Datastore .DataVaults .credentials.flatMap { credentialConfig => 
494+       new  CredentialConfigReader (credentialConfig).getCredential
495+     }.collectFirst {
496+       case  S3AccessKeyCredential (credentialName, accessKeyId, secretAccessKey, _, _)
497+           if  config.Datastore .S3Upload .credentialName ==  credentialName => 
498+         (accessKeyId, secretAccessKey)
499+     }
500+   private  lazy  val  s3Client :  S3AsyncClient  =  S3AsyncClient 
501+     .builder()
502+     .credentialsProvider(
503+       StaticCredentialsProvider .create(
504+         AwsBasicCredentials .builder
505+           .accessKeyId(s3UploadCredentialsOpt.getOrElse((" " " " 
506+           .secretAccessKey(s3UploadCredentialsOpt.getOrElse((" " " " 
507+           .build()
508+       ))
509+     .crossRegionAccessEnabled(true )
510+     .forcePathStyle(true )
511+     .endpointOverride(new  URI (config.Datastore .S3Upload .endpoint))
512+     .region(Region .US_EAST_1 )
513+     //  Disabling checksum calculation prevents files being stored with Content Encoding "aws-chunked".
514+     .requestChecksumCalculation(RequestChecksumCalculation .WHEN_REQUIRED )
515+     .build()
516+ 
517+   def  deleteFromControlledS3 (dataSource : DataSource ):  Fox [Unit ] =  {
518+     //  TODO: Do we handle other datasets using the same layers?
519+ 
520+     def  deleteBatch (bucket : String , keys : Seq [String ]):  Fox [DeleteObjectsResponse ] = 
521+       if  (keys.isEmpty) Fox .empty
522+       else  {
523+         Fox .fromFuture(
524+           s3Client
525+             .deleteObjects(
526+               DeleteObjectsRequest 
527+                 .builder()
528+                 .bucket(bucket)
529+                 .delete(
530+                   Delete 
531+                     .builder()
532+                     .objects(
533+                       keys.map(k =>  ObjectIdentifier .builder().key(k).build()).asJava
534+                     )
535+                     .build()
536+                 )
537+                 .build()
538+             )
539+             .asScala)
540+       }
541+ 
542+     def  listKeysAtPrefix (bucket : String , prefix : String ):  Fox [Seq [String ]] =  {
543+       def  listRec (continuationToken : Option [String ], acc : Seq [String ]):  Fox [Seq [String ]] =  {
544+         val  builder  =  ListObjectsV2Request .builder().bucket(bucket).prefix(prefix).maxKeys(1000 )
545+         val  request  =  continuationToken match  {
546+           case  Some (token) =>  builder.continuationToken(token).build()
547+           case  None         =>  builder.build()
548+         }
549+         for  {
550+           response <-  Fox .fromFuture(s3Client.listObjectsV2(request).asScala)
551+           keys =  response.contents().asScala.map(_.key())
552+           allKeys =  acc ++  keys
553+           result <-  if  (response.isTruncated) {
554+             listRec(Option (response.nextContinuationToken()), allKeys)
555+           } else  {
556+             Fox .successful(allKeys)
557+           }
558+         } yield  result
559+       }
560+       listRec(None , Seq ())
561+     }
562+ 
563+     for  {
564+       _ <-  Fox .successful(())
565+       paths =  dataSource.allExplicitPaths
566+       //  Assume everything is in the same bucket
567+       firstPath <-  paths.headOption.toFox ?~>  " No explicit paths found for dataset in controlled S3" 
568+       bucket <-  S3DataVault 
569+         .hostBucketFromUri(new  URI (firstPath))
570+         .toFox ?~>  s " Could not determine S3 bucket from path  $firstPath" 
571+       prefixes <-  Fox .combined(paths.map(path =>  S3DataVault .objectKeyFromUri(new  URI (path)).toFox))
572+       keys : Seq [String ] <-  Fox .serialCombined(prefixes)(listKeysAtPrefix(bucket, _)).map(_.flatten)
573+       uniqueKeys =  keys.distinct
574+       _ =  logger.info(
575+         s " Deleting  ${uniqueKeys.length} objects from controlled S3 bucket  $bucket for dataset  ${dataSource.id}" )
576+       _ <-  Fox .serialCombined(uniqueKeys.grouped(1000 ).toSeq)(deleteBatch(bucket, _)).map(_ =>  ())
577+     } yield  ()
578+   }
449579}
0 commit comments