Skip to content

Commit 7f7a50c

Browse files
authored
More logging for dataset upload (#8999)
Some more logging (both to slack and to stdout) for uploads, especially when uploads are aborted due to filesize problems. This is my current theory why redis entries were removed before some uploads were complete. ------ - [x] Removed dev-only changes like prints and application.conf edits - [x] Considered [common edge cases](../blob/master/.github/common_edge_cases.md) - [x] Needs datastore update after deployment
1 parent 65a5965 commit 7f7a50c

File tree

3 files changed

+59
-53
lines changed

3 files changed

+59
-53
lines changed

webknossos-datastore/app/com/scalableminds/webknossos/datastore/controllers/DataSourceController.scala

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -142,40 +142,43 @@ class DataSourceController @Inject()(
142142
*/
143143
def uploadChunk(): Action[MultipartFormData[Files.TemporaryFile]] =
144144
Action.async(parse.multipartFormData) { implicit request =>
145-
val uploadForm = Form(
146-
tuple(
147-
"resumableChunkNumber" -> number,
148-
"resumableChunkSize" -> number,
149-
"resumableCurrentChunkSize" -> number,
150-
"resumableTotalChunks" -> longNumber,
151-
"resumableIdentifier" -> nonEmptyText
152-
)).fill((-1, -1, -1, -1, ""))
153-
154-
uploadForm
155-
.bindFromRequest(request.body.dataParts)
156-
.fold(
157-
hasErrors = formWithErrors => Fox.successful(JsonBadRequest(formWithErrors.errors.head.message)),
158-
success = {
159-
case (chunkNumber, chunkSize, currentChunkSize, totalChunkCount, uploadFileId) =>
160-
for {
161-
datasetId <- uploadService
162-
.getDatasetIdByUploadId(uploadService.extractDatasetUploadId(uploadFileId)) ?~> "dataset.upload.validation.failed"
163-
result <- accessTokenService.validateAccessFromTokenContext(UserAccessRequest.writeDataset(datasetId)) {
164-
for {
165-
isKnownUpload <- uploadService.isKnownUploadByFileId(uploadFileId)
166-
_ <- Fox.fromBool(isKnownUpload) ?~> "dataset.upload.validation.failed"
167-
chunkFile <- request.body.file("file").toFox ?~> "zip.file.notFound"
168-
_ <- uploadService.handleUploadChunk(uploadFileId,
169-
chunkSize,
170-
currentChunkSize,
171-
totalChunkCount,
172-
chunkNumber,
173-
new File(chunkFile.ref.path.toString))
174-
} yield Ok
175-
}
176-
} yield result
177-
}
178-
)
145+
log(Some(slackNotificationService.noticeFailedUploadRequest)) {
146+
val uploadForm = Form(
147+
tuple(
148+
"resumableChunkNumber" -> number,
149+
"resumableChunkSize" -> number,
150+
"resumableCurrentChunkSize" -> number,
151+
"resumableTotalChunks" -> longNumber,
152+
"resumableIdentifier" -> nonEmptyText
153+
)).fill((-1, -1, -1, -1, ""))
154+
155+
uploadForm
156+
.bindFromRequest(request.body.dataParts)
157+
.fold(
158+
hasErrors = formWithErrors => Fox.successful(JsonBadRequest(formWithErrors.errors.head.message)),
159+
success = {
160+
case (chunkNumber, chunkSize, currentChunkSize, totalChunkCount, uploadFileId) =>
161+
for {
162+
datasetId <- uploadService
163+
.getDatasetIdByUploadId(uploadService.extractDatasetUploadId(uploadFileId)) ?~> "dataset.upload.validation.failed"
164+
result <- accessTokenService
165+
.validateAccessFromTokenContext(UserAccessRequest.writeDataset(datasetId)) {
166+
for {
167+
isKnownUpload <- uploadService.isKnownUploadByFileId(uploadFileId)
168+
_ <- Fox.fromBool(isKnownUpload) ?~> "dataset.upload.validation.failed"
169+
chunkFile <- request.body.file("file").toFox ?~> "zip.file.notFound"
170+
_ <- uploadService.handleUploadChunk(uploadFileId,
171+
chunkSize,
172+
currentChunkSize,
173+
totalChunkCount,
174+
chunkNumber,
175+
new File(chunkFile.ref.path.toString))
176+
} yield Ok
177+
}
178+
} yield result
179+
}
180+
)
181+
}
179182
}
180183

181184
def testChunk(resumableChunkNumber: Int, resumableIdentifier: String): Action[AnyContent] =
@@ -193,7 +196,7 @@ class DataSourceController @Inject()(
193196
}
194197

195198
def finishUpload(): Action[UploadInformation] = Action.async(validateJson[UploadInformation]) { implicit request =>
196-
log(Some(slackNotificationService.noticeFailedFinishUpload)) {
199+
log(Some(slackNotificationService.noticeFailedUploadRequest)) {
197200
logTime(slackNotificationService.noticeSlowRequest) {
198201
for {
199202
datasetId <- uploadService

webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/uploading/UploadService.scala

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,9 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
172172
reserveUploadAdditionalInfo: ReserveAdditionalInformation): Fox[Unit] =
173173
for {
174174
_ <- dataSourceService.assertDataDirWritable(reserveUploadInfo.organization)
175+
newDataSourceId = DataSourceId(reserveUploadAdditionalInfo.directoryName, reserveUploadInfo.organization)
176+
_ = logger.info(
177+
f"Reserving ${uploadFullName(reserveUploadInfo.uploadId, reserveUploadAdditionalInfo.newDatasetId, newDataSourceId)}...")
175178
_ <- Fox.fromBool(
176179
!reserveUploadInfo.needsConversion.getOrElse(false) || !reserveUploadInfo.layersToLink
177180
.exists(_.nonEmpty)) ?~> "Cannot use linked layers if the dataset needs conversion"
@@ -186,7 +189,6 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
186189
.insertLong(redisKeyForCurrentUploadedTotalFileSizeInBytes(reserveUploadInfo.uploadId), 0L)
187190
))
188191
}
189-
newDataSourceId = DataSourceId(reserveUploadAdditionalInfo.directoryName, reserveUploadInfo.organization)
190192
_ <- runningUploadMetadataStore.insert(
191193
redisKeyForDataSourceId(reserveUploadInfo.uploadId),
192194
Json.stringify(Json.toJson(newDataSourceId))
@@ -205,8 +207,6 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
205207
redisKeyForLinkedLayerIdentifier(reserveUploadInfo.uploadId),
206208
Json.stringify(Json.toJson(LinkedLayerIdentifiers(reserveUploadInfo.layersToLink)))
207209
)
208-
_ = logger.info(
209-
f"Reserving ${uploadFullName(reserveUploadInfo.uploadId, reserveUploadAdditionalInfo.newDatasetId, newDataSourceId)}...")
210210
} yield ()
211211

212212
def addUploadIdsToUnfinishedUploads(
@@ -278,6 +278,9 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
278278
.increaseBy(redisKeyForCurrentUploadedTotalFileSizeInBytes(uploadId), currentChunkSize)
279279
.flatMap(newTotalFileSizeInBytesOpt => {
280280
if (newTotalFileSizeInBytesOpt.getOrElse(0L) > maxFileSize) {
281+
logger.warn(
282+
s"Received upload chunk for $datasetId that pushes total file size to ${newTotalFileSizeInBytesOpt
283+
.getOrElse(0L)}, which is more than reserved $maxFileSize. Aborting the upload.")
281284
cleanUpDatasetExceedingSize(uploadDir, uploadId).flatMap(_ =>
282285
Fox.failure("dataset.upload.moreBytesThanReserved"))
283286
} else {
@@ -327,10 +330,9 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
327330
} yield
328331
if (knownUpload) {
329332
logger.info(f"Cancelling ${uploadFullName(uploadId, datasetId, dataSourceId)}...")
330-
for {
331-
_ <- removeFromRedis(uploadId)
332-
_ <- PathUtils.deleteDirectoryRecursively(uploadDirectoryFor(dataSourceId.organizationId, uploadId)).toFox
333-
} yield ()
333+
cleanUpUploadedDataset(uploadDirectoryFor(dataSourceId.organizationId, uploadId),
334+
uploadId,
335+
reason = "Cancelled by user")
334336
} else Fox.failure(s"Unknown upload")
335337
}
336338

@@ -364,7 +366,7 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
364366
unpackToDir = unpackToDirFor(dataSourceId)
365367
_ <- ensureDirectoryBox(unpackToDir.getParent).toFox ?~> "dataset.import.fileAccessDenied"
366368
unpackResult <- unpackDataset(uploadDir, unpackToDir, datasetId).shiftBox
367-
_ <- cleanUpUploadedDataset(uploadDir, uploadId)
369+
_ <- cleanUpUploadedDataset(uploadDir, uploadId, reason = "Upload complete, data unpacked.")
368370
_ <- cleanUpOnFailure(unpackResult,
369371
datasetId,
370372
dataSourceId,
@@ -808,17 +810,19 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
808810
tryo(FileUtils.copyDirectory(uploadDir.toFile, backupDir.toFile))
809811
}
810812

811-
private def cleanUpUploadedDataset(uploadDir: Path, uploadId: String): Fox[Unit] = {
812-
this.synchronized {
813-
PathUtils.deleteDirectoryRecursively(uploadDir)
814-
}
815-
removeFromRedis(uploadId)
816-
}
813+
private def cleanUpUploadedDataset(uploadDir: Path, uploadId: String, reason: String): Fox[Unit] =
814+
for {
815+
_ <- Fox.successful(logger.info(s"Cleaning up uploaded dataset. Reason: $reason"))
816+
_ <- removeFromRedis(uploadId)
817+
_ <- this.synchronized {
818+
PathUtils.deleteDirectoryRecursively(uploadDir).toFox
819+
}
820+
} yield ()
817821

818822
private def cleanUpDatasetExceedingSize(uploadDir: Path, uploadId: String): Fox[Unit] =
819823
for {
820824
datasetId <- getDatasetIdByUploadId(uploadId)
821-
_ <- cleanUpUploadedDataset(uploadDir, uploadId)
825+
_ <- cleanUpUploadedDataset(uploadDir, uploadId, reason = "Exceeded reserved fileSize")
822826
_ <- remoteWebknossosClient.deleteDataset(datasetId)
823827
} yield ()
824828

@@ -841,7 +845,6 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
841845
_ <- runningUploadMetadataStore.remove(redisKeyForLinkedLayerIdentifier(uploadId))
842846
_ <- runningUploadMetadataStore.remove(redisKeyForUploadId(dataSourceId))
843847
_ <- runningUploadMetadataStore.remove(redisKeyForFilePaths(uploadId))
844-
845848
} yield ()
846849

847850
private def cleanUpOrphanUploads(): Fox[Unit] =

webknossos-datastore/app/com/scalableminds/webknossos/datastore/slacknotification/DSSlackNotificationService.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ class DSSlackNotificationService @Inject()(rpc: RPC, config: DataStoreConfig) ex
2323
msg = e.getMessage
2424
)
2525

26-
def noticeFailedFinishUpload(msg: String): Unit =
26+
def noticeFailedUploadRequest(msg: String): Unit =
2727
slackClient.warn(
28-
title = "Failed finishUpload request",
28+
title = "Failed upload request",
2929
msg = msg
3030
)
3131
}

0 commit comments

Comments
 (0)