Skip to content

Commit 403b061

Browse files
Merge branch 'master' of github.com:scalableminds/webknossos into live-m3
2 parents c157cbb + 7f7a50c commit 403b061

File tree

3 files changed

+59
-53
lines changed

3 files changed

+59
-53
lines changed

webknossos-datastore/app/com/scalableminds/webknossos/datastore/controllers/DataSourceController.scala

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -142,40 +142,43 @@ class DataSourceController @Inject()(
142142
*/
143143
def uploadChunk(): Action[MultipartFormData[Files.TemporaryFile]] =
144144
Action.async(parse.multipartFormData) { implicit request =>
145-
val uploadForm = Form(
146-
tuple(
147-
"resumableChunkNumber" -> number,
148-
"resumableChunkSize" -> number,
149-
"resumableCurrentChunkSize" -> number,
150-
"resumableTotalChunks" -> longNumber,
151-
"resumableIdentifier" -> nonEmptyText
152-
)).fill((-1, -1, -1, -1, ""))
153-
154-
uploadForm
155-
.bindFromRequest(request.body.dataParts)
156-
.fold(
157-
hasErrors = formWithErrors => Fox.successful(JsonBadRequest(formWithErrors.errors.head.message)),
158-
success = {
159-
case (chunkNumber, chunkSize, currentChunkSize, totalChunkCount, uploadFileId) =>
160-
for {
161-
datasetId <- uploadService
162-
.getDatasetIdByUploadId(uploadService.extractDatasetUploadId(uploadFileId)) ?~> "dataset.upload.validation.failed"
163-
result <- accessTokenService.validateAccessFromTokenContext(UserAccessRequest.writeDataset(datasetId)) {
164-
for {
165-
isKnownUpload <- uploadService.isKnownUploadByFileId(uploadFileId)
166-
_ <- Fox.fromBool(isKnownUpload) ?~> "dataset.upload.validation.failed"
167-
chunkFile <- request.body.file("file").toFox ?~> "zip.file.notFound"
168-
_ <- uploadService.handleUploadChunk(uploadFileId,
169-
chunkSize,
170-
currentChunkSize,
171-
totalChunkCount,
172-
chunkNumber,
173-
new File(chunkFile.ref.path.toString))
174-
} yield Ok
175-
}
176-
} yield result
177-
}
178-
)
145+
log(Some(slackNotificationService.noticeFailedUploadRequest)) {
146+
val uploadForm = Form(
147+
tuple(
148+
"resumableChunkNumber" -> number,
149+
"resumableChunkSize" -> number,
150+
"resumableCurrentChunkSize" -> number,
151+
"resumableTotalChunks" -> longNumber,
152+
"resumableIdentifier" -> nonEmptyText
153+
)).fill((-1, -1, -1, -1, ""))
154+
155+
uploadForm
156+
.bindFromRequest(request.body.dataParts)
157+
.fold(
158+
hasErrors = formWithErrors => Fox.successful(JsonBadRequest(formWithErrors.errors.head.message)),
159+
success = {
160+
case (chunkNumber, chunkSize, currentChunkSize, totalChunkCount, uploadFileId) =>
161+
for {
162+
datasetId <- uploadService
163+
.getDatasetIdByUploadId(uploadService.extractDatasetUploadId(uploadFileId)) ?~> "dataset.upload.validation.failed"
164+
result <- accessTokenService
165+
.validateAccessFromTokenContext(UserAccessRequest.writeDataset(datasetId)) {
166+
for {
167+
isKnownUpload <- uploadService.isKnownUploadByFileId(uploadFileId)
168+
_ <- Fox.fromBool(isKnownUpload) ?~> "dataset.upload.validation.failed"
169+
chunkFile <- request.body.file("file").toFox ?~> "zip.file.notFound"
170+
_ <- uploadService.handleUploadChunk(uploadFileId,
171+
chunkSize,
172+
currentChunkSize,
173+
totalChunkCount,
174+
chunkNumber,
175+
new File(chunkFile.ref.path.toString))
176+
} yield Ok
177+
}
178+
} yield result
179+
}
180+
)
181+
}
179182
}
180183

181184
def testChunk(resumableChunkNumber: Int, resumableIdentifier: String): Action[AnyContent] =
@@ -193,7 +196,7 @@ class DataSourceController @Inject()(
193196
}
194197

195198
def finishUpload(): Action[UploadInformation] = Action.async(validateJson[UploadInformation]) { implicit request =>
196-
log(Some(slackNotificationService.noticeFailedFinishUpload)) {
199+
log(Some(slackNotificationService.noticeFailedUploadRequest)) {
197200
logTime(slackNotificationService.noticeSlowRequest) {
198201
for {
199202
datasetId <- uploadService

webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/uploading/UploadService.scala

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,9 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
172172
reserveUploadAdditionalInfo: ReserveAdditionalInformation): Fox[Unit] =
173173
for {
174174
_ <- dataSourceService.assertDataDirWritable(reserveUploadInfo.organization)
175+
newDataSourceId = DataSourceId(reserveUploadAdditionalInfo.directoryName, reserveUploadInfo.organization)
176+
_ = logger.info(
177+
f"Reserving ${uploadFullName(reserveUploadInfo.uploadId, reserveUploadAdditionalInfo.newDatasetId, newDataSourceId)}...")
175178
_ <- Fox.fromBool(
176179
!reserveUploadInfo.needsConversion.getOrElse(false) || !reserveUploadInfo.layersToLink
177180
.exists(_.nonEmpty)) ?~> "Cannot use linked layers if the dataset needs conversion"
@@ -186,7 +189,6 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
186189
.insertLong(redisKeyForCurrentUploadedTotalFileSizeInBytes(reserveUploadInfo.uploadId), 0L)
187190
))
188191
}
189-
newDataSourceId = DataSourceId(reserveUploadAdditionalInfo.directoryName, reserveUploadInfo.organization)
190192
_ <- runningUploadMetadataStore.insert(
191193
redisKeyForDataSourceId(reserveUploadInfo.uploadId),
192194
Json.stringify(Json.toJson(newDataSourceId))
@@ -205,8 +207,6 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
205207
redisKeyForLinkedLayerIdentifier(reserveUploadInfo.uploadId),
206208
Json.stringify(Json.toJson(LinkedLayerIdentifiers(reserveUploadInfo.layersToLink)))
207209
)
208-
_ = logger.info(
209-
f"Reserving ${uploadFullName(reserveUploadInfo.uploadId, reserveUploadAdditionalInfo.newDatasetId, newDataSourceId)}...")
210210
} yield ()
211211

212212
def addUploadIdsToUnfinishedUploads(
@@ -278,6 +278,9 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
278278
.increaseBy(redisKeyForCurrentUploadedTotalFileSizeInBytes(uploadId), currentChunkSize)
279279
.flatMap(newTotalFileSizeInBytesOpt => {
280280
if (newTotalFileSizeInBytesOpt.getOrElse(0L) > maxFileSize) {
281+
logger.warn(
282+
s"Received upload chunk for $datasetId that pushes total file size to ${newTotalFileSizeInBytesOpt
283+
.getOrElse(0L)}, which is more than reserved $maxFileSize. Aborting the upload.")
281284
cleanUpDatasetExceedingSize(uploadDir, uploadId).flatMap(_ =>
282285
Fox.failure("dataset.upload.moreBytesThanReserved"))
283286
} else {
@@ -327,10 +330,9 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
327330
} yield
328331
if (knownUpload) {
329332
logger.info(f"Cancelling ${uploadFullName(uploadId, datasetId, dataSourceId)}...")
330-
for {
331-
_ <- removeFromRedis(uploadId)
332-
_ <- PathUtils.deleteDirectoryRecursively(uploadDirectoryFor(dataSourceId.organizationId, uploadId)).toFox
333-
} yield ()
333+
cleanUpUploadedDataset(uploadDirectoryFor(dataSourceId.organizationId, uploadId),
334+
uploadId,
335+
reason = "Cancelled by user")
334336
} else Fox.failure(s"Unknown upload")
335337
}
336338

@@ -364,7 +366,7 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
364366
unpackToDir = unpackToDirFor(dataSourceId)
365367
_ <- ensureDirectoryBox(unpackToDir.getParent).toFox ?~> "dataset.import.fileAccessDenied"
366368
unpackResult <- unpackDataset(uploadDir, unpackToDir, datasetId).shiftBox
367-
_ <- cleanUpUploadedDataset(uploadDir, uploadId)
369+
_ <- cleanUpUploadedDataset(uploadDir, uploadId, reason = "Upload complete, data unpacked.")
368370
_ <- cleanUpOnFailure(unpackResult,
369371
datasetId,
370372
dataSourceId,
@@ -808,17 +810,19 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
808810
tryo(FileUtils.copyDirectory(uploadDir.toFile, backupDir.toFile))
809811
}
810812

811-
private def cleanUpUploadedDataset(uploadDir: Path, uploadId: String): Fox[Unit] = {
812-
this.synchronized {
813-
PathUtils.deleteDirectoryRecursively(uploadDir)
814-
}
815-
removeFromRedis(uploadId)
816-
}
813+
private def cleanUpUploadedDataset(uploadDir: Path, uploadId: String, reason: String): Fox[Unit] =
814+
for {
815+
_ <- Fox.successful(logger.info(s"Cleaning up uploaded dataset. Reason: $reason"))
816+
_ <- removeFromRedis(uploadId)
817+
_ <- this.synchronized {
818+
PathUtils.deleteDirectoryRecursively(uploadDir).toFox
819+
}
820+
} yield ()
817821

818822
private def cleanUpDatasetExceedingSize(uploadDir: Path, uploadId: String): Fox[Unit] =
819823
for {
820824
datasetId <- getDatasetIdByUploadId(uploadId)
821-
_ <- cleanUpUploadedDataset(uploadDir, uploadId)
825+
_ <- cleanUpUploadedDataset(uploadDir, uploadId, reason = "Exceeded reserved fileSize")
822826
_ <- remoteWebknossosClient.deleteDataset(datasetId)
823827
} yield ()
824828

@@ -841,7 +845,6 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
841845
_ <- runningUploadMetadataStore.remove(redisKeyForLinkedLayerIdentifier(uploadId))
842846
_ <- runningUploadMetadataStore.remove(redisKeyForUploadId(dataSourceId))
843847
_ <- runningUploadMetadataStore.remove(redisKeyForFilePaths(uploadId))
844-
845848
} yield ()
846849

847850
private def cleanUpOrphanUploads(): Fox[Unit] =

webknossos-datastore/app/com/scalableminds/webknossos/datastore/slacknotification/DSSlackNotificationService.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ class DSSlackNotificationService @Inject()(rpc: RPC, config: DataStoreConfig) ex
2323
msg = e.getMessage
2424
)
2525

26-
def noticeFailedFinishUpload(msg: String): Unit =
26+
def noticeFailedUploadRequest(msg: String): Unit =
2727
slackClient.warn(
28-
title = "Failed finishUpload request",
28+
title = "Failed upload request",
2929
msg = msg
3030
)
3131
}

0 commit comments

Comments
 (0)