33
44#include " include/random.h"
55#include " include/Context.h"
6+ #include " common/async/spawn_throttle.h"
67#include " common/errno.h"
8+ #include " common/error_code.h"
79
810#include " rgw_cache.h"
911#include " svc_notify.h"
@@ -33,9 +35,7 @@ class RGWWatcher : public DoutPrefixProvider , public librados::WatchCtx2 {
3335 int index;
3436 rgw_rados_ref obj;
3537 uint64_t watch_handle;
36- int register_ret{0 };
3738 bool unregister_done{false };
38- librados::AioCompletion *register_completion{nullptr };
3939 uint64_t retries = 0 ;
4040
4141 class C_ReinitWatch : public Context {
@@ -99,7 +99,7 @@ class RGWWatcher : public DoutPrefixProvider , public librados::WatchCtx2 {
9999 abort ();
100100 }
101101 if (!unregister_done) {
102- int ret = unregister_watch ();
102+ int ret = unregister_watch (null_yield );
103103 if (ret < 0 ) {
104104 ldout (cct, 0 ) << " ERROR: unregister_watch() returned ret=" << ret << dendl;
105105 if (-2 == ret) {
@@ -111,7 +111,7 @@ class RGWWatcher : public DoutPrefixProvider , public librados::WatchCtx2 {
111111 }
112112 }
113113 }
114- int ret = register_watch ();
114+ int ret = register_watch (null_yield );
115115 if (ret < 0 ) {
116116 ldout (cct, 0 ) << " ERROR: register_watch() returned ret=" << ret << dendl;
117117 ++retries;
@@ -120,8 +120,8 @@ class RGWWatcher : public DoutPrefixProvider , public librados::WatchCtx2 {
120120 }
121121 }
122122
123- int unregister_watch () {
124- int r = svc->unwatch (obj, watch_handle);
123+ int unregister_watch (optional_yield y ) {
124+ int r = svc->unwatch (this , obj, watch_handle, y );
125125 unregister_done = true ;
126126 if (r < 0 ) {
127127 return r;
@@ -130,41 +130,8 @@ class RGWWatcher : public DoutPrefixProvider , public librados::WatchCtx2 {
130130 return 0 ;
131131 }
132132
133- int register_watch_async () {
134- if (register_completion) {
135- register_completion->release ();
136- register_completion = nullptr ;
137- }
138- register_completion = librados::Rados::aio_create_completion (nullptr , nullptr );
139- register_ret = obj.aio_watch (register_completion, &watch_handle, this );
140- if (register_ret < 0 ) {
141- register_completion->release ();
142- return register_ret;
143- }
144- return 0 ;
145- }
146-
147- int register_watch_finish () {
148- if (register_ret < 0 ) {
149- return register_ret;
150- }
151- if (!register_completion) {
152- return -EINVAL;
153- }
154- register_completion->wait_for_complete ();
155- int r = register_completion->get_return_value ();
156- register_completion->release ();
157- register_completion = nullptr ;
158- if (r < 0 ) {
159- return r;
160- }
161- svc->add_watcher (index);
162- unregister_done = false ;
163- return 0 ;
164- }
165-
166- int register_watch () {
167- int r = obj.watch (&watch_handle, this );
133+ int register_watch (optional_yield y) {
134+ int r = obj.watch (this , &watch_handle, this , y);
168135 if (r < 0 ) {
169136 return r;
170137 }
@@ -211,8 +178,9 @@ int RGWSI_Notify::init_watch(const DoutPrefixProvider *dpp, optional_yield y)
211178 if (num_watchers <= 0 )
212179 num_watchers = 1 ;
213180
214- int error = 0 ;
215-
181+ const size_t max_aio = cct->_conf .get_val <int64_t >(" rgw_max_control_aio" );
182+ auto throttle = ceph::async::spawn_throttle{
183+ y, max_aio, ceph::async::cancel_on_error::all};
216184 watchers.reserve (num_watchers);
217185
218186 for (int i=0 ; i < num_watchers; i++) {
@@ -231,47 +199,54 @@ int RGWSI_Notify::init_watch(const DoutPrefixProvider *dpp, optional_yield y)
231199 ldpp_dout (dpp, 0 ) << " ERROR: notify_obj.open() returned r=" << r << dendl;
232200 return r;
233201 }
234-
235- librados::ObjectWriteOperation op;
236- op.create (false );
237-
238- r = notify_obj.operate (dpp, std::move (op), y);
239- if (r < 0 && r != -EEXIST) {
240- ldpp_dout (dpp, 0 ) << " ERROR: notify_obj.operate() returned r=" << r << dendl;
241- return r;
242- }
243-
244202 auto & watcher = watchers.emplace_back (cct, this , i, std::move (notify_obj));
245203
246- r = watcher.register_watch_async ();
247- if (r < 0 ) {
248- ldpp_dout (dpp, 0 ) << " WARNING: register_watch_aio() returned " << r << dendl;
249- error = r;
250- continue ;
251- }
252- }
253-
254- for (auto & watcher : watchers) {
255- int r = watcher.register_watch_finish ();
256- if (r < 0 ) {
257- ldpp_dout (dpp, 0 ) << " WARNING: async watch returned " << r << dendl;
258- error = r;
204+ try {
205+ throttle.spawn ([dpp, &watcher] (boost::asio::yield_context yield) {
206+ // create the object if it doesn't exist
207+ librados::ObjectWriteOperation op;
208+ op.create (false );
209+
210+ int r = watcher.get_obj ().operate (dpp, std::move (op), yield);
211+ if (r < 0 && r != -EEXIST) {
212+ ldpp_dout (dpp, 0 ) << " ERROR: notify_obj.operate() returned r=" << r << dendl;
213+ throw boost::system::system_error (ceph::to_error_code (r));
214+ }
215+
216+ r = watcher.register_watch (yield);
217+ if (r < 0 ) {
218+ throw boost::system::system_error (ceph::to_error_code (r));
219+ }
220+ });
221+ } catch (const boost::system::system_error& e) {
222+ return ceph::from_error_code (e.code ());
259223 }
260- }
224+ } // for num_watchers
261225
262- if (error < 0 ) {
263- return error;
226+ try {
227+ throttle.wait ();
228+ } catch (const boost::system::system_error& e) {
229+ return ceph::from_error_code (e.code ());
264230 }
265231
266232 return 0 ;
267233}
268234
269235void RGWSI_Notify::finalize_watch ()
270236{
237+ const size_t max_aio = cct->_conf .get_val <int64_t >(" rgw_max_control_aio" );
238+ auto throttle = ceph::async::spawn_throttle{
239+ null_yield, max_aio, ceph::async::cancel_on_error::all};
271240 for (int i = 0 ; i < num_watchers; i++) {
272- if (watchers_set.find (i) != watchers_set.end ())
273- watchers[i].unregister_watch ();
241+ if (!watchers_set.contains (i)) {
242+ continue ;
243+ }
244+ throttle.spawn ([&watcher = watchers[i]] (boost::asio::yield_context yield) {
245+ std::ignore = watcher.unregister_watch (yield);
246+ });
274247 }
248+ throttle.wait ();
249+
275250 watchers.clear ();
276251}
277252
@@ -325,9 +300,10 @@ void RGWSI_Notify::shutdown()
325300 finalized = true ;
326301}
327302
328- int RGWSI_Notify::unwatch (rgw_rados_ref& obj, uint64_t watch_handle)
303+ int RGWSI_Notify::unwatch (const DoutPrefixProvider* dpp, rgw_rados_ref& obj,
304+ uint64_t handle, optional_yield y)
329305{
330- int r = obj.unwatch (watch_handle );
306+ int r = obj.unwatch (dpp, handle, y );
331307 if (r < 0 ) {
332308 ldout (cct, 0 ) << " ERROR: rados->unwatch2() returned r=" << r << dendl;
333309 return r;
0 commit comments