Skip to content

Commit dcd6453

Browse files
authored
Merge pull request #70 from rockset/LogController
Make a CloudLogController class independent of AWS
2 parents 8d96188 + 5035ffc commit dcd6453

File tree

16 files changed

+585
-646
lines changed

16 files changed

+585
-646
lines changed

cloud/aws/aws_env.cc

Lines changed: 22 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616
#endif
1717

1818
#include "cloud/aws/aws_file.h"
19-
#include "cloud/aws/aws_kafka.h"
20-
#include "cloud/aws/aws_kinesis.h"
21-
#include "cloud/aws/aws_log.h"
22-
#include "cloud/aws/aws_retry.h"
19+
#include "cloud/cloud_log_controller.h"
2320
#include "cloud/db_cloud_impl.h"
2421

2522
namespace rocksdb {
@@ -391,8 +388,7 @@ Aws::S3::Model::HeadObjectOutcome AwsS3ClientWrapper::HeadObject(
391388
//
392389
AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
393390
const std::shared_ptr<Logger>& info_log)
394-
: CloudEnvImpl(_cloud_env_options, underlying_env, info_log),
395-
running_(true) {
391+
: CloudEnvImpl(_cloud_env_options, underlying_env, info_log) {
396392
Aws::InitAPI(Aws::SDKOptions());
397393
if (cloud_env_options.src_bucket.GetRegion().empty() ||
398394
cloud_env_options.dest_bucket.GetRegion().empty()) {
@@ -432,18 +428,7 @@ AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
432428
creds ? "[given]" : "[not given]");
433429

434430
base_env_ = underlying_env;
435-
// create AWS S3 client with appropriate timeouts
436-
Aws::Client::ClientConfiguration config;
437-
config.connectTimeoutMs = 30000;
438-
config.requestTimeoutMs = 600000;
439-
440-
// Setup how retries need to be done
441-
config.retryStrategy =
442-
std::make_shared<AwsRetryStrategy>(cloud_env_options, info_log_);
443-
if (cloud_env_options.request_timeout_ms != 0) {
444-
config.requestTimeoutMs = cloud_env_options.request_timeout_ms;
445-
}
446-
431+
447432
// TODO: support buckets being in different regions
448433
if (!SrcMatchesDest() && HasSrcBucket() && HasDestBucket()) {
449434
if (cloud_env_options.src_bucket.GetRegion() == cloud_env_options.dest_bucket.GetRegion()) {
@@ -461,9 +446,13 @@ AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
461446
return;
462447
}
463448
}
449+
// create AWS S3 client with appropriate timeouts
450+
Aws::Client::ClientConfiguration config;
451+
create_bucket_status_ =
452+
AwsCloudOptions::GetClientConfiguration(this,
453+
cloud_env_options.src_bucket.GetRegion(),
454+
&config);
464455

465-
// Use specified region if any
466-
config.region = ToAwsString(cloud_env_options.src_bucket.GetRegion());
467456
Header(info_log_, "AwsEnv connection to endpoint in region: %s",
468457
config.region.c_str());
469458
bucket_location_ = Aws::S3::Model::BucketLocationConstraintMapper::
@@ -518,33 +507,10 @@ AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
518507
// create cloud log client for storing/reading logs
519508
if (create_bucket_status_.ok() && !cloud_env_options.keep_local_log_files) {
520509
if (cloud_env_options.log_type == kLogKinesis) {
521-
std::unique_ptr<Aws::Kinesis::KinesisClient> kinesis_client;
522-
kinesis_client.reset(creds
523-
? new Aws::Kinesis::KinesisClient(creds, config)
524-
: new Aws::Kinesis::KinesisClient(config));
525-
526-
if (!kinesis_client) {
527-
create_bucket_status_ =
528-
Status::IOError("Error in creating Kinesis client");
529-
}
530-
531-
if (create_bucket_status_.ok()) {
532-
cloud_log_controller_.reset(
533-
new KinesisController(this, info_log_, std::move(kinesis_client)));
534-
535-
if (!cloud_log_controller_) {
536-
create_bucket_status_ =
537-
Status::IOError("Error in creating Kinesis controller");
538-
}
539-
}
510+
create_bucket_status_ = CreateKinesisController(this, &cloud_log_controller_);
540511
} else if (cloud_env_options.log_type == kLogKafka) {
541512
#ifdef USE_KAFKA
542-
KafkaController* kafka_controller = nullptr;
543-
544-
create_bucket_status_ = KafkaController::create(
545-
this, info_log_, cloud_env_options, &kafka_controller);
546-
547-
cloud_log_controller_.reset(kafka_controller);
513+
create_bucket_status_ = CreateKafkaController(this, &cloud_log_controller_);
548514
#else
549515
create_bucket_status_ = Status::NotSupported(
550516
"In order to use Kafka, make sure you're compiling with "
@@ -567,17 +533,13 @@ AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
567533
// Create Kinesis stream and wait for it to be ready
568534
if (create_bucket_status_.ok()) {
569535
create_bucket_status_ =
570-
cloud_log_controller_->CreateStream(GetSrcBucketName());
536+
cloud_log_controller_->StartTailingStream(GetSrcBucketName());
571537
if (!create_bucket_status_.ok()) {
572538
Log(InfoLogLevel::ERROR_LEVEL, info_log,
573539
"[aws] NewAwsEnv Unable to create stream %s",
574540
create_bucket_status_.ToString().c_str());
575541
}
576542
}
577-
578-
if (create_bucket_status_.ok()) {
579-
create_bucket_status_ = StartTailingStream();
580-
}
581543
}
582544
if (!create_bucket_status_.ok()) {
583545
Log(InfoLogLevel::ERROR_LEVEL, info_log,
@@ -587,8 +549,6 @@ AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
587549
}
588550

589551
AwsEnv::~AwsEnv() {
590-
running_ = false;
591-
592552
{
593553
std::lock_guard<std::mutex> lk(files_to_delete_mutex_);
594554
using std::swap;
@@ -599,22 +559,8 @@ AwsEnv::~AwsEnv() {
599559
}
600560

601561
StopPurger();
602-
if (tid_ && tid_->joinable()) {
603-
tid_->join();
604-
}
605562
}
606563

607-
Status AwsEnv::StartTailingStream() {
608-
if (tid_) {
609-
return Status::Busy("Tailer already started");
610-
}
611-
612-
// create tailer thread
613-
auto lambda = [this]() { cloud_log_controller_->TailStream(); };
614-
tid_.reset(new std::thread(lambda));
615-
616-
return Status::OK();
617-
}
618564

619565
Status AwsEnv::status() { return create_bucket_status_; }
620566

@@ -705,15 +651,14 @@ Status AwsEnv::NewSequentialFile(const std::string& logical_fname,
705651
st = cloud_log_controller_->status();
706652
if (st.ok()) {
707653
// map pathname to cache dir
708-
std::string pathname = CloudLogController::GetCachePath(
709-
cloud_log_controller_->GetCacheDir(), Slice(fname));
654+
std::string pathname = cloud_log_controller_->GetCachePath(Slice(fname));
710655
Log(InfoLogLevel::DEBUG_LEVEL, info_log_,
711656
"[Kinesis] NewSequentialFile logfile %s %s", pathname.c_str(), "ok");
712657

713658
auto lambda = [this, pathname, &result, options]() -> Status {
714659
return base_env_->NewSequentialFile(pathname, result, options);
715660
};
716-
return CloudLogController::Retry(this, lambda);
661+
return cloud_log_controller_->Retry(lambda);
717662
}
718663
}
719664

@@ -818,16 +763,15 @@ Status AwsEnv::NewRandomAccessFile(const std::string& logical_fname,
818763
st = cloud_log_controller_->status();
819764
if (st.ok()) {
820765
// map pathname to cache dir
821-
std::string pathname = CloudLogController::GetCachePath(
822-
cloud_log_controller_->GetCacheDir(), Slice(fname));
766+
std::string pathname = cloud_log_controller_->GetCachePath(Slice(fname));
823767
Log(InfoLogLevel::DEBUG_LEVEL, info_log_,
824768
"[kinesis] NewRandomAccessFile logfile %s %s", pathname.c_str(),
825769
"ok");
826770

827771
auto lambda = [this, pathname, &result, options]() -> Status {
828772
return base_env_->NewRandomAccessFile(pathname, result, options);
829773
};
830-
return CloudLogController::Retry(this, lambda);
774+
return cloud_log_controller_->Retry(lambda);
831775
}
832776
}
833777

@@ -962,15 +906,14 @@ Status AwsEnv::FileExists(const std::string& logical_fname) {
962906
st = cloud_log_controller_->status();
963907
if (st.ok()) {
964908
// map pathname to cache dir
965-
std::string pathname = CloudLogController::GetCachePath(
966-
cloud_log_controller_->GetCacheDir(), Slice(fname));
909+
std::string pathname = cloud_log_controller_->GetCachePath(Slice(fname));
967910
Log(InfoLogLevel::DEBUG_LEVEL, info_log_,
968911
"[kinesis] FileExists logfile %s %s", pathname.c_str(), "ok");
969912

970913
auto lambda = [this, pathname]() -> Status {
971914
return base_env_->FileExists(pathname);
972915
};
973-
st = CloudLogController::Retry(this, lambda);
916+
st = cloud_log_controller_->Retry(lambda);
974917
}
975918
} else {
976919
st = base_env_->FileExists(fname);
@@ -1432,15 +1375,14 @@ Status AwsEnv::GetFileSize(const std::string& logical_fname, uint64_t* size) {
14321375
st = cloud_log_controller_->status();
14331376
if (st.ok()) {
14341377
// map pathname to cache dir
1435-
std::string pathname = CloudLogController::GetCachePath(
1436-
cloud_log_controller_->GetCacheDir(), Slice(fname));
1378+
std::string pathname = cloud_log_controller_->GetCachePath(Slice(fname));
14371379
Log(InfoLogLevel::DEBUG_LEVEL, info_log_,
14381380
"[kinesis] GetFileSize logfile %s %s", pathname.c_str(), "ok");
14391381

14401382
auto lambda = [this, pathname, size]() -> Status {
14411383
return base_env_->GetFileSize(pathname, size);
14421384
};
1443-
st = CloudLogController::Retry(this, lambda);
1385+
st = cloud_log_controller_->Retry(lambda);
14441386
}
14451387
} else {
14461388
st = base_env_->GetFileSize(fname, size);
@@ -1479,16 +1421,15 @@ Status AwsEnv::GetFileModificationTime(const std::string& logical_fname,
14791421
st = cloud_log_controller_->status();
14801422
if (st.ok()) {
14811423
// map pathname to cache dir
1482-
std::string pathname = CloudLogController::GetCachePath(
1483-
cloud_log_controller_->GetCacheDir(), Slice(fname));
1424+
std::string pathname = cloud_log_controller_->GetCachePath(Slice(fname));
14841425
Log(InfoLogLevel::DEBUG_LEVEL, info_log_,
14851426
"[kinesis] GetFileModificationTime logfile %s %s", pathname.c_str(),
14861427
"ok");
14871428

14881429
auto lambda = [this, pathname, time]() -> Status {
14891430
return base_env_->GetFileModificationTime(pathname, time);
14901431
};
1491-
st = CloudLogController::Retry(this, lambda);
1432+
st = cloud_log_controller_->Retry(lambda);
14921433
}
14931434
} else {
14941435
st = base_env_->GetFileModificationTime(fname, time);
@@ -2029,31 +1970,6 @@ std::string AwsEnv::GetWALCacheDir() {
20291970
return cloud_log_controller_->GetCacheDir();
20301971
}
20311972

2032-
//
2033-
// Keep retrying the command until it is successful or the timeout has expired
2034-
//
2035-
Status CloudLogController::Retry(Env* env, RetryType func) {
2036-
Status stat;
2037-
std::chrono::microseconds start(env->NowMicros());
2038-
2039-
while (true) {
2040-
// If command is successful, return immediately
2041-
stat = func();
2042-
if (stat.ok()) {
2043-
break;
2044-
}
2045-
// sleep for some time
2046-
std::this_thread::sleep_for(std::chrono::milliseconds(100));
2047-
2048-
// If timeout has expired, return error
2049-
std::chrono::microseconds now(env->NowMicros());
2050-
if (start + CloudLogController::kRetryPeriod < now) {
2051-
stat = Status::TimedOut();
2052-
break;
2053-
}
2054-
}
2055-
return stat;
2056-
}
20571973

20581974
#pragma GCC diagnostic pop
20591975
#endif // USE_AWS

cloud/aws/aws_env.h

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
namespace rocksdb {
2727

28-
class CloudLogController;
2928
class S3ReadableFile;
3029

3130
class AwsS3ClientWrapper {
@@ -234,8 +233,6 @@ class AwsEnv : public CloudEnvImpl {
234233
virtual Status EmptyBucket(const std::string& bucket,
235234
const std::string& path) override;
236235

237-
bool IsRunning() const { return running_; }
238-
239236
std::string GetWALCacheDir();
240237

241238
// The S3 client
@@ -244,8 +241,6 @@ class AwsEnv : public CloudEnvImpl {
244241
// AWS's utility to help out with uploading and downloading S3 file
245242
std::shared_ptr<Aws::Transfer::TransferManager> awsTransferManager_;
246243

247-
Status StartTailingStream();
248-
249244
// Saves and retrieves the dbid->dirname mapping in S3
250245
Status SaveDbid(const std::string& bucket_name, const std::string& dbid,
251246
const std::string& dirname) override;
@@ -325,12 +320,6 @@ class AwsEnv : public CloudEnvImpl {
325320

326321
Status create_bucket_status_;
327322

328-
// Background thread to tail stream
329-
std::unique_ptr<std::thread> tid_;
330-
std::atomic<bool> running_;
331-
332-
std::unique_ptr<CloudLogController> cloud_log_controller_;
333-
334323
std::mutex files_to_delete_mutex_;
335324
std::chrono::seconds file_deletion_delay_ = std::chrono::hours(1);
336325
std::unordered_map<std::string, std::shared_ptr<detail::JobHandle>>

0 commit comments

Comments
 (0)