Skip to content

Commit 9d2981e

Browse files
committed
Change CreateNewAwsEnv to use the created base. The env_ that was created earlier is now used as the "default" env for the AwsEnv. This fixes an occasional timing-related failure of the DbTest.Snapshot test.
Move methods that are not AWS-specific to the CloudEnvImpl This change moves many of the methods from the AWS Env into CloudEnvImpl. With the addition of the CloudStorageProvider and CloudLogController classes, many methods in the AWS Env no longe methods that are moved in this change do not use any methods or code that was specific to the AWS Environment. Note that DeleteFile was left for a later change, as it requires moving/changing much more code. Run "clang-format -i cloud/*.{cc,h} cloud/aws/*.{cc,h} include/rocksdb/cloud/*,{cc,h}.
1 parent e6fb333 commit 9d2981e

27 files changed

+1088
-1111
lines changed

cloud/aws/aws_env.cc

Lines changed: 12 additions & 675 deletions
Large diffs are not rendered by default.

cloud/aws/aws_env.h

Lines changed: 8 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ namespace rocksdb {
2222

2323
class S3ReadableFile;
2424

25-
2625
namespace detail {
2726
struct JobHandle;
2827
} // namespace detail
@@ -53,9 +52,9 @@ struct JobHandle;
5352
class AwsEnv : public CloudEnvImpl {
5453
public:
5554
// A factory method for creating S3 envs
56-
static Status NewAwsEnv(Env* env,
57-
const CloudEnvOptions& env_options,
58-
const std::shared_ptr<Logger> & info_log, CloudEnv** cenv);
55+
static Status NewAwsEnv(Env* env, const CloudEnvOptions& env_options,
56+
const std::shared_ptr<Logger>& info_log,
57+
CloudEnv** cenv);
5958

6059
virtual ~AwsEnv();
6160

@@ -72,136 +71,20 @@ class AwsEnv : public CloudEnvImpl {
7271
// explicitly make the default region be us-west-2.
7372
static constexpr const char* default_region = "us-west-2";
7473

75-
virtual Status NewSequentialFile(const std::string& fname,
76-
std::unique_ptr<SequentialFile>* result,
77-
const EnvOptions& options) override;
78-
79-
virtual Status NewSequentialFileCloud(const std::string& bucket,
80-
const std::string& fname,
81-
std::unique_ptr<SequentialFile>* result,
82-
const EnvOptions& options) override;
83-
84-
virtual Status NewRandomAccessFile(const std::string& fname,
85-
std::unique_ptr<RandomAccessFile>* result,
86-
const EnvOptions& options) override;
87-
88-
virtual Status NewWritableFile(const std::string& fname,
89-
std::unique_ptr<WritableFile>* result,
90-
const EnvOptions& options) override;
91-
92-
virtual Status ReopenWritableFile(const std::string& /*fname*/,
93-
std::unique_ptr<WritableFile>* /*result*/,
94-
const EnvOptions& /*options*/) override;
95-
96-
virtual Status NewDirectory(const std::string& name,
97-
std::unique_ptr<Directory>* result) override;
98-
99-
virtual Status FileExists(const std::string& fname) override;
100-
101-
virtual Status GetChildren(const std::string& path,
102-
std::vector<std::string>* result) override;
103-
10474
virtual Status DeleteFile(const std::string& fname) override;
10575

106-
virtual Status CreateDir(const std::string& name) override;
107-
108-
virtual Status CreateDirIfMissing(const std::string& name) override;
109-
110-
virtual Status DeleteDir(const std::string& name) override;
111-
112-
virtual Status GetFileSize(const std::string& fname, uint64_t* size) override;
113-
114-
virtual Status GetFileModificationTime(const std::string& fname,
115-
uint64_t* file_mtime) override;
116-
117-
virtual Status RenameFile(const std::string& src,
118-
const std::string& target) override;
119-
120-
virtual Status LinkFile(const std::string& src,
121-
const std::string& target) override;
122-
12376
virtual Status LockFile(const std::string& fname, FileLock** lock) override;
12477

12578
virtual Status UnlockFile(FileLock* lock) override;
12679

127-
virtual Status NewLogger(const std::string& fname,
128-
std::shared_ptr<Logger>* result) override;
129-
130-
virtual void Schedule(void (*function)(void* arg), void* arg,
131-
Priority pri = LOW, void* tag = nullptr,
132-
void (*unschedFunction)(void* arg) = 0) override {
133-
base_env_->Schedule(function, arg, pri, tag, unschedFunction);
134-
}
135-
136-
virtual int UnSchedule(void* tag, Priority pri) override {
137-
return base_env_->UnSchedule(tag, pri);
138-
}
139-
140-
virtual void StartThread(void (*function)(void* arg), void* arg) override {
141-
base_env_->StartThread(function, arg);
142-
}
143-
144-
virtual void WaitForJoin() override { base_env_->WaitForJoin(); }
145-
146-
virtual unsigned int GetThreadPoolQueueLen(
147-
Priority pri = LOW) const override {
148-
return base_env_->GetThreadPoolQueueLen(pri);
149-
}
150-
151-
virtual Status GetTestDirectory(std::string* path) override {
152-
return base_env_->GetTestDirectory(path);
153-
}
154-
155-
virtual uint64_t NowMicros() override { return base_env_->NowMicros(); }
156-
157-
virtual void SleepForMicroseconds(int micros) override {
158-
base_env_->SleepForMicroseconds(micros);
159-
}
160-
161-
virtual Status GetHostName(char* name, uint64_t len) override {
162-
return base_env_->GetHostName(name, len);
163-
}
164-
165-
virtual Status GetCurrentTime(int64_t* unix_time) override {
166-
return base_env_->GetCurrentTime(unix_time);
167-
}
168-
169-
virtual Status GetAbsolutePath(const std::string& db_path,
170-
std::string* output_path) override {
171-
return base_env_->GetAbsolutePath(db_path, output_path);
172-
}
173-
174-
virtual void SetBackgroundThreads(int number, Priority pri = LOW) override {
175-
base_env_->SetBackgroundThreads(number, pri);
176-
}
177-
int GetBackgroundThreads(Priority pri) override {
178-
return base_env_->GetBackgroundThreads(pri);
179-
}
180-
181-
virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
182-
base_env_->IncBackgroundThreadsIfNeeded(number, pri);
183-
}
184-
185-
virtual std::string TimeToString(uint64_t number) override {
186-
return base_env_->TimeToString(number);
187-
}
188-
189-
static uint64_t gettid() {
190-
assert(sizeof(pthread_t) <= sizeof(uint64_t));
191-
return (uint64_t)pthread_self();
192-
}
193-
194-
virtual uint64_t GetThreadID() const override { return AwsEnv::gettid(); }
195-
19680
std::string GetWALCacheDir();
19781

19882
// Saves and retrieves the dbid->dirname mapping in S3
19983
Status SaveDbid(const std::string& bucket_name, const std::string& dbid,
20084
const std::string& dirname) override;
201-
Status GetPathForDbid(const std::string& bucket,
202-
const std::string& dbid, std::string* dirname) override;
203-
Status GetDbidList(const std::string& bucket,
204-
DbidList* dblist) override;
85+
Status GetPathForDbid(const std::string& bucket, const std::string& dbid,
86+
std::string* dirname) override;
87+
Status GetDbidList(const std::string& bucket, DbidList* dblist) override;
20588
Status DeleteDbid(const std::string& bucket,
20689
const std::string& dbid) override;
20790
Status DeleteCloudFileFromDest(const std::string& fname) override;
@@ -220,41 +103,17 @@ class AwsEnv : public CloudEnvImpl {
220103
// The AWS credentials are specified to the constructor via
221104
// access_key_id and secret_key.
222105
//
223-
explicit AwsEnv(Env* underlying_env,
224-
const CloudEnvOptions& cloud_options,
225-
const std::shared_ptr<Logger> & info_log = nullptr);
226-
227-
106+
explicit AwsEnv(Env* underlying_env, const CloudEnvOptions& cloud_options,
107+
const std::shared_ptr<Logger>& info_log = nullptr);
228108

229109
// The pathname that contains a list of all db's inside a bucket.
230110
static constexpr const char* dbid_registry_ = "/.rockset/dbid/";
231111

232-
Status create_bucket_status_;
233-
234112
std::mutex files_to_delete_mutex_;
235113
std::chrono::seconds file_deletion_delay_ = std::chrono::hours(1);
236114
std::unordered_map<std::string, std::shared_ptr<detail::JobHandle>>
237115
files_to_delete_;
238116
Random64 rng_;
239-
240-
Status status();
241-
242-
// Validate options
243-
Status CheckOption(const EnvOptions& options);
244-
245-
246-
Status NewS3ReadableFile(const std::string& bucket, const std::string& fname,
247-
std::unique_ptr<S3ReadableFile>* result);
248-
249-
// Save IDENTITY file to S3. Update dbid registry.
250-
Status SaveIdentitytoS3(const std::string& localfile,
251-
const std::string& target_idfile);
252-
253-
// Converts a local pathname to an object name in the src bucket
254-
std::string srcname(const std::string& localname);
255-
256-
// Converts a local pathname to an object name in the dest bucket
257-
std::string destname(const std::string& localname);
258117
};
259118

260119
} // namespace rocksdb

cloud/aws/aws_file.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@ inline Aws::String ToAwsString(const std::string& s) {
1010
return Aws::String(s.data(), s.size());
1111
}
1212

13-
} // namepace rocksdb
13+
} // namespace rocksdb
1414

1515
#endif /* USE_AWS */

cloud/aws/aws_kafka.cc

Lines changed: 30 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,22 @@
2020
namespace rocksdb {
2121
namespace cloud {
2222
namespace kafka {
23-
23+
2424
/***************************************************/
2525
/* KafkaWritableFile */
2626
/***************************************************/
2727
class KafkaWritableFile : public CloudLogWritableFile {
2828
public:
2929
static const std::chrono::microseconds kFlushTimeout;
3030

31-
KafkaWritableFile(CloudEnv* env, const std::string& fname, const EnvOptions& options,
31+
KafkaWritableFile(CloudEnv* env, const std::string& fname,
32+
const EnvOptions& options,
3233
std::shared_ptr<RdKafka::Producer> producer,
3334
std::shared_ptr<RdKafka::Topic> topic)
34-
: CloudLogWritableFile(env, fname, options),
35-
producer_(producer),
36-
topic_(topic),
37-
current_offset_(0) {
35+
: CloudLogWritableFile(env, fname, options),
36+
producer_(producer),
37+
topic_(topic),
38+
current_offset_(0) {
3839
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
3940
"[kafka] WritableFile opened file %s", fname_.c_str());
4041
}
@@ -58,11 +59,10 @@ class KafkaWritableFile : public CloudLogWritableFile {
5859
const std::chrono::microseconds KafkaWritableFile::kFlushTimeout =
5960
std::chrono::seconds(10);
6061

61-
6262
Status KafkaWritableFile::ProduceRaw(const std::string& operation_name,
6363
const Slice& message) {
64-
if (!status_.ok()){
65-
return status_;
64+
if (!status_.ok()) {
65+
return status_;
6666
}
6767

6868
RdKafka::ErrorCode resp;
@@ -116,13 +116,9 @@ Status KafkaWritableFile::Close() {
116116
return ProduceRaw("Close", serialized_data);
117117
}
118118

119-
bool KafkaWritableFile::IsSyncThreadSafe() const {
120-
return true;
121-
}
119+
bool KafkaWritableFile::IsSyncThreadSafe() const { return true; }
122120

123-
Status KafkaWritableFile::Sync() {
124-
return Flush();
125-
}
121+
Status KafkaWritableFile::Sync() { return Flush(); }
126122

127123
Status KafkaWritableFile::Flush() {
128124
std::chrono::microseconds start(env_->NowMicros());
@@ -175,33 +171,33 @@ Status KafkaWritableFile::LogDelete() {
175171
//
176172
class KafkaController : public CloudLogControllerImpl {
177173
public:
178-
179174
~KafkaController() {
180175
for (size_t i = 0; i < partitions_.size(); i++) {
181176
consumer_->stop(consumer_topic_.get(), partitions_[i]->partition());
182177
}
183-
178+
184179
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
185180
"[%s] KafkaController closed.", Name());
186181
}
187-
188-
const char *Name() const override { return "kafka"; }
182+
183+
const char* Name() const override { return "kafka"; }
189184

190185
virtual Status CreateStream(const std::string& /* bucket_prefix */) override {
191186
// Kafka client cannot create a topic. Topics are either manually created
192187
// or implicitly created on first write if auto.create.topics.enable is
193188
// true.
194189
return status_;
195190
}
196-
virtual Status WaitForStreamReady(const std::string& /* bucket_prefix */) override {
191+
virtual Status WaitForStreamReady(
192+
const std::string& /* bucket_prefix */) override {
197193
// Kafka topics don't need to be waited on.
198194
return status_;
199195
}
200196

201197
virtual Status TailStream() override;
202198

203-
virtual CloudLogWritableFile* CreateWritableFile(const std::string& fname,
204-
const EnvOptions& options) override;
199+
virtual CloudLogWritableFile* CreateWritableFile(
200+
const std::string& fname, const EnvOptions& options) override;
205201

206202
protected:
207203
Status Initialize(CloudEnv* env) override;
@@ -293,8 +289,7 @@ Status KafkaController::TailStream() {
293289
}
294290

295291
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_, "[%s] TailStream topic %s %s",
296-
Name(), consumer_topic_->name().c_str(),
297-
status_.ToString().c_str());
292+
Name(), consumer_topic_->name().c_str(), status_.ToString().c_str());
298293

299294
Status lastErrorStatus;
300295
int retryAttempt = 0;
@@ -319,14 +314,14 @@ Status KafkaController::TailStream() {
319314
Log(InfoLogLevel::ERROR_LEVEL, env_->info_log_,
320315
"[%s] error processing message size %ld "
321316
"extracted from stream %s %s",
322-
Name(), message->len(),
323-
consumer_topic_->name().c_str(), status_.ToString().c_str());
317+
Name(), message->len(), consumer_topic_->name().c_str(),
318+
status_.ToString().c_str());
324319
} else {
325320
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
326321
"[%s] successfully processed message size %ld "
327322
"extracted from stream %s %s",
328-
Name(), message->len(),
329-
consumer_topic_->name().c_str(), status_.ToString().c_str());
323+
Name(), message->len(), consumer_topic_->name().c_str(),
324+
status_.ToString().c_str());
330325
}
331326

332327
// Remember last read offset from topic (currently unused).
@@ -344,8 +339,7 @@ Status KafkaController::TailStream() {
344339
RdKafka::err2str(message->err()).c_str());
345340

346341
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
347-
"[%s] error reading %s %s", Name(),
348-
consumer_topic_->name().c_str(),
342+
"[%s] error reading %s %s", Name(), consumer_topic_->name().c_str(),
349343
RdKafka::err2str(message->err()).c_str());
350344

351345
++retryAttempt;
@@ -376,9 +370,8 @@ Status KafkaController::InitializePartitions() {
376370
RdKafka::err2str(err).c_str());
377371

378372
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
379-
"[%s] S3ReadableFile file %s Unable to find shards %s",
380-
Name(), consumer_topic_->name().c_str(),
381-
status_.ToString().c_str());
373+
"[%s] S3ReadableFile file %s Unable to find shards %s", Name(),
374+
consumer_topic_->name().c_str(), status_.ToString().c_str());
382375

383376
return status_;
384377
}
@@ -432,11 +425,12 @@ Status CloudLogControllerImpl::CreateKafkaController(
432425
std::shared_ptr<CloudLogController>* output) {
433426
#ifndef USE_KAFKA
434427
output->reset();
435-
return Status::NotSupported("In order to use Kafka, make sure you're compiling with "
436-
"USE_KAFKA=1");
428+
return Status::NotSupported(
429+
"In order to use Kafka, make sure you're compiling with "
430+
"USE_KAFKA=1");
437431
#else
438432
output->reset(new rocksdb::cloud::kafka::KafkaController());
439433
return Status::OK();
440-
#endif // USE_KAFKA
434+
#endif // USE_KAFKA
441435
}
442436
} // namespace rocksdb

0 commit comments

Comments
 (0)