Skip to content

Commit 786bf92

Browse files
committed
- Added ThreadSafeObservable implementation
1 parent 810657c commit 786bf92

File tree

2 files changed

+114
-1
lines changed

2 files changed

+114
-1
lines changed

src/ESPressio_Observable.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace ESPressio {
1414
/// An `Observable` is an object that can be observed by any number of `IObserver` descendant types
1515
/// This is a concrete implementation of `IObservable`, but it is NOT Thread-Safe.
1616
/// Registering or Unregistering Observers while Observers are being notified can lead to undefined behavior.
17-
/// If you need a Thread-Safe Implementation, install the `ESPressio-ObservableTS` package and use its implementation instead.
17+
/// If you need a Thread-Safe Implementation, use the `ThreadSafeObservable` class instead.
1818
class Observable : public IObservable {
1919
private:
2020
std::vector<IObserverHandle*> _observers;
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
#pragma once
2+
3+
#include <functional>
4+
#include <vector>
5+
#include <mutex>
6+
7+
#include "ESPressio_IObservable.hpp"
8+
#include "ESPressio_IObserver.hpp"
9+
#include "ESPressio_ObserverHandle.hpp"
10+
11+
namespace ESPressio {
12+
13+
namespace Observable {
14+
15+
/// An `Observable` is an object that can be observed by any number of `IObserver` descendant types
16+
/// This is a concrete implementation of `IObservable`, and is Thread Safe!
17+
class ThreadSafeObservable : public IObservable {
18+
private:
19+
std::vector<IObserverHandle*> _observers;
20+
std::mutex _mutex;
21+
22+
std::vector<IObserverHandle*>* CopyObservers() {
23+
_mutex.lock();
24+
std::vector<IObserverHandle*>* observers = new std::vector<IObserverHandle*>(_observers);
25+
_mutex.unlock();
26+
return observers;
27+
}
28+
29+
protected:
30+
/// Will call the `callback` for each Observer
31+
virtual void WithObservers(std::function<void(IObserver*)> callback) {
32+
// _mutex.lock();
33+
// std::vector<IObserverHandle*> observers;
34+
// observers.reserve(_observers.size());
35+
// for (auto observer : _observers) {
36+
// observers.push_back(observer);
37+
// }
38+
// _mutex.unlock();
39+
40+
std::vector<IObserverHandle*>* observers = CopyObservers();
41+
for (auto observer : *observers) {
42+
callback(observer->GetObserver());
43+
}
44+
delete observers;
45+
}
46+
47+
/// Will call the `callback` for each Observer that is of type `ObserverType`
48+
template <class ObserverType>
49+
void WithObservers(std::function<void(ObserverType*)> callback) {
50+
std::vector<IObserverHandle*>* observers = CopyObservers();
51+
52+
for (auto observer : *observers) {
53+
ObserverType* observerAsT = dynamic_cast<ObserverType*>(observer->GetObserver());
54+
if (!observerAsT) { continue; }
55+
callback(observerAsT);
56+
}
57+
58+
delete observers;
59+
}
60+
public:
61+
~ThreadSafeObservable() {
62+
_mutex.lock();
63+
for (auto observer : _observers) {
64+
static_cast<ObserverHandle*>(observer)->__invalidate();
65+
}
66+
67+
_observers.clear();
68+
_mutex.unlock();
69+
}
70+
71+
virtual IObserverHandle* RegisterObserver(IObserver* observer) {
72+
_mutex.lock();
73+
for (auto thisObserver : _observers) {
74+
if (thisObserver->GetObserver() == observer) {
75+
_mutex.unlock();
76+
return thisObserver;
77+
}
78+
}
79+
IObserverHandle* handle = new ObserverHandle(this, observer);
80+
_observers.push_back(handle);
81+
_mutex.unlock();
82+
return handle;
83+
}
84+
85+
virtual void UnregisterObserver(IObserver* observer) {
86+
_mutex.lock();
87+
for (auto thisObserver = _observers.begin(); thisObserver != _observers.end(); thisObserver++) {
88+
if ((*thisObserver)->GetObserver() == observer) {
89+
static_cast<ObserverHandle*>((*thisObserver))->__invalidate();
90+
_observers.erase(thisObserver);
91+
_mutex.unlock();
92+
return;
93+
}
94+
}
95+
_mutex.unlock();
96+
}
97+
98+
virtual bool IsObserverRegistered(IObserver* observer) {
99+
_mutex.lock();
100+
for (auto thisObserver : _observers) {
101+
if ((*thisObserver).GetObserver() == observer) {
102+
_mutex.unlock();
103+
return true;
104+
}
105+
}
106+
_mutex.unlock();
107+
return false;
108+
}
109+
};
110+
111+
}
112+
113+
}

0 commit comments

Comments
 (0)