@@ -1373,6 +1373,7 @@ class Notifier : public async::service_list_base_hook {
13731373 };
13741374
13751375 asio::io_context::executor_type ex;
1376+ Objecter::LingerOp* linger_op;
13761377 // Zero for unbounded. I would not recommend this.
13771378 const uint32_t capacity;
13781379
@@ -1383,14 +1384,18 @@ class Notifier : public async::service_list_base_hook {
13831384 uint64_t next_id = 0 ;
13841385
13851386 void service_shutdown () {
1387+ if (linger_op) {
1388+ linger_op->put ();
1389+ }
13861390 std::unique_lock l (m);
13871391 handlers.clear ();
13881392 }
13891393
13901394public:
13911395
1392- Notifier (asio::io_context::executor_type ex, uint32_t capacity)
1393- : ex(ex), capacity(capacity),
1396+ Notifier (asio::io_context::executor_type ex, Objecter::LingerOp* linger_op,
1397+ uint32_t capacity)
1398+ : ex(ex), linger_op(linger_op), capacity(capacity),
13941399 svc (asio::use_service<async::service<Notifier>>(
13951400 asio::query (ex, boost::asio::execution::context))) {
13961401 // register for service_shutdown() notifications
@@ -1507,7 +1512,11 @@ void RADOS::watch_(Object o, IOContext _ioc,
15071512 linger_op, op, ioc->snapc , ceph::real_clock::now (), bl,
15081513 asio::bind_executor (
15091514 std::move (e),
1510- [c = std::move (c), cookie](bs::error_code e, cb::list) mutable {
1515+ [c = std::move (c), cookie, linger_op](bs::error_code e, cb::list) mutable {
1516+ if (e) {
1517+ linger_op->objecter ->linger_cancel (linger_op);
1518+ cookie = 0 ;
1519+ }
15111520 asio::dispatch (asio::append (std::move (c), e, cookie));
15121521 }), nullptr );
15131522}
@@ -1525,7 +1534,7 @@ void RADOS::watch_(Object o, IOContext _ioc, WatchComp c,
15251534 uint64_t cookie = linger_op->get_cookie ();
15261535 // Shared pointer to avoid a potential race condition
15271536 linger_op->user_data .emplace <std::shared_ptr<Notifier>>(
1528- std::make_shared<Notifier>(get_executor (), queue_size));
1537+ std::make_shared<Notifier>(get_executor (), linger_op, queue_size));
15291538 auto & n = ceph::any_cast<std::shared_ptr<Notifier>&>(
15301539 linger_op->user_data );
15311540 linger_op->handle = std::ref (*n);
@@ -1537,7 +1546,12 @@ void RADOS::watch_(Object o, IOContext _ioc, WatchComp c,
15371546 linger_op, op, ioc->snapc , ceph::real_clock::now (), bl,
15381547 asio::bind_executor (
15391548 std::move (e),
1540- [c = std::move (c), cookie](bs::error_code e, cb::list) mutable {
1549+ [c = std::move (c), cookie, linger_op](bs::error_code e, cb::list) mutable {
1550+ if (e) {
1551+ linger_op->user_data .reset ();
1552+ linger_op->objecter ->linger_cancel (linger_op);
1553+ cookie = 0 ;
1554+ }
15411555 asio::dispatch (asio::append (std::move (c), e, cookie));
15421556 }), nullptr );
15431557}
@@ -1610,9 +1624,7 @@ void RADOS::unwatch_(uint64_t cookie, IOContext _ioc,
16101624 [objecter = impl->objecter ,
16111625 linger_op, c = std::move (c)]
16121626 (bs::error_code ec) mutable {
1613- if (!ec) {
1614- objecter->linger_cancel (linger_op);
1615- }
1627+ objecter->linger_cancel (linger_op);
16161628 asio::dispatch (asio::append (std::move (c), ec));
16171629 }));
16181630}
0 commit comments