Skip to content

Commit 1e0e713

Browse files
samarahupritha-srivastava
authored andcommitted
rgw/redis: Implement RedisDriver::get_async and RedisDriver::put_async
Signed-off-by: Samarah <[email protected]>
1 parent 9397358 commit 1e0e713

File tree

4 files changed

+144
-11
lines changed

4 files changed

+144
-11
lines changed

src/rgw/driver/d4n/rgw_sal_d4n.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ class D4NFilterObject : public FilterObject {
140140
source(_source)
141141
{
142142
cb = std::make_unique<D4NFilterGetCB>(source->driver, source);
143-
}
143+
}
144144
virtual ~D4NFilterReadOp() = default;
145145

146146
virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override;

src/rgw/rgw_redis_driver.cc

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ int RedisDriver::initialize(const DoutPrefixProvider* dpp)
7373
config cfg;
7474
cfg.addr.host = address.substr(0, address.find(":"));
7575
cfg.addr.port = address.substr(address.find(":") + 1, address.length());
76+
cfg.clientname = "RedisDriver";
7677

7778
if (!cfg.addr.host.length() || !cfg.addr.port.length()) {
7879
ldpp_dout(dpp, 10) << "RedisDriver::" << __func__ << "(): Endpoint was not configured correctly." << dendl;
@@ -552,18 +553,43 @@ int RedisDriver::set_attr(const DoutPrefixProvider* dpp, const std::string& key,
552553
Aio::OpFunc RedisDriver::redis_read_op(optional_yield y, std::shared_ptr<connection> conn,
553554
off_t read_ofs, off_t read_len, const std::string& key)
554555
{
555-
return [y, conn, key] (Aio* aio, AioResult& r) mutable {
556+
return [y, conn, &key] (Aio* aio, AioResult& r) mutable {
556557
using namespace boost::asio;
557558
spawn::yield_context yield = y.get_yield_context();
558559
async_completion<spawn::yield_context, void()> init(yield);
559560
auto ex = get_associated_executor(init.completion_handler);
560561

561-
boost::redis::request req;
562+
// TODO: Make unique pointer once support is added
563+
auto s = std::make_shared<RedisDriver::redis_response>();
564+
auto& resp = s->resp;
565+
auto& req = s->req;
562566
req.push("HGET", key, "data");
563567

568+
conn->async_exec(req, resp, bind_executor(ex, RedisDriver::redis_aio_handler{aio, r, s}));
569+
};
570+
}
571+
572+
Aio::OpFunc RedisDriver::redis_write_op(optional_yield y, std::shared_ptr<connection> conn,
573+
const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, const std::string& key)
574+
{
575+
return [y, conn, &bl, &len, &attrs, &key] (Aio* aio, AioResult& r) mutable {
576+
using namespace boost::asio;
577+
spawn::yield_context yield = y.get_yield_context();
578+
async_completion<spawn::yield_context, void()> init(yield);
579+
auto ex = get_associated_executor(init.completion_handler);
580+
581+
auto redisAttrs = build_attrs(attrs);
582+
583+
if (bl.length()) {
584+
redisAttrs.push_back("data");
585+
redisAttrs.push_back(bl.to_str());
586+
}
587+
564588
// TODO: Make unique pointer once support is added
565589
auto s = std::make_shared<RedisDriver::redis_response>();
566590
auto& resp = s->resp;
591+
auto& req = s->req;
592+
req.push_range("HMSET", key, redisAttrs);
567593

568594
conn->async_exec(req, resp, bind_executor(ex, RedisDriver::redis_aio_handler{aio, r, s}));
569595
};
@@ -579,9 +605,11 @@ rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optiona
579605
}
580606

581607
rgw::AioResultList RedisDriver::put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) {
582-
// TODO: implement
583-
rgw::AioResultList aio_result_list;
584-
return aio_result_list;
608+
std::string entry = partition_info.location + key;
609+
rgw_raw_obj r_obj;
610+
r_obj.oid = key;
611+
612+
return aio->get(r_obj, redis_write_op(y, conn, bl, len, attrs, entry), cost, id);
585613
}
586614

587615
void RedisDriver::shutdown()

src/rgw/rgw_redis_driver.h

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ class RedisDriver : public CacheDriver {
3131

3232
virtual int initialize(const DoutPrefixProvider* dpp) override;
3333
virtual int put(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, optional_yield y) override;
34-
virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override;
34+
virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len,
35+
const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override;
3536
virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override;
3637
virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
3738
virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
@@ -52,6 +53,7 @@ class RedisDriver : public CacheDriver {
5253
uint64_t outstanding_write_size;
5354

5455
struct redis_response {
56+
boost::redis::request req;
5557
boost::redis::response<std::string> resp;
5658
};
5759

@@ -61,14 +63,24 @@ class RedisDriver : public CacheDriver {
6163
std::shared_ptr<redis_response> s;
6264

6365
/* Read Callback */
64-
void operator()(boost::system::error_code ec, auto) const {
65-
r.result = -ec.value();
66-
r.data.append(std::get<0>(s->resp).value().c_str());
66+
void operator()(auto ec, auto) const {
67+
if (ec.failed()) {
68+
r.result = -ec.value();
69+
} else {
70+
r.result = 0;
71+
}
72+
73+
/* Only append data for GET call */
74+
if (s->req.payload().find("HGET") != std::string::npos) {
75+
r.data.append(std::get<0>(s->resp).value());
76+
}
77+
6778
throttle->put(r);
6879
}
6980
};
7081

71-
static Aio::OpFunc redis_read_op(optional_yield y, std::shared_ptr<connection> conn, off_t read_ofs, off_t read_len, const std::string& key);
82+
Aio::OpFunc redis_read_op(optional_yield y, std::shared_ptr<connection> conn, off_t read_ofs, off_t read_len, const std::string& key);
83+
Aio::OpFunc redis_write_op(optional_yield y, std::shared_ptr<connection> conn, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, const std::string& key);
7284
};
7385

7486
} } // namespace rgw::cache

src/test/rgw/test_redis_driver.cc

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,45 @@ using boost::redis::request;
1717
using boost::redis::response;
1818

1919
class Environment* env;
20+
rgw::AioResultList completed;
21+
uint64_t offset = 0;
22+
23+
int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results) {
24+
int r = rgw::check_for_errors(results);
25+
26+
if (r < 0) {
27+
return r;
28+
}
29+
30+
auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; };
31+
results.sort(cmp); // merge() requires results to be sorted first
32+
completed.merge(results, cmp); // merge results in sorted order
33+
34+
while (!completed.empty() && completed.front().id == offset) {
35+
auto ret = std::move(completed.front().result);
36+
37+
EXPECT_EQ(0, ret);
38+
completed.pop_front_and_dispose(std::default_delete<rgw::AioResultEntry>{});
39+
}
40+
return 0;
41+
}
42+
43+
void cancel(rgw::Aio* aio) {
44+
aio->drain();
45+
}
46+
47+
int drain(const DoutPrefixProvider* dpp, rgw::Aio* aio) {
48+
auto c = aio->wait();
49+
while (!c.empty()) {
50+
int r = flush(dpp, std::move(c));
51+
if (r < 0) {
52+
cancel(aio);
53+
return r;
54+
}
55+
c = aio->wait();
56+
}
57+
return flush(dpp, std::move(c));
58+
}
2059

2160
class Environment : public ::testing::Environment {
2261
public:
@@ -148,6 +187,60 @@ TEST_F(RedisDriverFixture, GetYield)
148187
io.run();
149188
}
150189

190+
TEST_F(RedisDriverFixture, PutAsyncYield)
191+
{
192+
spawn::spawn(io, [this] (spawn::yield_context yield) {
193+
std::unique_ptr<rgw::Aio> aio = rgw::make_throttle(env->cct->_conf->rgw_get_obj_window_size, optional_yield{io, yield});
194+
auto completed = cacheDriver->put_async(env->dpp, optional_yield{io, yield}, aio.get(), "testName", bl, bl.length(), attrs, 0, 0);
195+
drain(env->dpp, aio.get());
196+
197+
cacheDriver->shutdown();
198+
199+
boost::system::error_code ec;
200+
request req;
201+
req.push("HMGET", "RedisCache/testName", "attr", "data");
202+
req.push("FLUSHALL");
203+
response<std::vector<std::string>, boost::redis::ignore_t> resp;
204+
205+
conn->async_exec(req, resp, yield[ec]);
206+
207+
ASSERT_EQ((bool)ec, false);
208+
EXPECT_EQ(std::get<0>(resp).value()[0], "attrVal");
209+
EXPECT_EQ(std::get<0>(resp).value()[1], "test data");
210+
conn->cancel();
211+
});
212+
213+
io.run();
214+
}
215+
216+
TEST_F(RedisDriverFixture, GetAsyncYield)
217+
{
218+
spawn::spawn(io, [this] (spawn::yield_context yield) {
219+
ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield}));
220+
221+
std::unique_ptr<rgw::Aio> aio = rgw::make_throttle(env->cct->_conf->rgw_get_obj_window_size, optional_yield{io, yield});
222+
auto completed = cacheDriver->get_async(env->dpp, optional_yield{io, yield}, aio.get(), "testName", 0, bl.length(), 0, 0);
223+
drain(env->dpp, aio.get());
224+
225+
cacheDriver->shutdown();
226+
227+
boost::system::error_code ec;
228+
request req;
229+
req.push("HMGET", "RedisCache/testName", "attr", "data");
230+
req.push("FLUSHALL");
231+
response<std::vector<std::string>, boost::redis::ignore_t> resp;
232+
233+
conn->async_exec(req, resp, yield[ec]);
234+
235+
ASSERT_EQ((bool)ec, false);
236+
EXPECT_EQ(std::get<0>(resp).value()[0], "attrVal");
237+
EXPECT_EQ(std::get<0>(resp).value()[1], "test data");
238+
conn->cancel();
239+
});
240+
241+
io.run();
242+
}
243+
151244
TEST_F(RedisDriverFixture, DelYield)
152245
{
153246
spawn::spawn(io, [this] (spawn::yield_context yield) {

0 commit comments

Comments
 (0)