Skip to content

Commit 0d0d599

Browse files
committed
Add CloudScheduler class.
Class starts a thread and allows jobs to be executed in the future. Add ability to do recurring jobs. Move structs into CloudEnvImpl
1 parent 67ef949 commit 0d0d599

File tree

9 files changed

+563
-272
lines changed

9 files changed

+563
-272
lines changed

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ TESTS = \
447447
remote_compaction_test \
448448
db_cloud_test \
449449
cloud_manifest_test \
450+
cloud_scheduler_test \
450451
db_basic_test \
451452
db_encryption_test \
452453
db_test2 \
@@ -1685,6 +1686,9 @@ db_cloud_test: cloud/db_cloud_test.o $(LIBOBJECTS) $(TESTHARNESS)
16851686
cloud_manifest_test: cloud/cloud_manifest_test.o $(LIBOBJECTS) $(TESTHARNESS)
16861687
$(AM_LINK)
16871688

1689+
cloud_scheduler_test: cloud/cloud_scheduler_test.o $(LIBOBJECTS) $(TESTHARNESS)
1690+
$(AM_LINK)
1691+
16881692
iostats_context_test: monitoring/iostats_context_test.o $(LIBOBJECTS) $(TESTHARNESS)
16891693
$(AM_V_CCLD)$(CXX) $^ $(EXEC_LDFLAGS) -o $@ $(LDFLAGS)
16901694

cloud/aws/aws_env.cc

Lines changed: 1 addition & 237 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <set>
1313

1414
#include "cloud/cloud_log_controller_impl.h"
15+
#include "cloud/cloud_scheduler.h"
1516
#include "cloud/cloud_storage_provider_impl.h"
1617
#include "cloud/filename.h"
1718
#include "port/port_posix.h"
@@ -164,107 +165,6 @@ Status AwsCloudAccessCredentials::GetCredentialsProvider(
164165
}
165166

166167
#ifdef USE_AWS
167-
namespace detail {
168-
169-
using ScheduledJob =
170-
std::pair<std::chrono::steady_clock::time_point, std::function<void(void)>>;
171-
struct Comp {
172-
bool operator()(const ScheduledJob& a, const ScheduledJob& b) const {
173-
return a.first < b.first;
174-
}
175-
};
176-
struct JobHandle {
177-
std::multiset<ScheduledJob, Comp>::iterator itr;
178-
JobHandle(std::multiset<ScheduledJob, Comp>::iterator i)
179-
: itr(std::move(i)) {}
180-
};
181-
182-
class JobExecutor {
183-
public:
184-
std::shared_ptr<JobHandle> ScheduleJob(
185-
std::chrono::steady_clock::time_point time,
186-
std::function<void(void)> callback);
187-
void CancelJob(JobHandle* handle);
188-
189-
JobExecutor();
190-
~JobExecutor();
191-
192-
private:
193-
void DoWork();
194-
195-
std::mutex mutex_;
196-
// Notified when the earliest job to be scheduled has changed.
197-
std::condition_variable jobs_changed_cv_;
198-
std::multiset<ScheduledJob, Comp> scheduled_jobs_;
199-
bool shutting_down_{false};
200-
201-
std::thread thread_;
202-
};
203-
204-
JobExecutor::JobExecutor() {
205-
thread_ = std::thread([this]() { DoWork(); });
206-
}
207-
208-
JobExecutor::~JobExecutor() {
209-
{
210-
std::lock_guard<std::mutex> lk(mutex_);
211-
shutting_down_ = true;
212-
jobs_changed_cv_.notify_all();
213-
}
214-
if (thread_.joinable()) {
215-
thread_.join();
216-
}
217-
}
218-
219-
std::shared_ptr<JobHandle> JobExecutor::ScheduleJob(
220-
std::chrono::steady_clock::time_point time,
221-
std::function<void(void)> callback) {
222-
std::lock_guard<std::mutex> lk(mutex_);
223-
auto itr = scheduled_jobs_.emplace(time, std::move(callback));
224-
if (itr == scheduled_jobs_.begin()) {
225-
jobs_changed_cv_.notify_all();
226-
}
227-
return std::make_shared<JobHandle>(itr);
228-
}
229-
230-
void JobExecutor::CancelJob(JobHandle* handle) {
231-
std::lock_guard<std::mutex> lk(mutex_);
232-
if (scheduled_jobs_.begin() == handle->itr) {
233-
jobs_changed_cv_.notify_all();
234-
}
235-
scheduled_jobs_.erase(handle->itr);
236-
}
237-
238-
void JobExecutor::DoWork() {
239-
while (true) {
240-
std::unique_lock<std::mutex> lk(mutex_);
241-
if (shutting_down_) {
242-
break;
243-
}
244-
if (scheduled_jobs_.empty()) {
245-
jobs_changed_cv_.wait(lk);
246-
continue;
247-
}
248-
auto earliest_job = scheduled_jobs_.begin();
249-
auto earliest_job_time = earliest_job->first;
250-
if (earliest_job_time >= std::chrono::steady_clock::now()) {
251-
jobs_changed_cv_.wait_until(lk, earliest_job_time);
252-
continue;
253-
}
254-
// invoke the function
255-
lk.unlock();
256-
earliest_job->second();
257-
lk.lock();
258-
scheduled_jobs_.erase(earliest_job);
259-
}
260-
}
261-
262-
} // namespace detail
263-
264-
detail::JobExecutor* GetJobExecutor() {
265-
static detail::JobExecutor executor;
266-
return &executor;
267-
}
268168

269169
//
270170
// The AWS credentials are specified to the constructor via
@@ -292,144 +192,8 @@ AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
292192
base_env_ = underlying_env;
293193
}
294194

295-
AwsEnv::~AwsEnv() {
296-
{
297-
std::lock_guard<std::mutex> lk(files_to_delete_mutex_);
298-
using std::swap;
299-
for (auto& e : files_to_delete_) {
300-
GetJobExecutor()->CancelJob(e.second.get());
301-
}
302-
files_to_delete_.clear();
303-
}
304-
305-
StopPurger();
306-
}
307-
308195
void AwsEnv::Shutdown() { Aws::ShutdownAPI(Aws::SDKOptions()); }
309196

310-
void AwsEnv::RemoveFileFromDeletionQueue(const std::string& filename) {
311-
std::lock_guard<std::mutex> lk(files_to_delete_mutex_);
312-
auto itr = files_to_delete_.find(filename);
313-
if (itr != files_to_delete_.end()) {
314-
GetJobExecutor()->CancelJob(itr->second.get());
315-
files_to_delete_.erase(itr);
316-
}
317-
}
318-
319-
Status AwsEnv::DeleteFile(const std::string& logical_fname) {
320-
auto fname = RemapFilename(logical_fname);
321-
auto file_type = GetFileType(fname);
322-
bool sstfile = (file_type == RocksDBFileType::kSstFile),
323-
manifest = (file_type == RocksDBFileType::kManifestFile),
324-
identity = (file_type == RocksDBFileType::kIdentityFile),
325-
logfile = (file_type == RocksDBFileType::kLogFile);
326-
327-
if (manifest) {
328-
// We don't delete manifest files. The reason for this is that even though
329-
// RocksDB creates manifest with different names (like MANIFEST-00001,
330-
// MANIFEST-00008) we actually map all of them to the same filename
331-
// MANIFEST-[epoch].
332-
// When RocksDB wants to roll the MANIFEST (let's say from 1 to 8) it does
333-
// the following:
334-
// 1. Create a new MANIFEST-8
335-
// 2. Write everything into MANIFEST-8
336-
// 3. Sync MANIFEST-8
337-
// 4. Store "MANIFEST-8" in CURRENT file
338-
// 5. Delete MANIFEST-1
339-
//
340-
// What RocksDB cloud does behind the scenes (the numbers match the list
341-
// above):
342-
// 1. Create manifest file MANIFEST-[epoch].tmp
343-
// 2. Forward RocksDB writes to the file created in the first step
344-
// 3. Atomic rename from MANIFEST-[epoch].tmp to MANIFEST-[epoch]. The old
345-
// file with the same file name is overwritten.
346-
// 4. Nothing. Whatever the contents of CURRENT file, we don't care, we
347-
// always remap MANIFEST files to the correct with the latest epoch.
348-
// 5. Also nothing. There is no file to delete, because we have overwritten
349-
// it in the third step.
350-
return Status::OK();
351-
}
352-
353-
Status st;
354-
// Delete from destination bucket and local dir
355-
if (sstfile || manifest || identity) {
356-
if (HasDestBucket()) {
357-
// add the remote file deletion to the queue
358-
st = DeleteCloudFileFromDest(basename(fname));
359-
}
360-
// delete from local, too. Ignore the result, though. The file might not be
361-
// there locally.
362-
base_env_->DeleteFile(fname);
363-
} else if (logfile && !cloud_env_options.keep_local_log_files) {
364-
// read from Kinesis
365-
st = cloud_env_options.cloud_log_controller->status();
366-
if (st.ok()) {
367-
// Log a Delete record to kinesis stream
368-
std::unique_ptr<CloudLogWritableFile> f(
369-
cloud_env_options.cloud_log_controller->CreateWritableFile(
370-
fname, EnvOptions()));
371-
if (!f || !f->status().ok()) {
372-
st = Status::IOError("[Kinesis] DeleteFile", fname.c_str());
373-
} else {
374-
st = f->LogDelete();
375-
}
376-
}
377-
} else {
378-
st = base_env_->DeleteFile(fname);
379-
}
380-
Log(InfoLogLevel::DEBUG_LEVEL, info_log_, "[s3] DeleteFile file %s %s",
381-
fname.c_str(), st.ToString().c_str());
382-
return st;
383-
}
384-
385-
Status AwsEnv::CopyLocalFileToDest(const std::string& local_name,
386-
const std::string& dest_name) {
387-
RemoveFileFromDeletionQueue(basename(local_name));
388-
return cloud_env_options.storage_provider->PutCloudObject(
389-
local_name, GetDestBucketName(), dest_name);
390-
}
391-
392-
Status AwsEnv::DeleteCloudFileFromDest(const std::string& fname) {
393-
assert(HasDestBucket());
394-
auto base = basename(fname);
395-
// add the job to delete the file in 1 hour
396-
auto doDeleteFile = [this, base]() {
397-
{
398-
std::lock_guard<std::mutex> lk(files_to_delete_mutex_);
399-
auto itr = files_to_delete_.find(base);
400-
if (itr == files_to_delete_.end()) {
401-
// File was removed from files_to_delete_, do not delete!
402-
return;
403-
}
404-
files_to_delete_.erase(itr);
405-
}
406-
auto path = GetDestObjectPath() + "/" + base;
407-
// we are ready to delete the file!
408-
auto st = cloud_env_options.storage_provider->DeleteCloudObject(
409-
GetDestBucketName(), path);
410-
if (!st.ok() && !st.IsNotFound()) {
411-
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
412-
"[aws] DeleteFile DeletePathInS3 file %s error %s", path.c_str(),
413-
st.ToString().c_str());
414-
}
415-
};
416-
{
417-
std::lock_guard<std::mutex> lk(files_to_delete_mutex_);
418-
if (files_to_delete_.find(base) != files_to_delete_.end()) {
419-
// already in the queue
420-
return Status::OK();
421-
}
422-
}
423-
{
424-
std::lock_guard<std::mutex> lk(files_to_delete_mutex_);
425-
auto handle = GetJobExecutor()->ScheduleJob(
426-
std::chrono::steady_clock::now() + file_deletion_delay_,
427-
std::move(doDeleteFile));
428-
files_to_delete_.emplace(base, std::move(handle));
429-
}
430-
return Status::OK();
431-
}
432-
433197
//
434198
// All db in a bucket are stored in path /.rockset/dbid/<dbid>
435199
// The value of the object is the pathname where the db resides.

cloud/aws/aws_env.h

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,6 @@
2121
namespace ROCKSDB_NAMESPACE {
2222

2323
class S3ReadableFile;
24-
25-
namespace detail {
26-
struct JobHandle;
27-
} // namespace detail
28-
2924
//
3025
// The S3 environment for rocksdb. This class overrides all the
3126
// file/dir access methods and delegates all other methods to the
@@ -56,7 +51,7 @@ class AwsEnv : public CloudEnvImpl {
5651
const std::shared_ptr<Logger>& info_log,
5752
CloudEnv** cenv);
5853

59-
virtual ~AwsEnv();
54+
virtual ~AwsEnv() {}
6055

6156
const char* Name() const override { return "aws"; }
6257

@@ -71,8 +66,6 @@ class AwsEnv : public CloudEnvImpl {
7166
// explicitly make the default region be us-west-2.
7267
static constexpr const char* default_region = "us-west-2";
7368

74-
virtual Status DeleteFile(const std::string& fname) override;
75-
7669
virtual Status LockFile(const std::string& fname, FileLock** lock) override;
7770

7871
virtual Status UnlockFile(FileLock* lock) override;
@@ -87,16 +80,6 @@ class AwsEnv : public CloudEnvImpl {
8780
Status GetDbidList(const std::string& bucket, DbidList* dblist) override;
8881
Status DeleteDbid(const std::string& bucket,
8982
const std::string& dbid) override;
90-
Status DeleteCloudFileFromDest(const std::string& fname) override;
91-
Status CopyLocalFileToDest(const std::string& local_name,
92-
const std::string& cloud_name) override;
93-
94-
void RemoveFileFromDeletionQueue(const std::string& filename);
95-
96-
void TEST_SetFileDeletionDelay(std::chrono::seconds delay) {
97-
std::lock_guard<std::mutex> lk(files_to_delete_mutex_);
98-
file_deletion_delay_ = delay;
99-
}
10083

10184
private:
10285
//
@@ -109,10 +92,6 @@ class AwsEnv : public CloudEnvImpl {
10992
// The pathname that contains a list of all db's inside a bucket.
11093
static constexpr const char* dbid_registry_ = "/.rockset/dbid/";
11194

112-
std::mutex files_to_delete_mutex_;
113-
std::chrono::seconds file_deletion_delay_ = std::chrono::hours(1);
114-
std::unordered_map<std::string, std::shared_ptr<detail::JobHandle>>
115-
files_to_delete_;
11695
Random64 rng_;
11796
};
11897

0 commit comments

Comments
 (0)