Skip to content

Commit 74719ce

Browse files
committed
base: Move some direct crosstalk to callback system
1 parent 9123879 commit 74719ce

File tree

6 files changed

+73
-76
lines changed

6 files changed

+73
-76
lines changed

src/eventhandler/EventHandler.cpp

Lines changed: 28 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -73,49 +73,37 @@ EventHandler::~EventHandler()
7373
blog_debug("[EventHandler::~EventHandler] Finished.");
7474
}
7575

76-
void EventHandler::SetBroadcastCallback(EventHandler::BroadcastCallback cb)
76+
// Function to increment or decrement refcounts for high volume event subscriptions
77+
void EventHandler::ProcessSubscriptionChange(bool type, uint64_t eventSubscriptions)
7778
{
78-
_broadcastCallback = cb;
79-
}
80-
81-
void EventHandler::SetObsReadyCallback(EventHandler::ObsReadyCallback cb)
82-
{
83-
_obsReadyCallback = cb;
84-
}
85-
86-
// Function to increment refcounts for high volume event subscriptions
87-
void EventHandler::ProcessSubscription(uint64_t eventSubscriptions)
88-
{
89-
if ((eventSubscriptions & EventSubscription::InputVolumeMeters) != 0) {
90-
if (_inputVolumeMetersRef.fetch_add(1) == 0) {
91-
if (_inputVolumeMetersHandler)
92-
blog(LOG_WARNING, "[EventHandler::ProcessSubscription] Input volume meter handler already exists!");
93-
else
94-
_inputVolumeMetersHandler = std::make_unique<Utils::Obs::VolumeMeter::Handler>(
95-
std::bind(&EventHandler::HandleInputVolumeMeters, this, std::placeholders::_1));
79+
if (type) {
80+
if ((eventSubscriptions & EventSubscription::InputVolumeMeters) != 0) {
81+
if (_inputVolumeMetersRef.fetch_add(1) == 0) {
82+
if (_inputVolumeMetersHandler)
83+
blog(LOG_WARNING, "[EventHandler::ProcessSubscription] Input volume meter handler already exists!");
84+
else
85+
_inputVolumeMetersHandler = std::make_unique<Utils::Obs::VolumeMeter::Handler>(
86+
std::bind(&EventHandler::HandleInputVolumeMeters, this, std::placeholders::_1));
87+
}
9688
}
89+
if ((eventSubscriptions & EventSubscription::InputActiveStateChanged) != 0)
90+
_inputActiveStateChangedRef++;
91+
if ((eventSubscriptions & EventSubscription::InputShowStateChanged) != 0)
92+
_inputShowStateChangedRef++;
93+
if ((eventSubscriptions & EventSubscription::SceneItemTransformChanged) != 0)
94+
_sceneItemTransformChangedRef++;
95+
} else {
96+
if ((eventSubscriptions & EventSubscription::InputVolumeMeters) != 0) {
97+
if (_inputVolumeMetersRef.fetch_sub(1) == 1)
98+
_inputVolumeMetersHandler.reset();
99+
}
100+
if ((eventSubscriptions & EventSubscription::InputActiveStateChanged) != 0)
101+
_inputActiveStateChangedRef--;
102+
if ((eventSubscriptions & EventSubscription::InputShowStateChanged) != 0)
103+
_inputShowStateChangedRef--;
104+
if ((eventSubscriptions & EventSubscription::SceneItemTransformChanged) != 0)
105+
_sceneItemTransformChangedRef--;
97106
}
98-
if ((eventSubscriptions & EventSubscription::InputActiveStateChanged) != 0)
99-
_inputActiveStateChangedRef++;
100-
if ((eventSubscriptions & EventSubscription::InputShowStateChanged) != 0)
101-
_inputShowStateChangedRef++;
102-
if ((eventSubscriptions & EventSubscription::SceneItemTransformChanged) != 0)
103-
_sceneItemTransformChangedRef++;
104-
}
105-
106-
// Function to decrement refcounts for high volume event subscriptions
107-
void EventHandler::ProcessUnsubscription(uint64_t eventSubscriptions)
108-
{
109-
if ((eventSubscriptions & EventSubscription::InputVolumeMeters) != 0) {
110-
if (_inputVolumeMetersRef.fetch_sub(1) == 1)
111-
_inputVolumeMetersHandler.reset();
112-
}
113-
if ((eventSubscriptions & EventSubscription::InputActiveStateChanged) != 0)
114-
_inputActiveStateChangedRef--;
115-
if ((eventSubscriptions & EventSubscription::InputShowStateChanged) != 0)
116-
_inputShowStateChangedRef--;
117-
if ((eventSubscriptions & EventSubscription::SceneItemTransformChanged) != 0)
118-
_sceneItemTransformChangedRef--;
119107
}
120108

121109
// Function required in order to use default arguments

src/eventhandler/EventHandler.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,18 @@ class EventHandler {
3636

3737
typedef std::function<void(uint64_t, std::string, json, uint8_t)>
3838
BroadcastCallback; // uint64_t requiredIntent, std::string eventType, json eventData, uint8_t rpcVersion
39-
void SetBroadcastCallback(BroadcastCallback cb);
39+
inline void SetBroadcastCallback(BroadcastCallback cb)
40+
{
41+
_broadcastCallback = cb;
42+
}
43+
4044
typedef std::function<void(bool)> ObsReadyCallback; // bool ready
41-
void SetObsReadyCallback(ObsReadyCallback cb);
45+
inline void SetObsReadyCallback(ObsReadyCallback cb)
46+
{
47+
_obsReadyCallback = cb;
48+
}
4249

43-
void ProcessSubscription(uint64_t eventSubscriptions);
44-
void ProcessUnsubscription(uint64_t eventSubscriptions);
50+
void ProcessSubscriptionChange(bool type, uint64_t eventSubscriptions);
4551

4652
private:
4753
BroadcastCallback _broadcastCallback;

src/obs-websocket.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ bool obs_module_load(void)
8181
// Initialize the WebSocket server
8282
_webSocketServer = std::make_shared<WebSocketServer>();
8383

84+
// Attach event handlers between WebSocket server and event handler
85+
_eventHandler->SetBroadcastCallback(std::bind(&WebSocketServer::BroadcastEvent, _webSocketServer.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
86+
_eventHandler->SetObsReadyCallback(std::bind(&WebSocketServer::SetObsReady, _webSocketServer.get(), std::placeholders::_1));
87+
_webSocketServer->SetClientSubscriptionCallback(std::bind(&EventHandler::ProcessSubscriptionChange, _eventHandler.get(), std::placeholders::_1, std::placeholders::_2));
88+
8489
// Initialize the settings dialog
8590
obs_frontend_push_ui_translation(obs_module_get_string);
8691
QMainWindow *mainWindow = static_cast<QMainWindow *>(obs_frontend_get_main_window());
@@ -123,6 +128,11 @@ void obs_module_unload(void)
123128
_webSocketServer->Stop();
124129
}
125130

131+
// Disconnect event handler from WebSocket server
132+
_eventHandler->SetObsReadyCallback(nullptr);
133+
_eventHandler->SetBroadcastCallback(nullptr);
134+
_webSocketServer->SetClientSubscriptionCallback(nullptr);
135+
126136
// Release the WebSocket server
127137
_webSocketServer = nullptr;
128138

src/websocketserver/WebSocketServer.cpp

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ with this program. If not, see <https://www.gnu.org/licenses/>
2424
#include <obs-frontend-api.h>
2525

2626
#include "WebSocketServer.h"
27-
#include "../eventhandler/EventHandler.h"
2827
#include "../obs-websocket.h"
2928
#include "../Config.h"
3029
#include "../utils/Crypto.h"
@@ -47,23 +46,10 @@ WebSocketServer::WebSocketServer() : QObject(nullptr)
4746
_server.set_close_handler(websocketpp::lib::bind(&WebSocketServer::onClose, this, websocketpp::lib::placeholders::_1));
4847
_server.set_message_handler(websocketpp::lib::bind(&WebSocketServer::onMessage, this, websocketpp::lib::placeholders::_1,
4948
websocketpp::lib::placeholders::_2));
50-
51-
auto eventHandler = GetEventHandler();
52-
if (eventHandler) {
53-
eventHandler->SetBroadcastCallback(std::bind(&WebSocketServer::BroadcastEvent, this, std::placeholders::_1,
54-
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
55-
eventHandler->SetObsReadyCallback(std::bind(&WebSocketServer::onObsReady, this, std::placeholders::_1));
56-
}
5749
}
5850

5951
WebSocketServer::~WebSocketServer()
6052
{
61-
auto eventHandler = GetEventHandler();
62-
if (eventHandler) {
63-
eventHandler->SetObsReadyCallback(nullptr);
64-
eventHandler->SetBroadcastCallback(nullptr);
65-
}
66-
6753
if (_server.is_listening())
6854
Stop();
6955
}
@@ -215,7 +201,7 @@ std::vector<WebSocketServer::WebSocketSessionState> WebSocketServer::GetWebSocke
215201
return webSocketSessions;
216202
}
217203

218-
void WebSocketServer::onObsReady(bool ready)
204+
void WebSocketServer::SetObsReady(bool ready)
219205
{
220206
_obsReady = ready;
221207
}
@@ -327,11 +313,9 @@ void WebSocketServer::onClose(websocketpp::connection_hdl hdl)
327313
_sessions.erase(hdl);
328314
lock.unlock();
329315

330-
// If client was identified, decrement appropriate refs in eventhandler.
331-
if (isIdentified) {
332-
auto eventHandler = GetEventHandler();
333-
eventHandler->ProcessUnsubscription(eventSubscriptions);
334-
}
316+
// If client was identified, announce unsubscription
317+
if (isIdentified && _clientSubscriptionCallback)
318+
_clientSubscriptionCallback(false, eventSubscriptions);
335319

336320
// Build SessionState object for signal
337321
WebSocketSessionState state;

src/websocketserver/WebSocketServer.h

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,20 @@ class WebSocketServer : QObject {
5757
void InvalidateSession(websocketpp::connection_hdl hdl);
5858
void BroadcastEvent(uint64_t requiredIntent, const std::string &eventType, const json &eventData = nullptr,
5959
uint8_t rpcVersion = 0);
60+
void SetObsReady(bool ready);
6061

61-
bool IsListening() { return _server.is_listening(); }
62+
// Callback for when a client subscribes or unsubscribes. `true` for sub, `false` for unsub
63+
typedef std::function<void(bool, uint64_t)> ClientSubscriptionCallback; // bool type, uint64_t eventSubscriptions
64+
inline void SetClientSubscriptionCallback(ClientSubscriptionCallback cb)
65+
{
66+
_clientSubscriptionCallback = cb;
67+
}
68+
69+
inline bool IsListening() { return _server.is_listening(); }
6270

6371
std::vector<WebSocketSessionState> GetWebSocketSessions();
6472

65-
QThreadPool *GetThreadPool() { return &_threadPool; }
73+
inline QThreadPool *GetThreadPool() { return &_threadPool; }
6674

6775
signals:
6876
void ClientConnected(WebSocketSessionState state);
@@ -77,7 +85,6 @@ class WebSocketServer : QObject {
7785

7886
void ServerRunner();
7987

80-
void onObsReady(bool loaded);
8188
bool onValidate(websocketpp::connection_hdl hdl);
8289
void onOpen(websocketpp::connection_hdl hdl);
8390
void onClose(websocketpp::connection_hdl hdl);
@@ -98,4 +105,6 @@ class WebSocketServer : QObject {
98105
std::map<websocketpp::connection_hdl, SessionPtr, std::owner_less<websocketpp::connection_hdl>> _sessions;
99106

100107
std::atomic<bool> _obsReady = false;
108+
109+
ClientSubscriptionCallback _clientSubscriptionCallback;
101110
};

src/websocketserver/WebSocketServer_Protocol.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ with this program. If not, see <https://www.gnu.org/licenses/>
2323
#include "WebSocketServer.h"
2424
#include "../requesthandler/RequestHandler.h"
2525
#include "../requesthandler/RequestBatchHandler.h"
26-
#include "../eventhandler/EventHandler.h"
2726
#include "../obs-websocket.h"
2827
#include "../Config.h"
2928
#include "../utils/Crypto.h"
@@ -149,9 +148,9 @@ void WebSocketServer::ProcessMessage(SessionPtr session, WebSocketServer::Proces
149148
if (ret.closeCode != WebSocketCloseCode::DontClose)
150149
return;
151150

152-
// Increment refs for event subscriptions
153-
auto eventHandler = GetEventHandler();
154-
eventHandler->ProcessSubscription(session->EventSubscriptions());
151+
// Announce subscribe
152+
if (_clientSubscriptionCallback)
153+
_clientSubscriptionCallback(true, session->EventSubscriptions());
155154

156155
// Mark session as identified
157156
session->SetIsIdentified(true);
@@ -172,16 +171,17 @@ void WebSocketServer::ProcessMessage(SessionPtr session, WebSocketServer::Proces
172171
case WebSocketOpCode::Reidentify: { // Reidentify
173172
std::unique_lock<std::mutex> sessionLock(session->OperationMutex);
174173

175-
// Decrement refs for current subscriptions
176-
auto eventHandler = GetEventHandler();
177-
eventHandler->ProcessUnsubscription(session->EventSubscriptions());
174+
// Announce unsubscribe
175+
if (_clientSubscriptionCallback)
176+
_clientSubscriptionCallback(false, session->EventSubscriptions());
178177

179178
SetSessionParameters(session, ret, payloadData);
180179
if (ret.closeCode != WebSocketCloseCode::DontClose)
181180
return;
182181

183-
// Increment refs for new subscriptions
184-
eventHandler->ProcessSubscription(session->EventSubscriptions());
182+
// Announce subscribe
183+
if (_clientSubscriptionCallback)
184+
_clientSubscriptionCallback(true, session->EventSubscriptions());
185185

186186
ret.result["op"] = WebSocketOpCode::Identified;
187187
ret.result["d"]["negotiatedRpcVersion"] = session->RpcVersion();

0 commit comments

Comments
 (0)