1+ /*
2+ DiscordCoreAPI, A bot library for Discord, written in C++, and featuring explicit multithreading through the usage of custom, asynchronous C++ CoRoutines.
3+
4+ Copyright 2021, 2022 Chris M. (RealTimeChris)
5+
6+ This library is free software; you can redistribute it and/or
7+ modify it under the terms of the GNU Lesser General Public
8+ License as published by the Free Software Foundation; either
9+ version 2.1 of the License, or (at your option) any later version.
10+
11+ This library is distributed in the hope that it will be useful,
12+ but WITHOUT ANY WARRANTY; without even the implied warranty of
13+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14+ Lesser General Public License for more details.
15+
16+ You should have received a copy of the GNU Lesser General Public
17+ License along with this library; if not, write to the Free Software
18+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
19+ USA
20+ */
21+ // / ThreadPool.hpp - Header for the "Thread-Pool" related stuff.
22+ // / Dec 18, 2021
23+ // / https://discordcoreapi.com
24+ // / \file ThreadPool.hpp
25+
26+ #pragma once
27+
28+ #include < discordcoreapi/FoundationEntities.hpp>
29+ #include < discordcoreapi/Https.hpp>
30+ #include < coroutine>
31+
32+ namespace DiscordCoreAPI {
33+ /* *
34+ * \addtogroup utilities
35+ * @{
36+ */
37+
38+ template <typename ... ArgTypes> using TimeElapsedHandler = std::function<void (ArgTypes...)>;
39+
40+ using TimeElapsedHandlerNoArgs = std::function<void (void )>;
41+
42+ inline static constexpr double percentage{ 10 .0f / 100 .0f };
43+
44+ class ThreadPool {
45+ public:
46+ inline ThreadPool& operator =(const ThreadPool&) = delete ;
47+
48+ inline ThreadPool (const ThreadPool&) = delete;
49+
50+ inline ThreadPool () noexcept = default;
51+
52+ inline static std::string storeThread (TimeElapsedHandlerNoArgs timeElapsedHandler, int64_t timeInterval) {
53+ std::string threadId = std::to_string (std::chrono::duration_cast<Nanoseconds>(HRClock::now ().time_since_epoch ()).count ());
54+
55+ auto thread = std::jthread ([=](std::stop_token token) {
56+ StopWatch stopWatch{ Milliseconds{ timeInterval } };
57+ while (true ) {
58+ stopWatch.resetTimer ();
59+ std::this_thread::sleep_for (Milliseconds{ static_cast <int64_t >(std::ceil (static_cast <double >(timeInterval) * percentage)) });
60+ while (!stopWatch.hasTimePassed () && !token.stop_requested ()) {
61+ std::this_thread::sleep_for (1ms);
62+ }
63+ if (token.stop_requested ()) {
64+ return ;
65+ }
66+ timeElapsedHandler ();
67+ if (token.stop_requested ()) {
68+ return ;
69+ }
70+ std::this_thread::sleep_for (1ms);
71+ }
72+ });
73+ thread.detach ();
74+ threads[threadId] = std::move (thread);
75+ return threadId;
76+ }
77+
78+ template <typename ... ArgTypes> inline static void executeFunctionAfterTimePeriod (TimeElapsedHandler<ArgTypes...> timeElapsedHandler,
79+ int64_t timeDelay, bool blockForCompletion, ArgTypes... args) {
80+ std::jthread thread = std::jthread ([=](std::stop_token token) {
81+ StopWatch stopWatch{ Milliseconds{ timeDelay } };
82+ stopWatch.resetTimer ();
83+ if (static_cast <int64_t >(std::ceil (static_cast <double >(timeDelay) * percentage)) <= timeDelay &&
84+ static_cast <int64_t >(std::ceil (static_cast <double >(timeDelay) * percentage)) > 0 ) {
85+ std::this_thread::sleep_for (Milliseconds{ static_cast <int64_t >(std::ceil (static_cast <double >(timeDelay) * percentage)) });
86+ }
87+ while (!stopWatch.hasTimePassed () && !token.stop_requested ()) {
88+ std::this_thread::sleep_for (1ms);
89+ }
90+ if (token.stop_requested ()) {
91+ return ;
92+ }
93+ timeElapsedHandler (args...);
94+ if (token.stop_requested ()) {
95+ return ;
96+ }
97+ });
98+ if (blockForCompletion) {
99+ if (thread.joinable ()) {
100+ thread.join ();
101+ }
102+ } else if (thread.joinable ()) {
103+ thread.detach ();
104+ }
105+ }
106+
107+ inline void stopThread (const std::string& key) {
108+ if (ThreadPool::threads.contains (key)) {
109+ ThreadPool::threads[key].request_stop ();
110+ ThreadPool::threads.erase (key);
111+ }
112+ }
113+
114+ inline ~ThreadPool () noexcept = default ;
115+
116+ protected:
117+ inline static std::unordered_map<std::string, std::jthread> threads{};
118+ };
119+ }
120+
121+ namespace DiscordCoreInternal {
122+
123+ struct WorkerThread {
124+ inline WorkerThread& operator =(WorkerThread&& other) noexcept {
125+ if (this != &other) {
126+ areWeCurrentlyWorking.store (other.areWeCurrentlyWorking .load ());
127+ thread.swap (other.thread );
128+ }
129+ return *this ;
130+ }
131+
132+ inline WorkerThread () noexcept = default;
133+
134+ inline ~WorkerThread () noexcept = default ;
135+
136+ std::atomic_bool areWeCurrentlyWorking{};
137+ std::jthread thread{};
138+ };
139+
140+ class CoRoutineThreadPool {
141+ public:
142+ friend class DiscordCoreAPI ::DiscordCoreClient;
143+
144+ inline CoRoutineThreadPool () : threadCount(std::thread::hardware_concurrency() / 2 > 0 ? std::thread::hardware_concurrency() / 2 : 1) {
145+ for (uint32_t x = 0 ; x < threadCount.load (); ++x) {
146+ WorkerThread workerThread{};
147+ currentIndex.store (currentIndex.load () + 1 );
148+ currentCount.store (currentCount.load () + 1 );
149+ int64_t indexNew = currentIndex.load ();
150+ workerThread.thread = std::jthread ([=, this ](std::stop_token stopToken) {
151+ threadFunction (stopToken, indexNew);
152+ });
153+ workerThreads[currentIndex.load ()] = std::move (workerThread);
154+ }
155+ }
156+
157+ inline void submitTask (std::coroutine_handle<> coro) noexcept {
158+ std::shared_lock lock01{ workerAccessMutex };
159+ bool areWeAllBusy{ true };
160+ for (const auto & [key, value]: workerThreads) {
161+ if (!value.areWeCurrentlyWorking .load ()) {
162+ areWeAllBusy = false ;
163+ break ;
164+ }
165+ }
166+ if (areWeAllBusy) {
167+ WorkerThread workerThread{};
168+ currentIndex.store (currentIndex.load () + 1 );
169+ currentCount.store (currentCount.load () + 1 );
170+ int64_t indexNew = currentIndex.load ();
171+ workerThread.thread = std::jthread ([=, this ](std::stop_token stopToken) {
172+ threadFunction (stopToken, indexNew);
173+ });
174+ lock01.unlock ();
175+ std::unique_lock lock{ workerAccessMutex };
176+ workerThreads[currentIndex.load ()] = std::move (workerThread);
177+ }
178+ std::unique_lock lock{ coroHandleAccessMutex };
179+ coroutineHandles.emplace_back (coro);
180+ lock.unlock ();
181+ coroHandleCount.store (coroHandleCount.load () + 1 );
182+ }
183+
184+ protected:
185+ std::unordered_map<int64_t , WorkerThread> workerThreads{};
186+ std::deque<std::coroutine_handle<>> coroutineHandles{};
187+ const std::atomic_int64_t threadCount{};
188+ std::atomic_int64_t coroHandleCount{};
189+ std::shared_mutex workerAccessMutex{};
190+ std::atomic_int64_t currentCount{};
191+ std::atomic_int64_t currentIndex{};
192+ std::mutex coroHandleAccessMutex{};
193+
194+ inline void threadFunction (std::stop_token stopToken, int64_t index) {
195+ while (!stopToken.stop_requested ()) {
196+ if (coroHandleCount.load () > 0 ) {
197+ std::unique_lock lock{ coroHandleAccessMutex, std::defer_lock_t {} };
198+ if (lock.try_lock () && coroutineHandles.size () > 0 ) {
199+ std::coroutine_handle<> coroHandle = coroutineHandles.front ();
200+ coroHandleCount.store (coroHandleCount.load () - 1 );
201+ coroutineHandles.pop_front ();
202+ lock.unlock ();
203+ workerThreads[index].areWeCurrentlyWorking .store (true );
204+ coroHandle ();
205+ workerThreads[index].areWeCurrentlyWorking .store (false );
206+ }
207+ }
208+ if (currentCount.load () > threadCount.load ()) {
209+ int64_t extraWorkers{ currentCount.load () - threadCount.load () };
210+ while (extraWorkers > 0 ) {
211+ --extraWorkers;
212+ std::unique_lock lock{ workerAccessMutex };
213+ const auto oldThread = workerThreads.begin ();
214+ if (oldThread->second .thread .joinable () && oldThread->second .areWeCurrentlyWorking .load ()) {
215+ oldThread->second .thread .detach ();
216+ currentCount.store (currentCount.load () - 1 );
217+ workerThreads.erase (oldThread->first );
218+ }
219+ }
220+ }
221+ std::this_thread::sleep_for (100000ns);
222+ }
223+ }
224+ };
225+ /* *@}*/
226+ }
0 commit comments