Skip to content

Commit 7645dfb

Browse files
authored
Merge pull request #118 from rockset/AsanUnity
Make Cloud work for LITE and Unity tests
2 parents 5bee117 + b32e9cb commit 7645dfb

12 files changed

+131
-45
lines changed

cloud/aws/aws_env.cc

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
// Copyright (c) 2016-present, Rockset, Inc. All rights reserved.
22
//
3+
#ifndef ROCKSDB_LITE
34
#include "cloud/aws/aws_env.h"
45

56
#include <chrono>
@@ -18,7 +19,7 @@
1819
#include "rocksdb/cloud/cloud_storage_provider.h"
1920
#include "rocksdb/env.h"
2021
#include "rocksdb/status.h"
21-
#include "util/stderr_logger.h"
22+
#include "rocksdb/utilities/options_type.h"
2223
#include "util/string_util.h"
2324

2425
#ifdef USE_AWS
@@ -41,16 +42,6 @@ static const std::unordered_map<std::string, AwsAccessType> AwsAccessTypeMap = {
4142
{"anonymous", AwsAccessType::kAnonymous},
4243
};
4344

44-
template <typename T>
45-
bool ParseEnum(const std::unordered_map<std::string, T>& type_map,
46-
const std::string& type, T* value) {
47-
auto iter = type_map.find(type);
48-
if (iter != type_map.end()) {
49-
*value = iter->second;
50-
return true;
51-
}
52-
return false;
53-
}
5445

5546
AwsAccessType AwsCloudAccessCredentials::GetAccessType() const {
5647
if (type != AwsAccessType::kUndefined) {
@@ -177,8 +168,7 @@ Status AwsCloudAccessCredentials::GetCredentialsProvider(
177168
//
178169
AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
179170
const std::shared_ptr<Logger>& info_log)
180-
: CloudEnvImpl(_cloud_env_options, underlying_env, info_log),
181-
rng_(time(nullptr)) {
171+
: CloudEnvImpl(_cloud_env_options, underlying_env, info_log) {
182172
Aws::InitAPI(Aws::SDKOptions());
183173
if (cloud_env_options.src_bucket.GetRegion().empty() ||
184174
cloud_env_options.dest_bucket.GetRegion().empty()) {
@@ -370,3 +360,4 @@ std::string AwsEnv::GetWALCacheDir() {
370360

371361
#endif // USE_AWS
372362
} // namespace ROCKSDB_NAMESPACE
363+
#endif // ROCKSDB_LITE

cloud/aws/aws_env.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
#include <iostream>
1010

1111
#include "cloud/cloud_env_impl.h"
12-
#include "port/sys_time.h"
13-
#include "util/random.h"
1412

1513
#ifdef USE_AWS
1614

@@ -93,8 +91,6 @@ class AwsEnv : public CloudEnvImpl {
9391

9492
// The pathname that contains a list of all db's inside a bucket.
9593
static constexpr const char* dbid_registry_ = "/.rockset/dbid/";
96-
97-
Random64 rng_;
9894
};
9995

10096
} // namespace ROCKSDB_NAMESPACE

cloud/aws/aws_s3.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
// A directory maps to an an zero-size object in an S3 bucket
55
// A sst file maps to an object in that S3 bucket.
66
//
7+
#ifndef ROCKSDB_LITE
78
#ifdef USE_AWS
89
#include <aws/core/Aws.h>
910
#include <aws/core/utils/Outcome.h>
@@ -903,7 +904,7 @@ Status S3StorageProvider::DoPutCloudObject(const std::string& local_file,
903904
}
904905

905906
#endif /* USE_AWS */
906-
907+
907908
Status CloudStorageProviderImpl::CreateS3Provider(
908909
std::shared_ptr<CloudStorageProvider>* provider) {
909910
#ifndef USE_AWS
@@ -916,3 +917,4 @@ Status CloudStorageProviderImpl::CreateS3Provider(
916917
#endif /* USE_AWS */
917918
}
918919
} // namespace ROCKSDB_NAMESPACE
920+
#endif // ROCKSDB_LITE

cloud/cloud_scheduler.cc

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class CloudSchedulerImpl : public CloudScheduler {
4646
std::function<void(void*)> callback,
4747
void* arg) override;
4848
bool CancelJob(long handle) override;
49+
bool IsScheduled(long handle) override;
4950

5051
private:
5152
void DoWork();
@@ -70,16 +71,25 @@ class LocalCloudScheduler : public CloudScheduler {
7071
public:
7172
LocalCloudScheduler(const std::shared_ptr<CloudScheduler>& scheduler,
7273
long local_id)
73-
: scheduler_(scheduler), next_local_id_(local_id) {}
74+
: scheduler_(scheduler),
75+
next_local_id_(local_id),
76+
shutting_down_(false) {}
7477
~LocalCloudScheduler() override {
75-
for (const auto job : jobs_) {
78+
{
79+
std::lock_guard<std::mutex> lk(job_mutex_);
80+
shutting_down_ = true;
81+
}
82+
for (const auto& job : jobs_) {
7683
scheduler_->CancelJob(job.second);
7784
}
7885
jobs_.clear();
7986
}
8087

8188
long ScheduleJob(std::chrono::microseconds when,
8289
std::function<void(void*)> callback, void* arg) override {
90+
if (shutting_down_) {
91+
return -1;
92+
}
8393
std::lock_guard<std::mutex> lk(job_mutex_);
8494
long local_id = next_local_id_++;
8595
auto job = [this, local_id, callback](void* a) {
@@ -95,12 +105,37 @@ class LocalCloudScheduler : public CloudScheduler {
95105
std::chrono::microseconds frequency,
96106
std::function<void(void*)> callback,
97107
void* arg) override {
108+
if (shutting_down_) {
109+
return -1;
110+
}
98111
auto job = scheduler_->ScheduleRecurringJob(when, frequency, callback, arg);
99112
std::lock_guard<std::mutex> lk(job_mutex_);
100113
long local_id = next_local_id_++;
101114
jobs_[local_id] = job;
102115
return local_id;
103116
}
117+
118+
bool IsScheduled(long handle) override {
119+
if (shutting_down_) {
120+
return false;
121+
} else {
122+
std::lock_guard<std::mutex> lk(job_mutex_);
123+
const auto& it = jobs_.find(handle);
124+
if (it == jobs_.end()) {
125+
// We do not have the job in our queue. Return false
126+
return false;
127+
} else if (scheduler_->IsScheduled(it->second)) {
128+
// The job is still scheduled. Return false
129+
return true;
130+
} else {
131+
// We have the job in our queue but it has already
132+
// completed. Erase from our queue and return false
133+
jobs_.erase(it);
134+
return false;
135+
}
136+
}
137+
}
138+
104139
// Cancels the job referred to by handle if it is active and associated with
105140
// this scheduler
106141
bool CancelJob(long handle) override {
@@ -123,6 +158,7 @@ class LocalCloudScheduler : public CloudScheduler {
123158
std::mutex job_mutex_;
124159
std::shared_ptr<CloudScheduler> scheduler_;
125160
long next_local_id_;
161+
bool shutting_down_;
126162
std::unordered_map<long, long> jobs_;
127163
};
128164

@@ -187,6 +223,22 @@ long CloudSchedulerImpl::ScheduleRecurringJob(
187223
return id;
188224
}
189225

226+
bool CloudSchedulerImpl::IsScheduled(long id) {
227+
if (id < 0) {
228+
return false;
229+
} else {
230+
std::unique_lock<std::mutex> lk(mutex_);
231+
if (!scheduled_jobs_.empty()) {
232+
for (const auto& job : scheduled_jobs_) {
233+
if (job.id == id) {
234+
return true;
235+
}
236+
}
237+
}
238+
return false;
239+
}
240+
}
241+
190242
bool CloudSchedulerImpl::CancelJob(long id) {
191243
if (id < 0) {
192244
return false;

cloud/cloud_scheduler.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ class CloudScheduler {
3737
// was canceled, false otherwise.
3838
virtual bool CancelJob(long handle) = 0;
3939

40+
// Returns true if the job represented by handle has not completed,
41+
// meaning that it is either currently schedule or actively running
42+
virtual bool IsScheduled(long handle) = 0;
4043
// Returns a new instance of a cloud scheduler. The caller is responsible
4144
// for freeing the scheduler when it is no longer required.
4245
static std::shared_ptr<CloudScheduler> Get();

cloud/cloud_scheduler_test.cc

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
// Copyright (c) 2017 Rockset
22

3+
#ifndef ROCKSDB_LITE
34
#include "cloud/cloud_scheduler.h"
45

56
#include <gtest/gtest.h>
@@ -22,6 +23,21 @@ class CloudSchedulerTest : public testing::Test {
2223
~CloudSchedulerTest() {}
2324

2425
std::shared_ptr<CloudScheduler> scheduler_;
26+
void WaitForJobs(const std::vector<long> &jobs, uint32_t delay) {
27+
bool running = true;
28+
while (running) {
29+
running = false;
30+
for (const auto &job : jobs) {
31+
if (scheduler_->IsScheduled(job)) {
32+
running = true;
33+
break;
34+
}
35+
}
36+
if (running) {
37+
usleep(delay);
38+
}
39+
}
40+
}
2541
};
2642

2743
// This test tests basic scheduling function. There are 2 jobs, job2 is
@@ -33,10 +49,13 @@ TEST_F(CloudSchedulerTest, TestSchedule) {
3349
auto job1 = [&p1](void *) { p1 = std::chrono::steady_clock::now(); };
3450
auto job2 = [&p2](void *) { p2 = std::chrono::steady_clock::now(); };
3551

36-
scheduler_->ScheduleJob(std::chrono::milliseconds(300), job1, nullptr);
37-
scheduler_->ScheduleJob(std::chrono::milliseconds(100), job2, nullptr);
38-
39-
std::this_thread::sleep_for(std::chrono::milliseconds(400));
52+
auto h1 =
53+
scheduler_->ScheduleJob(std::chrono::milliseconds(300), job1, nullptr);
54+
auto h2 =
55+
scheduler_->ScheduleJob(std::chrono::milliseconds(100), job2, nullptr);
56+
while (scheduler_->IsScheduled(h1) && scheduler_->IsScheduled(h2)) {
57+
std::this_thread::sleep_for(std::chrono::milliseconds(400));
58+
}
4059
ASSERT_LT(p2, p1);
4160
}
4261

@@ -50,16 +69,16 @@ TEST_F(CloudSchedulerTest, TestCancel) {
5069
auto handle2 =
5170
scheduler_->ScheduleJob(std::chrono::microseconds(200), doJob, &job2);
5271
ASSERT_TRUE(scheduler_->CancelJob(handle2));
53-
usleep(300);
72+
WaitForJobs({handle1, handle2}, 300);
5473
ASSERT_EQ(job1, 2);
5574
ASSERT_EQ(job2, 0);
5675
ASSERT_FALSE(scheduler_->CancelJob(handle1));
5776
ASSERT_FALSE(scheduler_->CancelJob(handle2));
5877
}
5978

6079
TEST_F(CloudSchedulerTest, TestRecurring) {
61-
int job1 = 1;
62-
int job2 = 1;
80+
std::atomic<int> job1(1);
81+
std::atomic<int> job2(1);
6382
auto doJob1 = [&job1](void *) { job1++; };
6483
auto doJob2 = [&job2](void *) { job2++; };
6584

@@ -69,22 +88,24 @@ TEST_F(CloudSchedulerTest, TestRecurring) {
6988
scheduler_->ScheduleRecurringJob(std::chrono::microseconds(120),
7089
std::chrono::microseconds(100), doJob2,
7190
nullptr);
72-
usleep(700);
73-
ASSERT_GE(job2, 4);
74-
ASSERT_GT(job1, job2);
91+
while (job2 <= 4) {
92+
usleep(100);
93+
}
94+
ASSERT_GE(job2.load(), 4);
95+
ASSERT_GT(job1.load(), job2);
7596
ASSERT_TRUE(scheduler_->CancelJob(handle1));
76-
auto old1 = job1;
77-
auto old2 = job2;
97+
auto old1 = job1.load();
98+
auto old2 = job2.load();
7899
usleep(200);
79-
ASSERT_EQ(job1, old1);
80-
ASSERT_GT(job2, old2);
100+
ASSERT_EQ(job1.load(), old1);
101+
ASSERT_GT(job2.load(), old2);
81102
}
82103

83104
TEST_F(CloudSchedulerTest, TestMultipleSchedulers) {
84105
auto scheduler2 = CloudScheduler::Get();
85106

86-
int job1 = 1;
87-
int job2 = 1;
107+
std::atomic<int> job1(1);
108+
std::atomic<int> job2(1);
88109
auto doJob1 = [&job1](void *) { job1++; };
89110
auto doJob2 = [&job2](void *) { job2++; };
90111

@@ -107,8 +128,8 @@ TEST_F(CloudSchedulerTest, TestMultipleSchedulers) {
107128
std::chrono::microseconds(20), doJob2,
108129
nullptr);
109130
scheduler2.reset();
110-
auto old1 = job1;
111-
auto old2 = job2;
131+
auto old1 = job1.load();
132+
auto old2 = job2.load();
112133
usleep(200);
113134
ASSERT_EQ(job2, old2);
114135
ASSERT_GT(job1, old1);
@@ -159,3 +180,12 @@ int main(int argc, char **argv) {
159180
::testing::InitGoogleTest(&argc, argv);
160181
return RUN_ALL_TESTS();
161182
}
183+
#else
184+
#include <stdio.h>
185+
186+
int main(int /*argc*/, char** /*argv*/) {
187+
fprintf(stderr, "SKIPPED as CloudSchedulerTest is not supported in ROCKSDB_LITE\n");
188+
return 0;
189+
}
190+
191+
#endif // ROCKSDB_LITE

cloud/cloud_storage_provider.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
#include "util/string_util.h"
2121

2222
namespace ROCKSDB_NAMESPACE {
23-
23+
#ifndef ROCKSDB_LITE
2424
/******************** Readablefile ******************/
2525
CloudStorageReadableFileImpl::CloudStorageReadableFileImpl(
2626
const std::shared_ptr<Logger>& info_log, const std::string& bucket,
@@ -361,4 +361,5 @@ Status CloudStorageProviderImpl::PutCloudObject(
361361
return DoPutCloudObject(local_file, bucket_name, object_path, fsize);
362362
}
363363

364+
#endif //ROCKSDB_LITE
364365
} // namespace ROCKSDB_NAMESPACE

cloud/db_cloud_test.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,9 @@ class CloudTest : public testing::Test {
6464

6565
CloudEnv* aenv;
6666
// create a dummy aws env
67-
CloudEnv::NewAwsEnv(base_env_, cloud_env_options_, options_.info_log,
68-
&aenv);
67+
ASSERT_OK(CloudEnv::NewAwsEnv(base_env_, cloud_env_options_,
68+
options_.info_log, &aenv));
69+
ASSERT_NE(aenv, nullptr);
6970
aenv_.reset(aenv);
7071
// delete all pre-existing contents from the bucket
7172
Status st = aenv_->GetCloudEnvOptions().storage_provider->EmptyBucket(
@@ -1559,7 +1560,7 @@ int main(int, char**) {
15591560

15601561
#include <stdio.h>
15611562

1562-
int main(int argc, char** argv) {
1563+
int main(int, char** ) {
15631564
fprintf(stderr, "SKIPPED as DBCloud is not supported in ROCKSDB_LITE\n");
15641565
return 0;
15651566
}

cloud/remote_compaction_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ int main(int, char**) {
517517

518518
#include <stdio.h>
519519

520-
int main(int argc, char** argv) {
520+
int main(int, char**) {
521521
fprintf(stderr, "SKIPPED as DBCloud is not supported in ROCKSDB_LITE\n");
522522
return 0;
523523
}

db/db_impl/db_impl_remote_compaction.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,13 +151,15 @@ Status DBImpl::doCompact(const CompactionOptions& compact_options,
151151
// remove this compaction job from the list of compactions
152152
c->ReleaseCompactionFiles(s);
153153

154+
#ifndef ROCKSDB_LITE
154155
// Make sure SstFileManager does its bookkeeping
155156
auto sfm = static_cast<SstFileManagerImpl*>(
156157
immutable_db_options_.sst_file_manager.get());
157158
if (sfm) {
158159
sfm->OnCompactionCompletion(c.get());
159160
}
160-
161+
#endif // ROCKSDB_LITE
162+
161163
// remove our iterator
162164
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
163165

0 commit comments

Comments
 (0)