Skip to content

libboostasio fully refactored#550

Open
paolopas wants to merge 12 commits intoCopernicaMarketingSoftware:masterfrom
paolopas:boosted
Open

libboostasio fully refactored#550
paolopas wants to merge 12 commits intoCopernicaMarketingSoftware:masterfrom
paolopas:boosted

Conversation

@paolopas
Copy link

@paolopas paolopas commented Dec 2, 2025

The reasons that pushed me to review the implementation of libboostasio are the following:

  • uses deprecated boost.asio code, e.g. deadline_timer, null_buffers
  • the header for strand is now boost/asio/io_context_strand.hpp
  • virtual inheritance is being abused for LibBoostAsioHandler and LibBoostAsioHandler::Watcher, an unnecessary complication
  • Heartbeat management is lacking and not in line with that offered by libev, in fact the provided implementation solves issue LibBoostAsioHandler does not detect connection timeouts when heartbeat is negotiated #464 by copying everything from LibEvHandler
  • replace the use of boost::bind and boost::function with their standard library equivalents

Most of the work done is related to the callback code (i.e. asio completion handler):

  • all callbacks now have the same prototype and are generated more consistently, generation is now centralized in the make_handler method
  • callbacks now handle dependencies on this (Watcher instance) and the strand (io_context::strand instance) equally
  • they are no longer invoked with boost::system::errc::operation_canceled if the strand has been destroyed as this serves no purpose
  • They definitely shouldn't be called with boost::asio::error::would_block, see boost::asio::posix::stream_descriptor documentation

No changes have been made to the public interface of LibBoostAsioHandler except for the parameter (uint16_t connection_timeout = 60) added to the constructor (which has a default value anyway.)

I just added a couple of asserts to handle dubious cases in the monitor and onNegotiate methods, I think the comments in the relevant lines are pretty explanatory. Perhaps the assert in monitor can be useful to prematurely intercept many of the problems related to an unsupported concurrency scheme or other weird issues.

Need more testing, of course...

@paolopas
Copy link
Author

paolopas commented Dec 4, 2025

As far as heartbeat management is concerned, in my opinion there is still one thing missing.
On #510 @EmielBruijntjes speaking of LibEv said:

  • We also take into account that no heartbeats have to be sent if we already sent some other kind of data

I haven't been able to find where this feature is actually implemented. If it isn't implemented in the library, I can tell you that LibEvHandler::Wrapper::_next isn't updated when sending data to the broker.
What is actually done, in the Wrapper::onActive method is updating LibEvHandler::Wrapper::_expire

/**
* Method that is called when a filedescriptor becomes active
* @param fd the filedescriptor that is active
* @param events the events that are active (readable/writable)
*/
virtual void onActive(int fd, int events) override
{
// if the server is readable, we have some extra time before it expires, the expire time
// is set to 1.5 * _timeout to close the connection when the third heartbeat is about to be sent
if (_timeout != 0 && (events & EV_READ)) _expire = ev_now(_loop) + _timeout * 1.5;
// pass on to the connection
_connection->process(fd, events);
}

Maybe a line of code like

if (_timeout != 0 && (events & EV_WRITE)) _next = ev_now(_loop) + std::max(_timeout / 2, 1);

or a better code layout that avoids duplication (_timeout checking and ev_now(_loop) calls) could have the desired effect.

The same feature can obviously also be implemented in the libboostasio.

@paolopas
Copy link
Author

paolopas commented Dec 4, 2025

I should also point out that issue #479 is easily fixable, although the reason I'm mentioning it here may not seem obvious.
@devgs is absolutely right and described the scenario well, and the proposed solution should work too.

With my proposed implementation (and even the current one) there is no apparent reason why a Watcher should outlive TcpExtState::cleanup(), there aren't shared_ptrs around (in AMQP_CPP code) keeping it alive. In the absence of evidence, one might assume that the delayed destruction of the Watcher is due to pending boost callbacks (read/write/timeout, which could have a significant impact and needs to be investigated.)

Well, releasing the socket certainly prevents the race condition described in the issue, and it also eliminates any callbacks related to pending operations on the socket, see posix::basic_stream_descriptor::release documentation
but not that related to the timer!

So when you want to destroy the Watcher, you might need to release the socket and also cancel the timer, which is essentially equivalent to calling the destructor. Since both of these operations can be repeated without causing problems (subsequent releases do nothing), you can write a method that can also be called again by the destructor.

All this to finally say that clearing the _read and _write flags that occurs in the destructor (in addition to the other operations mentioned) is absolutely useless (in fact, it's the only thing I regretted leaving as it was), since when the
destructor is called, there certainly can't be any boost callback that could use that data.

@paolopas
Copy link
Author

paolopas commented Dec 5, 2025

As expected, the problem of the delayed destruction of the Watcher is not due to boost, nor to the pending callbacks (waiting to be executed), but is in fact a bug in libboostasio.

I discuss the details in issue #479, but here I'll simply say that in my tests, the implementation proposed with this PR behaves exactly like the current version (except for being faster), so the proposed version also needs the same fix.

#479 demonstrates how important it is for callbacks to be as simple as possible. My version (which eliminates the need to include a lock() on a weak_ptr) is certainly to be appreciated, in addition to the greater simplicity, which I believe is
more important for maintenance.

@paolopas
Copy link
Author

paolopas commented Dec 5, 2025

@EmielBruijntjes,
I hope you find the time one day to take a look at all this stuff. I understand no one asked me to do so, but the quality of this component is indeed quite far from the rest of the library. Of course, it can be done even better...
And if I can be of any help, I usually do so voluntarily. But feel free to ask.

@paolopas
Copy link
Author

paolopas commented Dec 7, 2025

Ok, sorry for the mess with the comments but I wanted the code to be clear. I think that's all for now, it's time to do more testing.

Even though libboostasio is now faster (and above all more robust and clear) there is still a lot of room for improvement, here are some ideas (maybe @zerodefect might be interested):

expose strand

Users of this handler may want to run some code in the same io_contexct::strand used by the handler. This requires adding a method to the public interface of LibBoostAsioHandler.

improve performance

There is really a lot of room for improvement here.

Abandon the use of managed pointers to ensure that the strand or watcher pointer does not dangle

The lifecycle of a Watcher should be guaranteed only by canceling all callbacks in the destructor (as it already is). Everything else can be done with the usual observer pattern, that is reusing AMQP::Watchable and AMQP::Monitor.

abandon the use of Watcher::make_handler

Yes, while this method I wrote to clean up the current implementation improves performance quite a bit, I'm convinced that this style of asynchronous programming is inherently inefficient. There should be no need to generate a wrapper around the method that needs to complete an asynchronous operation (e.g. every time the socket becomes readable/writable) when that method does not change.

Why not use a coroutine instead? Here's a sketch of this idea:

#include <boost/asio/coroutine.hpp>
#include <boost/asio/yield.hpp>

class Watcher:
private:

    boost::asio::posix::stream_descriptor _socket;

    ...

    boost::asio::coroutine _rd_coro;

    bool _read{false};
    bool _read_pending{false};

public:

    // initiating call (to be called by LibBoostAsioHandler::events()/LibBoostAsioHandler::monitor())
    void async_reading(bool want_read)
    {
        bool old_read = _read;
        _read = want_read;
        if (_read && ((_read != old_read) || !_read_pending)) // see #424 
        {
            _read_pending = true;
            _rd_coro = boost::asio::coroutine(); // coroutine reset
            _socket.async_wait(
                boost::asio::posix::stream_descriptor::wait_read,
                [this](const boost::system::error_code& ec) { async_reading(ec); }); // strand omitted
        }
    }

private:

    // working call (the coroutine)
    void async_reading(const boost::system::error_code& ec)
    {
        _read_pending = false;
        reenter (_rd_coro) while (!ec && _read)
        {
            // do heartbeat stuff ...

            // do connection->process ...

            if (_socket.is_open())
            {
                _read_pending = true;
                yield _socket.async_wait(
                    boost::asio::posix::stream_descriptor::wait_read,
                    [this](const boost::system::error_code& ec) { async_reading(ec); }); // strand omitted
            }
        }
    }

   ...

#include <boost/asio/unyield.hpp>

@paolopas
Copy link
Author

paolopas commented Dec 7, 2025

I know, this PR is getting too long...
But I saw #424 late, and it seems too easy to fix not to do it.
So I modified Watcher::events and the above example (the coroutine one) to fix the problem.

@paolopas
Copy link
Author

paolopas commented Dec 7, 2025

I'm still not sure that the condition that should solve #424 is 100% reliable.
Well, I wouldn't call it a thorough fix; the solution proposed by @svyatogor addresses the problem in a more structured way. However, as a workaround, it should work well.

@zerodefect
Copy link
Contributor

Thanks for this contribution. This PR covers a significant amount of ground.

Because this project is used in production, we need to be extremely cautious about introducing regressions. Reviewing a change of this magnitude in one go makes it difficult to spot potential edge cases.

Could you please split this into smaller, atomic PRs? I recommend an iterative approach: start with the code cleanup/refactoring first, and then add the functional changes in subsequent PRs. I would be happy to prioritize reviewing those smaller PRs to get them merged quickly.

@paolopas
Copy link
Author

paolopas commented Dec 7, 2025

I think now #424 is really fixed.

@zerodefect I understand well, but in the meantime here's the whole story. I myself want to do several more tests.

Meanwhile, for those who want to have a comprehensive idea, here's where we are

@paolopas
Copy link
Author

paolopas commented Dec 8, 2025

But why did you ask me to do this if you wanted to do it yourself?

Could you please split this into smaller, atomic PRs? I recommend an iterative approach: start with the code cleanup/refactoring first, and then add the functional changes in subsequent PRs. I would be happy to prioritize reviewing those smaller PRs to get them merged quickly.

@zerodefect
Copy link
Contributor

Your response came across as dismissive. You responded with:

I understand well, but in the meantime here's the whole story. I myself want to do several more tests.

You understand, but you don't indicate that you are going to do what I suggested. It's not my repo. I don't make the decisions around here.

@paolopas
Copy link
Author

paolopas commented Dec 8, 2025

then it means I have to thank @EmielBruijntjes for this... I'm still happy that in the end you understood the value of what was already given away.

@paolopas
Copy link
Author

paolopas commented Dec 9, 2025

Yes, @zerodefect, okay, we didn't understand each other. But it's the reason for all this haste that bothers me. Considering that @devg (#479) waited more than 3 years without a response (despite having understood and explained the problem better, as well as having served up the solution on a silver platter). And let's not talk about @svyatogor (#424) who has been waiting for more than 4 years for someone to pay attention to him.
I didn't write libboostasio, but that doesn't stop me from looking inside, understanding where the problems are, and maybe even finding solutions (and maybe in less than 5 days). It's not my repository of course...

But these are the facts:

Now the simple question is @EmielBruijntjes should I continue to produce PR like #553 or have you decided to do it yourself?

@zerodefect
Copy link
Contributor

Let me be clear - I understood you. I didn't like your approach. If there's an issue spotted, does Emiel revert all your changes? We're back to square one. Effectively, nothing has then been updated.

I have an interest in this project. If it goes wrong, it costs me.

As I said, there is some good stuff in there like the heartbeat(ing). Can your changes be used? Sure. Let's layer changes. I have other changes waiting in the wings. It's always been difficult to get changes merged into this repo.

@paolopas
Copy link
Author

paolopas commented Dec 9, 2025

I don't understand what the problem is with the revert. I didn't like your approach either. 1-1.
If you really have an interest in this project then you shouldn't let that code circulate, it's not a good calling card. The first time I saw the carousel of calls to return a handler I trembled...
@zerodefect, don't worry, I do all the tests I can, and in fact if you remember I told you I wanted to do more, I'm here to help, #553 should prove it to you. But you can see a lot from the code, and taking a look at the diffs is worth more than a lot of chatter, and if you don't understand much from a diff, git allows you to see the whole picture very easily. After all, that's why we're here, right? As I already told Emiel, feel free to ask, and even if he doesn't want to have anything to do with boost, it will become more and more widespread and used, that's a fact.

@zerodefect
Copy link
Contributor

zerodefect commented Dec 9, 2025

Hi Paolo, I'm not here to score points nor to seek public appreciation or approval. I was just sharing my concerns. Go well.

paolopas added a commit to paolopas/AMQP-CPP that referenced this pull request Dec 12, 2025
slow down the heartbeat emission if the client has
already sent data within the negotiated timeout,
a similar improvement was suggested for LivEvHandler in CopernicaMarketingSoftware#550
@paolopas
Copy link
Author

paolopas commented Dec 14, 2025

With the #558, I've finished factorizing this work, at least up to this point. The code is basically the same (except for one condition I improved), but it looks significantly better. I'm also adding the current version here for anyone who wants to try it.

In the meantime, I've done all the testing I could, and this has led me to the following conclusions, some of which I've partially expressed in a note I left at the bottom of the header, but I have to update that note to talk about the elephant in the china shop. But let's save that for last.

  1. A slightly more sophisticated test than the current one should be provided, which could also be used for testing purposes. I've written several, but something similar to libev's could already be considered a major improvement. I reserve the right to dedicate a PR to this purpose in the future.

  2. The handler lacks a mechanism that allows the user to force the removal of Watchers. There will definitely be a need for a new method in the handler.

I have often encountered processes that would not terminate because zombie watchers remained in the reactor queue but were attached to an unresponsive filedescriptor. And judging by the issues from several users, it is a frequent case, which pushes many (wrongly) towards a multi-threaded solution (the elephant.)

This can happen under different conditions and ensuring that the reactor (boost io_context so to speak) is flushed along each path is non-trivial.

For example, in one of the tests I use a publisher that does not implement the onBlocked and while it is running I set the rabbit's rabbitmqctl set_vm_memory_high_watermark absolute to 0. The server blocks it but the client does not understand what is happening and when I try to manually stop it I have to do a kill. The problem is that even if I had implemented onBlocked, I would not have been able to empty the reactor's queue without specific handler support, or at least not in a clean way. At the moment, it's hard to know what other use cases this might actually be needed for, so I propose adding it as a protected method. Perhaps I can also modify the example to demonstrate its use in the most common case.

I have some doubts about what the interface of the single method to be added could be, considering that the user of the TcpHandler thinks in terms of TcpConnection but the handler thinks in terms of filedescriptor, it might sound like this

protected:
    /**
     *  Stop handling connection(s).
     *
     *  Note that you cannot continue using the connection(s) after calling this method.
     *  @param  pconn  The TcpConnection* to release (all by default).
     *  @return bool   If there was a release.
     */
    bool release(const TcpConnection *pconn = nullptr)
    {
        if (pconn == nullptr)
        {
            // close all watchers
            for (auto &wp : _watchers) wp.second->close();
            auto b = _watchers.begin();
            return b != _watchers.erase(b, _watchers.end());
        }
        else
        {
            int fd = pconn->fileno();
            // is the connection already closed?
            if (fd != -1) {
                // close matching watcher
                auto iter = _watchers.find(fd);
                if (iter != _watchers.end())
                {
                    iter->second->close();
                    _watchers.erase(iter);
                    return true;
                }
            }
            return false;
        }
    }

Which obviously should also be called during the destruction phase, always with the aim of preventing unwanted deadlocks.
I did some tests and it works, except that I have to remove the assert that I had put in the monitor method, in case of forced release of a connection it is only the handler that forgets about it but the library does not and sooner or later tries to unregister it. Actually that assert was the only thing that worried me a little.
I look forward to your feedback on this @EmielBruijntjes.

We finally got to the elephant.

about libboostasio and thread safety

AMQP-CPP is not thread safe, we cannot repeat this too many times.

But I think we should definitely give credit to Gavin, the original author of this handler, for the intuition that with the help of the boost reactor it could get pretty close.

The reactor ensures that queued callbacks are executed exclusively by threads that are running the context (io_context::run()) and the io_context::strand ensures that each callback is executed sequentially and every other thread in the pool sees its effects.

So boost magically makes AMQP-CPP thread safe?

if all the threads involved are running the context then all the calls to AMQP-CPP originate from callbacks (at least as far as this handler is concerned) and therefore there are no problems
BUT
if a thread interacts with the library without going through the reactor (for example it directly invoke publish) then expect an indeterminate behavior
.

This is why among the proposed improvements for the future I said that the handler should expose the strand.

@paolopas
Copy link
Author

paolopas commented Dec 20, 2025

Why we need to eliminate managed ptrs

In #562 (ddc1662) I removed all the managed ptrs that pointed to the strand from AMQP::LibBoostAsioHandler. The first was created during construction of the handler, which in the _strand member held a shared_ptr for the dynamically allocated object.
The first time I saw it I wondered why it was written that way, and I still haven't found an answer. It certainly hinders our plans to expose the strand (which was created for the handler's exclusive use) to users who need it. Certainly, passing a shared_ptr in user-space (pardon the somewhat specious expression) is certainly not the best possible interface; perhaps a simple reference would be better.
But let's move on. The manager doesn't use the strand directly, its Watcher children need it to send callbacks.
Note that all callbacks are executed with dispatch and not deferred with post. This is to ensure that the queueing order in the reactor is preserved. And practically all the code in libboostasio.h (except the handler constructor) is executed through a callback in the event loop. Since the strand ensures that callbacks cannot overlap, even if they may execute in different threads, the resulting effect is to emulate a single-threaded process.
So, for example, there's no danger that while this virtual thread is executing the make_callback (which is where all the handler drawing errors have accumulated up to now), it could be interrupted by the same virtual thread, which could start destroying not only the watcher (which can happen during normal operation) but even the parent!
Now I understand that this reasoning is difficult to follow, and I'm not good at explaining myself. But let's try to look at it differently.
Let's think about the order of construction/destruction of objects. Well, when the handler is constructed, the strand attribute comes first, followed by the map with the children (watchers). Yes, I know, they can keep themselves alive because they maintain a shared_ptr at this, but what matters is that the children destruction will certainly be attempted first, and THEN the strand.
Now, since we can write the handler however we want, no one is stopping us from forcing the destruction of a children whenever we want. And it's simple, because the destructor simply fires all the events enough to make the watcher exit any callbacks that could allow it to survive. The fix for #479 (see 5d03768) for example did just that.
So rather than filling the code with managed ptrs (with significant performance costs and some risk of unwanted effects) out of fear that a pointer might dangle, we should understand that we're in a scenario where this eventuality can easily be avoided in a less expensive way. I have always tried to make the various handlers that execute callbacks as simple (and responsive) as possible, ensuring that they check as soon as possible whether it's time to abandon execution, perhaps because the socket was closed while they were waiting to be scheduled (or running connection->process()) ...
In short, what ensures that in the make_handler method we do not end up with a dying parent is precisely the fact that (by design) we children must die first! So I happily threw away all the shared ptrs that the children wanted to keep on the parent's strand and replaced them with a pointer to the parent, a simple good old pointer.
And if it's not clear yet, I'll say it another way.
Imagine we're executing the watcher's destructor, and there are still some callbacks (from the same watcher) in the reactor's queue; now, if the destructor requests their deletion, since the reactor calls them immediately with an error (abort), if they are written carefully, they immediately return control to the destructor, which can continue. Problems arise when the watcher's destructor isn't called, then the watcher can survive longer than it should, and this can only happen thanks to the damned shared ptrs we've put on guard for this to sleep soundly.
So instead of adding more, it would be much safer to remove all of them. And if you want to be sure there are no queued callbacks (which, even if they don't keep the watcher alive, can still have other unwanted effects), just be sure to delete them.
This is why I resolve the issue by saying that the guarantee that the parent cannot die before the children must be written into the children's destructor and in the care that must be taken to complete the work left by the children when it is certain that it is no longer needed.
For those wondering what happens to the callbacks that are running when the watcher is destroyed, here is a simple program that demonstrates that there is nothing to fear proof23.zip.

I just started throwing away shared ptr...

@paolopas
Copy link
Author

paolopas commented Dec 23, 2025

I'm done with #566, I promise. Also because I fixed everything that was wrong except for one thing that, however, there's nothing that can be done.
In the meantime, I also did some experiments to explore the feasibility of the coroutine-based implementation and I realized that it is not practicable (and not even convenient as long as we are forced by boost to have to wrap every callback.) The problem with coroutines is that while a callback can be easily canceled, forcing the coroutine to exit is virtually impossible.

The handler and its children now finally have the minimal architecture that allows them to function efficiently. Globally we have gone from this

classDiagram
LibBoostAsioHandler : SharedPtr~Strand~ _strand
LibBoostAsioHandler : Map~int SharedPtr~Watcher~~ _watchers
LibBoostAsioHandler *-- Watcher : _watchers
class enable_shared_from_this~Watcher~
Watcher --|> enable_shared_from_this~Watcher~
Watcher : WeakPtr~Strand~ _wpstrand
Watcher o-- LibBoostAsioHandler : _wpstrand
Loading

to this

classDiagram
LibBoostAsioHandler : Strand _strand
LibBoostAsioHandler : Map~int UniquePtr~Watcher~~ _watchers
LibBoostAsioHandler *-- Watcher : _watchers
Watcher : Strand & _strand
Watcher --> LibBoostAsioHandler : _strand
Loading

It may seem like a small change but it has important effects on how it works.
I thank everyone for their patience and I attach the latest version

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants