Skip to content

Commit 0ecd80e

Browse files
committed
Change the Cloud files for 5.6.2 release
-> Fixed to not require using std -> Upgrade compiler issues with obsolete code and print formats -> Header file locations
1 parent b965a29 commit 0ecd80e

19 files changed

+138
-132
lines changed

cloud/aws/aws_env.cc

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
// Copyright (c) 2016-present, Rockset, Inc. All rights reserved.
22
//
33
#include "cloud/aws/aws_env.h"
4+
45
#include <unistd.h>
6+
57
#include <chrono>
8+
#include <cinttypes>
69
#include <fstream>
710
#include <iostream>
811
#include <memory>
12+
913
#include "rocksdb/env.h"
1014
#include "rocksdb/status.h"
1115
#include "util/stderr_logger.h"
@@ -171,8 +175,9 @@ struct JobHandle {
171175

172176
class JobExecutor {
173177
public:
174-
shared_ptr<JobHandle> ScheduleJob(std::chrono::steady_clock::time_point time,
175-
std::function<void(void)> callback);
178+
std::shared_ptr<JobHandle> ScheduleJob(
179+
std::chrono::steady_clock::time_point time,
180+
std::function<void(void)> callback);
176181
void CancelJob(JobHandle* handle);
177182

178183
JobExecutor();
@@ -205,7 +210,7 @@ JobExecutor::~JobExecutor() {
205210
}
206211
}
207212

208-
shared_ptr<JobHandle> JobExecutor::ScheduleJob(
213+
std::shared_ptr<JobHandle> JobExecutor::ScheduleJob(
209214
std::chrono::steady_clock::time_point time,
210215
std::function<void(void)> callback) {
211216
std::lock_guard<std::mutex> lk(mutex_);
@@ -404,7 +409,7 @@ AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
404409
}
405410
}
406411

407-
shared_ptr<Aws::Auth::AWSCredentialsProvider> creds;
412+
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> creds;
408413
create_bucket_status_ =
409414
cloud_env_options.credentials.GetCredentialsProvider(&creds);
410415
if (!create_bucket_status_.ok()) {
@@ -582,10 +587,10 @@ Status AwsEnv::CheckOption(const EnvOptions& options) {
582587
// Ability to read a file directly from cloud storage
583588
Status AwsEnv::NewSequentialFileCloud(const std::string& bucket,
584589
const std::string& fname,
585-
unique_ptr<SequentialFile>* result,
590+
std::unique_ptr<SequentialFile>* result,
586591
const EnvOptions& options) {
587592
assert(status().ok());
588-
unique_ptr<S3ReadableFile> file;
593+
std::unique_ptr<S3ReadableFile> file;
589594
Status st = NewS3ReadableFile(bucket, fname, &file);
590595
if (!st.ok()) {
591596
return st;
@@ -597,7 +602,7 @@ Status AwsEnv::NewSequentialFileCloud(const std::string& bucket,
597602

598603
// open a file for sequential reading
599604
Status AwsEnv::NewSequentialFile(const std::string& logical_fname,
600-
unique_ptr<SequentialFile>* result,
605+
std::unique_ptr<SequentialFile>* result,
601606
const EnvOptions& options) {
602607
assert(status().ok());
603608
result->reset();
@@ -632,7 +637,7 @@ Status AwsEnv::NewSequentialFile(const std::string& logical_fname,
632637
st = base_env_->NewSequentialFile(fname, result, options);
633638
}
634639
} else {
635-
unique_ptr<S3ReadableFile> file;
640+
std::unique_ptr<S3ReadableFile> file;
636641
if (!st.ok() && HasDestBucket()) { // read from destination S3
637642
st = NewS3ReadableFile(GetDestBucketName(), destname(fname), &file);
638643
}
@@ -671,7 +676,7 @@ Status AwsEnv::NewSequentialFile(const std::string& logical_fname,
671676

672677
// open a file for random reading
673678
Status AwsEnv::NewRandomAccessFile(const std::string& logical_fname,
674-
unique_ptr<RandomAccessFile>* result,
679+
std::unique_ptr<RandomAccessFile>* result,
675680
const EnvOptions& options) {
676681
assert(status().ok());
677682
result->reset();
@@ -745,7 +750,7 @@ Status AwsEnv::NewRandomAccessFile(const std::string& logical_fname,
745750
// Only execute this code path if keep_local_sst_files == false. If it's
746751
// true, we will never use S3ReadableFile to read; we copy the file
747752
// locally and read using base_env.
748-
unique_ptr<S3ReadableFile> file;
753+
std::unique_ptr<S3ReadableFile> file;
749754
if (!st.ok() && HasDestBucket()) {
750755
st = NewS3ReadableFile(GetDestBucketName(), destname(fname), &file);
751756
}
@@ -784,7 +789,7 @@ Status AwsEnv::NewRandomAccessFile(const std::string& logical_fname,
784789

785790
// create a new file for writing
786791
Status AwsEnv::NewWritableFile(const std::string& logical_fname,
787-
unique_ptr<WritableFile>* result,
792+
std::unique_ptr<WritableFile>* result,
788793
const EnvOptions& options) {
789794
assert(status().ok());
790795
result->reset();
@@ -799,7 +804,7 @@ Status AwsEnv::NewWritableFile(const std::string& logical_fname,
799804
Status s;
800805

801806
if (HasDestBucket() && (sstfile || identity || manifest)) {
802-
unique_ptr<S3WritableFile> f(
807+
std::unique_ptr<S3WritableFile> f(
803808
new S3WritableFile(this, fname, GetDestBucketName(), destname(fname),
804809
options, cloud_env_options));
805810
s = f->status();
@@ -851,23 +856,23 @@ class S3Directory : public Directory {
851856
AwsEnv* env_;
852857
std::string name_;
853858
Status status_;
854-
unique_ptr<Directory> posixDir;
859+
std::unique_ptr<Directory> posixDir;
855860
};
856861

857862
//
858863
// Returns success only if the directory-bucket exists in the
859864
// AWS S3 service and the posixEnv local directory exists as well.
860865
//
861866
Status AwsEnv::NewDirectory(const std::string& name,
862-
unique_ptr<Directory>* result) {
867+
std::unique_ptr<Directory>* result) {
863868
assert(status().ok());
864869
result->reset(nullptr);
865870

866871
Log(InfoLogLevel::DEBUG_LEVEL, info_log_, "[aws] NewDirectory name '%s'",
867872
name.c_str());
868873

869874
// create new object.
870-
unique_ptr<S3Directory> d(new S3Directory(this, name));
875+
std::unique_ptr<S3Directory> d(new S3Directory(this, name));
871876

872877
// Check if the path exists in local dir
873878
if (!d->status().ok()) {
@@ -964,7 +969,7 @@ Status AwsEnv::GetChildrenFromS3(const std::string& path,
964969
s3err == Aws::S3::S3Errors::NO_SUCH_KEY ||
965970
s3err == Aws::S3::S3Errors::RESOURCE_NOT_FOUND) {
966971
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
967-
"[s3] GetChildren dir %s does not exist", path.c_str(),
972+
"[s3] GetChildren dir %s does not exist: %s", path.c_str(),
968973
errmsg.c_str());
969974
return Status::NotFound(path, errmsg.c_str());
970975
}
@@ -1037,7 +1042,7 @@ Status AwsEnv::HeadObject(const std::string& bucket,
10371042

10381043
Status AwsEnv::NewS3ReadableFile(const std::string& bucket,
10391044
const std::string& fname,
1040-
unique_ptr<S3ReadableFile>* result) {
1045+
std::unique_ptr<S3ReadableFile>* result) {
10411046
// First, check if the file exists and also find its size. We use size in
10421047
// S3ReadableFile to make sure we always read the valid ranges of the file
10431048
uint64_t size;
@@ -1065,7 +1070,8 @@ Status AwsEnv::EmptyBucket(const std::string& bucket,
10651070
return st;
10661071
}
10671072
Log(InfoLogLevel::DEBUG_LEVEL, info_log_,
1068-
"[s3] EmptyBucket going to delete %d objects in bucket %s",
1073+
"[s3] EmptyBucket going to delete %" ROCKSDB_PRIszt
1074+
" objects in bucket %s",
10691075
results.size(), bucket.c_str());
10701076

10711077
// Delete all objects from bucket
@@ -1145,8 +1151,8 @@ Status AwsEnv::GetChildren(const std::string& path,
11451151
result->erase(std::unique(result->begin(), result->end()), result->end());
11461152

11471153
Log(InfoLogLevel::DEBUG_LEVEL, info_log_,
1148-
"[s3] GetChildren %s successfully returned %d files", path.c_str(),
1149-
result->size());
1154+
"[s3] GetChildren %s successfully returned %" ROCKSDB_PRIszt " files",
1155+
path.c_str(), result->size());
11501156
return Status::OK();
11511157
}
11521158

@@ -1390,8 +1396,9 @@ Status AwsEnv::GetFileSize(const std::string& logical_fname, uint64_t* size) {
13901396
} else {
13911397
st = base_env_->GetFileSize(fname, size);
13921398
}
1393-
Log(InfoLogLevel::DEBUG_LEVEL, info_log_, "[aws] GetFileSize src '%s' %s %ld",
1394-
fname.c_str(), st.ToString().c_str(), *size);
1399+
Log(InfoLogLevel::DEBUG_LEVEL, info_log_,
1400+
"[aws] GetFileSize src '%s' %s %" PRIu64, fname.c_str(),
1401+
st.ToString().c_str(), *size);
13951402
return st;
13961403
}
13971404

@@ -1817,17 +1824,18 @@ Status AwsEnv::GetObject(const std::string& bucket_name,
18171824
localenv->DeleteFile(tmp_destination);
18181825
s = Status::IOError("Partial download of a file " + local_destination);
18191826
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
1820-
"[s3] GetObject %s/%s local size %ld != cloud size "
1827+
"[s3] GetObject %s/%s local size %" PRIu64
1828+
" != cloud size "
18211829
"%ld. %s",
1822-
bucket_name.c_str(), object_path.c_str(), file_size,
1823-
result.objectSize, s.ToString().c_str());
1830+
bucket_name.c_str(), object_path.c_str(), file_size, result.objectSize,
1831+
s.ToString().c_str());
18241832
}
18251833

18261834
if (s.ok()) {
18271835
s = localenv->RenameFile(tmp_destination, local_destination);
18281836
}
18291837
Log(InfoLogLevel::INFO_LEVEL, info_log_,
1830-
"[s3] GetObject %s/%s size %ld. %s", bucket_name.c_str(),
1838+
"[s3] GetObject %s/%s size %" PRIu64 ". %s", bucket_name.c_str(),
18311839
object_path.c_str(), file_size, s.ToString().c_str());
18321840
return s;
18331841
}
@@ -1910,11 +1918,11 @@ Status AwsEnv::PutObject(const std::string& local_file,
19101918
std::string errmsg(error.GetMessage().c_str(), error.GetMessage().size());
19111919
st = Status::IOError(local_file, errmsg);
19121920
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
1913-
"[s3] PutObject %s/%s, size %zu, ERROR %s", s3_bucket.c_str(),
1921+
"[s3] PutObject %s/%s, size %" PRIu64 ", ERROR %s", s3_bucket.c_str(),
19141922
object_path.c_str(), fsize, errmsg.c_str());
19151923
} else {
19161924
Log(InfoLogLevel::INFO_LEVEL, info_log_,
1917-
"[s3] PutObject %s/%s, size %zu, OK", s3_bucket.c_str(),
1925+
"[s3] PutObject %s/%s, size %" PRIu64 ", OK", s3_bucket.c_str(),
19181926
object_path.c_str(), fsize);
19191927
}
19201928

@@ -1946,7 +1954,8 @@ Status AwsEnv::LockFile(const std::string& fname, FileLock** lock) {
19461954

19471955
Status AwsEnv::UnlockFile(FileLock* lock) { return Status::OK(); }
19481956

1949-
Status AwsEnv::NewLogger(const std::string& fname, shared_ptr<Logger>* result) {
1957+
Status AwsEnv::NewLogger(const std::string& fname,
1958+
std::shared_ptr<Logger>* result) {
19501959
return base_env_->NewLogger(fname, result);
19511960
}
19521961

@@ -1960,7 +1969,7 @@ Status AwsEnv::NewAwsEnv(Env* base_env,
19601969
if (!base_env) {
19611970
base_env = Env::Default();
19621971
}
1963-
unique_ptr<AwsEnv> aenv(new AwsEnv(base_env, cloud_options, info_log));
1972+
std::unique_ptr<AwsEnv> aenv(new AwsEnv(base_env, cloud_options, info_log));
19641973
if (!aenv->status().ok()) {
19651974
status = aenv->status();
19661975
} else {

cloud/aws/aws_env.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -346,9 +346,8 @@ class AwsEnv : public CloudEnvImpl {
346346
Aws::Map<Aws::String, Aws::String>* metadata = nullptr,
347347
uint64_t* size = nullptr, uint64_t* modtime = nullptr);
348348

349-
Status NewS3ReadableFile(const std::string& bucket,
350-
const std::string& fname,
351-
unique_ptr<S3ReadableFile>* result);
349+
Status NewS3ReadableFile(const std::string& bucket, const std::string& fname,
350+
std::unique_ptr<S3ReadableFile>* result);
352351

353352
// Save IDENTITY file to S3. Update dbid registry.
354353
Status SaveIdentitytoS3(const std::string& localfile,

cloud/aws/aws_file.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
#include <iostream>
99
#include "cloud/aws/aws_env.h"
1010
#include "cloud/filename.h"
11+
#include "file/filename.h"
1112
#include "rocksdb/env.h"
1213
#include "rocksdb/status.h"
13-
#include "util/filename.h"
1414

1515
#include <aws/core/Aws.h>
1616
#include <aws/core/utils/DateTime.h>
@@ -79,7 +79,7 @@ class S3WritableFile : public WritableFile {
7979
std::string fname_;
8080
std::string tmp_file_;
8181
Status status_;
82-
unique_ptr<WritableFile> local_file_;
82+
std::unique_ptr<WritableFile> local_file_;
8383
std::string bucket_prefix_;
8484
std::string cloud_fname_;
8585
bool is_manifest_;

cloud/aws/aws_kafka.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
// A log file maps to a stream in Kinesis.
55
//
66

7+
#include <cinttypes>
78
#include <fstream>
89
#include <iostream>
910

@@ -144,8 +145,8 @@ Status KafkaWritableFile::Flush() {
144145
"[kafka] WritableFile src %s Flushed", fname_.c_str());
145146
} else if (timeout) {
146147
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
147-
"[kafka] WritableFile src %s Flushing timed out after %lldus",
148-
fname_.c_str(), kFlushTimeout);
148+
"[kafka] WritableFile src %s Flushing timed out after %" PRId64 "us",
149+
fname_.c_str(), kFlushTimeout.count());
149150
status_ = Status::TimedOut();
150151
} else {
151152
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,

cloud/aws/aws_kinesis.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,8 @@ Status KinesisController::TailStream() {
249249
Aws::Kinesis::KinesisErrors err = error.GetErrorType();
250250
if (err == Aws::Kinesis::KinesisErrors::EXPIRED_ITERATOR) {
251251
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
252-
"[%s] expired shard iterator for %s. Reseeking...",
253-
Name(), topic_.c_str(), error.GetMessage().c_str());
252+
"[%s] expired shard iterator for %s. Reseeking...", Name(),
253+
topic_.c_str());
254254
shards_iterator_[0] = "";
255255
SeekShards(); // read position at last seqno
256256
} else {

cloud/aws/aws_retry.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
//
33
//
44

5-
#include "rocksdb/cloud/cloud_env_options.h"
5+
#include <cinttypes>
6+
67
#include "cloud/aws/aws_file.h"
8+
#include "rocksdb/cloud/cloud_env_options.h"
79
#ifdef USE_AWS
810
#include <aws/core/client/AWSError.h>
911
#include <aws/core/client/ClientConfiguration.h>
@@ -68,7 +70,7 @@ bool AwsRetryStrategy::ShouldRetry(const Aws::Client::AWSError<Aws::Client::Core
6870
if (attemptedRetries <= internal_failure_num_retries_) {
6971
Log(InfoLogLevel::INFO_LEVEL, env_->info_log_,
7072
"[aws] Encountered retriable failure: %s (code %d, http %d). "
71-
"Exception %s. retry attempt %d is lesser than max retries %d. "
73+
"Exception %s. retry attempt %ld is lesser than max retries %d. "
7274
"Retrying...",
7375
err.c_str(), static_cast<int>(ce),
7476
static_cast<int>(error.GetResponseCode()), emsg.c_str(),
@@ -77,15 +79,15 @@ bool AwsRetryStrategy::ShouldRetry(const Aws::Client::AWSError<Aws::Client::Core
7779
}
7880
Log(InfoLogLevel::INFO_LEVEL, env_->info_log_,
7981
"[aws] Encountered retriable failure: %s (code %d, http %d). Exception "
80-
"%s. retry attempt %d exceeds max retries %d. Aborting...",
82+
"%s. retry attempt %ld exceeds max retries %d. Aborting...",
8183
err.c_str(), static_cast<int>(ce),
8284
static_cast<int>(error.GetResponseCode()), emsg.c_str(),
8385
attemptedRetries, internal_failure_num_retries_);
8486
return false;
8587
}
8688
Log(InfoLogLevel::WARN_LEVEL, env_->info_log_,
8789
"[aws] Encountered S3 failure %s (code %d, http %d). Exception %s."
88-
" retry attempt %d max retries %d. Using default retry policy...",
90+
" retry attempt %ld max retries %d. Using default retry policy...",
8991
err.c_str(), static_cast<int>(ce),
9092
static_cast<int>(error.GetResponseCode()), emsg.c_str(), attemptedRetries,
9193
internal_failure_num_retries_);

0 commit comments

Comments
 (0)