1313
1414#ifdef USE_AWS
1515
16- #include < aws/core/utils/threading/Executor.h>
17-
1816#include " cloud/aws/aws_file.h"
1917#include " cloud/aws/aws_kafka.h"
2018#include " cloud/aws/aws_kinesis.h"
@@ -127,42 +125,16 @@ detail::JobExecutor* GetJobExecutor() {
127125 return &executor;
128126}
129127
130- Aws::Utils::Threading::Executor* GetAwsTransferManagerExecutor () {
131- static Aws::Utils::Threading::PooledThreadExecutor executor (8 );
132- return &executor;
133- }
134-
135- Aws::String ToAwsString (const std::string& s) {
136- return Aws::String (s.data (), s.size ());
137- }
138-
139- template <typename T>
140- void SetEncryptionParameters (const CloudEnvOptions& cloud_env_options,
141- T& put_request) {
142- if (cloud_env_options.server_side_encryption ) {
143- if (cloud_env_options.encryption_key_id .empty ()) {
144- put_request.SetServerSideEncryption (
145- Aws::S3::Model::ServerSideEncryption::AES256);
146- } else {
147- put_request.SetServerSideEncryption (
148- Aws::S3::Model::ServerSideEncryption::aws_kms);
149- put_request.SetSSEKMSKeyId (cloud_env_options.encryption_key_id .c_str ());
150- }
151- }
152- }
153-
154- class CloudRequestCallbackGuard {
128+ class AwsS3ClientWrapper ::Timer {
155129 public:
156- CloudRequestCallbackGuard (CloudRequestCallback* callback,
157- CloudRequestOpType type, uint64_t size = 0 )
130+ Timer (CloudRequestCallback* callback, CloudRequestOpType type ,
131+ uint64_t size = 0 )
158132 : callback_(callback), type_(type), size_(size), start_(now()) {}
159-
160- ~CloudRequestCallbackGuard () {
133+ ~Timer () {
161134 if (callback_) {
162135 (*callback_)(type_, size_, now () - start_, success_);
163136 }
164137 }
165-
166138 void SetSize (uint64_t size) { size_ = size; }
167139 void SetSuccess (bool success) { success_ = success; }
168140
@@ -181,24 +153,22 @@ class CloudRequestCallbackGuard {
181153};
182154
183155AwsS3ClientWrapper::AwsS3ClientWrapper (
184- std::shared_ptr <Aws::S3::S3Client> client,
156+ std::unique_ptr <Aws::S3::S3Client> client,
185157 std::shared_ptr<CloudRequestCallback> cloud_request_callback)
186158 : client_(std::move(client)),
187159 cloud_request_callback_ (std::move(cloud_request_callback)) {}
188160
189161Aws::S3::Model::ListObjectsOutcome AwsS3ClientWrapper::ListObjects (
190162 const Aws::S3::Model::ListObjectsRequest& request) {
191- CloudRequestCallbackGuard t (cloud_request_callback_.get (),
192- CloudRequestOpType::kListOp );
163+ Timer t (cloud_request_callback_.get (), CloudRequestOpType::kListOp );
193164 auto outcome = client_->ListObjects (request);
194165 t.SetSuccess (outcome.IsSuccess ());
195166 return outcome;
196167}
197168
198169Aws::S3::Model::CreateBucketOutcome AwsS3ClientWrapper::CreateBucket (
199170 const Aws::S3::Model::CreateBucketRequest& request) {
200- CloudRequestCallbackGuard t (cloud_request_callback_.get (),
201- CloudRequestOpType::kCreateOp );
171+ Timer t (cloud_request_callback_.get (), CloudRequestOpType::kCreateOp );
202172 return client_->CreateBucket (request);
203173}
204174
@@ -209,26 +179,23 @@ Aws::S3::Model::HeadBucketOutcome AwsS3ClientWrapper::HeadBucket(
209179
210180Aws::S3::Model::DeleteObjectOutcome AwsS3ClientWrapper::DeleteObject (
211181 const Aws::S3::Model::DeleteObjectRequest& request) {
212- CloudRequestCallbackGuard t (cloud_request_callback_.get (),
213- CloudRequestOpType::kDeleteOp );
182+ Timer t (cloud_request_callback_.get (), CloudRequestOpType::kDeleteOp );
214183 auto outcome = client_->DeleteObject (request);
215184 t.SetSuccess (outcome.IsSuccess ());
216185 return outcome;
217186}
218187
219188Aws::S3::Model::CopyObjectOutcome AwsS3ClientWrapper::CopyObject (
220189 const Aws::S3::Model::CopyObjectRequest& request) {
221- CloudRequestCallbackGuard t (cloud_request_callback_.get (),
222- CloudRequestOpType::kCopyOp );
190+ Timer t (cloud_request_callback_.get (), CloudRequestOpType::kCopyOp );
223191 auto outcome = client_->CopyObject (request);
224192 t.SetSuccess (outcome.IsSuccess ());
225193 return outcome;
226194}
227195
228196Aws::S3::Model::GetObjectOutcome AwsS3ClientWrapper::GetObject (
229197 const Aws::S3::Model::GetObjectRequest& request) {
230- CloudRequestCallbackGuard t (cloud_request_callback_.get (),
231- CloudRequestOpType::kReadOp );
198+ Timer t (cloud_request_callback_.get (), CloudRequestOpType::kReadOp );
232199 auto outcome = client_->GetObject (request);
233200 if (outcome.IsSuccess ()) {
234201 t.SetSize (outcome.GetResult ().GetContentLength ());
@@ -239,17 +206,16 @@ Aws::S3::Model::GetObjectOutcome AwsS3ClientWrapper::GetObject(
239206
240207Aws::S3::Model::PutObjectOutcome AwsS3ClientWrapper::PutObject (
241208 const Aws::S3::Model::PutObjectRequest& request, uint64_t size_hint) {
242- CloudRequestCallbackGuard t (cloud_request_callback_.get (),
243- CloudRequestOpType:: kWriteOp , size_hint);
209+ Timer t (cloud_request_callback_.get (), CloudRequestOpType:: kWriteOp ,
210+ size_hint);
244211 auto outcome = client_->PutObject (request);
245212 t.SetSuccess (outcome.IsSuccess ());
246213 return outcome;
247214}
248215
249216Aws::S3::Model::HeadObjectOutcome AwsS3ClientWrapper::HeadObject (
250217 const Aws::S3::Model::HeadObjectRequest& request) {
251- CloudRequestCallbackGuard t (cloud_request_callback_.get (),
252- CloudRequestOpType::kInfoOp );
218+ Timer t (cloud_request_callback_.get (), CloudRequestOpType::kInfoOp );
253219 auto outcome = client_->HeadObject (request);
254220 t.SetSuccess (outcome.IsSuccess ());
255221 return outcome;
@@ -366,25 +332,14 @@ AwsEnv::AwsEnv(Env* underlying_env, const std::string& src_bucket_prefix,
366332 GetBucketLocationConstraintForName (config.region );
367333
368334 {
369- auto s3client = creds ? std::make_shared<Aws::S3::S3Client>(*creds, config)
370- : std::make_shared<Aws::S3::S3Client>(config);
335+ unique_ptr<Aws::S3::S3Client> s3client (
336+ creds ? new Aws::S3::S3Client (*creds, config)
337+ : new Aws::S3::S3Client (config));
371338
372339 s3client_ = std::make_shared<AwsS3ClientWrapper>(
373340 std::move (s3client), cloud_env_options.cloud_request_callback );
374341 }
375342
376- {
377- Aws::Transfer::TransferManagerConfiguration transferManagerConfig (
378- GetAwsTransferManagerExecutor ());
379- transferManagerConfig.s3Client = s3client_->GetClient ();
380- SetEncryptionParameters (cloud_env_options,
381- transferManagerConfig.putObjectTemplate );
382- SetEncryptionParameters (
383- cloud_env_options, transferManagerConfig.createMultipartUploadTemplate );
384- awsTransferManager_ =
385- Aws::Transfer::TransferManager::Create (transferManagerConfig);
386- }
387-
388343 // create dest bucket if specified
389344 if (has_dest_bucket_) {
390345 if (S3WritableFile::BucketExistsInS3 (s3client_, GetDestBucketPrefix (),
@@ -1505,7 +1460,7 @@ Status AwsEnv::SaveDbid(const std::string& dbid, const std::string& dirname) {
15051460 put_request.SetBucket (bucket);
15061461 put_request.SetKey (key);
15071462 put_request.SetMetadata (metadata);
1508- SetEncryptionParameters (cloud_env_options, put_request);
1463+ SetEncryptionParameters (put_request);
15091464
15101465 Aws::S3::Model::PutObjectOutcome put_outcome =
15111466 s3client_->PutObject (put_request);
@@ -1689,19 +1644,20 @@ Status AwsEnv::GetObject(const std::string& bucket_name_prefix,
16891644 std::string tmp_destination = local_destination + " .tmp" ;
16901645 auto s3_bucket = GetAwsBucket (bucket_name_prefix);
16911646
1692- CloudRequestCallbackGuard guard (
1693- cloud_env_options.cloud_request_callback .get (),
1694- CloudRequestOpType::kReadOp );
1695-
1696- auto transferHandle = awsTransferManager_->DownloadFile (
1697- s3_bucket, ToAwsString (bucket_object_path),
1698- ToAwsString (tmp_destination));
1699-
1700- transferHandle->WaitUntilFinished ();
1701-
1702- if (transferHandle->GetStatus () != Aws::Transfer::TransferStatus::COMPLETED) {
1647+ Aws::S3::Model::GetObjectRequest getObjectRequest;
1648+ getObjectRequest.SetBucket (s3_bucket);
1649+ getObjectRequest.SetKey (
1650+ Aws::String (bucket_object_path.data (), bucket_object_path.size ()));
1651+ getObjectRequest.SetResponseStreamFactory ([tmp_destination]() {
1652+ return Aws::New<Aws::FStream>(Aws::Utils::ARRAY_ALLOCATION_TAG,
1653+ tmp_destination, std::ios_base::out);
1654+ });
1655+ auto get_outcome = s3client_->GetObject (getObjectRequest);
1656+
1657+ bool isSuccess = get_outcome.IsSuccess ();
1658+ if (!isSuccess) {
17031659 localenv->DeleteFile (tmp_destination);
1704- const auto & error = transferHandle-> GetLastError ();
1660+ const auto & error = get_outcome. GetError ();
17051661 std::string errmsg (error.GetMessage ().c_str (), error.GetMessage ().size ());
17061662 Log (InfoLogLevel::ERROR_LEVEL, info_log_,
17071663 " [s3] GetObject %s/%s error %s." , s3_bucket.c_str (),
@@ -1715,23 +1671,21 @@ Status AwsEnv::GetObject(const std::string& bucket_name_prefix,
17151671 return Status::IOError (std::move (errmsg));
17161672 }
17171673
1718- guard.SetSize (transferHandle->GetBytesTotalSize ());
1719- guard.SetSuccess (true );
1720-
17211674 // Check if our local file is the same as S3 promised
17221675 uint64_t file_size{0 };
17231676 auto s = localenv->GetFileSize (tmp_destination, &file_size);
17241677 if (!s.ok ()) {
17251678 return s;
17261679 }
1727- if (file_size != transferHandle->GetBytesTotalSize ()) {
1680+ if (static_cast <int64_t >(file_size) !=
1681+ get_outcome.GetResult ().GetContentLength ()) {
17281682 localenv->DeleteFile (tmp_destination);
17291683 s = Status::IOError (" Partial download of a file " + local_destination);
17301684 Log (InfoLogLevel::ERROR_LEVEL, info_log_,
17311685 " [s3] GetObject %s/%s local size %ld != cloud size "
17321686 " %ld. %s" ,
17331687 s3_bucket.c_str (), bucket_object_path.c_str (), file_size,
1734- transferHandle-> GetBytesTotalSize (), s.ToString ().c_str ());
1688+ get_outcome. GetResult (). GetContentLength (), s.ToString ().c_str ());
17351689 }
17361690
17371691 if (s.ok ()) {
@@ -1761,33 +1715,51 @@ Status AwsEnv::PutObject(const std::string& local_file,
17611715 return Status::IOError (local_file + " Zero size." );
17621716 }
17631717
1764- auto s3_bucket = GetAwsBucket (bucket_name_prefix);
1765-
1766- CloudRequestCallbackGuard guard (
1767- cloud_env_options.cloud_request_callback .get (),
1768- CloudRequestOpType::kWriteOp , fsize);
1718+ auto input_data = Aws::MakeShared<Aws::FStream>(
1719+ bucket_object_path.c_str (), local_file.c_str (),
1720+ std::ios_base::in | std::ios_base::out);
17691721
1770- auto transferHandle = awsTransferManager_->UploadFile (
1771- ToAwsString (local_file), s3_bucket, ToAwsString (bucket_object_path),
1772- Aws::DEFAULT_CONTENT_TYPE, Aws::Map<Aws::String, Aws::String>());
1773-
1774- transferHandle->WaitUntilFinished ();
1722+ // Copy entire file into S3.
1723+ // Writes to an S3 object are atomic.
1724+ Aws::S3::Model::PutObjectRequest put_request;
1725+ put_request.SetBucket (GetAwsBucket (bucket_name_prefix));
1726+ put_request.SetKey (
1727+ Aws::String (bucket_object_path.data (), bucket_object_path.size ()));
1728+ put_request.SetBody (input_data);
1729+ SetEncryptionParameters (put_request);
17751730
1776- if (transferHandle->GetStatus () != Aws::Transfer::TransferStatus::COMPLETED) {
1777- auto error = transferHandle->GetLastError ();
1731+ Aws::S3::Model::PutObjectOutcome put_outcome =
1732+ s3client_->PutObject (put_request, fsize);
1733+ bool isSuccess = put_outcome.IsSuccess ();
1734+ if (!isSuccess) {
1735+ const Aws::Client::AWSError<Aws::S3::S3Errors>& error =
1736+ put_outcome.GetError ();
17781737 std::string errmsg (error.GetMessage ().c_str (), error.GetMessage ().size ());
17791738 st = Status::IOError (local_file, errmsg);
17801739 }
17811740
1782- guard.SetSuccess (st.ok ());
1783-
17841741 Log (InfoLogLevel::INFO_LEVEL, info_log_,
1785- " [s3] PutObject %s/%s, size %zu, status %s" , s3_bucket.c_str (),
1786- bucket_object_path.c_str (), fsize, st.ToString ().c_str ());
1742+ " [s3] PutObject %s/%s, size %zu, status %s" ,
1743+ put_request.GetBucket ().c_str (), bucket_object_path.c_str (), fsize,
1744+ st.ToString ().c_str ());
17871745
17881746 return st;
17891747}
17901748
1749+ void AwsEnv::SetEncryptionParameters (
1750+ Aws::S3::Model::PutObjectRequest& put_request) const {
1751+ if (cloud_env_options.server_side_encryption ) {
1752+ if (cloud_env_options.encryption_key_id .empty ()) {
1753+ put_request.SetServerSideEncryption (
1754+ Aws::S3::Model::ServerSideEncryption::AES256);
1755+ } else {
1756+ put_request.SetServerSideEncryption (
1757+ Aws::S3::Model::ServerSideEncryption::aws_kms);
1758+ put_request.SetSSEKMSKeyId (cloud_env_options.encryption_key_id .c_str ());
1759+ }
1760+ }
1761+ }
1762+
17911763//
17921764// prepends the configured src object path name
17931765//
0 commit comments