Skip to content

Commit f90002e

Browse files
committed
Use S3 transfer manager for file uploads
1 parent 6e62928 commit f90002e

File tree

4 files changed

+23
-38
lines changed

4 files changed

+23
-38
lines changed

app/controllers/DatasetController.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import com.scalableminds.util.objectid.ObjectId
77
import com.scalableminds.util.time.Instant
88
import com.scalableminds.util.tools.{Fox, TristateOptionJsonHelper}
99
import com.scalableminds.webknossos.datastore.models.AdditionalCoordinate
10-
import com.scalableminds.webknossos.datastore.models.datasource.ElementClass
10+
import com.scalableminds.webknossos.datastore.models.datasource.{DataSource, ElementClass}
1111
import mail.{MailchimpClient, MailchimpTag}
1212
import models.analytics.{AnalyticsService, ChangeDatasetSettingsEvent, OpenDatasetEvent}
1313
import models.dataset._
@@ -21,7 +21,6 @@ import models.organization.OrganizationDAO
2121
import models.team.{TeamDAO, TeamService}
2222
import models.user.{User, UserDAO, UserService}
2323
import com.scalableminds.util.tools.{Empty, Failure, Full}
24-
import com.scalableminds.webknossos.datastore.services.DataSourceRegistrationInfo
2524
import play.api.i18n.{Messages, MessagesProvider}
2625
import play.api.libs.functional.syntax._
2726
import play.api.libs.json._
@@ -72,6 +71,12 @@ object SegmentAnythingMaskParameters {
7271
implicit val jsonFormat: Format[SegmentAnythingMaskParameters] = Json.format[SegmentAnythingMaskParameters]
7372
}
7473

74+
case class DataSourceRegistrationInfo(dataSource: DataSource, folderId: Option[String], dataStoreName: String)
75+
76+
object DataSourceRegistrationInfo {
77+
implicit val jsonFormat: OFormat[DataSourceRegistrationInfo] = Json.format[DataSourceRegistrationInfo]
78+
}
79+
7580
class DatasetController @Inject()(userService: UserService,
7681
userDAO: UserDAO,
7782
datasetService: DatasetService,

project/Dependencies.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ object Dependencies {
55
private val silhouetteVersion = "10.0.3"
66
private val brotliVersion = "1.19.0"
77
private val slickVersion = "3.5.2"
8+
private val awsVersion = "2.32.24"
89
private val scalapbVersion = scalapb.compiler.Version.scalapbVersion
910
private val grpcVersion = scalapb.compiler.Version.grpcJavaVersion
1011

@@ -55,7 +56,9 @@ object Dependencies {
5556
// MultiArray (ndarray) handles. import ucar
5657
"edu.ucar" % "cdm-core" % "5.4.2",
5758
// Amazon S3 cloud storage client. import software.amazon.awssdk
58-
"software.amazon.awssdk" % "s3" % "2.32.24",
59+
"software.amazon.awssdk" % "s3" % awsVersion,
60+
// AWS Transfer Manager for multipart uploads. import software.amazon.awssdk.transfer.s3
61+
"software.amazon.awssdk" % "s3-transfer-manager" % awsVersion,
5962
// Google cloud storage client. import com.google.cloud.storage, import com.google.auth.oauth2
6063
"com.google.cloud" % "google-cloud-storage" % "2.55.0",
6164
// Blosc compression. import dev.zarr.bloscjava

webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/DSRemoteWebknossosClient.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,6 @@ object MagPathInfo {
5050
implicit val jsonFormat: OFormat[MagPathInfo] = Json.format[MagPathInfo]
5151
}
5252

53-
case class DataSourceRegistrationInfo(dataSource: DataSource, folderId: Option[String], dataStoreName: String)
54-
55-
object DataSourceRegistrationInfo {
56-
implicit val jsonFormat: OFormat[DataSourceRegistrationInfo] = Json.format[DataSourceRegistrationInfo]
57-
}
58-
5953
trait RemoteWebknossosClient {
6054
def requestUserAccess(accessRequest: UserAccessRequest)(implicit tc: TokenContext): Fox[UserAccessAnswer]
6155
}

webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/uploading/UploadService.scala

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import com.typesafe.scalalogging.LazyLogging
3131
import org.apache.commons.io.FileUtils
3232
import play.api.libs.json.{Json, OFormat, Reads}
3333
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
34-
import software.amazon.awssdk.core.async.AsyncRequestBody
3534
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation
3635
import software.amazon.awssdk.regions.Region
3736
import software.amazon.awssdk.services.s3.S3AsyncClient
@@ -40,9 +39,10 @@ import software.amazon.awssdk.services.s3.model.{
4039
CompletedMultipartUpload,
4140
CompletedPart,
4241
CreateMultipartUploadRequest,
43-
PutObjectRequest,
4442
UploadPartRequest
4543
}
44+
import software.amazon.awssdk.transfer.s3.S3TransferManager
45+
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest
4646

4747
import java.io.{File, RandomAccessFile}
4848
import java.net.URI
@@ -584,7 +584,6 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
584584
dataSource,
585585
newBasePath = s"s3://$endPointHost/${dataStoreConfig.Datastore.S3Upload.bucketName}/$s3ObjectKey")
586586
_ <- remoteWebknossosClient.updateDataSource(s3DataSource, datasetId, allowNewPaths = true)
587-
// TODO: Is uploaded dataset size the same as local dataset size?
588587
datasetSize <- tryo(FileUtils.sizeOfDirectoryAsBigInteger(new File(unpackToDir.toString)).longValue).toFox
589588
_ = this.synchronized {
590589
PathUtils.deleteDirectoryRecursively(unpackToDir)
@@ -671,38 +670,22 @@ class UploadService @Inject()(dataSourceService: DataSourceService,
671670
exploreLocalLayerService.writeLocalDatasourceProperties(dataSource, path))
672671
} yield path
673672

673+
private lazy val transferManager = S3TransferManager.builder().s3Client(s3Client).build()
674+
674675
private def uploadDirectoryToS3(
675676
dataDir: Path,
676677
bucketName: String,
677678
prefix: String
678679
): Fox[Unit] =
679680
for {
680-
files <- PathUtils.listFilesRecursive(dataDir, silent = false, maxDepth = 20).toFox
681-
uploadFoxes = files.map(filePath => {
682-
val relPath = dataDir.relativize(filePath).toString.replace("\\", "/")
683-
val s3Key = s"$prefix$relPath"
684-
685-
// TODO: For large files, consider using multipart upload
686-
logger.info("Uploading file to S3: " + filePath)
687-
val bytes = Files.readAllBytes(filePath)
688-
logger.info(s"Dataset Upload: Uploading ${bytes.length} bytes to s3://$bucketName/$s3Key")
689-
val startTime = System.currentTimeMillis()
690-
logger.info(s"Starting upload of $filePath to S3 at $startTime")
691-
692-
for {
693-
_ <- Fox.fromFuture {
694-
s3Client
695-
.putObject(
696-
PutObjectRequest.builder().bucket(bucketName).key(s3Key).build(),
697-
AsyncRequestBody.fromBytes(bytes)
698-
)
699-
.asScala
700-
} ?~> s"Failed to upload file $filePath to S3"
701-
} yield ()
702-
})
703-
// TODO: Limit number of concurrent uploads?
704-
_ <- Fox.combined(uploadFoxes)
705-
_ = logger.info(s"Finished uploading directory to S3 at ${System.currentTimeMillis()}")
681+
_ <- Fox.successful(())
682+
directoryUpload = transferManager.uploadDirectory(
683+
UploadDirectoryRequest.builder().bucket(bucketName).s3Prefix(prefix).source(dataDir).build()
684+
)
685+
completedUpload <- Fox.fromFuture(directoryUpload.completionFuture().asScala)
686+
failedTransfers = completedUpload.failedTransfers()
687+
_ <- Fox.fromBool(failedTransfers.isEmpty) ?~>
688+
s"Some files failed to upload to S3: $failedTransfers"
706689
} yield ()
707690

708691
private def cleanUpOnFailure[T](result: Box[T],

0 commit comments

Comments
 (0)