@@ -21,6 +21,7 @@ import com.scalableminds.webknossos.datastore.models.UnfinishedUpload
2121import com .scalableminds .webknossos .datastore .models .datasource .UsableDataSource .FILENAME_DATASOURCE_PROPERTIES_JSON
2222import com .scalableminds .webknossos .datastore .models .datasource ._
2323import com .scalableminds .webknossos .datastore .services .{DSRemoteWebknossosClient , DataSourceService }
24+ import com .scalableminds .webknossos .datastore .slacknotification .DSSlackNotificationService
2425import com .scalableminds .webknossos .datastore .storage .{
2526 CredentialConfigReader ,
2627 DataStoreRedisStore ,
@@ -42,6 +43,7 @@ import java.net.URI
4243import java .nio .file .{Files , Path }
4344import scala .concurrent .{ExecutionContext , Future }
4445import scala .jdk .FutureConverters ._
46+ import scala .util .Try
4547
4648case class ReserveUploadInformation (
4749 uploadId : String , // upload id that was also used in chunk upload (this time without file paths)
@@ -105,6 +107,7 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
105107 remoteSourceDescriptorService : RemoteSourceDescriptorService ,
106108 exploreLocalLayerService : ExploreLocalLayerService ,
107109 dataStoreConfig : DataStoreConfig ,
110+ slackNotificationService : DSSlackNotificationService ,
108111 val remoteWebknossosClient : DSRemoteWebknossosClient )(implicit ec : ExecutionContext )
109112 extends DatasetDeleter
110113 with DirectoryConstants
@@ -144,6 +147,8 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
144147 s " upload___ ${uploadId}___datasetId "
145148 private def redisKeyForFilePaths (uploadId : String ): String =
146149 s " upload___ ${uploadId}___filePaths "
150+ private def redisKeyForReportedTooLargeUpload (uploadId : String ): String =
151+ s " upload___ ${uploadId}___tooLargeUpload "
147152
148153 cleanUpOrphanUploads()
149154
@@ -203,6 +208,7 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
203208 )
204209 filePaths = Json .stringify(Json .toJson(reserveUploadInfo.filePaths.getOrElse(List .empty)))
205210 _ <- runningUploadMetadataStore.insert(redisKeyForFilePaths(reserveUploadInfo.uploadId), filePaths)
211+ _ <- runningUploadMetadataStore.insert(redisKeyForReportedTooLargeUpload(reserveUploadInfo.uploadId), " false" )
206212 _ <- runningUploadMetadataStore.insert(
207213 redisKeyForLinkedLayerIdentifier(reserveUploadInfo.uploadId),
208214 Json .stringify(Json .toJson(LinkedLayerIdentifiers (reserveUploadInfo.layersToLink)))
@@ -273,16 +279,23 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
273279 (filePath, uploadDir) <- getFilePathAndDirOfUploadId(uploadFileId)
274280 isFileKnown <- runningUploadMetadataStore.contains(redisKeyForFileChunkCount(uploadId, filePath))
275281 totalFileSizeInBytesOpt <- runningUploadMetadataStore.findLong(redisKeyForTotalFileSizeInBytes(uploadId))
282+ alreadyNotifiedAboutExceedingLimitOpt <- runningUploadMetadataStore.find(
283+ redisKeyForReportedTooLargeUpload(uploadId))
276284 _ <- Fox .runOptional(totalFileSizeInBytesOpt) { maxFileSize =>
277285 runningUploadMetadataStore
278286 .increaseBy(redisKeyForCurrentUploadedTotalFileSizeInBytes(uploadId), currentChunkSize)
279287 .flatMap(newTotalFileSizeInBytesOpt => {
280288 if (newTotalFileSizeInBytesOpt.getOrElse(0L ) > maxFileSize) {
289+ runningUploadMetadataStore.insert(redisKeyForReportedTooLargeUpload(uploadId), " true" )
281290 logger.warn(
282291 s " Received upload chunk for $datasetId that pushes total file size to ${newTotalFileSizeInBytesOpt
283- .getOrElse(0L )}, which is more than reserved $maxFileSize. Aborting the upload. " )
284- cleanUpDatasetExceedingSize(uploadDir, uploadId).flatMap(_ =>
285- Fox .failure(" dataset.upload.moreBytesThanReserved" ))
292+ .getOrElse(0L )}, which is more than reserved $maxFileSize. Allowing upload for now. " )
293+ if (! alreadyNotifiedAboutExceedingLimitOpt.exists(s => Try (s.toBoolean).getOrElse(false ))) {
294+ slackNotificationService.noticeTooLargeUploadChunkRequest(
295+ s " Received upload chunk for $datasetId that pushes total file size to ${newTotalFileSizeInBytesOpt
296+ .getOrElse(0L )}, which is more than reserved $maxFileSize. " )
297+ }
298+ Fox .successful(())
286299 } else {
287300 Fox .successful(())
288301 }
@@ -339,18 +352,6 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
339352 private def uploadFullName (uploadId : String , datasetId : ObjectId , dataSourceId : DataSourceId ) =
340353 s " upload $uploadId of dataset $datasetId ( $dataSourceId) "
341354
342- private def assertWithinRequestedFileSizeAndCleanUpOtherwise (uploadDir : Path , uploadId : String ): Fox [Unit ] =
343- for {
344- totalFileSizeInBytesOpt <- runningUploadMetadataStore.find(redisKeyForTotalFileSizeInBytes(uploadId))
345- _ <- Fox .runOptional(totalFileSizeInBytesOpt) { maxFileSize =>
346- tryo(FileUtils .sizeOfDirectoryAsBigInteger(uploadDir.toFile).longValue).toFox.map(actualFileSize =>
347- if (actualFileSize > maxFileSize.toLong) {
348- cleanUpDatasetExceedingSize(uploadDir, uploadId)
349- Fox .failure(s " Uploaded dataset exceeds the maximum allowed size of $maxFileSize bytes " )
350- } else Fox .successful(()))
351- }
352- } yield ()
353-
354355 def finishUpload (uploadInformation : UploadInformation , datasetId : ObjectId )(implicit tc : TokenContext ): Fox [Unit ] = {
355356 val uploadId = uploadInformation.uploadId
356357
@@ -361,11 +362,25 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
361362 needsConversion = uploadInformation.needsConversion.getOrElse(false )
362363 uploadDir = uploadDirectoryFor(dataSourceId.organizationId, uploadId)
363364 _ <- backupRawUploadedData(uploadDir, uploadBackupDirectoryFor(dataSourceId.organizationId, uploadId), datasetId).toFox
364- _ <- assertWithinRequestedFileSizeAndCleanUpOtherwise(uploadDir, uploadId)
365+ // Temporarily disabled till reserved size for upload is fixed.
366+ // _ <- assertWithinRequestedFileSizeAndCleanUpOtherwise(uploadDir, uploadId)
365367 _ <- checkAllChunksUploaded(uploadId)
366368 unpackToDir = unpackToDirFor(dataSourceId)
367369 _ <- ensureDirectoryBox(unpackToDir.getParent).toFox ?~> " dataset.import.fileAccessDenied"
368370 unpackResult <- unpackDataset(uploadDir, unpackToDir, datasetId).shiftBox
371+ reservedTotalFileSizeInBytesOpt <- runningUploadMetadataStore.findLong(redisKeyForTotalFileSizeInBytes(uploadId))
372+ actualUploadedFileSizeInBytesOpt <- runningUploadMetadataStore.findLong(
373+ redisKeyForCurrentUploadedTotalFileSizeInBytes(uploadId))
374+ // Logging successful uploads which exceeded size to notice how large the difference is. Should be removed later.
375+ _ = reservedTotalFileSizeInBytesOpt.foreach(reservedBytes =>
376+ actualUploadedFileSizeInBytesOpt.foreach(actualBytes => {
377+ if (actualBytes > reservedBytes) {
378+ logger.warn(
379+ s " Finished upload for $datasetId that exceeded reserved upload size. $reservedBytes bytes were reserved but $actualBytes were uploaded according to redis store. " )
380+ slackNotificationService.noticeTooLargeUploadChunkRequest(
381+ s " Finished upload for $datasetId that exceeded reserved upload size. $reservedBytes bytes were reserved but $actualBytes were uploaded according to redis store. " )
382+ }
383+ }))
369384 _ <- cleanUpUploadedDataset(uploadDir, uploadId, reason = " Upload complete, data unpacked." )
370385 _ <- cleanUpOnFailure(unpackResult,
371386 datasetId,
@@ -819,13 +834,6 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
819834 }
820835 } yield ()
821836
822- private def cleanUpDatasetExceedingSize (uploadDir : Path , uploadId : String ): Fox [Unit ] =
823- for {
824- datasetId <- getDatasetIdByUploadId(uploadId)
825- _ <- cleanUpUploadedDataset(uploadDir, uploadId, reason = " Exceeded reserved fileSize" )
826- _ <- remoteWebknossosClient.deleteDataset(datasetId)
827- } yield ()
828-
829837 private def removeFromRedis (uploadId : String ): Fox [Unit ] =
830838 for {
831839 _ <- runningUploadMetadataStore.remove(redisKeyForFileCount(uploadId))
@@ -845,6 +853,7 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
845853 _ <- runningUploadMetadataStore.remove(redisKeyForLinkedLayerIdentifier(uploadId))
846854 _ <- runningUploadMetadataStore.remove(redisKeyForUploadId(dataSourceId))
847855 _ <- runningUploadMetadataStore.remove(redisKeyForFilePaths(uploadId))
856+ _ <- runningUploadMetadataStore.remove(redisKeyForReportedTooLargeUpload(uploadId))
848857 } yield ()
849858
850859 private def cleanUpOrphanUploads (): Fox [Unit ] =
0 commit comments