Skip to content

Commit f394e08

Browse files
authored
Merge pull request ceph#55199 from igomon-bloomberg/wip-publish-commit-async
rgw/s3-notifications: async commit to persistent notification queue Reviewed-by: Yuval Lifshitz <[email protected]>
2 parents 0151e6a + 39ca724 commit f394e08

File tree

1 file changed

+27
-5
lines changed

1 file changed

+27
-5
lines changed

src/rgw/driver/rados/rgw_notify.cc

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,25 @@ auto make_stack_allocator() {
8484

8585
const std::string Q_LIST_OBJECT_NAME = "queues_list_object";
8686

87+
struct PublishCommitCompleteArg {
88+
89+
PublishCommitCompleteArg(std::string _queue_name, const DoutPrefixProvider *_dpp)
90+
: queue_name{std::move(_queue_name)}, dpp{_dpp} {}
91+
92+
std::string queue_name;
93+
const DoutPrefixProvider *dpp;
94+
};
95+
96+
void publish_commit_completion(rados_completion_t completion, void *arg) {
97+
auto *comp_obj = reinterpret_cast<librados::AioCompletionImpl *>(completion);
98+
std::unique_ptr<PublishCommitCompleteArg> pcc_arg(reinterpret_cast<PublishCommitCompleteArg *>(arg));
99+
if (comp_obj->get_return_value() < 0) {
100+
ldpp_dout(pcc_arg->dpp, 1) << "ERROR: failed to commit reservation to queue: "
101+
<< pcc_arg->queue_name << ". error: " << comp_obj->get_return_value()
102+
<< dendl;
103+
}
104+
};
105+
87106
class Manager : public DoutPrefixProvider {
88107
const size_t max_queue_size;
89108
const uint32_t queues_update_period_ms;
@@ -1087,16 +1106,19 @@ int publish_commit(rgw::sal::Object* obj,
10871106
std::vector<buffer::list> bl_data_vec{std::move(bl)};
10881107
librados::ObjectWriteOperation op;
10891108
cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
1090-
const auto ret = rgw_rados_operate(
1091-
dpp, res.store->getRados()->get_notif_pool_ctx(),
1092-
queue_name, &op, res.yield);
1109+
aio_completion_ptr completion {librados::Rados::aio_create_completion()};
1110+
auto pcc_arg = make_unique<PublishCommitCompleteArg>(queue_name, dpp);
1111+
completion->set_complete_callback(pcc_arg.get(), publish_commit_completion);
1112+
auto &io_ctx = res.store->getRados()->get_notif_pool_ctx();
1113+
int ret = io_ctx.aio_operate(queue_name, completion.get(), &op);
10931114
topic.res_id = cls_2pc_reservation::NO_ID;
10941115
if (ret < 0) {
10951116
ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: "
1096-
<< queue_name << ". error: " << ret
1097-
<< dendl;
1117+
<< queue_name << ". error: " << ret << dendl;
10981118
return ret;
10991119
}
1120+
pcc_arg.release();
1121+
completion.release();
11001122
} else {
11011123
try {
11021124
// TODO add endpoint LRU cache

0 commit comments

Comments
 (0)