Skip to content

Commit 77e0d58

Browse files
authored
Merge pull request #88 from rockset/CloudEnv
Move common code from AwsEnv to CloudEnvImpl
2 parents e6fb333 + 9d2981e commit 77e0d58

27 files changed

+1088
-1111
lines changed

cloud/aws/aws_env.cc

Lines changed: 12 additions & 675 deletions
Large diffs are not rendered by default.

cloud/aws/aws_env.h

Lines changed: 8 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ namespace rocksdb {
2222

2323
class S3ReadableFile;
2424

25-
2625
namespace detail {
2726
struct JobHandle;
2827
} // namespace detail
@@ -53,9 +52,9 @@ struct JobHandle;
5352
class AwsEnv : public CloudEnvImpl {
5453
public:
5554
// A factory method for creating S3 envs
56-
static Status NewAwsEnv(Env* env,
57-
const CloudEnvOptions& env_options,
58-
const std::shared_ptr<Logger> & info_log, CloudEnv** cenv);
55+
static Status NewAwsEnv(Env* env, const CloudEnvOptions& env_options,
56+
const std::shared_ptr<Logger>& info_log,
57+
CloudEnv** cenv);
5958

6059
virtual ~AwsEnv();
6160

@@ -72,136 +71,20 @@ class AwsEnv : public CloudEnvImpl {
7271
// explicitly make the default region be us-west-2.
7372
static constexpr const char* default_region = "us-west-2";
7473

75-
virtual Status NewSequentialFile(const std::string& fname,
76-
std::unique_ptr<SequentialFile>* result,
77-
const EnvOptions& options) override;
78-
79-
virtual Status NewSequentialFileCloud(const std::string& bucket,
80-
const std::string& fname,
81-
std::unique_ptr<SequentialFile>* result,
82-
const EnvOptions& options) override;
83-
84-
virtual Status NewRandomAccessFile(const std::string& fname,
85-
std::unique_ptr<RandomAccessFile>* result,
86-
const EnvOptions& options) override;
87-
88-
virtual Status NewWritableFile(const std::string& fname,
89-
std::unique_ptr<WritableFile>* result,
90-
const EnvOptions& options) override;
91-
92-
virtual Status ReopenWritableFile(const std::string& /*fname*/,
93-
std::unique_ptr<WritableFile>* /*result*/,
94-
const EnvOptions& /*options*/) override;
95-
96-
virtual Status NewDirectory(const std::string& name,
97-
std::unique_ptr<Directory>* result) override;
98-
99-
virtual Status FileExists(const std::string& fname) override;
100-
101-
virtual Status GetChildren(const std::string& path,
102-
std::vector<std::string>* result) override;
103-
10474
virtual Status DeleteFile(const std::string& fname) override;
10575

106-
virtual Status CreateDir(const std::string& name) override;
107-
108-
virtual Status CreateDirIfMissing(const std::string& name) override;
109-
110-
virtual Status DeleteDir(const std::string& name) override;
111-
112-
virtual Status GetFileSize(const std::string& fname, uint64_t* size) override;
113-
114-
virtual Status GetFileModificationTime(const std::string& fname,
115-
uint64_t* file_mtime) override;
116-
117-
virtual Status RenameFile(const std::string& src,
118-
const std::string& target) override;
119-
120-
virtual Status LinkFile(const std::string& src,
121-
const std::string& target) override;
122-
12376
virtual Status LockFile(const std::string& fname, FileLock** lock) override;
12477

12578
virtual Status UnlockFile(FileLock* lock) override;
12679

127-
virtual Status NewLogger(const std::string& fname,
128-
std::shared_ptr<Logger>* result) override;
129-
130-
virtual void Schedule(void (*function)(void* arg), void* arg,
131-
Priority pri = LOW, void* tag = nullptr,
132-
void (*unschedFunction)(void* arg) = 0) override {
133-
base_env_->Schedule(function, arg, pri, tag, unschedFunction);
134-
}
135-
136-
virtual int UnSchedule(void* tag, Priority pri) override {
137-
return base_env_->UnSchedule(tag, pri);
138-
}
139-
140-
virtual void StartThread(void (*function)(void* arg), void* arg) override {
141-
base_env_->StartThread(function, arg);
142-
}
143-
144-
virtual void WaitForJoin() override { base_env_->WaitForJoin(); }
145-
146-
virtual unsigned int GetThreadPoolQueueLen(
147-
Priority pri = LOW) const override {
148-
return base_env_->GetThreadPoolQueueLen(pri);
149-
}
150-
151-
virtual Status GetTestDirectory(std::string* path) override {
152-
return base_env_->GetTestDirectory(path);
153-
}
154-
155-
virtual uint64_t NowMicros() override { return base_env_->NowMicros(); }
156-
157-
virtual void SleepForMicroseconds(int micros) override {
158-
base_env_->SleepForMicroseconds(micros);
159-
}
160-
161-
virtual Status GetHostName(char* name, uint64_t len) override {
162-
return base_env_->GetHostName(name, len);
163-
}
164-
165-
virtual Status GetCurrentTime(int64_t* unix_time) override {
166-
return base_env_->GetCurrentTime(unix_time);
167-
}
168-
169-
virtual Status GetAbsolutePath(const std::string& db_path,
170-
std::string* output_path) override {
171-
return base_env_->GetAbsolutePath(db_path, output_path);
172-
}
173-
174-
virtual void SetBackgroundThreads(int number, Priority pri = LOW) override {
175-
base_env_->SetBackgroundThreads(number, pri);
176-
}
177-
int GetBackgroundThreads(Priority pri) override {
178-
return base_env_->GetBackgroundThreads(pri);
179-
}
180-
181-
virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
182-
base_env_->IncBackgroundThreadsIfNeeded(number, pri);
183-
}
184-
185-
virtual std::string TimeToString(uint64_t number) override {
186-
return base_env_->TimeToString(number);
187-
}
188-
189-
static uint64_t gettid() {
190-
assert(sizeof(pthread_t) <= sizeof(uint64_t));
191-
return (uint64_t)pthread_self();
192-
}
193-
194-
virtual uint64_t GetThreadID() const override { return AwsEnv::gettid(); }
195-
19680
std::string GetWALCacheDir();
19781

19882
// Saves and retrieves the dbid->dirname mapping in S3
19983
Status SaveDbid(const std::string& bucket_name, const std::string& dbid,
20084
const std::string& dirname) override;
201-
Status GetPathForDbid(const std::string& bucket,
202-
const std::string& dbid, std::string* dirname) override;
203-
Status GetDbidList(const std::string& bucket,
204-
DbidList* dblist) override;
85+
Status GetPathForDbid(const std::string& bucket, const std::string& dbid,
86+
std::string* dirname) override;
87+
Status GetDbidList(const std::string& bucket, DbidList* dblist) override;
20588
Status DeleteDbid(const std::string& bucket,
20689
const std::string& dbid) override;
20790
Status DeleteCloudFileFromDest(const std::string& fname) override;
@@ -220,41 +103,17 @@ class AwsEnv : public CloudEnvImpl {
220103
// The AWS credentials are specified to the constructor via
221104
// access_key_id and secret_key.
222105
//
223-
explicit AwsEnv(Env* underlying_env,
224-
const CloudEnvOptions& cloud_options,
225-
const std::shared_ptr<Logger> & info_log = nullptr);
226-
227-
106+
explicit AwsEnv(Env* underlying_env, const CloudEnvOptions& cloud_options,
107+
const std::shared_ptr<Logger>& info_log = nullptr);
228108

229109
// The pathname that contains a list of all db's inside a bucket.
230110
static constexpr const char* dbid_registry_ = "/.rockset/dbid/";
231111

232-
Status create_bucket_status_;
233-
234112
std::mutex files_to_delete_mutex_;
235113
std::chrono::seconds file_deletion_delay_ = std::chrono::hours(1);
236114
std::unordered_map<std::string, std::shared_ptr<detail::JobHandle>>
237115
files_to_delete_;
238116
Random64 rng_;
239-
240-
Status status();
241-
242-
// Validate options
243-
Status CheckOption(const EnvOptions& options);
244-
245-
246-
Status NewS3ReadableFile(const std::string& bucket, const std::string& fname,
247-
std::unique_ptr<S3ReadableFile>* result);
248-
249-
// Save IDENTITY file to S3. Update dbid registry.
250-
Status SaveIdentitytoS3(const std::string& localfile,
251-
const std::string& target_idfile);
252-
253-
// Converts a local pathname to an object name in the src bucket
254-
std::string srcname(const std::string& localname);
255-
256-
// Converts a local pathname to an object name in the dest bucket
257-
std::string destname(const std::string& localname);
258117
};
259118

260119
} // namespace rocksdb

cloud/aws/aws_file.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@ inline Aws::String ToAwsString(const std::string& s) {
1010
return Aws::String(s.data(), s.size());
1111
}
1212

13-
} // namepace rocksdb
13+
} // namespace rocksdb
1414

1515
#endif /* USE_AWS */

cloud/aws/aws_kafka.cc

Lines changed: 30 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,22 @@
2020
namespace rocksdb {
2121
namespace cloud {
2222
namespace kafka {
23-
23+
2424
/***************************************************/
2525
/* KafkaWritableFile */
2626
/***************************************************/
2727
class KafkaWritableFile : public CloudLogWritableFile {
2828
public:
2929
static const std::chrono::microseconds kFlushTimeout;
3030

31-
KafkaWritableFile(CloudEnv* env, const std::string& fname, const EnvOptions& options,
31+
KafkaWritableFile(CloudEnv* env, const std::string& fname,
32+
const EnvOptions& options,
3233
std::shared_ptr<RdKafka::Producer> producer,
3334
std::shared_ptr<RdKafka::Topic> topic)
34-
: CloudLogWritableFile(env, fname, options),
35-
producer_(producer),
36-
topic_(topic),
37-
current_offset_(0) {
35+
: CloudLogWritableFile(env, fname, options),
36+
producer_(producer),
37+
topic_(topic),
38+
current_offset_(0) {
3839
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
3940
"[kafka] WritableFile opened file %s", fname_.c_str());
4041
}
@@ -58,11 +59,10 @@ class KafkaWritableFile : public CloudLogWritableFile {
5859
const std::chrono::microseconds KafkaWritableFile::kFlushTimeout =
5960
std::chrono::seconds(10);
6061

61-
6262
Status KafkaWritableFile::ProduceRaw(const std::string& operation_name,
6363
const Slice& message) {
64-
if (!status_.ok()){
65-
return status_;
64+
if (!status_.ok()) {
65+
return status_;
6666
}
6767

6868
RdKafka::ErrorCode resp;
@@ -116,13 +116,9 @@ Status KafkaWritableFile::Close() {
116116
return ProduceRaw("Close", serialized_data);
117117
}
118118

119-
bool KafkaWritableFile::IsSyncThreadSafe() const {
120-
return true;
121-
}
119+
bool KafkaWritableFile::IsSyncThreadSafe() const { return true; }
122120

123-
Status KafkaWritableFile::Sync() {
124-
return Flush();
125-
}
121+
Status KafkaWritableFile::Sync() { return Flush(); }
126122

127123
Status KafkaWritableFile::Flush() {
128124
std::chrono::microseconds start(env_->NowMicros());
@@ -175,33 +171,33 @@ Status KafkaWritableFile::LogDelete() {
175171
//
176172
class KafkaController : public CloudLogControllerImpl {
177173
public:
178-
179174
~KafkaController() {
180175
for (size_t i = 0; i < partitions_.size(); i++) {
181176
consumer_->stop(consumer_topic_.get(), partitions_[i]->partition());
182177
}
183-
178+
184179
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
185180
"[%s] KafkaController closed.", Name());
186181
}
187-
188-
const char *Name() const override { return "kafka"; }
182+
183+
const char* Name() const override { return "kafka"; }
189184

190185
virtual Status CreateStream(const std::string& /* bucket_prefix */) override {
191186
// Kafka client cannot create a topic. Topics are either manually created
192187
// or implicitly created on first write if auto.create.topics.enable is
193188
// true.
194189
return status_;
195190
}
196-
virtual Status WaitForStreamReady(const std::string& /* bucket_prefix */) override {
191+
virtual Status WaitForStreamReady(
192+
const std::string& /* bucket_prefix */) override {
197193
// Kafka topics don't need to be waited on.
198194
return status_;
199195
}
200196

201197
virtual Status TailStream() override;
202198

203-
virtual CloudLogWritableFile* CreateWritableFile(const std::string& fname,
204-
const EnvOptions& options) override;
199+
virtual CloudLogWritableFile* CreateWritableFile(
200+
const std::string& fname, const EnvOptions& options) override;
205201

206202
protected:
207203
Status Initialize(CloudEnv* env) override;
@@ -293,8 +289,7 @@ Status KafkaController::TailStream() {
293289
}
294290

295291
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_, "[%s] TailStream topic %s %s",
296-
Name(), consumer_topic_->name().c_str(),
297-
status_.ToString().c_str());
292+
Name(), consumer_topic_->name().c_str(), status_.ToString().c_str());
298293

299294
Status lastErrorStatus;
300295
int retryAttempt = 0;
@@ -319,14 +314,14 @@ Status KafkaController::TailStream() {
319314
Log(InfoLogLevel::ERROR_LEVEL, env_->info_log_,
320315
"[%s] error processing message size %ld "
321316
"extracted from stream %s %s",
322-
Name(), message->len(),
323-
consumer_topic_->name().c_str(), status_.ToString().c_str());
317+
Name(), message->len(), consumer_topic_->name().c_str(),
318+
status_.ToString().c_str());
324319
} else {
325320
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
326321
"[%s] successfully processed message size %ld "
327322
"extracted from stream %s %s",
328-
Name(), message->len(),
329-
consumer_topic_->name().c_str(), status_.ToString().c_str());
323+
Name(), message->len(), consumer_topic_->name().c_str(),
324+
status_.ToString().c_str());
330325
}
331326

332327
// Remember last read offset from topic (currently unused).
@@ -344,8 +339,7 @@ Status KafkaController::TailStream() {
344339
RdKafka::err2str(message->err()).c_str());
345340

346341
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
347-
"[%s] error reading %s %s", Name(),
348-
consumer_topic_->name().c_str(),
342+
"[%s] error reading %s %s", Name(), consumer_topic_->name().c_str(),
349343
RdKafka::err2str(message->err()).c_str());
350344

351345
++retryAttempt;
@@ -376,9 +370,8 @@ Status KafkaController::InitializePartitions() {
376370
RdKafka::err2str(err).c_str());
377371

378372
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
379-
"[%s] S3ReadableFile file %s Unable to find shards %s",
380-
Name(), consumer_topic_->name().c_str(),
381-
status_.ToString().c_str());
373+
"[%s] S3ReadableFile file %s Unable to find shards %s", Name(),
374+
consumer_topic_->name().c_str(), status_.ToString().c_str());
382375

383376
return status_;
384377
}
@@ -432,11 +425,12 @@ Status CloudLogControllerImpl::CreateKafkaController(
432425
std::shared_ptr<CloudLogController>* output) {
433426
#ifndef USE_KAFKA
434427
output->reset();
435-
return Status::NotSupported("In order to use Kafka, make sure you're compiling with "
436-
"USE_KAFKA=1");
428+
return Status::NotSupported(
429+
"In order to use Kafka, make sure you're compiling with "
430+
"USE_KAFKA=1");
437431
#else
438432
output->reset(new rocksdb::cloud::kafka::KafkaController());
439433
return Status::OK();
440-
#endif // USE_KAFKA
434+
#endif // USE_KAFKA
441435
}
442436
} // namespace rocksdb

0 commit comments

Comments
 (0)