Skip to content

Commit a35bbba

Browse files
frcrothnormanrzfm3MichaelBuessemeyer
authored
Upload datasets to S3 (#8912)
### Description - If configured, uploaded datasets are forwarded to S3 after unzipping - layersToLink now reference the source dataset by datasetId (this introduces api version 11) - All newly uploaded datasets are virtual (meaning that the on-disk datasource-properties.json is removed and the database entry is the source of truth) - Only exception: datasets with needsConversion=true. Will be cleaned up in #8959 - layersToLink no longer creates symlinks but rather just entries in the DB, just like it works for uploadToPaths - Raw uploaded data is backed up inside of the .trash folder for debugging purposes. It gets cleared regularly anyway. ### Steps to test: - Configure S3 uploads using appropriate credentials in the application.conf - Upload a dataset - View that dataset - Also test that local filesystem case did not break - Test uploading zipped dataset and non-zipped dataset - Test tif upload with `yarn enable-jobs` and a worker - test upload from libs (ideally with layersToLink) ### TODOs: - [x] Handle uploading larger files with multipart upload - [x] Fix uploaded files having "aws-chunked" encoding - [x] Push updated application.conf without credentials - [x] Test with more datasets - [x] Do not upload files that are not referenced in layers (e.g. datasource properties.json files) - [x] use UPath for everything - [x] is there still a case where uploaded datasets should not be virtual? → no! - [x] Make all uploaded datasets virtual - [x] unify layers to link mechanism - [x] api versioning (layersToLink adapter) - [x] re-test with s3 - [x] delete local stuff or move to .trash? - [x] make sure both old and new convert_to_wkw variants are supported ### Issues: - fixes #8415 - fixes #8934 - fixes #8893 - fixes #8749 - Follow-up issue #8979 ------ - [x] Added changelog entry (create a `$PR_NUMBER.md` file in `unreleased_changes` or use `./tools/create-changelog-entry.py`) - [x] Added migration guide entry if applicable (edit the same file as for the changelog) - [x] Removed dev-only changes - [x] Considered [common edge cases](../blob/master/.github/common_edge_cases.md) - [x] Needs datastore update after deployment --------- Co-authored-by: Norman Rzepka <[email protected]> Co-authored-by: Florian M <[email protected]> Co-authored-by: Florian M <[email protected]> Co-authored-by: MichaelBuessemeyer <[email protected]>
1 parent bc63f8d commit a35bbba

29 files changed

+871
-576
lines changed

app/controllers/DatasetController.scala

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import com.scalableminds.webknossos.datastore.models.datasource.{
1515
LayerAttachmentType,
1616
UsableDataSource
1717
}
18+
import com.scalableminds.webknossos.datastore.services.uploading.LinkedLayerIdentifier
1819
import mail.{MailchimpClient, MailchimpTag}
1920
import models.analytics.{AnalyticsService, ChangeDatasetSettingsEvent, OpenDatasetEvent}
2021
import models.dataset._
@@ -25,6 +26,7 @@ import models.dataset.explore.{
2526
}
2627
import models.folder.FolderService
2728
import models.organization.OrganizationDAO
29+
import models.storage.UsedStorageService
2830
import models.team.{TeamDAO, TeamService}
2931
import models.user.{User, UserDAO, UserService}
3032
import play.api.i18n.{Messages, MessagesProvider}
@@ -54,12 +56,6 @@ object DatasetUpdateParameters extends TristateOptionJsonHelper {
5456
Json.configured(tristateOptionParsing).format[DatasetUpdateParameters]
5557
}
5658

57-
case class LinkedLayerIdentifier(datasetId: ObjectId, layerName: String, newLayerName: Option[String] = None)
58-
59-
object LinkedLayerIdentifier {
60-
implicit val jsonFormat: OFormat[LinkedLayerIdentifier] = Json.format[LinkedLayerIdentifier]
61-
}
62-
6359
case class ReserveDatasetUploadToPathsRequest(
6460
datasetName: String,
6561
layersToLink: Seq[LinkedLayerIdentifier],
@@ -121,7 +117,7 @@ object SegmentAnythingMaskParameters {
121117
implicit val jsonFormat: Format[SegmentAnythingMaskParameters] = Json.format[SegmentAnythingMaskParameters]
122118
}
123119

124-
case class DataSourceRegistrationInfo(dataSource: UsableDataSource, folderId: Option[String], dataStoreName: String)
120+
case class DataSourceRegistrationInfo(dataSource: UsableDataSource, folderId: Option[ObjectId], dataStoreName: String)
125121

126122
object DataSourceRegistrationInfo {
127123
implicit val jsonFormat: OFormat[DataSourceRegistrationInfo] = Json.format[DataSourceRegistrationInfo]
@@ -142,6 +138,7 @@ class DatasetController @Inject()(userService: UserService,
142138
folderService: FolderService,
143139
thumbnailService: ThumbnailService,
144140
thumbnailCachingService: ThumbnailCachingService,
141+
usedStorageService: UsedStorageService,
145142
conf: WkConf,
146143
authenticationService: AccessibleBySwitchingService,
147144
analyticsService: AnalyticsService,
@@ -208,12 +205,13 @@ class DatasetController @Inject()(userService: UserService,
208205
folderService.getOrCreateFromPathLiteral(folderPath, request.identity._organization)) ?~> "dataset.explore.autoAdd.getFolder.failed"
209206
_ <- datasetService.assertValidDatasetName(request.body.datasetName)
210207
_ <- Fox.serialCombined(dataSource.dataLayers)(layer => datasetService.assertValidLayerNameLax(layer.name))
211-
newDataset <- datasetService.createVirtualDataset(
208+
newDataset <- datasetService.createAndSetUpDataset(
212209
request.body.datasetName,
213210
dataStore,
214211
dataSource,
215-
folderIdOpt.map(_.toString),
216-
request.identity
212+
folderIdOpt,
213+
request.identity,
214+
isVirtual = true
217215
) ?~> "dataset.explore.autoAdd.failed"
218216
} yield Ok(Json.toJson(newDataset._id))
219217
}
@@ -229,13 +227,19 @@ class DatasetController @Inject()(userService: UserService,
229227
_ <- Fox.fromBool(isTeamManagerOrAdmin || user.isDatasetManager) ~> FORBIDDEN
230228
_ <- Fox.fromBool(request.body.dataSource.dataLayers.nonEmpty) ?~> "dataset.explore.zeroLayers"
231229
_ <- datasetService.validatePaths(request.body.dataSource.allExplicitPaths, dataStore) ?~> "dataSource.add.pathsNotAllowed"
232-
dataset <- datasetService.createVirtualDataset(
230+
dataset <- datasetService.createAndSetUpDataset(
233231
name,
234232
dataStore,
235233
request.body.dataSource,
236234
request.body.folderId,
237-
user
235+
user,
236+
isVirtual = true
238237
)
238+
_ = datasetService.trackNewDataset(dataset,
239+
user,
240+
needsConversion = false,
241+
datasetSizeBytes = 0,
242+
viaAddRoute = false)
239243
} yield Ok(Json.obj("newDatasetId" -> dataset._id))
240244
}
241245

@@ -654,6 +658,12 @@ class DatasetController @Inject()(userService: UserService,
654658
dataset.status == DataSourceStatus.notYetUploadedToPaths || dataset.status == DataSourceStatus.notYetUploaded) ?~> s"Dataset is not in uploading-to-paths status, got ${dataset.status}."
655659
_ <- Fox.fromBool(!dataset.isUsable) ?~> s"Dataset is already marked as usable."
656660
_ <- datasetDAO.updateDatasetStatusByDatasetId(datasetId, newStatus = "", isUsable = true)
661+
_ <- usedStorageService.refreshStorageReportForDataset(dataset)
662+
_ = datasetService.trackNewDataset(dataset,
663+
request.identity,
664+
needsConversion = false,
665+
datasetSizeBytes = 0,
666+
viaAddRoute = false)
657667
} yield Ok
658668
}
659669

app/controllers/WKRemoteDataStoreController.scala

Lines changed: 50 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,37 @@
11
package controllers
22

3-
import com.scalableminds.util.accesscontext.{AuthorizedAccessContext, GlobalAccessContext}
3+
import com.scalableminds.util.accesscontext.{AuthorizedAccessContext, DBAccessContext, GlobalAccessContext}
44
import com.scalableminds.util.objectid.ObjectId
55
import com.scalableminds.util.time.Instant
66
import com.scalableminds.util.tools.{Fox, Full}
77
import com.scalableminds.webknossos.datastore.controllers.JobExportProperties
88
import com.scalableminds.webknossos.datastore.helpers.{LayerMagLinkInfo, MagLinkInfo}
99
import com.scalableminds.webknossos.datastore.models.UnfinishedUpload
10-
import com.scalableminds.webknossos.datastore.models.datasource.{DataSourceId, DataSource}
10+
import com.scalableminds.webknossos.datastore.models.datasource.{
11+
DataSource,
12+
DataSourceId,
13+
DataSourceStatus,
14+
UnusableDataSource
15+
}
1116
import com.scalableminds.webknossos.datastore.services.{DataSourcePathInfo, DataStoreStatus}
1217
import com.scalableminds.webknossos.datastore.services.uploading.{
13-
LegacyLinkedLayerIdentifier,
18+
ReportDatasetUploadParameters,
1419
ReserveAdditionalInformation,
1520
ReserveUploadInformation
1621
}
1722
import com.typesafe.scalalogging.LazyLogging
18-
import mail.{MailchimpClient, MailchimpTag}
19-
import models.analytics.{AnalyticsService, UploadDatasetEvent}
2023
import models.annotation.AnnotationDAO
2124
import models.dataset._
2225
import models.dataset.credential.CredentialDAO
23-
import models.folder.FolderDAO
2426
import models.job.JobDAO
2527
import models.organization.OrganizationDAO
2628
import models.storage.UsedStorageService
2729
import models.team.TeamDAO
28-
import models.user.{MultiUserDAO, User, UserDAO, UserService}
29-
import play.api.i18n.{Messages, MessagesProvider}
30+
import models.user.UserDAO
31+
import play.api.i18n.Messages
3032
import play.api.libs.json.Json
3133
import play.api.mvc.{Action, AnyContent, PlayBodyParsers}
3234
import security.{WebknossosBearerTokenAuthenticatorService, WkSilhouetteEnvironment}
33-
import telemetry.SlackNotificationService
34-
import utils.WkConf
3535

3636
import scala.concurrent.duration.DurationInt
3737
import javax.inject.Inject
@@ -41,22 +41,16 @@ class WKRemoteDataStoreController @Inject()(
4141
datasetService: DatasetService,
4242
dataStoreService: DataStoreService,
4343
dataStoreDAO: DataStoreDAO,
44-
analyticsService: AnalyticsService,
45-
userService: UserService,
4644
organizationDAO: OrganizationDAO,
4745
usedStorageService: UsedStorageService,
46+
layerToLinkService: LayerToLinkService,
4847
datasetDAO: DatasetDAO,
4948
datasetLayerDAO: DatasetLayerDAO,
5049
userDAO: UserDAO,
51-
folderDAO: FolderDAO,
5250
teamDAO: TeamDAO,
5351
jobDAO: JobDAO,
54-
multiUserDAO: MultiUserDAO,
5552
credentialDAO: CredentialDAO,
5653
annotationDAO: AnnotationDAO,
57-
mailchimpClient: MailchimpClient,
58-
slackNotificationService: SlackNotificationService,
59-
conf: WkConf,
6054
wkSilhouetteEnvironment: WkSilhouetteEnvironment)(implicit ec: ExecutionContext, bodyParsers: PlayBodyParsers)
6155
extends Controller
6256
with LazyLogging {
@@ -79,26 +73,22 @@ class WKRemoteDataStoreController @Inject()(
7973
_ <- Fox.fromBool(organization._id == user._organization) ?~> "notAllowed" ~> FORBIDDEN
8074
_ <- datasetService.assertValidDatasetName(uploadInfo.name)
8175
_ <- Fox.fromBool(dataStore.onlyAllowedOrganization.forall(_ == organization._id)) ?~> "dataset.upload.Datastore.restricted"
82-
folderId = uploadInfo.folderId.getOrElse(organization._rootFolder)
83-
_ <- folderDAO.assertUpdateAccess(folderId)(AuthorizedAccessContext(user)) ?~> "folder.noWriteAccess"
84-
layersToLinkWithDirectoryName <- Fox.serialCombined(uploadInfo.layersToLink.getOrElse(List.empty))(l =>
85-
validateLayerToLink(l, user)) ?~> "dataset.upload.invalidLinkedLayers"
86-
newDatasetId = ObjectId.generate
76+
_ <- Fox.serialCombined(uploadInfo.layersToLink.getOrElse(List.empty))(l =>
77+
layerToLinkService.validateLayerToLink(l, user)) ?~> "dataset.upload.invalidLinkedLayers"
8778
_ <- Fox.runIf(request.body.requireUniqueName.getOrElse(false))(
8879
datasetService.assertNewDatasetNameUnique(request.body.name, organization._id))
89-
dataset <- datasetService.createPreliminaryDataset(newDatasetId,
90-
uploadInfo.name,
91-
datasetService.generateDirectoryName(uploadInfo.name,
92-
newDatasetId),
93-
uploadInfo.organization,
94-
dataStore) ?~> "dataset.upload.creation.failed"
95-
_ <- datasetDAO.updateFolder(dataset._id, folderId)(GlobalAccessContext)
80+
preliminaryDataSource = UnusableDataSource(DataSourceId("", ""), None, DataSourceStatus.notYetUploaded)
81+
dataset <- datasetService.createAndSetUpDataset(
82+
uploadInfo.name,
83+
dataStore,
84+
preliminaryDataSource,
85+
uploadInfo.folderId,
86+
user,
87+
// For the moment, the convert_to_wkw job can only fill the dataset if it is not virtual.
88+
isVirtual = !uploadInfo.needsConversion.getOrElse(false)
89+
) ?~> "dataset.upload.creation.failed"
9690
_ <- datasetService.addInitialTeams(dataset, uploadInfo.initialTeams, user)(AuthorizedAccessContext(user))
97-
_ <- datasetService.addUploader(dataset, user._id)(AuthorizedAccessContext(user))
98-
additionalInfo = ReserveAdditionalInformation(dataset._id,
99-
dataset.directoryName,
100-
if (layersToLinkWithDirectoryName.isEmpty) None
101-
else Some(layersToLinkWithDirectoryName))
91+
additionalInfo = ReserveAdditionalInformation(dataset._id, dataset.directoryName)
10292
} yield Ok(Json.toJson(additionalInfo))
10393
}
10494
}
@@ -132,62 +122,37 @@ class WKRemoteDataStoreController @Inject()(
132122
}
133123
}
134124

135-
private def validateLayerToLink(layerIdentifier: LegacyLinkedLayerIdentifier, requestingUser: User)(
136-
implicit ec: ExecutionContext,
137-
m: MessagesProvider): Fox[LegacyLinkedLayerIdentifier] =
138-
for {
139-
organization <- organizationDAO.findOne(layerIdentifier.getOrganizationId)(GlobalAccessContext) ?~> Messages(
140-
"organization.notFound",
141-
layerIdentifier.getOrganizationId) ~> NOT_FOUND
142-
datasetBox <- datasetDAO
143-
.findOneByNameAndOrganization(layerIdentifier.dataSetName, organization._id)(
144-
AuthorizedAccessContext(requestingUser))
145-
.shiftBox
146-
dataset <- datasetBox match {
147-
case Full(ds) => Fox.successful(ds)
148-
case _ =>
149-
ObjectId
150-
.fromString(layerIdentifier.dataSetName)
151-
.flatMap(interpretedAsId => datasetDAO.findOne(interpretedAsId)(AuthorizedAccessContext(requestingUser))) ?~> Messages(
152-
"dataset.notFound",
153-
layerIdentifier.dataSetName)
154-
}
155-
isTeamManagerOrAdmin <- userService.isTeamManagerOrAdminOfOrg(requestingUser, dataset._organization)
156-
_ <- Fox.fromBool(isTeamManagerOrAdmin || requestingUser.isDatasetManager || dataset.isPublic) ?~> "dataset.upload.linkRestricted"
157-
} yield layerIdentifier.copy(datasetDirectoryName = Some(dataset.directoryName))
158-
159125
def reportDatasetUpload(name: String,
160126
key: String,
161127
token: String,
162-
datasetDirectoryName: String,
163-
datasetSizeBytes: Long,
164-
needsConversion: Boolean,
165-
viaAddRoute: Boolean): Action[AnyContent] =
166-
Action.async { implicit request =>
167-
dataStoreService.validateAccess(name, key) { dataStore =>
128+
datasetId: ObjectId): Action[ReportDatasetUploadParameters] =
129+
Action.async(validateJson[ReportDatasetUploadParameters]) { implicit request =>
130+
dataStoreService.validateAccess(name, key) { _ =>
168131
for {
169132
user <- bearerTokenService.userForToken(token)
170-
dataset <- datasetDAO.findOneByDirectoryNameAndOrganization(datasetDirectoryName, user._organization)(
171-
GlobalAccessContext) ?~> Messages("dataset.notFound", datasetDirectoryName) ~> NOT_FOUND
172-
_ <- Fox.runIf(!needsConversion && !viaAddRoute)(usedStorageService.refreshStorageReportForDataset(dataset))
173-
_ <- Fox.runIf(!needsConversion)(logUploadToSlack(user, dataset._id, viaAddRoute))
174-
_ = analyticsService.track(UploadDatasetEvent(user, dataset, dataStore, datasetSizeBytes))
175-
_ = if (!needsConversion) mailchimpClient.tagUser(user, MailchimpTag.HasUploadedOwnDataset)
176-
} yield Ok(Json.obj("id" -> dataset._id))
133+
dataset <- datasetDAO.findOne(datasetId)(GlobalAccessContext) ?~> Messages("dataset.notFound", datasetId) ~> NOT_FOUND
134+
_ <- Fox.runIf(!request.body.needsConversion)(usedStorageService.refreshStorageReportForDataset(dataset))
135+
_ = datasetService.trackNewDataset(dataset,
136+
user,
137+
request.body.needsConversion,
138+
request.body.datasetSizeBytes,
139+
viaAddRoute = false)
140+
dataSourceWithLinkedLayersOpt <- Fox.runOptional(request.body.dataSourceOpt) {
141+
implicit val ctx: DBAccessContext = AuthorizedAccessContext(user)
142+
layerToLinkService.addLayersToLinkToDataSource(_, request.body.layersToLink)
143+
}
144+
_ <- Fox.runOptional(dataSourceWithLinkedLayersOpt) { dataSource =>
145+
logger.info(s"Updating dataset $datasetId in database after upload reported from datastore $name.")
146+
datasetDAO.updateDataSource(datasetId,
147+
dataset._dataStore,
148+
dataSource.hashCode(),
149+
dataSource,
150+
isUsable = true)(GlobalAccessContext)
151+
}
152+
} yield Ok
177153
}
178154
}
179155

180-
private def logUploadToSlack(user: User, datasetId: ObjectId, viaAddRoute: Boolean): Fox[Unit] =
181-
for {
182-
organization <- organizationDAO.findOne(user._organization)(GlobalAccessContext)
183-
multiUser <- multiUserDAO.findOne(user._multiUser)(GlobalAccessContext)
184-
resultLink = s"${conf.Http.uri}/datasets/$datasetId"
185-
addLabel = if (viaAddRoute) "(via explore+add)" else "(upload without conversion)"
186-
superUserLabel = if (multiUser.isSuperUser) " (for superuser)" else ""
187-
_ = slackNotificationService.info(s"Dataset added $addLabel$superUserLabel",
188-
s"For organization: ${organization.name}. <$resultLink|Result>")
189-
} yield ()
190-
191156
def statusUpdate(name: String, key: String): Action[DataStoreStatus] = Action.async(validateJson[DataStoreStatus]) {
192157
implicit request =>
193158
dataStoreService.validateAccess(name, key) { _ =>
@@ -242,11 +207,11 @@ class WKRemoteDataStoreController @Inject()(
242207
/**
243208
* Called by the datastore after a dataset has been deleted on disk.
244209
*/
245-
def deleteDataset(name: String, key: String): Action[DataSourceId] = Action.async(validateJson[DataSourceId]) {
210+
def deleteDataset(name: String, key: String): Action[ObjectId] = Action.async(validateJson[ObjectId]) {
246211
implicit request =>
247212
dataStoreService.validateAccess(name, key) { _ =>
248213
for {
249-
existingDatasetBox <- datasetDAO.findOneByDataSourceId(request.body)(GlobalAccessContext).shiftBox
214+
existingDatasetBox <- datasetDAO.findOne(request.body)(GlobalAccessContext).shiftBox
250215
_ <- existingDatasetBox match {
251216
case Full(dataset) =>
252217
for {
@@ -272,7 +237,7 @@ class WKRemoteDataStoreController @Inject()(
272237
"organization.notFound",
273238
organizationId) ~> NOT_FOUND
274239
dataset <- datasetDAO.findOneByNameAndOrganization(datasetDirectoryName, organization._id)(
275-
GlobalAccessContext)
240+
GlobalAccessContext) ?~> Messages("dataset.notFound", datasetDirectoryName)
276241
} yield Ok(Json.toJson(dataset._id))
277242
}
278243
}

app/models/dataset/ComposeService.scala

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,12 @@ class ComposeService @Inject()(datasetDAO: DatasetDAO, dataStoreDAO: DataStoreDA
4747
_ <- Fox.assertTrue(isComposable(composeRequest)) ?~> "Datasets are not composable, they are not on the same data store"
4848
dataSource <- createDatasource(composeRequest, composeRequest.newDatasetName, composeRequest.organizationId)
4949
dataStore <- dataStoreDAO.findOneWithUploadsAllowed
50-
dataset <- datasetService.createVirtualDataset(composeRequest.newDatasetName,
51-
dataStore,
52-
dataSource,
53-
Some(composeRequest.targetFolderId.toString),
54-
user)
50+
dataset <- datasetService.createAndSetUpDataset(composeRequest.newDatasetName,
51+
dataStore,
52+
dataSource,
53+
Some(composeRequest.targetFolderId),
54+
user,
55+
isVirtual = true)
5556

5657
} yield (dataSource, dataset._id)
5758

@@ -67,13 +68,9 @@ class ComposeService @Inject()(datasetDAO: DatasetDAO, dataStoreDAO: DataStoreDA
6768
case Some(c) => Some(c ++ composeLayer.transformations.toList)
6869
case None => Some(composeLayer.transformations.toList)
6970
}
70-
editedLayer: StaticLayer <- layer match {
71-
case l: StaticLayer =>
72-
Fox.successful(
73-
l.mapped(name = composeLayer.newName,
74-
coordinateTransformations = applyCoordinateTransformations(l.coordinateTransformations)))
75-
case _ => Fox.failure("Unsupported layer type for composition: " + layer.getClass.getSimpleName)
76-
}
71+
editedLayer = layer.mapped(name = composeLayer.newName,
72+
coordinateTransformations =
73+
applyCoordinateTransformations(layer.coordinateTransformations))
7774
} yield (editedLayer, usableDataSource.scale)
7875

7976
private def isComposable(composeRequest: ComposeRequest)(implicit ctx: DBAccessContext): Fox[Boolean] =

0 commit comments

Comments
 (0)