Skip to content

SO 5.7 InDepth Custom Worker Threads

eao197 edited this page Dec 28, 2021 · 18 revisions

What Is It?

Until v.5.7.3 there wasn't a possibility to specify a custom thread to be used by SObjectizer dispatchers. But sometimes such a feature could be useful. For example, a specific thread stack size has to be set by pthread_attr_setstacksize in POSIX Thread, or some signal handlers have to be set for a new worker thread on Unix. But SObjectizer created worked thread by itself by using std::thread and didn't provide a way to tune new worker threads.

Since v.5.7.3 it is possible to instruct SObjectizer to use custom worker threads. This feature is based on two interfaces added in v.5.7.3: so_5::disp::abstract_work_thread_t and so_5::disp::abstract_work_thread_factory_t.

Since v.5.7.3 a user can define his/her own worker thread type by implementing so_5::disp::abstract_work_thread_t interface. The user also has to define own thread factory by implementing so_5::disp::abstract_work_thread_factory_t interface. An instance of such a factory has to be created and specified in the params to a SObjectizer dispatcher or to the whole SObjectizer Environment.

abstract_work_thread_t Interface

A user has to define his/her own class that inherits so_5::disp::abstract_work_thread_t type and implements its pure virtual methods.

At the moment so_5::disp::abstract_work_thread_t defines just two virtual methods that have to be implemented in a derived class:

virtual void start( body_func_t thread_body ) = 0;
virtual void join() = 0;

where body_func_t is defined that way:

using body_func_t = std::function< void() >;

Method start has to start the execution of thread_body in the context of a separate thread. There are no limitations: it can be a newly created thread or preallocated one. There is no need to guarantee that thread_body is already started its execution before the return from start, but it is required to guarantee that all required resources are allocated and thread_body has been passed to a separate execution context.

It should also be guaranteed that join can safely be called just after the return from start.

The join method is an analog of std::thread::join: once called it should return only when the separate execution context finished the execution of thread_body passed to the previous call to start.

SObjectizer guarantees that join is called for a thread only once after the successful call to start. If start throws then join isn't called.

Implementations Of abstract_work_thread_t And Exceptions

At the moment SObjectizer can't guarantee that thread_func passed to start won't throw.

There is no requirement that a custom thread should intercept such an exception. If an exception from thread_body is intercepted then the call to join should be completed successfully.

Method start can throw if it can't start execution of thread_body in a separate worker context. That exception has to be inherited from std::exception. That exception will be handled by SObjectizer. The join method won't be called if start throws.

Please note that join isn't marked as noexcept. It's because std::thread::join is also not noexcept. Thus, join can throw. But the current version of SObjectizer can't cope with such an exception in most cases. It means that if join throws then the application will probably be terminated because of an exception from a noexcept method (like destructors of SObjectizer's dispatchers and so on).

A Very Simple Example Of Custom Thread

A very simple implementation of a custom thread can look like this:

class simple_custom_thread final : public so_5::disp::abstract_work_thread_t
{
  // Actual thread.
  std::thread thread_;
public:
  simple_custom_thread() = default;

  void start(body_func_t thread_body) override
  {
    // Start a new thread and run thread_body on it.
    thread_ = std::thread{ [tb = std::move(thread_body)] () {
        ... // Do some tuning of the new thread.
        // Run thread_body without intercepting exceptions.
        // It mean a crash of the whole application if thread_body throws,
        // but we don't care in such a simple implementation.
        tb();
      }
    };
  }

  void join() override
  {
    thread_.join();
  }
};

abstract_work_thread_factory_t Interface

A user has to define his/her own class that inherits so_5::disp::abstract_work_thread_factory_t type and implements its pure virtual methods.

At the moment so_5::disp::abstract_work_thread_factory_t defines just two virtual methods that have to be implemented in a derived class:

virtual so_5::disp::abstract_work_thread_t &
acquire( so_5::environment_t & env ) = 0;

virtual void release( so_5::disp::abstract_work_thread_t & thread ) = 0;

The acquire method should return a valid reference (not a pointer!) to an instance that implements the abstract_work_thread_t interface. It can be a reference to a dynamically allocated object, or to member of some aggregate, or something else. SObjectizer doesn't care about the nature of that instance.

SObjectizer will call start method for a returned thread instance.

The acquire method has to throw an exception derived from std::exception if a new thread can't be acquired.

The release method receives a reference acquired by a previous call to acquire method. The release method has to take all necessary actions for utilizing the returned thread object. For example, if acquire allocates a new thread object on every call then release has to deallocate the object passed to release.

If acquire successes then SObjectizer guarantees that obtained thread object will be returned to the factory by calling release at the moment when SObjectizer no more needs that thread. The return occurs even if the thread object throws in its start method.

A Very Simple Implementation Of Custom Thread Factory

A very simple example of a custom thread factory can look like this:

class simple_custom_factory final : public so_5::disp::abstract_work_thread_factory_t
{
public:
  simple_custom_factory() = default;

  so_5::disp::abstract_work_thread_t & acquire(so_5::environment_t & /*env*/) override
  {
    // Just allocate a new object on every call.
    return *(new simple_custom_thread{});
  }

  void release(so_5::disp::abstract_work_thread_t & thread) override
  {
    // It's a reference to an object created in a previous call to acquire.
    // So just deallocate it.
    delete &thread;
  }
};

Clone this wiki locally