Skip to content

Commit bbaae82

Browse files
authored
Merge pull request #78 from rockset/LogController2
Move CloudLogController into CloudEnvOptions.
2 parents eb5bd08 + 61f8adc commit bbaae82

File tree

10 files changed

+380
-286
lines changed

10 files changed

+380
-286
lines changed

cloud/aws/aws_env.cc

Lines changed: 22 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <iostream>
1111
#include <memory>
1212

13+
#include "cloud/cloud_log_controller_impl.h"
14+
#include "rocksdb/cloud/cloud_log_controller.h"
1315
#include "rocksdb/env.h"
1416
#include "rocksdb/status.h"
1517
#include "util/stderr_logger.h"
@@ -20,7 +22,6 @@
2022
#endif
2123

2224
#include "cloud/aws/aws_file.h"
23-
#include "cloud/cloud_log_controller.h"
2425
#include "cloud/db_cloud_impl.h"
2526

2627
namespace rocksdb {
@@ -516,10 +517,12 @@ AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
516517
// create cloud log client for storing/reading logs
517518
if (create_bucket_status_.ok() && !cloud_env_options.keep_local_log_files) {
518519
if (cloud_env_options.log_type == kLogKinesis) {
519-
create_bucket_status_ = CreateKinesisController(this, &cloud_log_controller_);
520+
create_bucket_status_ = CloudLogControllerImpl::CreateKinesisController(
521+
this, &cloud_env_options.cloud_log_controller);
520522
} else if (cloud_env_options.log_type == kLogKafka) {
521523
#ifdef USE_KAFKA
522-
create_bucket_status_ = CreateKafkaController(this, &cloud_log_controller_);
524+
create_bucket_status_ = CloudLogControllerImpl::CreateKafkaController(
525+
this, &cloud_env_options.cloud_log_controller);
523526
#else
524527
create_bucket_status_ = Status::NotSupported(
525528
"In order to use Kafka, make sure you're compiling with "
@@ -542,7 +545,8 @@ AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
542545
// Create Kinesis stream and wait for it to be ready
543546
if (create_bucket_status_.ok()) {
544547
create_bucket_status_ =
545-
cloud_log_controller_->StartTailingStream(GetSrcBucketName());
548+
cloud_env_options.cloud_log_controller->StartTailingStream(
549+
GetSrcBucketName());
546550
if (!create_bucket_status_.ok()) {
547551
Log(InfoLogLevel::ERROR_LEVEL, info_log,
548552
"[aws] NewAwsEnv Unable to create stream %s",
@@ -656,19 +660,8 @@ Status AwsEnv::NewSequentialFile(const std::string& logical_fname,
656660
return st;
657661

658662
} else if (logfile && !cloud_env_options.keep_local_log_files) {
659-
// read from Kinesis
660-
st = cloud_log_controller_->status();
661-
if (st.ok()) {
662-
// map pathname to cache dir
663-
std::string pathname = cloud_log_controller_->GetCachePath(Slice(fname));
664-
Log(InfoLogLevel::DEBUG_LEVEL, info_log_,
665-
"[Kinesis] NewSequentialFile logfile %s %s", pathname.c_str(), "ok");
666-
667-
auto lambda = [this, pathname, &result, options]() -> Status {
668-
return base_env_->NewSequentialFile(pathname, result, options);
669-
};
670-
return cloud_log_controller_->Retry(lambda);
671-
}
663+
return cloud_env_options.cloud_log_controller->NewSequentialFile(
664+
fname, result, options);
672665
}
673666

674667
// This is neither a sst file or a log file. Read from default env.
@@ -769,19 +762,8 @@ Status AwsEnv::NewRandomAccessFile(const std::string& logical_fname,
769762

770763
} else if (logfile && !cloud_env_options.keep_local_log_files) {
771764
// read from Kinesis
772-
st = cloud_log_controller_->status();
773-
if (st.ok()) {
774-
// map pathname to cache dir
775-
std::string pathname = cloud_log_controller_->GetCachePath(Slice(fname));
776-
Log(InfoLogLevel::DEBUG_LEVEL, info_log_,
777-
"[kinesis] NewRandomAccessFile logfile %s %s", pathname.c_str(),
778-
"ok");
779-
780-
auto lambda = [this, pathname, &result, options]() -> Status {
781-
return base_env_->NewRandomAccessFile(pathname, result, options);
782-
};
783-
return cloud_log_controller_->Retry(lambda);
784-
}
765+
return cloud_env_options.cloud_log_controller->NewRandomAccessFile(
766+
fname, result, options);
785767
}
786768

787769
// This is neither a sst file or a log file. Read from default env.
@@ -818,7 +800,8 @@ Status AwsEnv::NewWritableFile(const std::string& logical_fname,
818800
result->reset(dynamic_cast<WritableFile*>(f.release()));
819801
} else if (logfile && !cloud_env_options.keep_local_log_files) {
820802
std::unique_ptr<CloudLogWritableFile> f(
821-
cloud_log_controller_->CreateWritableFile(fname, options));
803+
cloud_env_options.cloud_log_controller->CreateWritableFile(fname,
804+
options));
822805
if (!f || !f->status().ok()) {
823806
s = Status::IOError("[aws] NewWritableFile", fname.c_str());
824807
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
@@ -922,18 +905,7 @@ Status AwsEnv::FileExists(const std::string& logical_fname) {
922905
}
923906
} else if (logfile && !cloud_env_options.keep_local_log_files) {
924907
// read from Kinesis
925-
st = cloud_log_controller_->status();
926-
if (st.ok()) {
927-
// map pathname to cache dir
928-
std::string pathname = cloud_log_controller_->GetCachePath(Slice(fname));
929-
Log(InfoLogLevel::DEBUG_LEVEL, info_log_,
930-
"[kinesis] FileExists logfile %s %s", pathname.c_str(), "ok");
931-
932-
auto lambda = [this, pathname]() -> Status {
933-
return base_env_->FileExists(pathname);
934-
};
935-
st = cloud_log_controller_->Retry(lambda);
936-
}
908+
st = cloud_env_options.cloud_log_controller->FileExists(fname);
937909
} else {
938910
st = base_env_->FileExists(fname);
939911
}
@@ -1224,11 +1196,12 @@ Status AwsEnv::DeleteFile(const std::string& logical_fname) {
12241196
base_env_->DeleteFile(fname);
12251197
} else if (logfile && !cloud_env_options.keep_local_log_files) {
12261198
// read from Kinesis
1227-
st = cloud_log_controller_->status();
1199+
st = cloud_env_options.cloud_log_controller->status();
12281200
if (st.ok()) {
12291201
// Log a Delete record to kinesis stream
12301202
std::unique_ptr<CloudLogWritableFile> f(
1231-
cloud_log_controller_->CreateWritableFile(fname, EnvOptions()));
1203+
cloud_env_options.cloud_log_controller->CreateWritableFile(
1204+
fname, EnvOptions()));
12321205
if (!f || !f->status().ok()) {
12331206
st = Status::IOError("[Kinesis] DeleteFile", fname.c_str());
12341207
} else {
@@ -1392,18 +1365,7 @@ Status AwsEnv::GetFileSize(const std::string& logical_fname, uint64_t* size) {
13921365
}
13931366
}
13941367
} else if (logfile && !cloud_env_options.keep_local_log_files) {
1395-
st = cloud_log_controller_->status();
1396-
if (st.ok()) {
1397-
// map pathname to cache dir
1398-
std::string pathname = cloud_log_controller_->GetCachePath(Slice(fname));
1399-
Log(InfoLogLevel::DEBUG_LEVEL, info_log_,
1400-
"[kinesis] GetFileSize logfile %s %s", pathname.c_str(), "ok");
1401-
1402-
auto lambda = [this, pathname, size]() -> Status {
1403-
return base_env_->GetFileSize(pathname, size);
1404-
};
1405-
st = cloud_log_controller_->Retry(lambda);
1406-
}
1368+
st = cloud_env_options.cloud_log_controller->GetFileSize(fname, size);
14071369
} else {
14081370
st = base_env_->GetFileSize(fname, size);
14091371
}
@@ -1439,19 +1401,8 @@ Status AwsEnv::GetFileModificationTime(const std::string& logical_fname,
14391401
}
14401402
}
14411403
} else if (logfile && !cloud_env_options.keep_local_log_files) {
1442-
st = cloud_log_controller_->status();
1443-
if (st.ok()) {
1444-
// map pathname to cache dir
1445-
std::string pathname = cloud_log_controller_->GetCachePath(Slice(fname));
1446-
Log(InfoLogLevel::DEBUG_LEVEL, info_log_,
1447-
"[kinesis] GetFileModificationTime logfile %s %s", pathname.c_str(),
1448-
"ok");
1449-
1450-
auto lambda = [this, pathname, time]() -> Status {
1451-
return base_env_->GetFileModificationTime(pathname, time);
1452-
};
1453-
st = cloud_log_controller_->Retry(lambda);
1454-
}
1404+
st = cloud_env_options.cloud_log_controller->GetFileModificationTime(fname,
1405+
time);
14551406
} else {
14561407
st = base_env_->GetFileModificationTime(fname, time);
14571408
}
@@ -1992,7 +1943,7 @@ Status AwsEnv::NewAwsEnv(Env* base_env,
19921943
}
19931944

19941945
std::string AwsEnv::GetWALCacheDir() {
1995-
return cloud_log_controller_->GetCacheDir();
1946+
return cloud_env_options.cloud_log_controller->GetCacheDir();
19961947
}
19971948

19981949

cloud/aws/aws_kafka.cc

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
#include <fstream>
99
#include <iostream>
1010

11-
#include "cloud/cloud_log_controller.h"
11+
#include "cloud/cloud_log_controller_impl.h"
1212
#include "rocksdb/cloud/cloud_env_options.h"
1313
#include "rocksdb/status.h"
1414
#include "util/coding.h"
@@ -99,8 +99,8 @@ Status KafkaWritableFile::ProduceRaw(const std::string& operation_name,
9999

100100
Status KafkaWritableFile::Append(const Slice& data) {
101101
std::string serialized_data;
102-
CloudLogController::SerializeLogRecordAppend(fname_, data, current_offset_,
103-
&serialized_data);
102+
CloudLogControllerImpl::SerializeLogRecordAppend(
103+
fname_, data, current_offset_, &serialized_data);
104104

105105
return ProduceRaw("Append", serialized_data);
106106
}
@@ -110,8 +110,8 @@ Status KafkaWritableFile::Close() {
110110
"[kafka] S3WritableFile closing %s", fname_.c_str());
111111

112112
std::string serialized_data;
113-
CloudLogController::SerializeLogRecordClosed(fname_, current_offset_,
114-
&serialized_data);
113+
CloudLogControllerImpl::SerializeLogRecordClosed(fname_, current_offset_,
114+
&serialized_data);
115115

116116
return ProduceRaw("Close", serialized_data);
117117
}
@@ -161,7 +161,7 @@ Status KafkaWritableFile::LogDelete() {
161161
fname_.c_str());
162162

163163
std::string serialized_data;
164-
CloudLogController::SerializeLogRecordDelete(fname_, &serialized_data);
164+
CloudLogControllerImpl::SerializeLogRecordDelete(fname_, &serialized_data);
165165

166166
return ProduceRaw("Delete", serialized_data);
167167
}
@@ -173,14 +173,13 @@ Status KafkaWritableFile::LogDelete() {
173173
//
174174
// Intricacies of reading a Kafka stream
175175
//
176-
class KafkaController : public CloudLogController {
176+
class KafkaController : public CloudLogControllerImpl {
177177
public:
178-
KafkaController(CloudEnv* env,
179-
std::unique_ptr<RdKafka::Producer> producer,
178+
KafkaController(CloudEnv* env, std::unique_ptr<RdKafka::Producer> producer,
180179
std::unique_ptr<RdKafka::Consumer> consumer)
181-
: CloudLogController(env),
182-
producer_(std::move(producer)),
183-
consumer_(std::move(consumer)) {
180+
: CloudLogControllerImpl(env),
181+
producer_(std::move(producer)),
182+
consumer_(std::move(consumer)) {
184183
const std::string topic_name = env_->GetSrcBucketName();
185184

186185
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
@@ -393,14 +392,14 @@ CloudLogWritableFile* KafkaController::CreateWritableFile(
393392

394393
namespace rocksdb {
395394
#ifndef USE_KAFKA
396-
Status CreateKafkaController(CloudEnv *,
397-
std::unique_ptr<CloudLogController> *) {
395+
Status CloudLogControllerImpl::CreateKafkaController(
396+
CloudEnv*, std::shared_ptr<CloudLogController>*) {
398397
return Status::NotSupported("In order to use Kafka, make sure you're compiling with "
399398
"USE_KAFKA=1");
400399
}
401400
#else
402-
Status CreateKafkaController(CloudEnv *env,
403-
std::unique_ptr<CloudLogController> *output) {
401+
Status CloudLogControllerImpl::CreateKafkaController(
402+
CloudEnv* env, std::shared_ptr<CloudLogController>* output) {
404403
Status st = Status::OK();
405404
std::string conf_errstr, producer_errstr, consumer_errstr;
406405
const auto& kconf = env->GetCloudEnvOptions().kafka_log_options.client_config_params;

cloud/aws/aws_kinesis.cc

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
#include <fstream>
88
#include <iostream>
99

10-
#include "cloud/cloud_log_controller.h"
10+
#include "cloud/cloud_log_controller_impl.h"
1111
#include "rocksdb/cloud/cloud_env_options.h"
1212
#include "rocksdb/status.h"
1313
#include "util/coding.h"
@@ -74,8 +74,8 @@ Status KinesisWritableFile::Append(const Slice& data) {
7474

7575
// serialize write record
7676
std::string buffer;
77-
CloudLogController::SerializeLogRecordAppend(fname_, data, current_offset_,
78-
&buffer);
77+
CloudLogControllerImpl::SerializeLogRecordAppend(fname_, data,
78+
current_offset_, &buffer);
7979
request.SetData(Aws::Utils::ByteBuffer((const unsigned char*)buffer.c_str(),
8080
buffer.size()));
8181

@@ -109,7 +109,8 @@ Status KinesisWritableFile::Close() {
109109

110110
// serialize write record
111111
std::string buffer;
112-
CloudLogController::SerializeLogRecordClosed(fname_, current_offset_, &buffer);
112+
CloudLogControllerImpl::SerializeLogRecordClosed(fname_, current_offset_,
113+
&buffer);
113114
request.SetData(Aws::Utils::ByteBuffer((const unsigned char*)buffer.c_str(),
114115
buffer.size()));
115116

@@ -146,7 +147,7 @@ Status KinesisWritableFile::LogDelete() {
146147

147148
// serialize write record
148149
std::string buffer;
149-
CloudLogController::SerializeLogRecordDelete(fname_, &buffer);
150+
CloudLogControllerImpl::SerializeLogRecordDelete(fname_, &buffer);
150151
request.SetData(Aws::Utils::ByteBuffer((const unsigned char*)buffer.c_str(),
151152
buffer.size()));
152153

@@ -173,12 +174,13 @@ Status KinesisWritableFile::LogDelete() {
173174
//
174175
// Intricacies of reading a Kinesis stream
175176
//
176-
class KinesisController : public CloudLogController {
177+
class KinesisController : public CloudLogControllerImpl {
177178
public:
178-
KinesisController(CloudEnv* env,
179-
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & provider,
180-
const Aws::Client::ClientConfiguration & config)
181-
: CloudLogController(env) {
179+
KinesisController(
180+
CloudEnv* env,
181+
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider>& provider,
182+
const Aws::Client::ClientConfiguration& config)
183+
: CloudLogControllerImpl(env) {
182184
kinesis_client_.reset(provider
183185
? new Aws::Kinesis::KinesisClient(provider, config)
184186
: new Aws::Kinesis::KinesisClient(config));
@@ -452,14 +454,14 @@ CloudLogWritableFile* KinesisController::CreateWritableFile(
452454

453455
namespace rocksdb {
454456
#ifndef USE_AWS
455-
Status CreateKinesisController(CloudEnv *,
456-
std::unique_ptr<CloudLogController> *) {
457+
Status CloudLogControllerImpl::CreateKinesisController(
458+
CloudEnv*, std::shared_ptr<CloudLogController>*) {
457459
return Status::NotSupported("In order to use Kinesis, make sure you're compiling with "
458460
"USE_AWS=1");
459461
}
460462
#else
461-
Status CreateKinesisController(CloudEnv *env,
462-
std::unique_ptr<CloudLogController> *output) {
463+
Status CloudLogControllerImpl::CreateKinesisController(
464+
CloudEnv *env, std::shared_ptr<CloudLogController> *output) {
463465
Aws::Client::ClientConfiguration config;
464466
const auto & options = env->GetCloudEnvOptions();
465467
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> provider;

cloud/cloud_env.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
#include "cloud/aws/aws_env.h"
77
#include "cloud/cloud_env_impl.h"
88
#include "cloud/cloud_env_wrapper.h"
9-
#include "cloud/cloud_log_controller.h"
109
#include "cloud/db_cloud_impl.h"
1110
#include "cloud/filename.h"
1211
#include "port/likely.h"
12+
#include "rocksdb/cloud/cloud_log_controller.h"
1313
#include "rocksdb/db.h"
1414
#include "rocksdb/env.h"
1515
#include "rocksdb/options.h"

cloud/cloud_env_impl.cc

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
// Copyright (c) 2017 Rockset.
22
#ifndef ROCKSDB_LITE
33

4+
#include "cloud/cloud_env_impl.h"
5+
46
#include <cinttypes>
57

6-
#include "cloud/cloud_env_impl.h"
78
#include "cloud/cloud_env_wrapper.h"
8-
#include "cloud/cloud_log_controller.h"
99
#include "cloud/filename.h"
1010
#include "cloud/manifest_reader.h"
11-
#include "file/filename.h"
1211
#include "file/file_util.h"
12+
#include "file/filename.h"
1313
#include "port/likely.h"
14+
#include "rocksdb/cloud/cloud_log_controller.h"
1415
#include "rocksdb/db.h"
1516
#include "rocksdb/env.h"
1617
#include "rocksdb/options.h"
@@ -24,8 +25,8 @@ namespace rocksdb {
2425
: CloudEnv(opts, base, l), purger_is_running_(true) {}
2526

2627
CloudEnvImpl::~CloudEnvImpl() {
27-
if (cloud_log_controller_) {
28-
cloud_log_controller_->StopTailingStream();
28+
if (cloud_env_options.cloud_log_controller) {
29+
cloud_env_options.cloud_log_controller->StopTailingStream();
2930
}
3031
StopPurger();
3132
}

0 commit comments

Comments
 (0)