Skip to content

Commit b6a94e9

Browse files
authored
feat(server): Support dynamic cloud storage SAVE path (#4729)
Add functionality to use SAVE and BGSAVE commands with dynamic CLOUD storage path. New syntax is: SAVE [RDB|DF] [CLOUD_URI] [BASENAME] where CLOUD_URI should start with S3 or GCS prefix. For example, now it should work to have working directory pointing to some local folder and executing command `SAVE DF s3://bucket/snapshots my_snapshot` would save snapshots to `s3://bucket/snapshots` with basename `my_snapshot`. Resolves #4660 --------- Signed-off-by: mkaruza <[email protected]>
1 parent 51dd6d1 commit b6a94e9

File tree

7 files changed

+140
-85
lines changed

7 files changed

+140
-85
lines changed

src/server/detail/save_stages_controller.cc

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@ namespace fs = std::filesystem;
3535

3636
namespace {
3737

38-
bool IsCloudPath(string_view path) {
39-
return absl::StartsWith(path, kS3Prefix) || absl::StartsWith(path, kGCSPrefix);
40-
}
41-
4238
// Create a directory and all its parents if they don't exist.
4339
error_code CreateDirs(fs::path dir_path) {
4440
error_code ec;
@@ -387,8 +383,8 @@ GenericError SaveStagesController::FinalizeFileMovement() {
387383

388384
// Build full path: get dir, try creating dirs, get filename with placeholder
389385
GenericError SaveStagesController::BuildFullPath() {
390-
fs::path dir_path = GetFlag(FLAGS_dir);
391-
if (!dir_path.empty() && !IsCloudPath(GetFlag(FLAGS_dir))) {
386+
fs::path dir_path = cloud_uri_.empty() ? GetFlag(FLAGS_dir) : cloud_uri_;
387+
if (!dir_path.empty() && cloud_uri_.empty() && !IsCloudPath(GetFlag(FLAGS_dir))) {
392388
if (auto ec = CreateDirs(dir_path); ec)
393389
return {ec, "Failed to create directories"};
394390
}

src/server/detail/save_stages_controller.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ struct SaveInfo {
2929

3030
struct SaveStagesInputs {
3131
bool use_dfs_format_;
32+
std::string_view cloud_uri_;
3233
std::string_view basename_;
3334
Transaction* trans_;
3435
Service* service_;

src/server/detail/snapshot_storage.cc

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@ using namespace util;
3434
using namespace std;
3535

3636
namespace {
37-
inline bool IsGcsPath(string_view path) {
38-
return absl::StartsWith(path, kGCSPrefix);
39-
}
4037

4138
constexpr string_view kSummarySuffix = "summary.dfs"sv;
4239

@@ -270,7 +267,7 @@ io::Result<std::pair<io::Sink*, uint8_t>, GenericError> GcsSnapshotStorage::Open
270267
}
271268

272269
io::ReadonlyFileOrError GcsSnapshotStorage::OpenReadFile(const std::string& path) {
273-
if (!IsGcsPath(path))
270+
if (!IsGCSPath(path))
274271
return nonstd::make_unexpected(GenericError("Invalid GCS path"));
275272

276273
auto [bucket, key] = GetBucketPath(path);
@@ -321,7 +318,7 @@ io::Result<std::string, GenericError> GcsSnapshotStorage::LoadPath(string_view d
321318

322319
io::Result<vector<string>, GenericError> GcsSnapshotStorage::ExpandFromPath(
323320
const string& load_path) {
324-
if (!IsGcsPath(load_path))
321+
if (!IsGCSPath(load_path))
325322
return nonstd::make_unexpected(
326323
GenericError(make_error_code(errc::invalid_argument), "Invalid GCS path"));
327324

src/server/detail/snapshot_storage.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
#include <aws/s3/S3Client.h>
88
#endif
99

10+
#include <absl/strings/match.h>
11+
1012
#include <filesystem>
1113
#include <string>
1214
#include <string_view>
@@ -186,5 +188,17 @@ struct FilenameSubstitutions {
186188

187189
void SubstituteFilenamePlaceholders(fs::path* filename, const FilenameSubstitutions& fns);
188190

191+
inline bool IsS3Path(std::string_view path) {
192+
return absl::StartsWith(path, detail::kS3Prefix);
193+
}
194+
195+
inline bool IsGCSPath(std::string_view path) {
196+
return absl::StartsWith(path, detail::kGCSPrefix);
197+
}
198+
199+
inline bool IsCloudPath(std::string_view path) {
200+
return IsS3Path(path) || IsGCSPath(path);
201+
}
202+
189203
} // namespace detail
190204
} // namespace dfly

src/server/server_family.cc

Lines changed: 69 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -251,12 +251,29 @@ string UnknownCmd(string cmd, CmdArgList args) {
251251
absl::StrJoin(args.begin(), args.end(), ", ", CmdArgListFormatter()));
252252
}
253253

254-
bool IsS3Path(string_view path) {
255-
return absl::StartsWith(path, detail::kS3Prefix);
256-
}
257-
258-
bool IsGCSPath(string_view path) {
259-
return absl::StartsWith(path, detail::kGCSPrefix);
254+
std::shared_ptr<detail::SnapshotStorage> CreateCloudSnapshotStorage(std::string_view uri) {
255+
if (detail::IsS3Path(uri)) {
256+
#ifdef WITH_AWS
257+
shard_set->pool()->GetNextProactor()->Await([&] { util::aws::Init(); });
258+
return std::make_shared<detail::AwsS3SnapshotStorage>(
259+
absl::GetFlag(FLAGS_s3_endpoint), absl::GetFlag(FLAGS_s3_use_https),
260+
absl::GetFlag(FLAGS_s3_ec2_metadata), absl::GetFlag(FLAGS_s3_sign_payload));
261+
#else
262+
LOG(ERROR) << "Compiled without AWS support";
263+
exit(1);
264+
#endif
265+
} else if (detail::IsGCSPath(uri)) {
266+
auto gcs = std::make_shared<detail::GcsSnapshotStorage>();
267+
auto ec = shard_set->pool()->GetNextProactor()->Await([&] { return gcs->Init(3000); });
268+
if (ec) {
269+
LOG(ERROR) << "Failed to initialize GCS snapshot storage: " << ec.message();
270+
exit(1);
271+
}
272+
return gcs;
273+
} else {
274+
LOG(ERROR) << "Uknown cloud storage " << uri;
275+
exit(1);
276+
}
260277
}
261278

262279
// Check that if TLS is used at least one form of client authentication is
@@ -854,24 +871,9 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
854871
}
855872

856873
string flag_dir = GetFlag(FLAGS_dir);
857-
if (IsS3Path(flag_dir)) {
858-
#ifdef WITH_AWS
859-
shard_set->pool()->GetNextProactor()->Await([&] { util::aws::Init(); });
860-
snapshot_storage_ = std::make_shared<detail::AwsS3SnapshotStorage>(
861-
absl::GetFlag(FLAGS_s3_endpoint), absl::GetFlag(FLAGS_s3_use_https),
862-
absl::GetFlag(FLAGS_s3_ec2_metadata), absl::GetFlag(FLAGS_s3_sign_payload));
863-
#else
864-
LOG(ERROR) << "Compiled without AWS support";
865-
exit(1);
866-
#endif
867-
} else if (IsGCSPath(flag_dir)) {
868-
auto gcs = std::make_shared<detail::GcsSnapshotStorage>();
869-
auto ec = shard_set->pool()->GetNextProactor()->Await([&] { return gcs->Init(3000); });
870-
if (ec) {
871-
LOG(ERROR) << "Failed to initialize GCS snapshot storage: " << ec.message();
872-
exit(1);
873-
}
874-
snapshot_storage_ = std::move(gcs);
874+
875+
if (detail::IsCloudPath(flag_dir)) {
876+
snapshot_storage_ = CreateCloudSnapshotStorage(flag_dir);
875877
} else if (fq_threadpool_) {
876878
snapshot_storage_ = std::make_shared<detail::FileSnapshotStorage>(fq_threadpool_.get());
877879
} else {
@@ -1655,10 +1657,11 @@ GenericError ServerFamily::DoSave(bool ignore_state) {
16551657
CHECK_NOTNULL(cid);
16561658
boost::intrusive_ptr<Transaction> trans(new Transaction{cid});
16571659
trans->InitByArgs(&namespaces->GetDefaultNamespace(), 0, {});
1658-
return DoSave(absl::GetFlag(FLAGS_df_snapshot_format), {}, trans.get(), ignore_state);
1660+
return DoSave(SaveCmdOptions{absl::GetFlag(FLAGS_df_snapshot_format), {}, {}}, trans.get(),
1661+
ignore_state);
16591662
}
16601663

1661-
GenericError ServerFamily::DoSaveCheckAndStart(bool new_version, string_view basename,
1664+
GenericError ServerFamily::DoSaveCheckAndStart(const SaveCmdOptions& save_cmd_opts,
16621665
Transaction* trans, bool ignore_state) {
16631666
auto state = ServerState::tlocal()->gstate();
16641667

@@ -1674,10 +1677,13 @@ GenericError ServerFamily::DoSaveCheckAndStart(bool new_version, string_view bas
16741677
"SAVING - can not save database"};
16751678
}
16761679

1677-
VLOG(1) << "Saving snapshot to " << basename;
1680+
auto snapshot_storage = save_cmd_opts.cloud_uri.empty()
1681+
? snapshot_storage_
1682+
: CreateCloudSnapshotStorage(save_cmd_opts.cloud_uri);
16781683

16791684
save_controller_ = make_unique<SaveStagesController>(detail::SaveStagesInputs{
1680-
new_version, basename, trans, &service_, fq_threadpool_.get(), snapshot_storage_});
1685+
save_cmd_opts.new_version, save_cmd_opts.cloud_uri, save_cmd_opts.basename, trans,
1686+
&service_, fq_threadpool_.get(), snapshot_storage});
16811687

16821688
auto res = save_controller_->InitResourcesAndStart();
16831689

@@ -1714,9 +1720,9 @@ GenericError ServerFamily::WaitUntilSaveFinished(Transaction* trans, bool ignore
17141720
return save_info.error;
17151721
}
17161722

1717-
GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transaction* trans,
1723+
GenericError ServerFamily::DoSave(const SaveCmdOptions& save_cmd_opts, Transaction* trans,
17181724
bool ignore_state) {
1719-
if (auto ec = DoSaveCheckAndStart(new_version, basename, trans, ignore_state); ec) {
1725+
if (auto ec = DoSaveCheckAndStart(save_cmd_opts, trans, ignore_state); ec) {
17201726
return ec;
17211727
}
17221728

@@ -2078,46 +2084,61 @@ void ServerFamily::BgSaveFb(boost::intrusive_ptr<Transaction> trans) {
20782084
}
20792085
}
20802086

2081-
std::optional<ServerFamily::VersionBasename> ServerFamily::GetVersionAndBasename(
2082-
CmdArgList args, SinkReplyBuilder* builder) {
2083-
if (args.size() > 2) {
2087+
std::optional<SaveCmdOptions> ServerFamily::GetSaveCmdOpts(CmdArgList args,
2088+
SinkReplyBuilder* builder) {
2089+
if (args.size() > 3) {
20842090
builder->SendError(kSyntaxErr);
20852091
return {};
20862092
}
20872093

2088-
bool new_version = absl::GetFlag(FLAGS_df_snapshot_format);
2094+
SaveCmdOptions save_cmd_opts;
2095+
save_cmd_opts.new_version = absl::GetFlag(FLAGS_df_snapshot_format);
20892096

20902097
if (args.size() >= 1) {
20912098
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
20922099
if (sub_cmd == "DF") {
2093-
new_version = true;
2100+
save_cmd_opts.new_version = true;
20942101
} else if (sub_cmd == "RDB") {
2095-
new_version = false;
2102+
save_cmd_opts.new_version = false;
20962103
} else {
20972104
builder->SendError(UnknownSubCmd(sub_cmd, "SAVE"), kSyntaxErrType);
20982105
return {};
20992106
}
21002107
}
21012108

2102-
string_view basename;
2103-
if (args.size() == 2) {
2104-
basename = ArgS(args, 1);
2109+
if (args.size() >= 2) {
2110+
if (detail::IsS3Path(ArgS(args, 1))) {
2111+
#ifdef WITH_AWS
2112+
save_cmd_opts.cloud_uri = ArgS(args, 1);
2113+
#else
2114+
LOG(ERROR) << "Compiled without AWS support";
2115+
exit(1);
2116+
#endif
2117+
} else if (detail::IsGCSPath(ArgS(args, 1))) {
2118+
save_cmd_opts.cloud_uri = ArgS(args, 1);
2119+
} else {
2120+
// no cloud_uri get basename and return
2121+
save_cmd_opts.basename = ArgS(args, 1);
2122+
return save_cmd_opts;
2123+
}
2124+
// cloud_uri is set so get basename if provided
2125+
if (args.size() == 3) {
2126+
save_cmd_opts.basename = ArgS(args, 2);
2127+
}
21052128
}
21062129

2107-
return ServerFamily::VersionBasename{new_version, basename};
2130+
return save_cmd_opts;
21082131
}
21092132

2110-
// BGSAVE [DF|RDB] [basename]
2133+
// SAVE [DF|RDB] [CLOUD_URI] [BASENAME]
21112134
// TODO add missing [SCHEDULE]
21122135
void ServerFamily::BgSave(CmdArgList args, const CommandContext& cmd_cntx) {
2113-
auto maybe_res = GetVersionAndBasename(args, cmd_cntx.rb);
2136+
auto maybe_res = GetSaveCmdOpts(args, cmd_cntx.rb);
21142137
if (!maybe_res) {
21152138
return;
21162139
}
21172140

2118-
const auto [version, basename] = *maybe_res;
2119-
2120-
if (auto ec = DoSaveCheckAndStart(version, basename, cmd_cntx.tx); ec) {
2141+
if (auto ec = DoSaveCheckAndStart(*maybe_res, cmd_cntx.tx); ec) {
21212142
cmd_cntx.rb->SendError(ec.Format());
21222143
return;
21232144
}
@@ -2127,18 +2148,16 @@ void ServerFamily::BgSave(CmdArgList args, const CommandContext& cmd_cntx) {
21272148
cmd_cntx.rb->SendOk();
21282149
}
21292150

2130-
// SAVE [DF|RDB] [basename]
2151+
// SAVE [DF|RDB] [CLOUD_URI] [BASENAME]
21312152
// Allows saving the snapshot of the dataset on disk, potentially overriding the format
21322153
// and the snapshot name.
21332154
void ServerFamily::Save(CmdArgList args, const CommandContext& cmd_cntx) {
2134-
auto maybe_res = GetVersionAndBasename(args, cmd_cntx.rb);
2155+
auto maybe_res = GetSaveCmdOpts(args, cmd_cntx.rb);
21352156
if (!maybe_res) {
21362157
return;
21372158
}
21382159

2139-
const auto [version, basename] = *maybe_res;
2140-
2141-
GenericError ec = DoSave(version, basename, cmd_cntx.tx);
2160+
GenericError ec = DoSave(*maybe_res, cmd_cntx.tx);
21422161
if (ec) {
21432162
cmd_cntx.rb->SendError(ec.Format());
21442163
} else {

src/server/server_family.h

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,15 @@ struct ReplicaOffsetInfo {
158158
std::vector<uint64_t> flow_offsets;
159159
};
160160

161+
struct SaveCmdOptions {
162+
// if new_version is true, saves DF specific, non redis compatible snapshot.
163+
bool new_version;
164+
// cloud storage URI
165+
std::string_view cloud_uri;
166+
// if basename is not empty it will override dbfilename flag
167+
std::string_view basename;
168+
};
169+
161170
class ServerFamily {
162171
using SinkReplyBuilder = facade::SinkReplyBuilder;
163172

@@ -193,9 +202,7 @@ class ServerFamily {
193202

194203
void StatsMC(std::string_view section, SinkReplyBuilder* builder);
195204

196-
// if new_version is true, saves DF specific, non redis compatible snapshot.
197-
// if basename is not empty it will override dbfilename flag.
198-
GenericError DoSave(bool new_version, std::string_view basename, Transaction* transaction,
205+
GenericError DoSave(const SaveCmdOptions& save_cmd_opts, Transaction* transaction,
199206
bool ignore_state = false);
200207

201208
// Calls DoSave with a default generated transaction and with the format
@@ -313,14 +320,11 @@ class ServerFamily {
313320

314321
void SendInvalidationMessages() const;
315322

316-
// Helper function to retrieve version(true if format is dfs rdb), and basename from args.
317-
// In case of an error an empty optional is returned.
318-
using VersionBasename = std::pair<bool, std::string_view>;
319-
std::optional<VersionBasename> GetVersionAndBasename(CmdArgList args, SinkReplyBuilder* builder);
323+
std::optional<SaveCmdOptions> GetSaveCmdOpts(CmdArgList args, SinkReplyBuilder* builder);
320324

321325
void BgSaveFb(boost::intrusive_ptr<Transaction> trans);
322326

323-
GenericError DoSaveCheckAndStart(bool new_version, string_view basename, Transaction* trans,
327+
GenericError DoSaveCheckAndStart(const SaveCmdOptions& save_cmd_opts, Transaction* trans,
324328
bool ignore_state = false) ABSL_LOCKS_EXCLUDED(save_mu_);
325329

326330
GenericError WaitUntilSaveFinished(Transaction* trans,

0 commit comments

Comments
 (0)