Skip to content

Commit aa3bd25

Browse files
rgw/d4n: changes to integrate pipelining with connection pool.
Signed-off-by: Pritha Srivastava <[email protected]>
1 parent 85c04af commit aa3bd25

File tree

4 files changed

+14
-27
lines changed

4 files changed

+14
-27
lines changed

src/rgw/driver/d4n/d4n_directory.cc

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -616,13 +616,7 @@ int ObjectDirectory::zadd(const DoutPrefixProvider* dpp, CacheObj* object, doubl
616616
req.push("ZADD", key, "CH", std::to_string(score), member);
617617

618618
response<std::string> resp;
619-
if(!redis_pool)[[unlikely]]
620-
{
621-
redis_exec(conn, ec, req, resp, y);
622-
ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl;
623-
}
624-
else[[likely]]
625-
redis_exec_cp(redis_pool, ec, req, resp, y);
619+
redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
626620

627621
if (ec) {
628622
ldpp_dout(dpp, 0) << "ObjectDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
@@ -926,13 +920,7 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, CacheBlock* block, option
926920
request req;
927921
req.push_range("HSET", key, redisValues);
928922

929-
if(!redis_pool)[[unlikely]]
930-
{
931-
redis_exec(conn, ec, req, resp, y);
932-
ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl;
933-
}
934-
else[[likely]]
935-
redis_exec_cp(redis_pool, ec, req, resp, y);
923+
redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
936924
if (ec) {
937925
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
938926
return -ec.value();
@@ -965,7 +953,7 @@ int BlockDirectory::set(const DoutPrefixProvider* dpp, std::vector<CacheBlock>&
965953
try {
966954
boost::system::error_code ec;
967955
boost::redis::generic_response resp;
968-
redis_exec(conn, ec, req, resp, y);
956+
redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
969957
if (ec) {
970958
ldpp_dout(dpp, 0) << "BlockDirectory::" << __func__ << "() ERROR: " << ec.what() << dendl;
971959
return -ec.value();
@@ -1346,13 +1334,7 @@ int BlockDirectory::del(const DoutPrefixProvider* dpp, CacheBlock* block, option
13461334
request req;
13471335
req.push("DEL", key);
13481336
response<int> resp;
1349-
if(!redis_pool)[[unlikely]]
1350-
{
1351-
redis_exec(conn, ec, req, resp, y);
1352-
ldpp_dout(dpp, 0) << "BucketDirectory::" << __func__ << "() Using connection: " << conn.get() << dendl;
1353-
}
1354-
else[[likely]]
1355-
redis_exec_cp(redis_pool, ec, req, resp, y);
1337+
redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
13561338
if (!std::get<0>(resp).value()) {
13571339
ldpp_dout(dpp, 10) << "BlockDirectory::" << __func__ << "(): No values deleted for key=" << key << dendl;
13581340
return -ENOENT;
@@ -1626,7 +1608,7 @@ int Pipeline::execute(const DoutPrefixProvider* dpp, optional_yield y)
16261608
try {
16271609
boost::system::error_code ec;
16281610
pipeline_mode = false;
1629-
redis_exec(conn, ec, req, resp, y);
1611+
redis_exec_connection_pool(dpp, redis_pool, conn, ec, req, resp, y);
16301612

16311613
if (ec) {
16321614
ldpp_dout(dpp, 0) << "Directory::" << __func__ << "() ERROR: " << ec.what() << dendl;

src/rgw/driver/d4n/d4n_directory.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ class Directory {
155155

156156
class Pipeline {
157157
public:
158-
Pipeline(std::shared_ptr<connection>& conn) : conn(conn) {}
158+
Pipeline(std::shared_ptr<connection>& conn, std::shared_ptr<RedisPool> redis_pool) : conn(conn), redis_pool(redis_pool) {}
159159
void start() { pipeline_mode = true; }
160160
//executes all commands and sets pipeline mode to false
161161
int execute(const DoutPrefixProvider* dpp, optional_yield y);
@@ -164,6 +164,7 @@ class Pipeline {
164164

165165
private:
166166
std::shared_ptr<connection> conn;
167+
std::shared_ptr<RedisPool> redis_pool{nullptr};
167168
request req;
168169
bool pipeline_mode{false};
169170
};

src/rgw/driver/d4n/rgw_sal_d4n.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,7 +1201,8 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::
12011201
//dirty objects
12021202
if (dirty) {
12031203
auto redis_conn = this->driver->get_conn();
1204-
rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn);
1204+
auto redis_pool = this->driver->get_redis_pool();
1205+
rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn, redis_pool);
12051206
p.start();
12061207
auto ret = blockDir->set(dpp, &block, y, &p);
12071208
if (ret < 0) {
@@ -1249,7 +1250,8 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::
12491250
if (ret == -ENOENT) {
12501251
if (!(this->get_bucket()->versioned())) {
12511252
auto redis_conn = this->driver->get_conn();
1252-
rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn);
1253+
auto redis_pool = this->driver->get_redis_pool();
1254+
rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn, redis_pool);
12531255
p.start();
12541256
//we can explore pipelining to send the two 'HSET' commands together
12551257
ret = blockDir->set(dpp, &block, y, &p);
@@ -1282,7 +1284,8 @@ int D4NFilterObject::set_head_obj_dir_entry(const DoutPrefixProvider* dpp, std::
12821284
and versioned and non-versioned buckets dirty objects */
12831285
if (!(this->get_bucket()->versioned())) {
12841286
auto redis_conn = this->driver->get_conn();
1285-
rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn);
1287+
auto redis_pool = this->driver->get_redis_pool();
1288+
rgw::d4n::Pipeline p = rgw::d4n::Pipeline(redis_conn, redis_pool);
12861289
p.start();
12871290
ret = blockDir->set(dpp, &block, y, &p);
12881291
if (ret < 0) {

src/rgw/driver/d4n/rgw_sal_d4n.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ class D4NFilterDriver : public FilterDriver {
9090
rgw::d4n::PolicyDriver* get_policy_driver() { return policyDriver.get(); }
9191
void save_y(optional_yield y) { this->y = y; }
9292
std::shared_ptr<connection> get_conn() { return conn; }
93+
std::shared_ptr<rgw::d4n::RedisPool> get_redis_pool() { return redis_pool; }
9394
void shutdown() override;
9495
};
9596

0 commit comments

Comments
 (0)