Skip to content

Commit 5b10cd5

Browse files
committed
Clickhouse - introduce Worker
1 parent 2958121 commit 5b10cd5

File tree

1 file changed

+122
-0
lines changed
  • extra_plugins/output/clickhouse/src

1 file changed

+122
-0
lines changed
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/**
2+
* @file
3+
* @author Michal Sedlak <[email protected]>
4+
* @brief Worker class for running tasks in a separate thread
5+
* @date 2025
6+
*
7+
* Copyright(c) 2025 CESNET z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#pragma once
12+
13+
#include "common.h"
14+
#include <thread>
15+
16+
/**
17+
* @class Worker
18+
* @brief A base class for creating worker threads that perform specific tasks.
19+
*
20+
* The Worker class provides a framework for running tasks in a separate thread,
21+
* with support for stopping the task, checking for errors, and joining the thread.
22+
* Derived classes must implement the `run` method to define the task logic.
23+
*/
24+
class Worker : Nonmoveable, Noncopyable {
25+
public:
26+
/**
27+
* @brief Starts the worker thread and executes the task.
28+
*/
29+
void start()
30+
{
31+
m_thread = std::thread([this]() { this->do_run(); });
32+
}
33+
34+
/**
35+
* @brief Requests the worker to stop by setting a stop signal.
36+
*/
37+
void request_stop()
38+
{
39+
if (!m_stop_signal) {
40+
m_stop_requested_at = std::time(nullptr);
41+
}
42+
m_stop_signal = true;
43+
}
44+
45+
/**
46+
* @brief Checks if an error occurred during the task execution.
47+
*
48+
* @throws The exception captured during task execution, if any.
49+
*/
50+
void check_error() const
51+
{
52+
if (m_errored) {
53+
std::rethrow_exception(m_exception);
54+
}
55+
}
56+
57+
/**
58+
* @brief Joins the worker thread if it is joinable.
59+
*/
60+
void join()
61+
{
62+
if (m_thread.joinable()) {
63+
m_thread.join();
64+
}
65+
}
66+
67+
/**
68+
* @brief Destructor that ensures the worker thread is joined.
69+
*/
70+
~Worker()
71+
{
72+
join();
73+
}
74+
75+
protected:
76+
/**
77+
* @brief Pure virtual method to define the task logic.
78+
*
79+
* Derived classes must implement this method to specify the task to be executed.
80+
*/
81+
virtual void run() = 0;
82+
83+
/**
84+
* @brief Checks if a stop request has been made.
85+
*
86+
* @return True if a stop request has been made, otherwise false.
87+
*/
88+
bool stop_requested() const
89+
{
90+
return m_stop_signal;
91+
}
92+
93+
/**
94+
* Calculates the number of seconds that have passed since a stop was requested.
95+
*
96+
* @return The number of seconds elapsed since the stop request.
97+
*/
98+
unsigned int secs_since_stop_requested() const
99+
{
100+
return std::time(nullptr) - m_stop_requested_at;
101+
}
102+
103+
private:
104+
std::thread m_thread; ///< The thread in which the task is executed.
105+
std::atomic_bool m_stop_signal = false; ///< Signal to request the task to stop.
106+
std::atomic_bool m_errored = false; ///< Flag indicating if an error occurred.
107+
std::exception_ptr m_exception = nullptr; ///< Pointer to the captured exception, if any.
108+
std::time_t m_stop_requested_at = 0; ///< When was stop requested.
109+
110+
/**
111+
* @brief Internal method to execute the task and handle exceptions.
112+
*/
113+
void do_run()
114+
{
115+
try {
116+
run();
117+
} catch (...) {
118+
m_exception = std::current_exception();
119+
m_errored = true;
120+
}
121+
}
122+
};

0 commit comments

Comments
 (0)