1313
1414#ifdef USE_AWS
1515
16+ #include < aws/core/utils/threading/Executor.h>
17+
1618#include " cloud/aws/aws_file.h"
1719#include " cloud/aws/aws_kafka.h"
1820#include " cloud/aws/aws_kinesis.h"
@@ -125,16 +127,42 @@ detail::JobExecutor* GetJobExecutor() {
125127 return &executor;
126128}
127129
128- class AwsS3ClientWrapper ::Timer {
130+ Aws::Utils::Threading::Executor* GetAwsTransferManagerExecutor () {
131+ static Aws::Utils::Threading::PooledThreadExecutor executor (8 );
132+ return &executor;
133+ }
134+
135+ Aws::String ToAwsString (const std::string& s) {
136+ return Aws::String (s.data (), s.size ());
137+ }
138+
139+ template <typename T>
140+ void SetEncryptionParameters (const CloudEnvOptions& cloud_env_options,
141+ T& put_request) {
142+ if (cloud_env_options.server_side_encryption ) {
143+ if (cloud_env_options.encryption_key_id .empty ()) {
144+ put_request.SetServerSideEncryption (
145+ Aws::S3::Model::ServerSideEncryption::AES256);
146+ } else {
147+ put_request.SetServerSideEncryption (
148+ Aws::S3::Model::ServerSideEncryption::aws_kms);
149+ put_request.SetSSEKMSKeyId (cloud_env_options.encryption_key_id .c_str ());
150+ }
151+ }
152+ }
153+
154+ class CloudRequestCallbackGuard {
129155 public:
130- Timer (CloudRequestCallback* callback, CloudRequestOpType type ,
131- uint64_t size = 0 )
156+ CloudRequestCallbackGuard (CloudRequestCallback* callback,
157+ CloudRequestOpType type, uint64_t size = 0 )
132158 : callback_(callback), type_(type), size_(size), start_(now()) {}
133- ~Timer () {
159+
160+ ~CloudRequestCallbackGuard () {
134161 if (callback_) {
135162 (*callback_)(type_, size_, now () - start_, success_);
136163 }
137164 }
165+
138166 void SetSize (uint64_t size) { size_ = size; }
139167 void SetSuccess (bool success) { success_ = success; }
140168
@@ -153,22 +181,24 @@ class AwsS3ClientWrapper::Timer {
153181};
154182
155183AwsS3ClientWrapper::AwsS3ClientWrapper (
156- std::unique_ptr <Aws::S3::S3Client> client,
184+ std::shared_ptr <Aws::S3::S3Client> client,
157185 std::shared_ptr<CloudRequestCallback> cloud_request_callback)
158186 : client_(std::move(client)),
159187 cloud_request_callback_ (std::move(cloud_request_callback)) {}
160188
161189Aws::S3::Model::ListObjectsOutcome AwsS3ClientWrapper::ListObjects (
162190 const Aws::S3::Model::ListObjectsRequest& request) {
163- Timer t (cloud_request_callback_.get (), CloudRequestOpType::kListOp );
191+ CloudRequestCallbackGuard t (cloud_request_callback_.get (),
192+ CloudRequestOpType::kListOp );
164193 auto outcome = client_->ListObjects (request);
165194 t.SetSuccess (outcome.IsSuccess ());
166195 return outcome;
167196}
168197
169198Aws::S3::Model::CreateBucketOutcome AwsS3ClientWrapper::CreateBucket (
170199 const Aws::S3::Model::CreateBucketRequest& request) {
171- Timer t (cloud_request_callback_.get (), CloudRequestOpType::kCreateOp );
200+ CloudRequestCallbackGuard t (cloud_request_callback_.get (),
201+ CloudRequestOpType::kCreateOp );
172202 return client_->CreateBucket (request);
173203}
174204
@@ -179,23 +209,26 @@ Aws::S3::Model::HeadBucketOutcome AwsS3ClientWrapper::HeadBucket(
179209
180210Aws::S3::Model::DeleteObjectOutcome AwsS3ClientWrapper::DeleteObject (
181211 const Aws::S3::Model::DeleteObjectRequest& request) {
182- Timer t (cloud_request_callback_.get (), CloudRequestOpType::kDeleteOp );
212+ CloudRequestCallbackGuard t (cloud_request_callback_.get (),
213+ CloudRequestOpType::kDeleteOp );
183214 auto outcome = client_->DeleteObject (request);
184215 t.SetSuccess (outcome.IsSuccess ());
185216 return outcome;
186217}
187218
188219Aws::S3::Model::CopyObjectOutcome AwsS3ClientWrapper::CopyObject (
189220 const Aws::S3::Model::CopyObjectRequest& request) {
190- Timer t (cloud_request_callback_.get (), CloudRequestOpType::kCopyOp );
221+ CloudRequestCallbackGuard t (cloud_request_callback_.get (),
222+ CloudRequestOpType::kCopyOp );
191223 auto outcome = client_->CopyObject (request);
192224 t.SetSuccess (outcome.IsSuccess ());
193225 return outcome;
194226}
195227
196228Aws::S3::Model::GetObjectOutcome AwsS3ClientWrapper::GetObject (
197229 const Aws::S3::Model::GetObjectRequest& request) {
198- Timer t (cloud_request_callback_.get (), CloudRequestOpType::kReadOp );
230+ CloudRequestCallbackGuard t (cloud_request_callback_.get (),
231+ CloudRequestOpType::kReadOp );
199232 auto outcome = client_->GetObject (request);
200233 if (outcome.IsSuccess ()) {
201234 t.SetSize (outcome.GetResult ().GetContentLength ());
@@ -206,16 +239,17 @@ Aws::S3::Model::GetObjectOutcome AwsS3ClientWrapper::GetObject(
206239
207240Aws::S3::Model::PutObjectOutcome AwsS3ClientWrapper::PutObject (
208241 const Aws::S3::Model::PutObjectRequest& request, uint64_t size_hint) {
209- Timer t (cloud_request_callback_.get (), CloudRequestOpType:: kWriteOp ,
210- size_hint);
242+ CloudRequestCallbackGuard t (cloud_request_callback_.get (),
243+ CloudRequestOpType:: kWriteOp , size_hint);
211244 auto outcome = client_->PutObject (request);
212245 t.SetSuccess (outcome.IsSuccess ());
213246 return outcome;
214247}
215248
216249Aws::S3::Model::HeadObjectOutcome AwsS3ClientWrapper::HeadObject (
217250 const Aws::S3::Model::HeadObjectRequest& request) {
218- Timer t (cloud_request_callback_.get (), CloudRequestOpType::kInfoOp );
251+ CloudRequestCallbackGuard t (cloud_request_callback_.get (),
252+ CloudRequestOpType::kInfoOp );
219253 auto outcome = client_->HeadObject (request);
220254 t.SetSuccess (outcome.IsSuccess ());
221255 return outcome;
@@ -332,14 +366,25 @@ AwsEnv::AwsEnv(Env* underlying_env, const std::string& src_bucket_prefix,
332366 GetBucketLocationConstraintForName (config.region );
333367
334368 {
335- unique_ptr<Aws::S3::S3Client> s3client (
336- creds ? new Aws::S3::S3Client (*creds, config)
337- : new Aws::S3::S3Client (config));
369+ auto s3client = creds ? std::make_shared<Aws::S3::S3Client>(*creds, config)
370+ : std::make_shared<Aws::S3::S3Client>(config);
338371
339372 s3client_ = std::make_shared<AwsS3ClientWrapper>(
340373 std::move (s3client), cloud_env_options.cloud_request_callback );
341374 }
342375
376+ {
377+ Aws::Transfer::TransferManagerConfiguration transferManagerConfig (
378+ GetAwsTransferManagerExecutor ());
379+ transferManagerConfig.s3Client = s3client_->GetClient ();
380+ SetEncryptionParameters (cloud_env_options,
381+ transferManagerConfig.putObjectTemplate );
382+ SetEncryptionParameters (
383+ cloud_env_options, transferManagerConfig.createMultipartUploadTemplate );
384+ awsTransferManager_ =
385+ Aws::Transfer::TransferManager::Create (transferManagerConfig);
386+ }
387+
343388 // create dest bucket if specified
344389 if (has_dest_bucket_) {
345390 if (S3WritableFile::BucketExistsInS3 (s3client_, GetDestBucketPrefix (),
@@ -1460,7 +1505,7 @@ Status AwsEnv::SaveDbid(const std::string& dbid, const std::string& dirname) {
14601505 put_request.SetBucket (bucket);
14611506 put_request.SetKey (key);
14621507 put_request.SetMetadata (metadata);
1463- SetEncryptionParameters (put_request);
1508+ SetEncryptionParameters (cloud_env_options, put_request);
14641509
14651510 Aws::S3::Model::PutObjectOutcome put_outcome =
14661511 s3client_->PutObject (put_request);
@@ -1644,20 +1689,19 @@ Status AwsEnv::GetObject(const std::string& bucket_name_prefix,
16441689 std::string tmp_destination = local_destination + " .tmp" ;
16451690 auto s3_bucket = GetAwsBucket (bucket_name_prefix);
16461691
1647- Aws::S3::Model::GetObjectRequest getObjectRequest;
1648- getObjectRequest.SetBucket (s3_bucket);
1649- getObjectRequest.SetKey (
1650- Aws::String (bucket_object_path.data (), bucket_object_path.size ()));
1651- getObjectRequest.SetResponseStreamFactory ([tmp_destination]() {
1652- return Aws::New<Aws::FStream>(Aws::Utils::ARRAY_ALLOCATION_TAG,
1653- tmp_destination, std::ios_base::out);
1654- });
1655- auto get_outcome = s3client_->GetObject (getObjectRequest);
1656-
1657- bool isSuccess = get_outcome.IsSuccess ();
1658- if (!isSuccess) {
1692+ CloudRequestCallbackGuard guard (
1693+ cloud_env_options.cloud_request_callback .get (),
1694+ CloudRequestOpType::kReadOp );
1695+
1696+ auto transferHandle = awsTransferManager_->DownloadFile (
1697+ s3_bucket, ToAwsString (bucket_object_path),
1698+ ToAwsString (tmp_destination));
1699+
1700+ transferHandle->WaitUntilFinished ();
1701+
1702+ if (transferHandle->GetStatus () != Aws::Transfer::TransferStatus::COMPLETED) {
16591703 localenv->DeleteFile (tmp_destination);
1660- const auto & error = get_outcome. GetError ();
1704+ const auto & error = transferHandle-> GetLastError ();
16611705 std::string errmsg (error.GetMessage ().c_str (), error.GetMessage ().size ());
16621706 Log (InfoLogLevel::ERROR_LEVEL, info_log_,
16631707 " [s3] GetObject %s/%s error %s." , s3_bucket.c_str (),
@@ -1671,21 +1715,23 @@ Status AwsEnv::GetObject(const std::string& bucket_name_prefix,
16711715 return Status::IOError (std::move (errmsg));
16721716 }
16731717
1718+ guard.SetSize (transferHandle->GetBytesTotalSize ());
1719+ guard.SetSuccess (true );
1720+
16741721 // Check if our local file is the same as S3 promised
16751722 uint64_t file_size{0 };
16761723 auto s = localenv->GetFileSize (tmp_destination, &file_size);
16771724 if (!s.ok ()) {
16781725 return s;
16791726 }
1680- if (static_cast <int64_t >(file_size) !=
1681- get_outcome.GetResult ().GetContentLength ()) {
1727+ if (file_size != transferHandle->GetBytesTotalSize ()) {
16821728 localenv->DeleteFile (tmp_destination);
16831729 s = Status::IOError (" Partial download of a file " + local_destination);
16841730 Log (InfoLogLevel::ERROR_LEVEL, info_log_,
16851731 " [s3] GetObject %s/%s local size %ld != cloud size "
16861732 " %ld. %s" ,
16871733 s3_bucket.c_str (), bucket_object_path.c_str (), file_size,
1688- get_outcome. GetResult (). GetContentLength (), s.ToString ().c_str ());
1734+ transferHandle-> GetBytesTotalSize (), s.ToString ().c_str ());
16891735 }
16901736
16911737 if (s.ok ()) {
@@ -1715,51 +1761,33 @@ Status AwsEnv::PutObject(const std::string& local_file,
17151761 return Status::IOError (local_file + " Zero size." );
17161762 }
17171763
1718- auto input_data = Aws::MakeShared<Aws::FStream>(
1719- bucket_object_path.c_str (), local_file.c_str (),
1720- std::ios_base::in | std::ios_base::out);
1764+ auto s3_bucket = GetAwsBucket (bucket_name_prefix);
17211765
1722- // Copy entire file into S3.
1723- // Writes to an S3 object are atomic.
1724- Aws::S3::Model::PutObjectRequest put_request;
1725- put_request.SetBucket (GetAwsBucket (bucket_name_prefix));
1726- put_request.SetKey (
1727- Aws::String (bucket_object_path.data (), bucket_object_path.size ()));
1728- put_request.SetBody (input_data);
1729- SetEncryptionParameters (put_request);
1766+ CloudRequestCallbackGuard guard (
1767+ cloud_env_options.cloud_request_callback .get (),
1768+ CloudRequestOpType::kWriteOp , fsize);
17301769
1731- Aws::S3::Model::PutObjectOutcome put_outcome =
1732- s3client_->PutObject (put_request, fsize);
1733- bool isSuccess = put_outcome.IsSuccess ();
1734- if (!isSuccess) {
1735- const Aws::Client::AWSError<Aws::S3::S3Errors>& error =
1736- put_outcome.GetError ();
1770+ auto transferHandle = awsTransferManager_->UploadFile (
1771+ ToAwsString (local_file), s3_bucket, ToAwsString (bucket_object_path),
1772+ Aws::DEFAULT_CONTENT_TYPE, Aws::Map<Aws::String, Aws::String>());
1773+
1774+ transferHandle->WaitUntilFinished ();
1775+
1776+ if (transferHandle->GetStatus () != Aws::Transfer::TransferStatus::COMPLETED) {
1777+ auto error = transferHandle->GetLastError ();
17371778 std::string errmsg (error.GetMessage ().c_str (), error.GetMessage ().size ());
17381779 st = Status::IOError (local_file, errmsg);
17391780 }
17401781
1782+ guard.SetSuccess (st.ok ());
1783+
17411784 Log (InfoLogLevel::INFO_LEVEL, info_log_,
1742- " [s3] PutObject %s/%s, size %zu, status %s" ,
1743- put_request.GetBucket ().c_str (), bucket_object_path.c_str (), fsize,
1744- st.ToString ().c_str ());
1785+ " [s3] PutObject %s/%s, size %zu, status %s" , s3_bucket.c_str (),
1786+ bucket_object_path.c_str (), fsize, st.ToString ().c_str ());
17451787
17461788 return st;
17471789}
17481790
1749- void AwsEnv::SetEncryptionParameters (
1750- Aws::S3::Model::PutObjectRequest& put_request) const {
1751- if (cloud_env_options.server_side_encryption ) {
1752- if (cloud_env_options.encryption_key_id .empty ()) {
1753- put_request.SetServerSideEncryption (
1754- Aws::S3::Model::ServerSideEncryption::AES256);
1755- } else {
1756- put_request.SetServerSideEncryption (
1757- Aws::S3::Model::ServerSideEncryption::aws_kms);
1758- put_request.SetSSEKMSKeyId (cloud_env_options.encryption_key_id .c_str ());
1759- }
1760- }
1761- }
1762-
17631791//
17641792// prepends the configured src object path name
17651793//
0 commit comments