diff --git a/app/controllers/DatasetController.scala b/app/controllers/DatasetController.scala index edb062c2086..6e351cce986 100755 --- a/app/controllers/DatasetController.scala +++ b/app/controllers/DatasetController.scala @@ -35,6 +35,7 @@ import play.api.libs.json._ import play.api.mvc.{Action, AnyContent, PlayBodyParsers} import play.silhouette.api.Silhouette import security.{AccessibleBySwitchingService, URLSharing, WkEnv} +import telemetry.SlackNotificationService import utils.{MetadataAssertions, WkConf} import javax.inject.Inject @@ -140,6 +141,7 @@ class DatasetController @Inject()(userService: UserService, thumbnailCachingService: ThumbnailCachingService, usedStorageService: UsedStorageService, conf: WkConf, + slackNotificationService: SlackNotificationService, authenticationService: AccessibleBySwitchingService, analyticsService: AnalyticsService, mailchimpClient: MailchimpClient, @@ -581,15 +583,23 @@ class DatasetController @Inject()(userService: UserService, } } - def deleteOnDisk(datasetId: ObjectId): Action[AnyContent] = + def delete(datasetId: ObjectId): Action[AnyContent] = sil.SecuredAction.async { implicit request => - for { - dataset <- datasetDAO.findOne(datasetId) ?~> notFoundMessage(datasetId.toString) ~> NOT_FOUND - _ <- Fox.fromBool(conf.Features.allowDeleteDatasets) ?~> "dataset.delete.disabled" - _ <- Fox.assertTrue(datasetService.isEditableBy(dataset, Some(request.identity))) ?~> "notAllowed" ~> FORBIDDEN - _ <- Fox.fromBool(request.identity.isAdminOf(dataset._organization)) ~> FORBIDDEN - _ <- datasetService.deleteVirtualOrDiskDataset(dataset) - } yield Ok + log() { + logTime(slackNotificationService.noticeSlowRequest) { + for { + dataset <- datasetDAO.findOne(datasetId) ?~> notFoundMessage(datasetId.toString) ~> NOT_FOUND + _ <- Fox.fromBool(conf.Features.allowDeleteDatasets) ?~> "dataset.delete.disabled" + _ <- Fox.assertTrue(datasetService.isEditableBy(dataset, Some(request.identity))) ?~> "notAllowed" ~> FORBIDDEN + _ <- Fox.fromBool(request.identity.isAdminOf(dataset._organization)) ?~> "delete.mustBeOrganizationAdmin" ~> FORBIDDEN + before = Instant.now + _ = logger.info( + s"Deleting dataset $datasetId (isVirtual=${dataset.isVirtual}) as requested by user ${request.identity._id}...") + _ <- datasetService.deleteDataset(dataset) + _ = Instant.logSince(before, s"Deleting dataset $datasetId") + } yield Ok + } + } } def compose(): Action[ComposeRequest] = diff --git a/app/controllers/WKRemoteDataStoreController.scala b/app/controllers/WKRemoteDataStoreController.scala index 9564ac496c2..37c89d7097d 100644 --- a/app/controllers/WKRemoteDataStoreController.scala +++ b/app/controllers/WKRemoteDataStoreController.scala @@ -3,9 +3,8 @@ package controllers import com.scalableminds.util.accesscontext.{AuthorizedAccessContext, DBAccessContext, GlobalAccessContext} import com.scalableminds.util.objectid.ObjectId import com.scalableminds.util.time.Instant -import com.scalableminds.util.tools.{Fox, Full} +import com.scalableminds.util.tools.Fox import com.scalableminds.webknossos.datastore.controllers.JobExportProperties -import com.scalableminds.webknossos.datastore.helpers.{LayerMagLinkInfo, MagLinkInfo} import com.scalableminds.webknossos.datastore.models.UnfinishedUpload import com.scalableminds.webknossos.datastore.models.datasource.{ DataSource, @@ -20,7 +19,6 @@ import com.scalableminds.webknossos.datastore.services.uploading.{ ReserveUploadInformation } import com.typesafe.scalalogging.LazyLogging -import models.annotation.AnnotationDAO import models.dataset._ import models.dataset.credential.CredentialDAO import models.job.JobDAO @@ -50,7 +48,6 @@ class WKRemoteDataStoreController @Inject()( teamDAO: TeamDAO, jobDAO: JobDAO, credentialDAO: CredentialDAO, - annotationDAO: AnnotationDAO, wkSilhouetteEnvironment: WkSilhouetteEnvironment)(implicit ec: ExecutionContext, bodyParsers: PlayBodyParsers) extends Controller with LazyLogging { @@ -209,17 +206,7 @@ class WKRemoteDataStoreController @Inject()( implicit request => dataStoreService.validateAccess(name, key) { _ => for { - existingDatasetBox <- datasetDAO.findOne(request.body)(GlobalAccessContext).shiftBox - _ <- existingDatasetBox match { - case Full(dataset) => - for { - annotationCount <- annotationDAO.countAllByDataset(dataset._id)(GlobalAccessContext) - _ = datasetDAO - .deleteDataset(dataset._id, onlyMarkAsDeleted = annotationCount > 0) - .flatMap(_ => usedStorageService.refreshStorageReportForDataset(dataset)) - } yield () - case _ => Fox.successful(()) - } + _ <- datasetService.deleteDatasetFromDB(request.body) } yield Ok } } @@ -240,21 +227,6 @@ class WKRemoteDataStoreController @Inject()( } } - def getPaths(name: String, key: String, datasetId: ObjectId): Action[AnyContent] = - Action.async { implicit request => - dataStoreService.validateAccess(name, key) { _ => - for { - dataset <- datasetDAO.findOne(datasetId)(GlobalAccessContext) ?~> Messages("dataset.notFound", datasetId) ~> NOT_FOUND - layers <- datasetLayerDAO.findAllForDataset(dataset._id) - magsAndLinkedMags <- Fox.serialCombined(layers)(l => datasetService.getPathsForDataLayer(dataset._id, l.name)) - magLinkInfos = magsAndLinkedMags.map(_.map { case (mag, linkedMags) => MagLinkInfo(mag, linkedMags) }) - layersAndMagLinkInfos = layers.zip(magLinkInfos).map { - case (layer, magLinkInfo) => LayerMagLinkInfo(layer.name, magLinkInfo) - } - } yield Ok(Json.toJson(layersAndMagLinkInfos)) - } - } - def getDataSource(name: String, key: String, datasetId: ObjectId): Action[AnyContent] = Action.async { implicit request => dataStoreService.validateAccess(name, key) { _ => diff --git a/app/models/dataset/Dataset.scala b/app/models/dataset/Dataset.scala index a10dc03c31a..321cf77d9d1 100755 --- a/app/models/dataset/Dataset.scala +++ b/app/models/dataset/Dataset.scala @@ -7,7 +7,7 @@ import com.scalableminds.util.time.Instant import com.scalableminds.util.tools.{Fox, JsonHelper} import com.scalableminds.webknossos.datastore.dataformats.MagLocator import com.scalableminds.webknossos.datastore.datareaders.AxisOrder -import com.scalableminds.webknossos.datastore.helpers.{DataSourceMagInfo, UPath} +import com.scalableminds.webknossos.datastore.helpers.UPath import com.scalableminds.webknossos.datastore.models.{LengthUnit, VoxelSize} import com.scalableminds.webknossos.datastore.models.datasource.DatasetViewConfiguration.DatasetViewConfiguration import com.scalableminds.webknossos.datastore.models.datasource.LayerViewConfiguration.LayerViewConfiguration @@ -868,33 +868,42 @@ class DatasetMagsDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionConte } ) - private def rowsToMagInfos(rows: Vector[DataSourceMagRow]): List[DataSourceMagInfo] = { - val mags = rows.map(_.mag) - val dataSources = rows.map(row => DataSourceId(row.directoryName, row._organization)) - rows.toList.zip(mags).zip(dataSources).map { - case ((row, mag), dataSource) => - DataSourceMagInfo(dataSource, row.dataLayerName, mag, row.path, row.realPath, row.hasLocalData) - } - } - - def findPathsForDatasetAndDatalayer(datasetId: ObjectId, dataLayerName: String): Fox[List[DataSourceMagInfo]] = + def findMagPathsUsedOnlyByThisDataset(datasetId: ObjectId): Fox[Seq[UPath]] = for { - rows <- run(q"""SELECT _dataset, dataLayerName, mag, path, realPath, hasLocalData, _organization, directoryName - FROM webknossos.dataset_mags - INNER JOIN webknossos.datasets ON webknossos.dataset_mags._dataset = webknossos.datasets._id - WHERE _dataset = $datasetId - AND dataLayerName = $dataLayerName""".as[DataSourceMagRow]) - magInfos = rowsToMagInfos(rows) - } yield magInfos - - def findAllByRealPath(realPath: String): Fox[List[DataSourceMagInfo]] = - for { - rows <- run(q"""SELECT _dataset, dataLayerName, mag, path, realPath, hasLocalData, _organization, directoryName - FROM webknossos.dataset_mags - INNER JOIN webknossos.datasets ON webknossos.dataset_mags._dataset = webknossos.datasets._id - WHERE realPath = $realPath""".as[DataSourceMagRow]) - magInfos = rowsToMagInfos(rows) - } yield magInfos + pathsStrOpts <- run(q""" + SELECT m1.path FROM webknossos.dataset_mags m1 + WHERE m1._dataset = $datasetId + AND m1.path IS NOT NULL + AND NOT EXISTS ( + SELECT m2.path + FROM webknossos.dataset_mags m2 + WHERE m2._dataset != $datasetId + AND ( + m2.path = m1.path + OR ( + m2.realpath IS NOT NULL AND m2.realpath = m1.realpath + ) + ) + ) + """.as[Option[String]]) + paths <- pathsStrOpts.flatten.map(UPath.fromString).toList.toSingleBox("Invalid UPath").toFox + } yield paths + + def findDatasetsWithMagsInDir(absolutePath: UPath, + dataStore: DataStore, + ignoredDataset: ObjectId): Fox[Seq[ObjectId]] = { + // ensure trailing slash on absolutePath to avoid string prefix false positives + val absolutePathWithTrailingSlash = + if (absolutePath.toString.endsWith("/")) absolutePath.toString else absolutePath.toString + "/" + run(q""" + SELECT d._id FROM webknossos.dataset_mags m + JOIN webknossos.datasets d ON m._dataset = d._id + WHERE m.realpath IS NOT NULL + AND starts_with(m.realpath, $absolutePathWithTrailingSlash) + AND d._id != $ignoredDataset + AND d._datastore = ${dataStore.name.trim} + """.as[ObjectId]) + } private def parseMagLocator(row: DatasetMagsRow): Fox[MagLocator] = for { @@ -1265,6 +1274,36 @@ class DatasetLayerAttachmentsDAO @Inject()(sqlClient: SqlClient)(implicit ec: Ex ${datasetIdOpt.map(datasetId => q"AND ranked._dataset = $datasetId").getOrElse(q"")}; """.as[StorageRelevantDataLayerAttachment]) } yield storageRelevantAttachments.toList + + def findAttachmentPathsUsedOnlyByThisDataset(datasetId: ObjectId): Fox[Seq[UPath]] = + for { + pathsStr <- run(q""" + SELECT a1.path FROM webknossos.dataset_layer_attachments a1 + WHERE a1._dataset = $datasetId + AND NOT EXISTS ( + SELECT a2.path + FROM webknossos.dataset_layer_attachments a2 + WHERE a2._dataset != $datasetId + AND a2.path = a1.path + ) + """.as[String]) + paths <- pathsStr.map(UPath.fromString).toList.toSingleBox("Invalid UPath").toFox + } yield paths + + def findDatasetsWithAttachmentsInDir(absolutePath: UPath, + dataStore: DataStore, + ignoredDataset: ObjectId): Fox[Seq[ObjectId]] = { + // ensure trailing slash on absolutePath to avoid string prefix false positives + val absolutePathWithTrailingSlash = + if (absolutePath.toString.endsWith("/")) absolutePath.toString else absolutePath.toString + "/" + run(q""" + SELECT d._id FROM webknossos.dataset_layer_attachments a + JOIN webknossos.datasets d ON a._dataset = d._id + WHERE starts_with(a.path, $absolutePathWithTrailingSlash) + AND d._id != $ignoredDataset + AND d._datastore = ${dataStore.name.trim} + """.as[ObjectId]) + } } class DatasetCoordinateTransformationsDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext) diff --git a/app/models/dataset/DatasetService.scala b/app/models/dataset/DatasetService.scala index f028ea1119e..e47c90d8a11 100644 --- a/app/models/dataset/DatasetService.scala +++ b/app/models/dataset/DatasetService.scala @@ -4,7 +4,7 @@ import com.scalableminds.util.accesscontext.{AuthorizedAccessContext, DBAccessCo import com.scalableminds.util.objectid.ObjectId import com.scalableminds.util.time.Instant import com.scalableminds.util.tools.{Empty, EmptyBox, Fox, FoxImplicits, Full, JsonHelper, TextUtils} -import com.scalableminds.webknossos.datastore.helpers.{DataSourceMagInfo, UPath} +import com.scalableminds.webknossos.datastore.helpers.UPath import com.scalableminds.webknossos.datastore.models.datasource.{ DataSource, DataSourceId, @@ -25,6 +25,8 @@ import models.user.{MultiUserDAO, User, UserService} import com.scalableminds.webknossos.datastore.controllers.PathValidationResult import mail.{MailchimpClient, MailchimpTag} import models.analytics.{AnalyticsService, UploadDatasetEvent} +import models.annotation.AnnotationDAO +import models.storage.UsedStorageService import play.api.http.Status.NOT_FOUND import play.api.i18n.{Messages, MessagesProvider} import play.api.libs.json.{JsObject, Json} @@ -42,6 +44,7 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO, datasetLastUsedTimesDAO: DatasetLastUsedTimesDAO, datasetDataLayerDAO: DatasetLayerDAO, datasetMagsDAO: DatasetMagsDAO, + datasetLayerAttachmentsDAO: DatasetLayerAttachmentsDAO, teamDAO: TeamDAO, folderDAO: FolderDAO, multiUserDAO: MultiUserDAO, @@ -52,6 +55,8 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO, teamService: TeamService, thumbnailCachingService: ThumbnailCachingService, userService: UserService, + annotationDAO: AnnotationDAO, + usedStorageService: UsedStorageService, conf: WkConf, rpc: RPC)(implicit ec: ExecutionContext) extends FoxImplicits @@ -492,28 +497,6 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO, _ <- Fox.serialCombined(pathInfos)(updateRealPath) } yield () - /** - * Returns a list of tuples, where the first element is the magInfo and the second element is a list of all magInfos - * that share the same realPath but have a different dataSourceId. For each mag in the data layer there is one tuple. - * @param datasetId id of the dataset - * @param layerName name of the layer in the dataset - * @return - */ - def getPathsForDataLayer(datasetId: ObjectId, - layerName: String): Fox[List[(DataSourceMagInfo, List[DataSourceMagInfo])]] = - for { - magInfos <- datasetMagsDAO.findPathsForDatasetAndDatalayer(datasetId, layerName) - magInfosAndLinkedMags <- Fox.serialCombined(magInfos)(magInfo => - magInfo.realPath match { - case Some(realPath) => - for { - pathInfos <- datasetMagsDAO.findAllByRealPath(realPath) - filteredPathInfos = pathInfos.filter(_.dataSourceId != magInfo.dataSourceId) - } yield (magInfo, filteredPathInfos) - case None => Fox.successful((magInfo, List())) - }) - } yield magInfosAndLinkedMags - def validatePaths(paths: Seq[UPath], dataStore: DataStore): Fox[Unit] = for { _ <- Fox.successful(()) @@ -525,18 +508,55 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO, }) } yield () - def deleteVirtualOrDiskDataset(dataset: Dataset)(implicit ctx: DBAccessContext): Fox[Unit] = + def deleteDataset(dataset: Dataset)(implicit ctx: DBAccessContext): Fox[Unit] = for { + datastoreClient <- clientFor(dataset) _ <- if (dataset.isVirtual) { - // At this point, we should also free space in S3 once implemented. - // Right now, we can just mark the dataset as deleted in the database. - datasetDAO.deleteDataset(dataset._id, onlyMarkAsDeleted = true) + for { + magPathsUsedOnlyByThisDataset <- datasetMagsDAO.findMagPathsUsedOnlyByThisDataset(dataset._id) + attachmentPathsUsedOnlyByThisDataset <- datasetLayerAttachmentsDAO.findAttachmentPathsUsedOnlyByThisDataset( + dataset._id) + pathsUsedOnlyByThisDataset = magPathsUsedOnlyByThisDataset ++ attachmentPathsUsedOnlyByThisDataset + // Note that the datastore only deletes local paths and paths on our managed S3 cloud storage + _ <- datastoreClient.deletePaths(pathsUsedOnlyByThisDataset) + } yield () } else { for { - datastoreClient <- clientFor(dataset) - _ <- datastoreClient.deleteOnDisk(dataset._id) + datastoreBaseDirStr <- datastoreClient.getBaseDirAbsolute + datastoreBaseDir <- UPath.fromString(datastoreBaseDirStr).toFox + datasetDir = datastoreBaseDir / dataset._organization / dataset.directoryName + datastore <- dataStoreFor(dataset) + datasetsUsingDataFromThisDir <- findDatasetsUsingDataFromDir(datasetDir, datastore, dataset._id) + _ <- Fox.fromBool(datasetsUsingDataFromThisDir.isEmpty) ?~> s"Cannot delete dataset because ${datasetsUsingDataFromThisDir.length} other datasets reference its data: ${datasetsUsingDataFromThisDir + .mkString(",")}" + _ <- datastoreClient.deleteOnDisk(dataset._id) ?~> "dataset.delete.failed" } yield () - } ?~> "dataset.delete.failed" + } + _ <- deleteDatasetFromDB(dataset._id) + } yield () + + private def findDatasetsUsingDataFromDir(directory: UPath, + dataStore: DataStore, + ignoredDatasetId: ObjectId): Fox[Seq[ObjectId]] = + for { + datasetsWithMagsInDir <- datasetMagsDAO.findDatasetsWithMagsInDir(directory, dataStore, ignoredDatasetId) + datasetsWithAttachmentsInDir <- datasetLayerAttachmentsDAO.findDatasetsWithAttachmentsInDir(directory, + dataStore, + ignoredDatasetId) + } yield (datasetsWithMagsInDir ++ datasetsWithAttachmentsInDir).distinct + + def deleteDatasetFromDB(datasetId: ObjectId): Fox[Unit] = + for { + existingDatasetBox <- datasetDAO.findOne(datasetId)(GlobalAccessContext).shiftBox + _ <- existingDatasetBox match { + case Full(dataset) => + for { + annotationCount <- annotationDAO.countAllByDataset(dataset._id)(GlobalAccessContext) + _ <- datasetDAO.deleteDataset(dataset._id, onlyMarkAsDeleted = annotationCount > 0) + _ <- usedStorageService.refreshStorageReportForDataset(dataset) + } yield () + case _ => Fox.successful(()) + } } yield () def generateDirectoryName(datasetName: String, datasetId: ObjectId): String = diff --git a/app/models/dataset/WKRemoteDataStoreClient.scala b/app/models/dataset/WKRemoteDataStoreClient.scala index 62136f78635..36256e7921c 100644 --- a/app/models/dataset/WKRemoteDataStoreClient.scala +++ b/app/models/dataset/WKRemoteDataStoreClient.scala @@ -126,4 +126,16 @@ class WKRemoteDataStoreClient(dataStore: DataStore, rpc: RPC) extends LazyLoggin .delete() } yield () + lazy val getBaseDirAbsolute: Fox[String] = + rpc(s"${dataStore.url}/data/datasets/baseDirAbsolute") + .addQueryParam("token", RpcTokenHolder.webknossosToken) + .getWithJsonResponse[String] + + def deletePaths(paths: Seq[UPath]): Fox[Unit] = + for { + _ <- rpc(s"${dataStore.url}/data/datasets/deletePaths") + .addQueryParam("token", RpcTokenHolder.webknossosToken) + .deleteJson(paths) + } yield () + } diff --git a/app/models/storage/UsedStorageService.scala b/app/models/storage/UsedStorageService.scala index bdbefbfbc5b..d4a4f9db436 100644 --- a/app/models/storage/UsedStorageService.scala +++ b/app/models/storage/UsedStorageService.scala @@ -16,7 +16,6 @@ import models.dataset.{ Dataset, DatasetLayerAttachmentsDAO, DatasetMagsDAO, - DatasetService, StorageRelevantDataLayerAttachment, WKRemoteDataStoreClient } @@ -34,7 +33,6 @@ import scala.concurrent.duration._ class UsedStorageService @Inject()(val actorSystem: ActorSystem, val lifecycle: ApplicationLifecycle, organizationDAO: OrganizationDAO, - datasetService: DatasetService, dataStoreDAO: DataStoreDAO, datasetMagDAO: DatasetMagsDAO, datasetLayerAttachmentsDAO: DatasetLayerAttachmentsDAO, @@ -213,7 +211,7 @@ class UsedStorageService @Inject()(val actorSystem: ActorSystem, def refreshStorageReportForDataset(dataset: Dataset): Fox[Unit] = for { _ <- Fox.successful(()) - dataStore <- datasetService.dataStoreFor(dataset) + dataStore <- dataStoreDAO.findOneByName(dataset._dataStore.trim) ?~> "datastore.notFound" _ <- if (dataStore.reportUsedStorageEnabled) { for { organization <- organizationDAO.findOne(dataset._organization) diff --git a/conf/webknossos.latest.routes b/conf/webknossos.latest.routes index 1df504fefee..904378478f1 100644 --- a/conf/webknossos.latest.routes +++ b/conf/webknossos.latest.routes @@ -103,7 +103,7 @@ GET /datasets/:datasetId/layers/:layer/thumbnail POST /datasets/:datasetId/layers/:layer/segmentAnythingMask controllers.DatasetController.segmentAnythingMask(datasetId: ObjectId, layer: String, intensityMin: Option[Float], intensityMax: Option[Float]) PUT /datasets/:datasetId/clearThumbnailCache controllers.DatasetController.removeFromThumbnailCache(datasetId: ObjectId) GET /datasets/:datasetName/isValidNewName controllers.DatasetController.isValidNewName(datasetName: String) -DELETE /datasets/:datasetId/deleteOnDisk controllers.DatasetController.deleteOnDisk(datasetId: ObjectId) +DELETE /datasets/:datasetId controllers.DatasetController.delete(datasetId: ObjectId) POST /datasets/:datasetId/reserveAttachmentUploadToPath controllers.DatasetController.reserveAttachmentUploadToPath(datasetId: ObjectId) POST /datasets/:datasetId/finishAttachmentUploadToPath controllers.DatasetController.finishAttachmentUploadToPath(datasetId: ObjectId) POST /datasets/:datasetId/reserveUploadToPathsForPreliminary controllers.DatasetController.reserveUploadToPathsForPreliminary(datasetId: ObjectId) @@ -126,7 +126,6 @@ GET /datastores PUT /datastores/:name/datasource controllers.WKRemoteDataStoreController.updateOne(name: String, key: String) PUT /datastores/:name/datasources controllers.WKRemoteDataStoreController.updateAll(name: String, key: String, organizationId: Option[String]) PUT /datastores/:name/datasources/paths controllers.WKRemoteDataStoreController.updatePaths(name: String, key: String) -GET /datastores/:name/datasources/:datasetId/paths controllers.WKRemoteDataStoreController.getPaths(name: String, key: String, datasetId: ObjectId) GET /datastores/:name/datasources/:datasetId controllers.WKRemoteDataStoreController.getDataSource(name: String, key: String, datasetId: ObjectId) PUT /datastores/:name/datasources/:datasetId controllers.WKRemoteDataStoreController.updateDataSource(name: String, key: String, datasetId: ObjectId) PATCH /datastores/:name/status controllers.WKRemoteDataStoreController.statusUpdate(name: String, key: String) diff --git a/frontend/javascripts/admin/rest_api.ts b/frontend/javascripts/admin/rest_api.ts index fa31bab42c5..46a9767a327 100644 --- a/frontend/javascripts/admin/rest_api.ts +++ b/frontend/javascripts/admin/rest_api.ts @@ -1379,7 +1379,7 @@ export async function triggerDatasetClearCache( } export async function deleteDatasetOnDisk(datasetId: string): Promise { - await Request.triggerRequest(`/api/datasets/${datasetId}/deleteOnDisk`, { + await Request.triggerRequest(`/api/datasets/${datasetId}`, { method: "DELETE", }); } diff --git a/frontend/javascripts/test/backend-snapshot-tests/__snapshots__/datasets.e2e.ts.snap b/frontend/javascripts/test/backend-snapshot-tests/__snapshots__/datasets.e2e.ts.snap index 86501468034..5aaffe23e01 100644 --- a/frontend/javascripts/test/backend-snapshot-tests/__snapshots__/datasets.e2e.ts.snap +++ b/frontend/javascripts/test/backend-snapshot-tests/__snapshots__/datasets.e2e.ts.snap @@ -1,33 +1,5 @@ // Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html -exports[`Dataset API (E2E) > Dataset Paths 1`] = ` -[ - { - "layerName": "segmentation", - "magLinkInfos": [ - { - "linkedMags": [], - "mag": { - "dataLayerName": "segmentation", - "dataSourceId": { - "name": "test-dataset", - "team": "Organization_X", - }, - "hasLocalData": true, - "mag": [ - 1, - 1, - 1, - ], - "path": "Organization_X/test-dataset/segmentation/1", - "realPath": "Organization_X/test-dataset/segmentation/1", - }, - }, - ], - }, -] -`; - exports[`Dataset API (E2E) > Zarr 3 streaming 1`] = `"{"zarr_format":3,"node_type":"group","attributes":{"ome":{"version":"0.5","multiscales":[{"name":"segmentation","axes":[{"name":"c","type":"channel"},{"name":"x","type":"space","unit":"nanometer"},{"name":"y","type":"space","unit":"nanometer"},{"name":"z","type":"space","unit":"nanometer"}],"datasets":[{"path":"1","coordinateTransformations":[{"type":"scale","scale":[1,11.24,11.24,28]}]}]}]}}}"`; exports[`Dataset API (E2E) > Zarr 3 streaming 2`] = `"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAEAAAABAAAAAQAAAAEAAAABAAAAAQAAAAEAAAABAAAAAQAAAAEAAAABAAAAAQAAAAEAAAA="`; diff --git a/frontend/javascripts/test/backend-snapshot-tests/datasets.e2e.ts b/frontend/javascripts/test/backend-snapshot-tests/datasets.e2e.ts index bea1e7509e8..fd4a88c7f26 100644 --- a/frontend/javascripts/test/backend-snapshot-tests/datasets.e2e.ts +++ b/frontend/javascripts/test/backend-snapshot-tests/datasets.e2e.ts @@ -174,38 +174,6 @@ describe("Dataset API (E2E)", () => { expect(base64).toMatchSnapshot(); }); - it("Dataset Paths", async () => { - const datasetId = await getTestDatasetId(); - const paths = await fetch( - `/api/datastores/localhost/datasources/${datasetId}/paths?key=something-secure`, - ); - const pathsJson = await paths.json(); - - // Dataset paths are absolute, we will relativize them here to make the snapshot stable - const makeRelative = (path: string) => - path.split("Organization_X")[1] ? "Organization_X" + path.split("Organization_X")[1] : path; - - interface MagLink { - mag: { - path: string; - realPath: string; - }; - } - - interface PathInfo { - magLinkInfos: MagLink[]; - } - - pathsJson.forEach((pathInfo: PathInfo) => - pathInfo.magLinkInfos.forEach((magLink: MagLink) => { - magLink.mag.path = makeRelative(magLink.mag.path); - magLink.mag.realPath = makeRelative(magLink.mag.realPath); - }), - ); - - expect(pathsJson).toMatchSnapshot(); - }); - /** * WARNING: This test creates a side effect by uploading and saving a dataset in your binaryData folder. * There is no clean up after the test, and the dataset will remain after each test run. diff --git a/test/backend/UPathTestSuite.scala b/test/backend/UPathTestSuite.scala index b1de1d825d4..b8ef5f806e4 100644 --- a/test/backend/UPathTestSuite.scala +++ b/test/backend/UPathTestSuite.scala @@ -129,6 +129,8 @@ class UPathTestSuite extends PlaySpec { "correctly answer startsWith" in { assert(UPath.fromStringUnsafe("relative/somewhere").startsWith(UPath.fromStringUnsafe("relative"))) assert(!UPath.fromStringUnsafe("relative/somewhere").startsWith(UPath.fromStringUnsafe("elsewhere"))) + // startsWith compares actual parents, not string prefix! + assert(!UPath.fromStringUnsafe("relativeElsewhere").startsWith(UPath.fromStringUnsafe("relative"))) assert(UPath.fromStringUnsafe("/absolute/somewhere").startsWith(UPath.fromStringUnsafe("/absolute"))) assert(!UPath.fromStringUnsafe("/absolute/somewhere").startsWith(UPath.fromStringUnsafe("/elsewhere"))) assert(!UPath.fromStringUnsafe("/absolute/somewhere").startsWith(UPath.fromStringUnsafe("https://example.com"))) @@ -136,6 +138,11 @@ class UPathTestSuite extends PlaySpec { UPath .fromStringUnsafe("https://example.com/path/somewhere") .startsWith(UPath.fromStringUnsafe("https://example.com/path"))) + // startsWith compares actual parents, not string prefix! + assert( + !UPath + .fromStringUnsafe("https://example.com/pathSomewhereElse") + .startsWith(UPath.fromStringUnsafe("https://example.com/path"))) } } diff --git a/unreleased_changes/8924.md b/unreleased_changes/8924.md new file mode 100644 index 00000000000..72437c593cd --- /dev/null +++ b/unreleased_changes/8924.md @@ -0,0 +1,5 @@ +### Added +- Datasets stored on managed S3 can now also be deleted there. + +### Changed +- Deleting datasets on disk whose layers are still referenced by symlinks of other datasets is now blocked. diff --git a/util/src/main/scala/com/scalableminds/util/io/PathUtils.scala b/util/src/main/scala/com/scalableminds/util/io/PathUtils.scala index 264739a12be..bbdfa2a81fe 100644 --- a/util/src/main/scala/com/scalableminds/util/io/PathUtils.scala +++ b/util/src/main/scala/com/scalableminds/util/io/PathUtils.scala @@ -11,9 +11,7 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala import scala.reflect.io.Directory import scala.util.Random -object PathUtils extends PathUtils - -trait PathUtils extends LazyLogging { +object PathUtils extends LazyLogging { private def directoryFilter(path: Path): Boolean = Files.isDirectory(path) && !Files.isHidden(path) diff --git a/util/src/main/scala/com/scalableminds/util/tools/Fox.scala b/util/src/main/scala/com/scalableminds/util/tools/Fox.scala index 7037dc97ca5..f911b4a841f 100644 --- a/util/src/main/scala/com/scalableminds/util/tools/Fox.scala +++ b/util/src/main/scala/com/scalableminds/util/tools/Fox.scala @@ -1,7 +1,5 @@ package com.scalableminds.util.tools -import com.scalableminds.util.tools.{Box, Empty, Failure, Full, ParamFailure} - import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.{Success, Try} diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/DataStoreModule.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/DataStoreModule.scala index d789656afe6..f6fed3de6c0 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/DataStoreModule.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/DataStoreModule.scala @@ -38,6 +38,7 @@ class DataStoreModule extends AbstractModule { bind(classOf[DataStoreConfig]).asEagerSingleton() bind(classOf[DataStoreAccessTokenService]).asEagerSingleton() bind(classOf[ActorSystem]).annotatedWith(Names.named("webknossos-datastore")).toInstance(actorSystem) + bind(classOf[ManagedS3Service]).asEagerSingleton() bind(classOf[UploadService]).asEagerSingleton() bind(classOf[DataSourceService]).asEagerSingleton() bind(classOf[DataVaultService]).asEagerSingleton() diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/controllers/DataSourceController.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/controllers/DataSourceController.scala index 714fc0103e7..cbc3c47eb61 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/controllers/DataSourceController.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/controllers/DataSourceController.scala @@ -87,6 +87,12 @@ class DataSourceController @Inject()( override def allowRemoteOrigin: Boolean = true + def baseDirAbsolute: Action[AnyContent] = Action.async { implicit request => + accessTokenService.validateAccessFromTokenContext(UserAccessRequest.webknossos) { + Fox.successful(Ok(Json.toJson(dataSourceService.dataBaseDir.toAbsolutePath.toString))) + } + } + def triggerInboxCheckBlocking(organizationId: Option[String]): Action[AnyContent] = Action.async { implicit request => accessTokenService.validateAccessFromTokenContext( organizationId @@ -406,24 +412,24 @@ class DataSourceController @Inject()( def deleteOnDisk(datasetId: ObjectId): Action[AnyContent] = Action.async { implicit request => - accessTokenService.validateAccessFromTokenContext(UserAccessRequest.deleteDataset(datasetId)) { + accessTokenService.validateAccessFromTokenContext(UserAccessRequest.webknossos) { for { - dataSource <- datasetCache.getById(datasetId) ~> NOT_FOUND + dataSource <- dsRemoteWebknossosClient.getDataSource(datasetId) ~> NOT_FOUND dataSourceId = dataSource.id - _ <- if (dataSourceService.existsOnDisk(dataSourceId)) { - for { - _ <- dataSourceService.deleteOnDisk( - dataSourceId.organizationId, - dataSourceId.directoryName, - Some(datasetId), - reason = Some("the user wants to delete the dataset")) ?~> "dataset.delete.failed" - _ <- dsRemoteWebknossosClient.deleteDataset(datasetId) - } yield () - } else - for { - _ <- dsRemoteWebknossosClient.deleteDataset(datasetId) - _ = logger.warn(s"Tried to delete dataset ${dataSource.id} ($datasetId), but is not present on disk.") - } yield () + _ <- dataSourceService.deleteOnDisk( + datasetId, + dataSourceId.organizationId, + dataSourceId.directoryName, + reason = Some("the user wants to delete the dataset")) ?~> "dataset.delete.failed" + } yield Ok + } + } + + def deletePaths(): Action[Seq[UPath]] = + Action.async(validateJson[Seq[UPath]]) { implicit request => + accessTokenService.validateAccessFromTokenContext(UserAccessRequest.webknossos) { + for { + _ <- dataSourceService.deletePathsFromDiskOrManagedS3(request.body) } yield Ok } } diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/S3DataVault.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/S3DataVault.scala index b7fc5f4dc84..7255c68f3e7 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/S3DataVault.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/datavault/S3DataVault.scala @@ -8,8 +8,8 @@ import com.scalableminds.webknossos.datastore.storage.{ S3AccessKeyCredential } import com.scalableminds.util.tools.Box.tryo -import com.scalableminds.util.tools.{Box, Empty, Full, Failure => BoxFailure} -import com.scalableminds.webknossos.datastore.helpers.UPath +import com.scalableminds.util.tools.{Empty, Full, Failure => BoxFailure} +import com.scalableminds.webknossos.datastore.helpers.{S3UriUtils, UPath} import org.apache.commons.lang3.builder.HashCodeBuilder import play.api.libs.ws.WSClient import software.amazon.awssdk.auth.credentials.{ @@ -49,7 +49,7 @@ class S3DataVault(s3AccessKeyCredential: Option[S3AccessKeyCredential], implicit val ec: ExecutionContext) extends DataVault with FoxImplicits { - private lazy val bucketName = S3DataVault.hostBucketFromUri(uri) match { + private lazy val bucketName = S3UriUtils.hostBucketFromUri(uri) match { case Some(value) => value case None => throw new Exception(s"Could not parse S3 bucket for ${uri.toString}") } @@ -109,7 +109,7 @@ class S3DataVault(s3AccessKeyCredential: Option[S3AccessKeyCredential], implicit ec: ExecutionContext, tc: TokenContext): Fox[(Array[Byte], Encoding.Value)] = for { - objectKey <- S3DataVault.objectKeyFromUri(path.toRemoteUriUnsafe).toFox + objectKey <- S3UriUtils.objectKeyFromUri(path.toRemoteUriUnsafe).toFox request = range match { case StartEnd(r) => getRangeRequest(bucketName, objectKey, r) case SuffixLength(l) => getSuffixRangeRequest(bucketName, objectKey, l) @@ -121,7 +121,7 @@ class S3DataVault(s3AccessKeyCredential: Option[S3AccessKeyCredential], override def listDirectory(path: VaultPath, maxItems: Int)(implicit ec: ExecutionContext): Fox[List[VaultPath]] = for { - prefixKey <- S3DataVault.objectKeyFromUri(path.toRemoteUriUnsafe).toFox + prefixKey <- S3UriUtils.objectKeyFromUri(path.toRemoteUriUnsafe).toFox s3SubPrefixKeys <- getObjectSummaries(bucketName, prefixKey, maxItems) vaultPaths <- tryo(s3SubPrefixKeys.map(key => new VaultPath(UPath.fromStringUnsafe(s"${uri.getScheme}://$bucketName/$key"), this))).toFox @@ -159,7 +159,7 @@ class S3DataVault(s3AccessKeyCredential: Option[S3AccessKeyCredential], } for { - rawPrefix <- S3DataVault.objectKeyFromUri(path.toRemoteUriUnsafe).toFox + rawPrefix <- S3UriUtils.objectKeyFromUri(path.toRemoteUriUnsafe).toFox // add a trailing slash only if it's missing prefixKey = if (rawPrefix.endsWith("/")) rawPrefix else rawPrefix + "/" client <- clientFox @@ -191,41 +191,6 @@ object S3DataVault { new S3DataVault(credential, credentializedUpath.upath.toRemoteUriUnsafe, ws, ec) } - def hostBucketFromUri(uri: URI): Option[String] = { - val host = uri.getHost - if (isShortStyle(uri)) { // assume host is omitted from uri, shortcut form s3://bucket/key - Some(host) - } else if (isVirtualHostedStyle(uri)) { - Some(host.substring(0, host.length - ".s3.amazonaws.com".length)) - } else if (isPathStyle(uri)) { - Some(uri.getPath.substring(1).split("/")(0)) - } else { - None - } - } - - // https://bucket-name.s3.region-code.amazonaws.com/key-name - private def isVirtualHostedStyle(uri: URI): Boolean = - uri.getHost.endsWith(".s3.amazonaws.com") - - // https://s3.region-code.amazonaws.com/bucket-name/key-name - private def isPathStyle(uri: URI): Boolean = - uri.getHost.matches("s3(.[\\w\\-_]+)?.amazonaws.com") || - (!uri.getHost.contains("amazonaws.com") && uri.getHost.contains(".")) - - // S3://bucket-name/key-name - private def isShortStyle(uri: URI): Boolean = - !uri.getHost.contains(".") - - private def objectKeyFromUri(uri: URI): Box[String] = - if (isVirtualHostedStyle(uri)) { - Full(uri.getPath) - } else if (isPathStyle(uri)) { - Full(uri.getPath.substring(1).split("/").tail.mkString("/")) - } else if (isShortStyle(uri)) { - Full(uri.getPath.tail) - } else BoxFailure(s"Not a valid s3 uri: $uri") - private def getCredentialsProvider(credentialOpt: Option[S3AccessKeyCredential]): AwsCredentialsProvider = credentialOpt match { case Some(s3AccessKeyCredential: S3AccessKeyCredential) => @@ -240,9 +205,6 @@ object S3DataVault { AnonymousCredentialsProvider.create() } - private def isNonAmazonHost(uri: URI): Boolean = - (isPathStyle(uri) && !uri.getHost.endsWith(".amazonaws.com")) || uri.getHost == "localhost" - private def determineProtocol(uri: URI, ws: WSClient)(implicit ec: ExecutionContext): Fox[String] = { // If the endpoint supports HTTPS, use it. Otherwise, use HTTP. val httpsUri = new URI("https", uri.getAuthority, "", "", "") @@ -261,7 +223,7 @@ object S3DataVault { implicit ec: ExecutionContext): Fox[S3AsyncClient] = { val basic = S3AsyncClient.builder().credentialsProvider(getCredentialsProvider(credentialOpt)).crossRegionAccessEnabled(true) - if (isNonAmazonHost(uri)) { + if (S3UriUtils.isNonAmazonHost(uri)) { for { protocol <- determineProtocol(uri, ws) } yield diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/DatasetDeleter.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/DatasetDeleter.scala index f953fb77083..118c178adb4 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/DatasetDeleter.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/DatasetDeleter.scala @@ -1,332 +1,57 @@ package com.scalableminds.webknossos.datastore.helpers +import com.scalableminds.util.io.PathUtils import com.scalableminds.util.objectid.ObjectId -import com.scalableminds.util.tools.{Fox, FoxImplicits, JsonHelper} -import com.scalableminds.webknossos.datastore.models.datasource.{DataSourceId, StaticLayer, UsableDataSource} -import com.scalableminds.webknossos.datastore.services.{DSRemoteWebknossosClient, DataSourceToDiskWriter} +import com.scalableminds.util.tools.{Fox, FoxImplicits, Full} import com.typesafe.scalalogging.LazyLogging -import com.scalableminds.util.tools.Box.tryo -import com.scalableminds.util.tools.{Box, Full} -import org.apache.commons.io.FileUtils -import java.io.File import java.nio.file.{Files, Path} import scala.annotation.tailrec import scala.concurrent.ExecutionContext -trait DatasetDeleter extends LazyLogging with DirectoryConstants with FoxImplicits with DataSourceToDiskWriter { +trait DatasetDeleter extends LazyLogging with DirectoryConstants with FoxImplicits { def dataBaseDir: Path - def existsOnDisk(dataSourceId: DataSourceId, isInConversion: Boolean = false): Boolean = { + def deleteOnDisk(datasetId: ObjectId, + organizationId: String, + datasetName: String, + isInConversion: Boolean = false, + reason: Option[String] = None)(implicit ec: ExecutionContext): Fox[Unit] = { + val dataSourcePath = - if (isInConversion) - dataBaseDir.resolve(dataSourceId.organizationId).resolve(forConversionDir).resolve(dataSourceId.directoryName) - else dataBaseDir.resolve(dataSourceId.organizationId).resolve(dataSourceId.directoryName) + if (isInConversion) dataBaseDir.resolve(organizationId).resolve(forConversionDir).resolve(datasetName) + else dataBaseDir.resolve(organizationId).resolve(datasetName) + + if (Files.exists(dataSourcePath)) { + val trashPath: Path = dataBaseDir.resolve(organizationId).resolve(trashDir) + val targetPath = trashPath.resolve(datasetName) + PathUtils.ensureDirectory(trashPath) - Files.exists(dataSourcePath) + logger.info( + s"Deleting dataset $datasetId by moving it from $dataSourcePath to $targetPath ${reason.map(r => s"because $r").getOrElse("...")}") + deleteWithRetry(dataSourcePath, targetPath) + } else { + logger.info( + s"Dataset deletion requested for dataset $datasetId at $dataSourcePath, but it does not exist. Skipping deletion on disk.") + Fox.successful(()) + } } - def deleteOnDisk( - organizationId: String, - datasetName: String, - datasetId: Option[ObjectId], // Is only set for datasets that are already registered in WK. In this case, we query WK using this id for symlink paths and move them. - isInConversion: Boolean = false, - reason: Option[String] = None)(implicit ec: ExecutionContext): Fox[Unit] = { - @tailrec - def deleteWithRetry(sourcePath: Path, targetPath: Path, retryCount: Int = 0): Fox[Unit] = + @tailrec + private def deleteWithRetry(sourcePath: Path, targetPath: Path, retryCount: Int = 0)( + implicit ec: ExecutionContext): Fox[Unit] = + if (retryCount > 15) { + Fox.failure(s"Deleting dataset failed: too many retries.") + } else { try { val deduplicatedTargetPath = if (retryCount == 0) targetPath else targetPath.resolveSibling(f"${targetPath.getFileName} ($retryCount)") - val path = Files.move(sourcePath, deduplicatedTargetPath) - if (path == null) { - throw new Exception("Deleting dataset failed") - } - logger.info(s"Successfully moved dataset from $sourcePath to $targetPath...") + Files.move(sourcePath, deduplicatedTargetPath) + logger.info(s"Successfully moved dataset from $sourcePath to $targetPath.") Fox.successful(()) } catch { case _: java.nio.file.FileAlreadyExistsException => deleteWithRetry(sourcePath, targetPath, retryCount + 1) case e: Exception => Fox.failure(s"Deleting dataset failed: ${e.toString}", Full(e)) } - - def moveToTrash(organizationId: String, - datasetName: String, - dataSourcePath: Path, - reason: Option[String]): Fox[Unit] = - if (Files.exists(dataSourcePath)) { - val trashPath: Path = dataBaseDir.resolve(organizationId).resolve(trashDir) - val targetPath = trashPath.resolve(datasetName) - new File(trashPath.toString).mkdirs() - - logger.info( - s"Deleting dataset by moving it from $dataSourcePath to $targetPath ${reason.map(r => s"because $r").getOrElse("...")}") - deleteWithRetry(dataSourcePath, targetPath) - } else { - Fox.successful(logger.info( - s"Dataset deletion requested for dataset at $dataSourcePath, but it does not exist. Skipping deletion on disk.")) - } - - val dataSourcePath = - if (isInConversion) dataBaseDir.resolve(organizationId).resolve(forConversionDir).resolve(datasetName) - else dataBaseDir.resolve(organizationId).resolve(datasetName) - - for { - _ <- Fox.runOptional(datasetId)(d => moveSymlinks(organizationId, datasetName, d)) ?~> "Failed to remake symlinks" - _ <- moveToTrash(organizationId, datasetName, dataSourcePath, reason) - } yield () - } - - def remoteWebknossosClient: DSRemoteWebknossosClient - - // Handle references to layers and mags that are deleted - - private def moveSymlinks(organizationId: String, datasetName: String, datasetId: ObjectId)( - implicit ec: ExecutionContext) = - for { - dataSourceId <- Fox.successful(DataSourceId(datasetName, organizationId)) - layersAndLinkedMags <- remoteWebknossosClient.fetchPaths(datasetId) - exceptionBoxes = layersAndLinkedMags.map(layerMagLinkInfo => - handleLayerSymlinks(dataSourceId, layerMagLinkInfo.layerName, layerMagLinkInfo.magLinkInfos.toList)) - _ <- Fox.assertNoFailure(exceptionBoxes) ?~> "Failed to move symlinks" - affectedDataSources = layersAndLinkedMags - .flatMap(_.magLinkInfos.map(m => m.linkedMags.map(_.dataSourceId))) - .flatten - _ <- updateDatasourceProperties(affectedDataSources) - } yield () - - private def getFullyLinkedLayers(linkedMags: List[MagLinkInfo]): Seq[(DataSourceId, String)] = { - val allMagsLocal = linkedMags.forall(_.mag.hasLocalData) - val allLinkedDatasetLayers = linkedMags.map(_.linkedMags.map(lm => (lm.dataSourceId, lm.dataLayerName))) - // Get combinations of datasourceId, layerName that link to EVERY mag - val linkedToByAllMags = - if (allLinkedDatasetLayers.isEmpty) Seq() - else allLinkedDatasetLayers.reduce((a, b) => a.intersect(b)) - if (allMagsLocal && linkedToByAllMags.nonEmpty) { - linkedToByAllMags - } else { - Seq() - } - } - - private def relativizeSymlinkPath(targetPath: Path, originPath: Path): Path = { - val absoluteTargetPath = targetPath.toAbsolutePath - val relativeTargetPath = originPath.getParent.toAbsolutePath.relativize(absoluteTargetPath) - relativeTargetPath - } - - private def getPossibleMagPaths(basePath: Path, magInfo: DataSourceMagInfo): List[Path] = { - val layerPath = basePath - .resolve(magInfo.dataSourceId.organizationId) - .resolve(magInfo.dataSourceId.directoryName) - .resolve(magInfo.dataLayerName) - List(layerPath.resolve(magInfo.mag.toMagLiteral(allowScalar = true)), - layerPath.resolve(magInfo.mag.toMagLiteral(allowScalar = false))) - } - - private def updateDatasourceProperties(dataSourceIds: List[DataSourceId])( - implicit ec: ExecutionContext): Fox[List[Unit]] = - // We need to update locally explored datasets, since they now may have symlinks where previously they only had the - // path property set. - Fox.serialCombined(dataSourceIds)(dataSourceId => { - val dataSourcePath = dataBaseDir.resolve(dataSourceId.organizationId).resolve(dataSourceId.directoryName) - val propertiesPath = dataSourcePath.resolve(UsableDataSource.FILENAME_DATASOURCE_PROPERTIES_JSON) - if (Files.exists(propertiesPath)) { - JsonHelper.parseFromFileAs[UsableDataSource](propertiesPath, dataBaseDir) match { - case Full(dataSource) => - val updatedDataSource = dataSource.copy( - id = dataSourceId, - dataLayers = dataSource.dataLayers.map { - case dl: StaticLayer => - if (dl.mags.forall(_.path.exists(p => p.isLocal && p.isAbsolute))) { - // Setting path to None means using resolution of layer/mag directories to access data - dl.mapped(magMapping = _.copy(path = None)) - } else { - dl - } - case dl => dl - } - ) - // Write properties back - updateDataSourceOnDisk(updatedDataSource, expectExisting = true, validate = false) - case _ => Fox.successful(()) - } - } else Fox.successful(()) - }) - - private def updateMagSymlinks(targetMagPath: Path, linkedMag: DataSourceMagInfo): Unit = { - val linkedMagPaths = getPossibleMagPaths(dataBaseDir, linkedMag) - // Before deleting, check write permissions at linkedMagPath - if (!Files.isWritable(linkedMagPaths.head.getParent)) { - throw new Exception(s"Cannot update symlink at ${linkedMagPaths.head}, no write permissions!") - } - val existingLinkedMagPath = linkedMagPaths.find(p => Files.exists(p) || Files.isSymbolicLink(p)) - - existingLinkedMagPath match { - case Some(linkedMagPath) => - Files.delete(linkedMagPath) - logger.info(s"Deleting symlink and recreating it at $linkedMagPath") - Files.createSymbolicLink(linkedMagPath, relativizeSymlinkPath(targetMagPath, linkedMagPath)) - case None => - val linkedMagPath = linkedMagPaths.head - if (!Files.exists(linkedMagPath) && linkedMag.path == linkedMag.realPath) { - // This is the case for locally explored datasets - // Since locally explored datasets are always fully linked layers when explored, this case can - // only happen if one of the mags was manually edited in the properties file. - Files.createSymbolicLink(linkedMagPath, relativizeSymlinkPath(targetMagPath, linkedMagPath)) - } else { - logger.warn(s"Trying to recreate symlink at mag $linkedMagPath, but it does not exist!") - } - } - } - - private def moveLayer(sourceDataSource: DataSourceId, - sourceLayer: String, - fullLayerLinks: Seq[(DataSourceId, String)], - layerMags: List[MagLinkInfo]): Unit = { - // Move layer on disk - val layerPath = - dataBaseDir.resolve(sourceDataSource.organizationId).resolve(sourceDataSource.directoryName).resolve(sourceLayer) - - if (fullLayerLinks.isEmpty) { - throw new IllegalArgumentException( - s"Cannot move layer $sourceLayer from $sourceDataSource, no fully linked layers provided!") - } - - // Select one of the fully linked layers as target to move layer to - // Selection of the first one is arbitrary, is there anything to distinguish between them? - val target = fullLayerLinks.head - val moveToDataSource = target._1 - val moveToDataLayer = target._2 - val targetPath = dataBaseDir - .resolve(moveToDataSource.organizationId) - .resolve(moveToDataSource.directoryName) - .resolve(moveToDataLayer) - - // Before deleting, check write permissions at targetPath - if (!Files.isWritable(targetPath.getParent)) { - throw new Exception(s"Cannot move layer $sourceLayer to $targetPath, no write permissions!") - } - - logger.info( - s"Found complete symlinks to layer; Moving layer $sourceLayer from $sourceDataSource to $moveToDataSource/$moveToDataLayer") - if (Files.exists(targetPath) && Files.isSymbolicLink(targetPath)) { - Files.delete(targetPath) - } - if (Files.exists(targetPath) && Files.isDirectory(targetPath)) { - // This happens when the fully linked layer consists of mag symlinks. The directory exists and is full of symlinked mags. - // We need to delete the directory before moving the layer. - FileUtils.deleteDirectory(targetPath.toFile) - } - Files.move(layerPath, targetPath) - - // All symlinks are now broken, we need to recreate them - // There may be more layers that are "fully linked", where we need to add only one symlink - - fullLayerLinks.tail.foreach { linkedLayer => - val linkedLayerPath = - dataBaseDir.resolve(linkedLayer._1.organizationId).resolve(linkedLayer._1.directoryName).resolve(linkedLayer._2) - // Before deleting, check write permissions at linkedLayerPath - if (!Files.isWritable(linkedLayerPath.getParent)) { - throw new Exception(s"Cannot move layer $sourceLayer to $targetPath, no write permissions!") - } - if (Files.exists(linkedLayerPath) || Files.isSymbolicLink(linkedLayerPath)) { - // Two cases exist here: 1. The layer is a regular directory where each mag is a symlink - // 2. The layer is a symlink to the other layer itself. - // We can handle both by deleting the layer and creating a new symlink. - if (Files.isDirectory(linkedLayerPath)) { // Case 1 - FileUtils.deleteDirectory(linkedLayerPath.toFile) - } else { // Case 2 - Files.delete(linkedLayerPath) - } - logger.info( - s"Deleting existing symlink(s) at $linkedLayerPath linking to $sourceDataSource/$sourceLayer, creating new symlink") - Files.createSymbolicLink(linkedLayerPath, relativizeSymlinkPath(targetPath, linkedLayerPath)) - } else { - if (!Files.exists(linkedLayerPath)) { - // This happens when the layer is a locally explored dataset, where the path is directly written into the properties - // and no layer directory actually exists. - Files.createSymbolicLink(linkedLayerPath, relativizeSymlinkPath(targetPath, linkedLayerPath)) - } else { - // This should not happen, since we got the info from WK that a layer exists here - logger.warn(s"Trying to recreate symlink at layer $linkedLayerPath, but it does not exist!") - } - } - } - - // For every mag that linked to this layer, we need to update the symlink - // We need to discard the already handled mags (fully linked layers) - - layerMags.foreach { magLinkInfo => - val mag = magLinkInfo.mag - val newMagPath = - Seq(targetPath.resolve(mag.mag.toMagLiteral(true)), targetPath.resolve(mag.mag.toMagLiteral(false))) - .find(Files.exists(_)) - .getOrElse( - throw new Exception(s"Cleaning up move failed for $mag, no local data found ${targetPath.resolve(mag.mag - .toMagLiteral(true))} or ${targetPath.resolve(mag.mag.toMagLiteral(false))}, failed to create symlink!")) - magLinkInfo.linkedMags - .filter(linkedMag => !fullLayerLinks.contains((linkedMag.dataSourceId, linkedMag.dataLayerName))) // Filter out mags that are fully linked layers, we already handled them - .foreach { linkedMag => - updateMagSymlinks(newMagPath, linkedMag) - } } - } - - private def handleLayerSymlinks(dataSourceId: DataSourceId, - layerName: String, - linkedMags: List[MagLinkInfo]): Box[Unit] = - tryo { - val fullyLinkedLayers = getFullyLinkedLayers(linkedMags) - if (fullyLinkedLayers.nonEmpty) { - moveLayer(dataSourceId, layerName, fullyLinkedLayers, linkedMags) - } else { - logger.info(s"Found incomplete symlinks to layer; Moving mags from $dataSourceId to other datasets") - linkedMags.foreach { magLinkInfo => - val magToDelete = magLinkInfo.mag - if (magLinkInfo.linkedMags.nonEmpty) { - if (magToDelete.hasLocalData) { - // Move mag to a different dataset - val magPath = getPossibleMagPaths(dataBaseDir, magToDelete).find(Files.exists(_)).getOrElse { - throw new IllegalArgumentException( - s"Cannot move mag $magToDelete, no local data found at ${magToDelete.path}!") - } - // Select an arbitrary linked mag to move to - val target = magLinkInfo.linkedMags.head - val possibleMagTargetPaths = getPossibleMagPaths(dataBaseDir, target) - - // Before deleting, check write permissions at targetPath - if (!Files.isWritable(possibleMagTargetPaths.head.getParent)) { - throw new Exception( - s"Cannot move mag $magToDelete to ${possibleMagTargetPaths.head.getParent}, no write permissions!") - } - - val targetPathExistingSymlink = possibleMagTargetPaths.find(Files.isSymbolicLink) - targetPathExistingSymlink match { - case Some(targetPath) => - logger.info( - s"Deleting existing symlink at $targetPath linking to ${Files.readSymbolicLink(targetPath)}") - Files.delete(targetPath) - case _ => () - } - val targetPath = targetPathExistingSymlink.getOrElse(possibleMagTargetPaths.head) - Files.move(magPath, targetPath) - - // Move all symlinks to this mag to link to the moved mag - magLinkInfo.linkedMags.tail.foreach { linkedMag => - updateMagSymlinks(targetPath, linkedMag) - } - } else { - // The mag has no local data but there are links to it... - // Mags without local data are either - // 1. remote and thus they have no mags that can be linked to (but also we do not need to delete anything more here) - // 2. are links themselves to other mags. In this case, there can't be any links to this here since they - // would be resolved to the other mag. - // 3. locally explored datasets. They don't have layer directories that could have symlinks to them, so - // this is also not a problem. - // So this should not happen. - logger.warn(s"Trying to move mag $magToDelete, but it has no local data!") - } - } - } - } - } } diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/S3UriUtils.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/S3UriUtils.scala new file mode 100644 index 00000000000..290c6363599 --- /dev/null +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/S3UriUtils.scala @@ -0,0 +1,52 @@ +package com.scalableminds.webknossos.datastore.helpers + +import com.scalableminds.util.tools.{Box, Full, Failure} + +import java.net.URI + +object S3UriUtils { + + def hostBucketFromUri(uri: URI): Option[String] = { + val host = uri.getHost + if (host == null) { + None + } else if (isShortStyle(uri)) { // assume host is omitted from uri, shortcut form s3://bucket/key + Some(host) + } else if (isVirtualHostedStyle(uri)) { + Some(host.substring(0, host.length - ".s3.amazonaws.com".length)) + } else if (isPathStyle(uri)) { + Some(uri.getPath.substring(1).split("/")(0)) + } else { + None + } + } + + def hostBucketFromUpath(path: UPath): Option[String] = + hostBucketFromUri(path.toRemoteUriUnsafe) + + // https://bucket-name.s3.region-code.amazonaws.com/key-name + private def isVirtualHostedStyle(uri: URI): Boolean = + uri.getHost.endsWith(".s3.amazonaws.com") + + // https://s3.region-code.amazonaws.com/bucket-name/key-name + private def isPathStyle(uri: URI): Boolean = + uri.getHost.matches("s3(.[\\w\\-_]+)?.amazonaws.com") || + (!uri.getHost.contains("amazonaws.com") && uri.getHost.contains(".")) + + // S3://bucket-name/key-name + private def isShortStyle(uri: URI): Boolean = + !uri.getHost.contains(".") + + def objectKeyFromUri(uri: URI): Box[String] = + if (isVirtualHostedStyle(uri)) { + Full(uri.getPath) + } else if (isPathStyle(uri)) { + Full(uri.getPath.substring(1).split("/").tail.mkString("/")) + } else if (isShortStyle(uri)) { + Full(uri.getPath.tail) + } else Failure(s"Not a valid s3 uri: $uri") + + def isNonAmazonHost(uri: URI): Boolean = + (isPathStyle(uri) && !uri.getHost.endsWith(".amazonaws.com")) || uri.getHost == "localhost" + +} diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/UPath.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/UPath.scala index 3d8f9ee2c63..f852a2ee9ea 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/UPath.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/helpers/UPath.scala @@ -179,7 +179,7 @@ private case class RemoteUPath(scheme: String, segments: Seq[String]) extends UP override def basename: String = segments.findLast(_.nonEmpty).getOrElse("") - override def parent: UPath = + override def parent: RemoteUPath = // < 2 check to avoid deleting “authority” (hostname:port) if (segments.length < 2) this else RemoteUPath(scheme, segments.dropRight(1)) @@ -196,8 +196,11 @@ private case class RemoteUPath(scheme: String, segments: Seq[String]) extends UP override def toAbsolute: UPath = this def startsWith(other: UPath): Boolean = other match { - case otherRemote: RemoteUPath => - this.normalize.toString.startsWith(otherRemote.normalize.toString) + case otherRemote: RemoteUPath => { + val thisNormalized = this.normalize + val otherNormalized = otherRemote.normalize + thisNormalized.scheme == otherNormalized.scheme && thisNormalized.segments.startsWith(otherNormalized.segments) + } case _ => false } diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/rpc/RPCRequest.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/rpc/RPCRequest.scala index 7fca9631dcd..4b3ff59c383 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/rpc/RPCRequest.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/rpc/RPCRequest.scala @@ -235,6 +235,12 @@ class RPCRequest(val id: Int, val url: String, wsClient: WSClient)(implicit ec: performRequest } + def deleteJson[T: Writes](body: T): Fox[WSResponse] = { + request = + request.addHttpHeaders(HeaderNames.CONTENT_TYPE -> jsonMimeType).withBody(Json.toJson(body)).withMethod("DELETE") + performRequest + } + def delete(): Fox[WSResponse] = { request = request.withMethod("DELETE") performRequest diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DSRemoteWebknossosClient.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DSRemoteWebknossosClient.scala index e5aa55d92fc..b30caef7ac8 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DSRemoteWebknossosClient.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DSRemoteWebknossosClient.scala @@ -10,7 +10,7 @@ import com.scalableminds.util.objectid.ObjectId import com.scalableminds.util.tools.{Fox, FoxImplicits} import com.scalableminds.webknossos.datastore.DataStoreConfig import com.scalableminds.webknossos.datastore.controllers.JobExportProperties -import com.scalableminds.webknossos.datastore.helpers.{IntervalScheduler, LayerMagLinkInfo, UPath} +import com.scalableminds.webknossos.datastore.helpers.{IntervalScheduler, UPath} import com.scalableminds.webknossos.datastore.models.UnfinishedUpload import com.scalableminds.webknossos.datastore.models.annotation.AnnotationSource import com.scalableminds.webknossos.datastore.models.datasource.{DataSource, DataSourceId} @@ -114,11 +114,6 @@ class DSRemoteWebknossosClient @Inject()( .silent .putJson(dataSourcePaths) - def fetchPaths(datasetId: ObjectId): Fox[List[LayerMagLinkInfo]] = - rpc(s"$webknossosUri/api/datastores/$dataStoreName/datasources/$datasetId/paths") - .addQueryParam("key", dataStoreKey) - .getWithJsonResponse[List[LayerMagLinkInfo]] - def reserveDataSourceUpload(info: ReserveUploadInformation)( implicit tc: TokenContext): Fox[ReserveAdditionalInformation] = for { diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DataSourceService.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DataSourceService.scala index a2786bb24a2..2d057f8410d 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DataSourceService.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DataSourceService.scala @@ -24,6 +24,7 @@ import scala.concurrent.duration._ class DataSourceService @Inject()( config: DataStoreConfig, + managedS3Service: ManagedS3Service, dataVaultService: DataVaultService, val remoteWebknossosClient: DSRemoteWebknossosClient, val lifecycle: ApplicationLifecycle, @@ -296,4 +297,15 @@ class DataSourceService @Inject()( } yield dataLayer.mags.length } yield removedEntriesList.sum + def deletePathsFromDiskOrManagedS3(paths: Seq[UPath]): Fox[Unit] = { + val localPaths = paths.filter(_.isLocal).flatMap(_.toLocalPath) + val managedS3Paths = paths.filter(managedS3Service.pathIsInManagedS3) + for { + _ <- Fox.serialCombined(localPaths)(PathUtils.deleteDirectoryRecursively(_).toFox) + _ <- managedS3Service.deletePaths(managedS3Paths) + } yield () + } + + def existsOnDisk(dataSourceId: DataSourceId): Boolean = + Files.exists(dataBaseDir.resolve(dataSourceId.organizationId).resolve(dataSourceId.directoryName)) } diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DataSourceToDiskWriter.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DataSourceToDiskWriter.scala index 51ab729324d..7d7b775e1b0 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DataSourceToDiskWriter.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DataSourceToDiskWriter.scala @@ -12,7 +12,7 @@ import java.nio.file.{Files, Path} import scala.concurrent.ExecutionContext import scala.io.Source -trait DataSourceToDiskWriter extends PathUtils with DataSourceValidation with FoxImplicits { +trait DataSourceToDiskWriter extends DataSourceValidation with FoxImplicits { private val propertiesFileName = Path.of(UsableDataSource.FILENAME_DATASOURCE_PROPERTIES_JSON) private val logFileName = Path.of("datasource-properties-backups.log") @@ -27,7 +27,7 @@ trait DataSourceToDiskWriter extends PathUtils with DataSourceValidation with Fo for { _ <- Fox.runIf(validate)(assertValidDataSource(dataSource).toFox) propertiesFile = dataSourcePath.resolve(propertiesFileName) - _ <- Fox.runIf(!expectExisting)(ensureDirectoryBox(dataSourcePath).toFox) + _ <- Fox.runIf(!expectExisting)(PathUtils.ensureDirectoryBox(dataSourcePath).toFox) _ <- Fox.runIf(!expectExisting)(Fox.fromBool(!Files.exists(propertiesFile))) ?~> "dataSource.alreadyPresent" _ <- Fox.runIf(expectExisting)(backupPreviousProperties(dataSourcePath).toFox) ?~> "Could not update datasource-properties.json" dataSourceWithRelativizedPaths = relativizePathsOfDataSource(dataSourcePath, dataSource) diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/ManagedS3Service.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/ManagedS3Service.scala new file mode 100644 index 00000000000..21e97f2e628 --- /dev/null +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/ManagedS3Service.scala @@ -0,0 +1,157 @@ +package com.scalableminds.webknossos.datastore.services + +import com.scalableminds.util.tools.{Box, Fox, FoxImplicits} +import com.scalableminds.util.tools.Box.tryo +import com.scalableminds.webknossos.datastore.DataStoreConfig +import com.scalableminds.webknossos.datastore.helpers.{PathSchemes, S3UriUtils, UPath} +import com.scalableminds.webknossos.datastore.storage.{CredentialConfigReader, S3AccessKeyCredential} +import com.typesafe.scalalogging.LazyLogging +import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} +import software.amazon.awssdk.core.checksums.RequestChecksumCalculation +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.s3.S3AsyncClient +import software.amazon.awssdk.services.s3.model.{ + Delete, + DeleteObjectsRequest, + DeleteObjectsResponse, + ListObjectsV2Request, + ObjectIdentifier +} +import software.amazon.awssdk.transfer.s3.S3TransferManager + +import java.net.URI +import javax.inject.Inject +import scala.concurrent.ExecutionContext +import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters._ + +class ManagedS3Service @Inject()(dataStoreConfig: DataStoreConfig) extends FoxImplicits with LazyLogging { + + private lazy val s3UploadCredentialsOpt: Option[(String, String)] = + dataStoreConfig.Datastore.DataVaults.credentials.flatMap { credentialConfig => + new CredentialConfigReader(credentialConfig).getCredential + }.collectFirst { + case S3AccessKeyCredential(credentialName, accessKeyId, secretAccessKey, _, _) + if dataStoreConfig.Datastore.S3Upload.credentialName == credentialName => + (accessKeyId, secretAccessKey) + } + + lazy val s3UploadBucketOpt: Option[String] = + // by convention, the credentialName is the S3 URI so we can extract the bucket from it. + S3UriUtils.hostBucketFromUri(new URI(dataStoreConfig.Datastore.S3Upload.credentialName)) + + private lazy val s3UploadEndpoint: URI = { + // by convention, the credentialName is the S3 URI so we can extract the bucket from it. + val credentialUri = new URI(dataStoreConfig.Datastore.S3Upload.credentialName) + new URI( + "https", + null, + credentialUri.getHost, + -1, + null, + null, + null + ) + } + + private lazy val s3ClientBox: Box[S3AsyncClient] = for { + accessKeyId <- Box(s3UploadCredentialsOpt.map(_._1)) + secretAccessKey <- Box(s3UploadCredentialsOpt.map(_._2)) + client <- tryo( + S3AsyncClient + .builder() + .credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.builder.accessKeyId(accessKeyId).secretAccessKey(secretAccessKey).build() + )) + .crossRegionAccessEnabled(true) + .forcePathStyle(true) + .endpointOverride(s3UploadEndpoint) + .region(Region.US_EAST_1) + // Disabling checksum calculation prevents files being stored with Content Encoding "aws-chunked". + .requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED) + .build()) + } yield client + + lazy val transferManagerBox: Box[S3TransferManager] = for { + client <- s3ClientBox + } yield S3TransferManager.builder().s3Client(client).build() + + def deletePaths(paths: Seq[UPath])(implicit ec: ExecutionContext): Fox[Unit] = { + val pathsByBucket: Map[Option[String], Seq[UPath]] = paths.groupBy(S3UriUtils.hostBucketFromUpath) + for { + _ <- Fox.serialCombined(pathsByBucket.keys) { bucket: Option[String] => + deleteS3PathsOnBucket(bucket, pathsByBucket(bucket)) + } + } yield () + } + + private def deleteS3PathsOnBucket(bucketOpt: Option[String], paths: Seq[UPath])( + implicit ec: ExecutionContext): Fox[Unit] = + for { + bucket <- bucketOpt.toFox ?~> "Could not determine S3 bucket from UPath" + s3Client <- s3ClientBox.toFox ?~> "No managed s3 client configured" + prefixes <- Fox.combined(paths.map(path => S3UriUtils.objectKeyFromUri(path.toRemoteUriUnsafe).toFox)) + keys: Seq[String] <- Fox.serialCombined(prefixes)(listKeysAtPrefix(s3Client, bucket, _)).map(_.flatten) + uniqueKeys = keys.distinct + _ = logger.info(s"Deleting ${uniqueKeys.length} objects from managed S3 bucket $bucket...") + _ <- Fox.serialCombined(uniqueKeys.grouped(1000).toSeq)(deleteBatch(s3Client, bucket, _)).map(_ => ()) + _ = logger.info(s"Successfully deleted ${uniqueKeys.length} objects from managed S3 bucket $bucket.") + } yield () + + private def deleteBatch(s3Client: S3AsyncClient, bucket: String, keys: Seq[String])( + implicit ec: ExecutionContext): Fox[DeleteObjectsResponse] = + if (keys.isEmpty) Fox.empty + else { + Fox.fromFuture( + s3Client + .deleteObjects( + DeleteObjectsRequest + .builder() + .bucket(bucket) + .delete( + Delete + .builder() + .objects( + keys.map(k => ObjectIdentifier.builder().key(k).build()).asJava + ) + .build() + ) + .build() + ) + .asScala) + } + + private def listKeysAtPrefix(s3Client: S3AsyncClient, bucket: String, prefix: String)( + implicit ec: ExecutionContext): Fox[Seq[String]] = { + def listRecursive(continuationToken: Option[String], acc: Seq[String]): Fox[Seq[String]] = { + val builder = ListObjectsV2Request.builder().bucket(bucket).prefix(prefix).maxKeys(1000) + continuationToken.foreach(builder.continuationToken) + val request = builder.build() + for { + response <- Fox.fromFuture(s3Client.listObjectsV2(request).asScala) + keys = response.contents().asScala.map(_.key()) + allKeys = acc ++ keys + result <- if (response.isTruncated) { + listRecursive(Option(response.nextContinuationToken()), allKeys) + } else { + Fox.successful(allKeys) + } + } yield result + } + + listRecursive(None, Seq()) + } + + private lazy val globalCredentials = { + val res = dataStoreConfig.Datastore.DataVaults.credentials.flatMap { credentialConfig => + new CredentialConfigReader(credentialConfig).getCredential + } + logger.info(s"Parsed ${res.length} global data vault credentials from datastore config.") + res + } + + def pathIsInManagedS3(path: UPath): Boolean = + path.getScheme.contains(PathSchemes.schemeS3) && globalCredentials.exists(c => + UPath.fromString(c.name).map(path.startsWith).getOrElse(false)) + +} diff --git a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/uploading/UploadService.scala b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/uploading/UploadService.scala index 430182aab2d..22c660c7c84 100644 --- a/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/uploading/UploadService.scala +++ b/webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/uploading/UploadService.scala @@ -15,28 +15,17 @@ import com.scalableminds.webknossos.datastore.datareaders.precomputed.Precompute import com.scalableminds.webknossos.datastore.datareaders.zarr.NgffMetadata.FILENAME_DOT_ZATTRS import com.scalableminds.webknossos.datastore.datareaders.zarr.ZarrHeader.FILENAME_DOT_ZARRAY import com.scalableminds.webknossos.datastore.datareaders.zarr3.Zarr3ArrayHeader.FILENAME_ZARR_JSON -import com.scalableminds.webknossos.datastore.datavault.S3DataVault import com.scalableminds.webknossos.datastore.explore.ExploreLocalLayerService import com.scalableminds.webknossos.datastore.helpers.{DatasetDeleter, DirectoryConstants, UPath} import com.scalableminds.webknossos.datastore.models.UnfinishedUpload import com.scalableminds.webknossos.datastore.models.datasource.UsableDataSource.FILENAME_DATASOURCE_PROPERTIES_JSON import com.scalableminds.webknossos.datastore.models.datasource._ -import com.scalableminds.webknossos.datastore.services.{DSRemoteWebknossosClient, DataSourceService} +import com.scalableminds.webknossos.datastore.services.{DSRemoteWebknossosClient, DataSourceService, ManagedS3Service} +import com.scalableminds.webknossos.datastore.storage.{DataStoreRedisStore, DataVaultService} import com.scalableminds.webknossos.datastore.slacknotification.DSSlackNotificationService -import com.scalableminds.webknossos.datastore.storage.{ - CredentialConfigReader, - DataStoreRedisStore, - DataVaultService, - S3AccessKeyCredential -} import com.typesafe.scalalogging.LazyLogging import org.apache.commons.io.FileUtils import play.api.libs.json.{Json, OFormat, Reads} -import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} -import software.amazon.awssdk.core.checksums.RequestChecksumCalculation -import software.amazon.awssdk.regions.Region -import software.amazon.awssdk.services.s3.S3AsyncClient -import software.amazon.awssdk.transfer.s3.S3TransferManager import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest import java.io.{File, RandomAccessFile} @@ -108,6 +97,7 @@ class UploadService @Inject()(dataSourceService: DataSourceService, dataVaultService: DataVaultService, exploreLocalLayerService: ExploreLocalLayerService, dataStoreConfig: DataStoreConfig, + managedS3Service: ManagedS3Service, slackNotificationService: DSSlackNotificationService, val remoteWebknossosClient: DSRemoteWebknossosClient)(implicit ec: ExecutionContext) extends DatasetDeleter @@ -369,7 +359,7 @@ class UploadService @Inject()(dataSourceService: DataSourceService, // _ <- assertWithinRequestedFileSizeAndCleanUpOtherwise(uploadDir, uploadId) _ <- checkAllChunksUploaded(uploadId) unpackToDir = unpackToDirFor(dataSourceId) - _ <- ensureDirectoryBox(unpackToDir.getParent).toFox ?~> "dataset.import.fileAccessDenied" + _ <- PathUtils.ensureDirectoryBox(unpackToDir.getParent).toFox ?~> "dataset.import.fileAccessDenied" unpackResult <- unpackDataset(uploadDir, unpackToDir, datasetId).shiftBox reservedTotalFileSizeInBytesOpt <- runningUploadMetadataStore.findLong(redisKeyForTotalFileSizeInBytes(uploadId)) actualUploadedFileSizeInBytesOpt <- runningUploadMetadataStore.findLong( @@ -446,7 +436,7 @@ class UploadService @Inject()(dataSourceService: DataSourceService, _ <- deleteFilesNotReferencedInDataSource(unpackedDir, usableDataSourceFromDir) newBasePath <- if (dataStoreConfig.Datastore.S3Upload.enabled) { for { - s3UploadBucket <- s3UploadBucketOpt.toFox + s3UploadBucket <- managedS3Service.s3UploadBucketOpt.toFox _ = logger.info(s"finishUpload for $datasetId: Copying data to s3 bucket $s3UploadBucket...") beforeS3Upload = Instant.now s3ObjectKey = s"${dataStoreConfig.Datastore.S3Upload.objectKeyPrefix}/${dataSourceId.organizationId}/${dataSourceId.directoryName}/" @@ -541,56 +531,13 @@ class UploadService @Inject()(dataSourceService: DataSourceService, exploreLocalLayerService.writeLocalDatasourceProperties(dataSource, path)) } yield path - private lazy val s3UploadCredentialsOpt: Option[(String, String)] = - dataStoreConfig.Datastore.DataVaults.credentials.flatMap { credentialConfig => - new CredentialConfigReader(credentialConfig).getCredential - }.collectFirst { - case S3AccessKeyCredential(credentialName, accessKeyId, secretAccessKey, _, _) - if dataStoreConfig.Datastore.S3Upload.credentialName == credentialName => - (accessKeyId, secretAccessKey) - } - - private lazy val s3UploadBucketOpt: Option[String] = - S3DataVault.hostBucketFromUri(new URI(dataStoreConfig.Datastore.S3Upload.credentialName)) - - private lazy val s3UploadEndpoint: URI = { - val credentialUri = new URI(dataStoreConfig.Datastore.S3Upload.credentialName) - new URI( - "https", - null, - credentialUri.getHost, - -1, - null, - null, - null - ) - } - - private lazy val getS3TransferManager: Box[S3TransferManager] = for { - accessKeyId <- Box(s3UploadCredentialsOpt.map(_._1)) - secretAccessKey <- Box(s3UploadCredentialsOpt.map(_._2)) - client <- tryo( - S3AsyncClient - .builder() - .credentialsProvider(StaticCredentialsProvider.create( - AwsBasicCredentials.builder.accessKeyId(accessKeyId).secretAccessKey(secretAccessKey).build() - )) - .crossRegionAccessEnabled(true) - .forcePathStyle(true) - .endpointOverride(s3UploadEndpoint) - .region(Region.US_EAST_1) - // Disabling checksum calculation prevents files being stored with Content Encoding "aws-chunked". - .requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED) - .build()) - } yield S3TransferManager.builder().s3Client(client).build() - private def uploadDirectoryToS3( dataDir: Path, bucketName: String, prefix: String ): Fox[Unit] = for { - transferManager <- getS3TransferManager.toFox ?~> "S3 upload is not properly configured, cannot get S3 client" + transferManager <- managedS3Service.transferManagerBox.toFox ?~> "S3 upload is not properly configured, cannot get S3 client" directoryUpload = transferManager.uploadDirectory( UploadDirectoryRequest.builder().bucket(bucketName).s3Prefix(prefix).source(dataDir).build() ) @@ -631,17 +578,17 @@ class UploadService @Inject()(dataSourceService: DataSourceService, case Full(_) => Fox.successful(()) case Empty => - deleteOnDisk(dataSourceId.organizationId, + deleteOnDisk(datasetId, + dataSourceId.organizationId, dataSourceId.directoryName, - None, needsConversion, Some("the upload failed")) Fox.failure(s"Unknown error $label") case Failure(msg, e, _) => logger.warn(s"Error while $label: $msg, $e") - deleteOnDisk(dataSourceId.organizationId, + deleteOnDisk(datasetId, + dataSourceId.organizationId, dataSourceId.directoryName, - None, needsConversion, Some("the upload failed")) remoteWebknossosClient.deleteDataset(datasetId) diff --git a/webknossos-datastore/conf/datastore.latest.routes b/webknossos-datastore/conf/datastore.latest.routes index 605e9f5e3e7..74b1a25d7c9 100644 --- a/webknossos-datastore/conf/datastore.latest.routes +++ b/webknossos-datastore/conf/datastore.latest.routes @@ -104,15 +104,16 @@ POST /datasets/:datasetId/layers/:dataLayerName/segmentStatistics/surfa GET /datasets @com.scalableminds.webknossos.datastore.controllers.DataSourceController.testChunk(resumableChunkNumber: Int, resumableIdentifier: String) POST /datasets @com.scalableminds.webknossos.datastore.controllers.DataSourceController.uploadChunk() GET /datasets/getUnfinishedUploads @com.scalableminds.webknossos.datastore.controllers.DataSourceController.getUnfinishedUploads(organizationName: String) +GET /datasets/baseDirAbsolute @com.scalableminds.webknossos.datastore.controllers.DataSourceController.baseDirAbsolute POST /datasets/reserveUpload @com.scalableminds.webknossos.datastore.controllers.DataSourceController.reserveUpload() POST /datasets/finishUpload @com.scalableminds.webknossos.datastore.controllers.DataSourceController.finishUpload() POST /datasets/cancelUpload @com.scalableminds.webknossos.datastore.controllers.DataSourceController.cancelUpload() POST /datasets/measureUsedStorage/:organizationId @com.scalableminds.webknossos.datastore.controllers.DataSourceController.measureUsedStorage(organizationId: String) PUT /datasets/:datasetId @com.scalableminds.webknossos.datastore.controllers.DataSourceController.updateOnDisk(datasetId: ObjectId) DELETE /datasets/:datasetId/deleteOnDisk @com.scalableminds.webknossos.datastore.controllers.DataSourceController.deleteOnDisk(datasetId: ObjectId) +DELETE /datasets/deletePaths @com.scalableminds.webknossos.datastore.controllers.DataSourceController.deletePaths() POST /datasets/exploreRemote @com.scalableminds.webknossos.datastore.controllers.DataSourceController.exploreRemoteDataset() POST /datasets/validatePaths @com.scalableminds.webknossos.datastore.controllers.DataSourceController.validatePaths() - DELETE /datasets/:datasetId @com.scalableminds.webknossos.datastore.controllers.DataSourceController.invalidateCache(datasetId: ObjectId) # Actions