Skip to content

Commit 04d117c

Browse files
committed
Implement generic async task runner
1 parent f3379b7 commit 04d117c

File tree

8 files changed

+191
-0
lines changed

8 files changed

+191
-0
lines changed

Client/mods/deathmatch/logic/CClientGame.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,9 @@ CClientGame::CClientGame ( bool bLocalPlay )
301301
CLuaFunctionDefs::Initialize ( m_pLuaManager, m_pScriptDebugging, this );
302302
CLuaDefs::Initialize ( this, m_pLuaManager, m_pScriptDebugging );
303303

304+
// Start async task scheduler
305+
m_pAsyncTaskScheduler = new SharedUtil::CAsyncTaskScheduler(2);
306+
304307
// Disable the enter/exit vehicle key button (we want to handle this button ourselves)
305308
g_pMultiplayer->DisableEnterExitVehicleKey ( true );
306309

@@ -391,6 +394,9 @@ CClientGame::~CClientGame ( void )
391394
m_pTransferBox->Hide();
392395
m_pBigPacketTransferBox->Hide();
393396

397+
// Stop async task scheduler
398+
SAFE_DELETE(m_pAsyncTaskScheduler);
399+
394400
SAFE_DELETE( m_pVoiceRecorder );
395401

396402
// Singular file download manager
@@ -1305,6 +1311,9 @@ void CClientGame::DoPulses ( void )
13051311
// Send screen shot data
13061312
ProcessDelayedSendList ();
13071313

1314+
// Collect async task scheduler results
1315+
m_pAsyncTaskScheduler->CollectResults();
1316+
13081317
TIMING_CHECKPOINT( "-CClientGame::DoPulses" );
13091318
}
13101319

Client/mods/deathmatch/logic/CClientGame.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,8 @@ class CClientGame
306306
CRemoteCalls* GetRemoteCalls ( void ) { return m_pRemoteCalls; }
307307
CResourceFileDownloadManager* GetResourceFileDownloadManager ( void ) { return m_pResourceFileDownloadManager; }
308308

309+
inline SharedUtil::CAsyncTaskScheduler* GetAsyncTaskScheduler ( void ) { return m_pAsyncTaskScheduler; }
310+
309311
// Status toggles
310312
void ShowNetstat ( int iCmd );
311313
void ShowEaeg ( bool bShow );
@@ -793,6 +795,8 @@ class CClientGame
793795
std::set < SString > m_AllowKeyUpMap;
794796
uint m_uiPrecisionCallDepth;
795797
SString m_strFileCacheRoot;
798+
799+
SharedUtil::CAsyncTaskScheduler* m_pAsyncTaskScheduler;
796800
};
797801

798802
extern CClientGame* g_pClientGame;

Server/mods/deathmatch/logic/CGame.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,9 @@ CGame::~CGame ( void )
274274
// Stop networking
275275
Stop ();
276276

277+
// Stop async task scheduler
278+
SAFE_DELETE(m_pAsyncTaskScheduler);
279+
277280
// Destroy our stuff
278281
SAFE_DELETE( m_pResourceManager );
279282

@@ -483,6 +486,8 @@ void CGame::DoPulse ( void )
483486

484487
CLOCK_CALL1( m_pLatentTransferManager->DoPulse (); );
485488

489+
CLOCK_CALL1(m_pAsyncTaskScheduler->CollectResults());
490+
486491
PrintLogOutputFromNetModule();
487492
m_pScriptDebugging->UpdateLogOutput();
488493

@@ -597,6 +602,9 @@ bool CGame::Start ( int iArgumentCount, char* szArguments [] )
597602
unsigned short usServerPort = m_pMainConfig->GetServerPort ();
598603
unsigned int uiMaxPlayers = m_pMainConfig->GetMaxPlayers ();
599604

605+
// Start async task scheduler
606+
m_pAsyncTaskScheduler = new SharedUtil::CAsyncTaskScheduler(2);
607+
600608
// Create the account manager
601609
strBuffer = g_pServerInterface->GetModManager ()->GetAbsolutePath ( "internal.db" );
602610
m_pDatabaseManager = NewDatabaseManager ();

Server/mods/deathmatch/logic/CGame.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ class CGame
266266
inline CCustomWeaponManager* GetCustomWeaponManager ( void ) { return m_pCustomWeaponManager; }
267267
inline CFunctionUseLogger* GetFunctionUseLogger ( void ) { return m_pFunctionUseLogger; }
268268
inline CMasterServerAnnouncer* GetMasterServerAnnouncer ( void ) { return m_pMasterServerAnnouncer; }
269+
inline SharedUtil::CAsyncTaskScheduler* GetAsyncTaskScheduler(void ) { return m_pAsyncTaskScheduler; }
269270

270271
void JoinPlayer ( CPlayer& Player );
271272
void InitialDataStream ( CPlayer& Player );
@@ -588,6 +589,8 @@ class CGame
588589
SString m_strPrevMinClientKickRequirement;
589590
SString m_strPrevMinClientConnectRequirement;
590591
SString m_strPrevLowestConnectedPlayerVersion;
592+
593+
SharedUtil::CAsyncTaskScheduler* m_pAsyncTaskScheduler;
591594
};
592595

593596
#endif
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
#pragma once
2+
#include <queue>
3+
#include <functional>
4+
#include <memory>
5+
#include <thread>
6+
#include <mutex>
7+
8+
namespace SharedUtil
9+
{
10+
///////////////////////////////////////////////////////////////
11+
//
12+
// CAsyncTaskScheduler class
13+
//
14+
// Asynchronously executes tasks in secondary worker threads
15+
// and returns the result back to the main thread
16+
//
17+
///////////////////////////////////////////////////////////////
18+
class CAsyncTaskScheduler
19+
{
20+
struct SBaseTask
21+
{
22+
virtual void Execute() = 0;
23+
virtual void ProcessResult() = 0;
24+
};
25+
26+
template<typename ResultType>
27+
struct STask : public SBaseTask
28+
{
29+
using TaskFunction_t = std::function<ResultType()>;
30+
using ReadyFunction_t = std::function<void(const ResultType&)>;
31+
32+
TaskFunction_t m_TaskFunction;
33+
ReadyFunction_t m_ReadyFunction;
34+
ResultType m_Result;
35+
36+
STask(const TaskFunction_t& taskFunc, const ReadyFunction_t& readyFunc) : m_TaskFunction(taskFunc), m_ReadyFunction(readyFunc)
37+
{}
38+
39+
void Execute() override
40+
{
41+
m_Result = std::move(m_TaskFunction());
42+
}
43+
44+
void ProcessResult() override
45+
{
46+
m_ReadyFunction(m_Result);
47+
}
48+
};
49+
50+
public:
51+
//
52+
// Creates a new async task scheduler
53+
// with a fixed number of worker threads
54+
//
55+
CAsyncTaskScheduler(std::size_t numWorkers);
56+
57+
//
58+
// Ends all worker threads (waits for the last task to finish)
59+
//
60+
~CAsyncTaskScheduler();
61+
62+
//
63+
// Pushes a new task for execution once a worker is free
64+
// (Template Parameter) ResultType: The type of the result
65+
//
66+
// taskFunc: Time-consuming function that is executed on the secondary thread (be aware of thread safety!)
67+
// readyFunc: Function that is called once the result is ready (called on the main thread)
68+
//
69+
template<typename ResultType>
70+
void PushTask(const std::function<ResultType()>& taskFunc, const std::function<void(const ResultType&)>& readyFunc)
71+
{
72+
std::unique_ptr<SBaseTask> pTask{ new STask<ResultType>{ taskFunc, readyFunc } };
73+
74+
std::lock_guard<std::mutex>{ m_TasksMutex };
75+
m_Tasks.push(std::move(pTask));
76+
}
77+
78+
//
79+
// Collects (polls) the results of the finished tasks
80+
// and invokes its ready-functions on the main thread
81+
// THIS FUNCTION MUST BE CALLED ON THE MAIN THREAD
82+
//
83+
void CollectResults();
84+
85+
protected:
86+
void DoWork();
87+
88+
private:
89+
std::vector<std::thread> m_Workers;
90+
bool m_Running = true;
91+
92+
std::queue<std::unique_ptr<SBaseTask>> m_Tasks;
93+
std::mutex m_TasksMutex;
94+
95+
std::vector<std::unique_ptr<SBaseTask>> m_TaskResults;
96+
std::mutex m_TaskResultsMutex;
97+
};
98+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#include "SharedUtil.AsyncTaskScheduler.h"
2+
3+
namespace SharedUtil
4+
{
5+
CAsyncTaskScheduler::CAsyncTaskScheduler(std::size_t numWorkers)
6+
{
7+
for (std::size_t i = 0; i < numWorkers; ++i)
8+
{
9+
m_Workers.emplace_back(&CAsyncTaskScheduler::DoWork, this);
10+
}
11+
}
12+
13+
CAsyncTaskScheduler::~CAsyncTaskScheduler()
14+
{
15+
m_Running = false;
16+
17+
// Wait for all threads to end
18+
for (auto& thread : m_Workers)
19+
{
20+
if (thread.joinable())
21+
thread.join();
22+
}
23+
}
24+
25+
void CAsyncTaskScheduler::CollectResults()
26+
{
27+
std::lock_guard<std::mutex>{ m_TaskResultsMutex };
28+
29+
for (auto& pTask : m_TaskResults)
30+
{
31+
pTask->ProcessResult();
32+
}
33+
m_TaskResults.clear();
34+
}
35+
36+
void CAsyncTaskScheduler::DoWork()
37+
{
38+
while (m_Running)
39+
{
40+
m_TasksMutex.lock();
41+
42+
// Sleep a bit if there are no tasks
43+
if (m_Tasks.empty())
44+
{
45+
m_TasksMutex.unlock();
46+
std::this_thread::sleep_for(std::chrono::milliseconds(4));
47+
continue;
48+
}
49+
50+
// Get task and remove from front
51+
std::unique_ptr<SBaseTask> pTask = std::move(m_Tasks.front());
52+
m_Tasks.pop();
53+
54+
m_TasksMutex.unlock();
55+
56+
// Execute time-consuming task
57+
pTask->Execute();
58+
59+
// Put into result queue
60+
{
61+
std::lock_guard<std::mutex>{ m_TaskResultsMutex };
62+
63+
m_TaskResults.push_back(std::move(pTask));
64+
}
65+
}
66+
}
67+
}

Shared/sdk/SharedUtil.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ typedef float FLOAT; // 32
9191
#endif
9292
#include "SharedUtil.Profiling.h"
9393
#include "SharedUtil.Logging.h"
94+
#include "SharedUtil.AsyncTaskScheduler.h"
9495
#include "CFastList.h"
9596
#include "CDuplicateLineFilter.h"
9697

Shared/sdk/SharedUtil.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,4 @@
2626
#endif
2727
#include "SharedUtil.Profiling.hpp"
2828
#include "SharedUtil.Logging.hpp"
29+
#include "SharedUtil.AsyncTaskScheduler.hpp"

0 commit comments

Comments
 (0)