Skip to content

Commit cad0556

Browse files
mrambacherigorcanadi
authored andcommitted
Multiple Code Cleanups
- Remove the use of AwsEnv from most of the public API. This will make it easier to use alternative implementations in a later PR - Change the use of CloudEnv::info_log_. This change will allow this value to be removed/changed from the public API (it should be part of the Options, not the CloudEnv itself) - Add GetStorageProvider() API to simplify calls - Moved the methods for DbId from AwsEnv to CloudEnvImpl. This change makes CloudEnvImpl a concrete class and moves all of the standard functions to CloudEnvImpl. AwsEnv deals only with AWS-specific code.
1 parent eb7120e commit cad0556

19 files changed

+393
-393
lines changed

cloud/aws/aws_env.cc

Lines changed: 0 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -189,126 +189,6 @@ AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
189189

190190
void AwsEnv::Shutdown() { Aws::ShutdownAPI(Aws::SDKOptions()); }
191191

192-
//
193-
// All db in a bucket are stored in path /.rockset/dbid/<dbid>
194-
// The value of the object is the pathname where the db resides.
195-
//
196-
Status AwsEnv::SaveDbid(const std::string& bucket_name, const std::string& dbid,
197-
const std::string& dirname) {
198-
Log(InfoLogLevel::DEBUG_LEVEL, info_log_, "[s3] SaveDbid dbid %s dir '%s'",
199-
dbid.c_str(), dirname.c_str());
200-
201-
std::string dbidkey = dbid_registry_ + dbid;
202-
std::unordered_map<std::string, std::string> metadata;
203-
metadata["dirname"] = dirname;
204-
205-
Status st = cloud_env_options.storage_provider->PutCloudObjectMetadata(
206-
bucket_name, dbidkey, metadata);
207-
208-
if (!st.ok()) {
209-
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
210-
"[aws] Bucket %s SaveDbid error in saving dbid %s dirname %s %s",
211-
bucket_name.c_str(), dbid.c_str(), dirname.c_str(),
212-
st.ToString().c_str());
213-
} else {
214-
Log(InfoLogLevel::INFO_LEVEL, info_log_,
215-
"[aws] Bucket %s SaveDbid dbid %s dirname %s %s", bucket_name.c_str(),
216-
dbid.c_str(), dirname.c_str(), "ok");
217-
}
218-
return st;
219-
};
220-
221-
//
222-
// Given a dbid, retrieves its pathname.
223-
//
224-
Status AwsEnv::GetPathForDbid(const std::string& bucket,
225-
const std::string& dbid, std::string* dirname) {
226-
std::string dbidkey = dbid_registry_ + dbid;
227-
228-
Log(InfoLogLevel::DEBUG_LEVEL, info_log_,
229-
"[s3] Bucket %s GetPathForDbid dbid %s", bucket.c_str(), dbid.c_str());
230-
231-
CloudObjectInformation info;
232-
Status st = cloud_env_options.storage_provider->GetCloudObjectMetadata(
233-
bucket, dbidkey, &info);
234-
if (!st.ok()) {
235-
if (st.IsNotFound()) {
236-
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
237-
"[aws] %s GetPathForDbid error non-existent dbid %s %s",
238-
bucket.c_str(), dbid.c_str(), st.ToString().c_str());
239-
} else {
240-
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
241-
"[aws] %s GetPathForDbid error dbid %s %s", bucket.c_str(),
242-
dbid.c_str(), st.ToString().c_str());
243-
}
244-
return st;
245-
}
246-
247-
// Find "dirname" metadata that stores the pathname of the db
248-
const char* kDirnameTag = "dirname";
249-
auto it = info.metadata.find(kDirnameTag);
250-
if (it != info.metadata.end()) {
251-
*dirname = it->second;
252-
} else {
253-
st = Status::NotFound("GetPathForDbid");
254-
}
255-
Log(InfoLogLevel::INFO_LEVEL, info_log_, "[aws] %s GetPathForDbid dbid %s %s",
256-
bucket.c_str(), dbid.c_str(), st.ToString().c_str());
257-
return st;
258-
}
259-
260-
//
261-
// Retrieves the list of all registered dbids and their paths
262-
//
263-
Status AwsEnv::GetDbidList(const std::string& bucket, DbidList* dblist) {
264-
// fetch the list all all dbids
265-
std::vector<std::string> dbid_list;
266-
Status st = cloud_env_options.storage_provider->ListCloudObjects(
267-
bucket, dbid_registry_, &dbid_list);
268-
if (!st.ok()) {
269-
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
270-
"[aws] %s GetDbidList error in GetChildrenFromS3 %s", bucket.c_str(),
271-
st.ToString().c_str());
272-
return st;
273-
}
274-
// for each dbid, fetch the db directory where the db data should reside
275-
for (auto dbid : dbid_list) {
276-
std::string dirname;
277-
st = GetPathForDbid(bucket, dbid, &dirname);
278-
if (!st.ok()) {
279-
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
280-
"[aws] %s GetDbidList error in GetPathForDbid(%s) %s", bucket.c_str(),
281-
dbid.c_str(), st.ToString().c_str());
282-
return st;
283-
}
284-
// insert item into result set
285-
(*dblist)[dbid] = dirname;
286-
}
287-
return st;
288-
}
289-
290-
//
291-
// Deletes the specified dbid from the registry
292-
//
293-
Status AwsEnv::DeleteDbid(const std::string& bucket, const std::string& dbid) {
294-
// fetch the list all all dbids
295-
std::string dbidkey = dbid_registry_ + dbid;
296-
Status st =
297-
cloud_env_options.storage_provider->DeleteCloudObject(bucket, dbidkey);
298-
Log(InfoLogLevel::DEBUG_LEVEL, info_log_,
299-
"[aws] %s DeleteDbid DeleteDbid(%s) %s", bucket.c_str(), dbid.c_str(),
300-
st.ToString().c_str());
301-
return st;
302-
}
303-
304-
Status AwsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) {
305-
// there isn's a very good way to atomically check and create
306-
// a file via libs3
307-
*lock = nullptr;
308-
return Status::OK();
309-
}
310-
311-
Status AwsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); }
312192

313193
// The factory method for creating an S3 Env
314194
Status AwsEnv::NewAwsEnv(Env* base_env, const CloudEnvOptions& cloud_options,
@@ -353,11 +233,6 @@ Status AwsEnv::NewAwsEnv(Env* base_env, const CloudEnvOptions& cloud_options,
353233
}
354234
return status;
355235
}
356-
357-
std::string AwsEnv::GetWALCacheDir() {
358-
return cloud_env_options.cloud_log_controller->GetCacheDir();
359-
}
360-
361236
#endif // USE_AWS
362237
} // namespace ROCKSDB_NAMESPACE
363238
#endif // ROCKSDB_LITE

cloud/aws/aws_env.h

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
#include <unordered_map>
2020

2121
namespace ROCKSDB_NAMESPACE {
22-
class S3ReadableFile;
23-
2422
//
2523
// The S3 environment for rocksdb. This class overrides all the
2624
// file/dir access methods and delegates all other methods to the
@@ -66,31 +64,13 @@ class AwsEnv : public CloudEnvImpl {
6664
// explicitly make the default region be us-west-2.
6765
static constexpr const char* default_region = "us-west-2";
6866

69-
virtual Status LockFile(const std::string& fname, FileLock** lock) override;
70-
71-
virtual Status UnlockFile(FileLock* lock) override;
72-
73-
std::string GetWALCacheDir();
74-
75-
// Saves and retrieves the dbid->dirname mapping in S3
76-
Status SaveDbid(const std::string& bucket_name, const std::string& dbid,
77-
const std::string& dirname) override;
78-
Status GetPathForDbid(const std::string& bucket, const std::string& dbid,
79-
std::string* dirname) override;
80-
Status GetDbidList(const std::string& bucket, DbidList* dblist) override;
81-
Status DeleteDbid(const std::string& bucket,
82-
const std::string& dbid) override;
83-
8467
private:
8568
//
8669
// The AWS credentials are specified to the constructor via
8770
// access_key_id and secret_key.
8871
//
8972
explicit AwsEnv(Env* underlying_env, const CloudEnvOptions& cloud_options,
9073
const std::shared_ptr<Logger>& info_log = nullptr);
91-
92-
// The pathname that contains a list of all db's inside a bucket.
93-
static constexpr const char* dbid_registry_ = "/.rockset/dbid/";
9474
};
9575

9676
} // namespace ROCKSDB_NAMESPACE

cloud/aws/aws_kafka.cc

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class KafkaWritableFile : public CloudLogWritableFile {
3636
producer_(producer),
3737
topic_(topic),
3838
current_offset_(0) {
39-
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
39+
Log(InfoLogLevel::DEBUG_LEVEL, env_->GetLogger(),
4040
"[kafka] WritableFile opened file %s", fname_.c_str());
4141
}
4242

@@ -72,20 +72,20 @@ Status KafkaWritableFile::ProduceRaw(const std::string& operation_name,
7272
message.size(), &fname_ /* Partitioning key */, nullptr);
7373

7474
if (resp == RdKafka::ERR_NO_ERROR) {
75-
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
75+
Log(InfoLogLevel::DEBUG_LEVEL, env_->GetLogger(),
7676
"[kafka] WritableFile %s file %s %ld", fname_.c_str(),
7777
operation_name.c_str(), message.size());
7878
return Status::OK();
7979
} else if (resp == RdKafka::ERR__QUEUE_FULL) {
8080
const std::string formatted_err = RdKafka::err2str(resp);
81-
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
81+
Log(InfoLogLevel::DEBUG_LEVEL, env_->GetLogger(),
8282
"[kafka] WritableFile src %s %s error %s", fname_.c_str(),
8383
operation_name.c_str(), formatted_err.c_str());
8484

8585
return Status::Busy(topic_->name().c_str(), RdKafka::err2str(resp).c_str());
8686
} else {
8787
const std::string formatted_err = RdKafka::err2str(resp);
88-
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
88+
Log(InfoLogLevel::DEBUG_LEVEL, env_->GetLogger(),
8989
"[kafka] WritableFile src %s %s error %s", fname_.c_str(),
9090
operation_name.c_str(), formatted_err.c_str());
9191

@@ -106,7 +106,7 @@ Status KafkaWritableFile::Append(const Slice& data) {
106106
}
107107

108108
Status KafkaWritableFile::Close() {
109-
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
109+
Log(InfoLogLevel::DEBUG_LEVEL, env_->GetLogger(),
110110
"[kafka] S3WritableFile closing %s", fname_.c_str());
111111

112112
std::string serialized_data;
@@ -128,7 +128,7 @@ Status KafkaWritableFile::Flush() {
128128
while (status_.ok() && !(done = (producer_->outq_len() == 0)) &&
129129
!(timeout = (std::chrono::microseconds(env_->NowMicros()) - start >
130130
kFlushTimeout))) {
131-
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
131+
Log(InfoLogLevel::DEBUG_LEVEL, env_->GetLogger(),
132132
"[kafka] WritableFile src %s "
133133
"Waiting on flush: Output queue length: %d",
134134
fname_.c_str(), producer_->outq_len());
@@ -137,23 +137,23 @@ Status KafkaWritableFile::Flush() {
137137
}
138138

139139
if (done) {
140-
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
140+
Log(InfoLogLevel::DEBUG_LEVEL, env_->GetLogger(),
141141
"[kafka] WritableFile src %s Flushed", fname_.c_str());
142142
} else if (timeout) {
143-
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
143+
Log(InfoLogLevel::DEBUG_LEVEL, env_->GetLogger(),
144144
"[kafka] WritableFile src %s Flushing timed out after %" PRId64 "us",
145145
fname_.c_str(), kFlushTimeout.count());
146146
status_ = Status::TimedOut();
147147
} else {
148-
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
148+
Log(InfoLogLevel::DEBUG_LEVEL, env_->GetLogger(),
149149
"[kafka] WritableFile src %s Flush interrupted", fname_.c_str());
150150
}
151151

152152
return status_;
153153
}
154154

155155
Status KafkaWritableFile::LogDelete() {
156-
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_, "[kafka] LogDelete %s",
156+
Log(InfoLogLevel::DEBUG_LEVEL, env_->GetLogger(), "[kafka] LogDelete %s",
157157
fname_.c_str());
158158

159159
std::string serialized_data;
@@ -176,7 +176,7 @@ class KafkaController : public CloudLogControllerImpl {
176176
consumer_->stop(consumer_topic_.get(), partitions_[i]->partition());
177177
}
178178

179-
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
179+
Log(InfoLogLevel::DEBUG_LEVEL, env_->GetLogger(),
180180
"[%s] KafkaController closed.", Name());
181181
}
182182

@@ -237,7 +237,7 @@ Status KafkaController::Initialize(CloudEnv* env) {
237237
s = Status::InvalidArgument("Failed adding specified conf to Kafka conf",
238238
conf_errstr.c_str());
239239

240-
Log(InfoLogLevel::ERROR_LEVEL, env->info_log_,
240+
Log(InfoLogLevel::ERROR_LEVEL, env->GetLogger(),
241241
"[aws] NewAwsEnv Kafka conf set error: %s", s.ToString().c_str());
242242
return s;
243243
}
@@ -250,18 +250,18 @@ Status KafkaController::Initialize(CloudEnv* env) {
250250
s = Status::InvalidArgument("Failed creating Kafka producer",
251251
producer_errstr.c_str());
252252

253-
Log(InfoLogLevel::ERROR_LEVEL, env->info_log_,
253+
Log(InfoLogLevel::ERROR_LEVEL, env->GetLogger(),
254254
"[%s] Kafka producer error: %s", Name(), s.ToString().c_str());
255255
} else if (!consumer_) {
256256
s = Status::InvalidArgument("Failed creating Kafka consumer",
257257
consumer_errstr.c_str());
258258

259-
Log(InfoLogLevel::ERROR_LEVEL, env->info_log_,
259+
Log(InfoLogLevel::ERROR_LEVEL, env->GetLogger(),
260260
"[%s] Kafka consumer error: %s", Name(), s.ToString().c_str());
261261
} else {
262262
const std::string topic_name = env->GetSrcBucketName();
263263

264-
Log(InfoLogLevel::DEBUG_LEVEL, env->info_log_,
264+
Log(InfoLogLevel::DEBUG_LEVEL, env->GetLogger(),
265265
"[%s] KafkaController opening stream %s using cachedir '%s'", Name(),
266266
topic_name.c_str(), cache_dir_.c_str());
267267

@@ -288,8 +288,9 @@ Status KafkaController::TailStream() {
288288
return status_;
289289
}
290290

291-
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_, "[%s] TailStream topic %s %s",
292-
Name(), consumer_topic_->name().c_str(), status_.ToString().c_str());
291+
Log(InfoLogLevel::DEBUG_LEVEL, env_->GetLogger(),
292+
"[%s] TailStream topic %s %s", Name(), consumer_topic_->name().c_str(),
293+
status_.ToString().c_str());
293294

294295
Status lastErrorStatus;
295296
int retryAttempt = 0;
@@ -311,13 +312,13 @@ Status KafkaController::TailStream() {
311312
// Apply the payload to local filesystem
312313
status_ = Apply(sl);
313314
if (!status_.ok()) {
314-
Log(InfoLogLevel::ERROR_LEVEL, env_->info_log_,
315+
Log(InfoLogLevel::ERROR_LEVEL, env_->GetLogger(),
315316
"[%s] error processing message size %ld "
316317
"extracted from stream %s %s",
317318
Name(), message->len(), consumer_topic_->name().c_str(),
318319
status_.ToString().c_str());
319320
} else {
320-
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
321+
Log(InfoLogLevel::DEBUG_LEVEL, env_->GetLogger(),
321322
"[%s] successfully processed message size %ld "
322323
"extracted from stream %s %s",
323324
Name(), message->len(), consumer_topic_->name().c_str(),
@@ -338,7 +339,7 @@ Status KafkaController::TailStream() {
338339
Status::IOError(consumer_topic_->name().c_str(),
339340
RdKafka::err2str(message->err()).c_str());
340341

341-
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
342+
Log(InfoLogLevel::DEBUG_LEVEL, env_->GetLogger(),
342343
"[%s] error reading %s %s", Name(), consumer_topic_->name().c_str(),
343344
RdKafka::err2str(message->err()).c_str());
344345

@@ -347,7 +348,7 @@ Status KafkaController::TailStream() {
347348
}
348349
}
349350
}
350-
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
351+
Log(InfoLogLevel::DEBUG_LEVEL, env_->GetLogger(),
351352
"[%s] TailStream topic %s finished: %s", Name(),
352353
consumer_topic_->name().c_str(), status_.ToString().c_str());
353354

@@ -369,7 +370,7 @@ Status KafkaController::InitializePartitions() {
369370
status_ = Status::IOError(consumer_topic_->name().c_str(),
370371
RdKafka::err2str(err).c_str());
371372

372-
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
373+
Log(InfoLogLevel::DEBUG_LEVEL, env_->GetLogger(),
373374
"[%s] S3ReadableFile file %s Unable to find shards %s", Name(),
374375
consumer_topic_->name().c_str(), status_.ToString().c_str());
375376

0 commit comments

Comments
 (0)