Skip to content

Commit 7088561

Browse files
authored
Add mutexes for threaded events, remove arcswap for normal events, move eventcenter to source (#1817)
* changing events again yay * nice one
1 parent 73d61b9 commit 7088561

File tree

2 files changed

+320
-21
lines changed

2 files changed

+320
-21
lines changed

loader/include/Geode/loader/Event.hpp

Lines changed: 100 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ namespace geode::comm {
7777
}
7878
};
7979

80+
class EventCenterThreadLocal;
81+
class EventCenterGlobal;
82+
8083
// Okay so even though the Event system is fully header only,
8184
// we can still version it. One caveat/hackiness is that
8285
// Ports should be backwards ABI compatible, meaning no member
@@ -94,6 +97,7 @@ namespace geode::comm {
9497
size_t m_sending = 0;
9598
public:
9699
using CallableType = Callable;
100+
using EventCenterType = EventCenterThreadLocal;
97101

98102
void migrateFromV1(Port&& other) noexcept {
99103
m_receivers = std::move(other.m_receivers);
@@ -185,6 +189,7 @@ namespace geode::comm {
185189
asp::PtrSwap<VectorType> m_receivers;
186190
public:
187191
using CallableType = Callable;
192+
using EventCenterType = EventCenterGlobal;
188193

189194
Port() : m_receivers(asp::make_shared<VectorType>()) {}
190195

@@ -382,6 +387,8 @@ namespace geode::comm {
382387
PortTemplate<geode::CopyableFunction<bool(PArgs...)>> m_port;
383388

384389
public:
390+
using EventCenterType = typename PortTemplate<geode::CopyableFunction<bool(PArgs...)>>::EventCenterType;
391+
385392
OpaqueEventPort() {}
386393

387394
~OpaqueEventPort() noexcept override {}
@@ -477,6 +484,8 @@ namespace geode::comm {
477484
: m_filter(std::move(filter)), m_handle(handle), m_remover(remover), m_active(active) {}
478485

479486
friend class EventCenter;
487+
friend class EventCenterThreadLocal;
488+
friend class EventCenterGlobal;
480489

481490
public:
482491
ListenerHandle() noexcept {}
@@ -544,19 +553,22 @@ namespace geode::comm {
544553
using ValueType = std::shared_ptr<OpaquePortBase>;
545554
using MapType = std::unordered_map<KeyType, ValueType, BaseFilterHash, BaseFilterEqual>;
546555
using IteratorType = typename MapType::iterator;
556+
using OpaqueEventType = OpaqueEventPort<PortTemplate, PArgs...>;
557+
using OpaqueEventV2Type = OpaqueEventPortV2<PortTemplate, PArgs...>;
558+
using LatestOpaqueEventType = OpaqueEventV2Type;
559+
using EventCenterType = LatestOpaqueEventType::EventCenterType;
547560

548561
// Here we migrate the port version if needed. This is what I meant by versioning,
549562
// we need to check for previous versions and move them into the current version.
550563
// Go to getPort definition.
551-
static bool migratePort(IteratorType iter) {
552-
auto port = iter->second.get();
553-
if (!geode::cast::typeinfo_cast<OpaqueEventPortV2<PortTemplate, PArgs...>*>(port)) {
554-
auto oldPort = static_cast<OpaqueEventPort<PortTemplate, PArgs...>*>(port);
555-
auto newPort = new OpaqueEventPortV2<PortTemplate, PArgs...>();
564+
static OpaquePortBase* migratePort(OpaquePortBase* port) {
565+
if (!geode::cast::typeinfo_cast<OpaqueEventV2Type*>(port)) {
566+
auto oldPort = static_cast<OpaqueEventType*>(port);
567+
auto newPort = new OpaqueEventV2Type();
556568
newPort->migrateFromV1(oldPort);
557-
iter->second.reset(newPort);
569+
return newPort;
558570
}
559-
return true;
571+
return nullptr;
560572
}
561573

562574
using Self = BasicEvent<Marker, PortTemplate, PReturn(PArgs...), FArgs...>;
@@ -583,7 +595,7 @@ namespace geode::comm {
583595
// All of the normal functions do static cast version, but that is not strictly needed,
584596
// what is needed however is updating this getPort function.
585597
OpaquePortBase* getPort() const noexcept override {
586-
return new (std::nothrow) OpaqueEventPortV2<PortTemplate, PArgs...>();
598+
return new (std::nothrow) OpaqueEventV2Type();
587599
}
588600

589601
size_t hash() const noexcept override {
@@ -599,8 +611,9 @@ namespace geode::comm {
599611

600612
static void removeReceiverStatic(BaseFilter const* filter, ReceiverHandle handle) noexcept {
601613
auto* self = static_cast<BasicEvent const*>(filter);
614+
// geode::console::log(fmt::format("Static removing receiver from BasicEvent {}, {}", (void*)self, typeid(Marker).name()), Severity::Debug);
602615
if (self) {
603-
// geode::console::log(fmt::format("Static removing receiver from BasicEvent {}, {}", (void*)self, typeid(Marker).name()), Severity::Debug);
616+
// geode::console::log(fmt::format("afssfd from BasicEvent {}, {}", (void*)self, typeid(Marker).name()), Severity::Debug);
604617
self->removeReceiver(handle);
605618
}
606619
}
@@ -641,6 +654,52 @@ namespace geode::comm {
641654
}
642655
};
643656

657+
class GEODE_DLL EventCenterThreadLocal {
658+
private:
659+
class Impl;
660+
std::unique_ptr<Impl> m_impl;
661+
662+
EventCenterThreadLocal();
663+
~EventCenterThreadLocal();
664+
665+
public:
666+
static EventCenterThreadLocal* get();
667+
668+
using SendFuncType = geode::Function<bool(OpaquePortBase*)>;
669+
using AddFuncType = geode::Function<ReceiverHandle(OpaquePortBase*)>;
670+
using SizeFuncType = geode::Function<size_t(OpaquePortBase*)>;
671+
using RemoveFuncType = geode::Function<size_t(OpaquePortBase*)>;
672+
using MigrateFuncType = geode::Function<OpaquePortBase*(OpaquePortBase*)>;
673+
674+
bool send(BaseFilter const* filter, SendFuncType func, MigrateFuncType migratePort) noexcept;
675+
ListenerHandle addReceiver(BaseFilter const* filter, AddFuncType func, MigrateFuncType migratePort) noexcept;
676+
size_t getReceiverCount(BaseFilter const* filter, SizeFuncType func, MigrateFuncType migratePort) noexcept;
677+
size_t removeReceiver(BaseFilter const* filter, RemoveFuncType func, MigrateFuncType migratePort) noexcept;
678+
};
679+
680+
class GEODE_DLL EventCenterGlobal {
681+
private:
682+
class Impl;
683+
std::unique_ptr<Impl> m_impl;
684+
685+
EventCenterGlobal();
686+
~EventCenterGlobal();
687+
688+
public:
689+
static EventCenterGlobal* get();
690+
691+
using SendFuncType = geode::Function<bool(OpaquePortBase*)>;
692+
using AddFuncType = geode::Function<ReceiverHandle(OpaquePortBase*)>;
693+
using SizeFuncType = geode::Function<size_t(OpaquePortBase*)>;
694+
using RemoveFuncType = geode::Function<size_t(OpaquePortBase*)>;
695+
using MigrateFuncType = geode::Function<OpaquePortBase*(OpaquePortBase*)>;
696+
697+
bool send(BaseFilter const* filter, SendFuncType func, MigrateFuncType migratePort) noexcept;
698+
ListenerHandle addReceiver(BaseFilter const* filter, AddFuncType func, MigrateFuncType migratePort) noexcept;
699+
size_t getReceiverCount(BaseFilter const* filter, SizeFuncType func, MigrateFuncType migratePort) noexcept;
700+
size_t removeReceiver(BaseFilter const* filter, RemoveFuncType func, MigrateFuncType migratePort) noexcept;
701+
};
702+
644703
class EventCenter {
645704
using KeyType = std::shared_ptr<BaseFilter>;
646705
using ValueType = std::shared_ptr<OpaquePortBase>;
@@ -657,7 +716,10 @@ namespace geode::comm {
657716
// geode::console::log(fmt::format("EventCenter sending event for filter {}, {}", (void*)filter, cast::getRuntimeTypeName(filter)), Severity::Debug);
658717
auto p = m_ports.load();
659718
auto it = p->find(filter);
660-
if (it != p->end() && std::invoke(migratePort, it)) {
719+
if (it != p->end()) {
720+
if (auto newFilter = std::invoke(migratePort, it->second.get())) {
721+
it->second.reset(newFilter);
722+
}
661723
auto newFilter = it->first.get();
662724
return std::invoke(func, it->second.get());
663725
}
@@ -670,7 +732,10 @@ namespace geode::comm {
670732
// geode::console::log(fmt::format("EventCenter adding receiver for filter {}, {}", (void*)filter, cast::getRuntimeTypeName(filter)), Severity::Debug);
671733
auto p = m_ports.load();
672734
auto it = p->find(filter);
673-
if (it != p->end() && std::invoke(migratePort, it)) {
735+
if (it != p->end()) {
736+
if (auto newFilter = std::invoke(migratePort, it->second.get())) {
737+
it->second.reset(newFilter);
738+
}
674739
return ListenerHandle(it->first, std::invoke(func, it->second.get()), nullptr);
675740
}
676741
else {
@@ -696,10 +761,13 @@ namespace geode::comm {
696761

697762
template <class Callable, class Callable2>
698763
requires std::is_invocable_v<Callable, OpaquePortBase*>
699-
size_t getReceiverCount(BaseFilter const* filter, Callable func, Callable2 migratePort) const noexcept {
764+
size_t getReceiverCount(BaseFilter const* filter, Callable func, Callable2 migratePort) noexcept {
700765
auto p = m_ports.load();
701766
auto it = p->find(filter);
702-
if (it != p->end() && std::invoke(migratePort, it)) {
767+
if (it != p->end()) {
768+
if (auto newFilter = std::invoke(migratePort, it->second.get())) {
769+
it->second.reset(newFilter);
770+
}
703771
return std::invoke(func, it->second.get());
704772
}
705773
return 0;
@@ -711,7 +779,10 @@ namespace geode::comm {
711779
// geode::console::log(fmt::format("EventCenter removing receiver for filter {}, {}", (void*)filter, cast::getRuntimeTypeName(filter)), Severity::Debug);
712780
auto p = m_ports.load();
713781
auto it = p->find(filter);
714-
if (it != p->end() && std::invoke(migratePort, it)) {
782+
if (it != p->end()) {
783+
if (auto newFilter = std::invoke(migratePort, it->second.get())) {
784+
it->second.reset(newFilter);
785+
}
715786
auto size = std::invoke(func, it->second.get());
716787
if (size == 0) {
717788
// geode::console::log(fmt::format("Removing port for filter type {}", cast::getRuntimeTypeName(filter)), Severity::Debug);
@@ -738,8 +809,16 @@ namespace geode::comm {
738809
std::is_convertible_v<PReturn, bool> || std::is_same_v<PReturn, void>;
739810
}
740811
bool BasicEvent<Marker, PortTemplate, PReturn(PArgs...), FArgs...>::send(PArgs... args) noexcept(std::is_nothrow_invocable_v<geode::CopyableFunction<PReturn(PArgs...)>, PArgs...>) {
812+
auto ret = EventCenterType::get()->send(this, [&](OpaquePortBase* opaquePort) {
813+
auto port = static_cast<LatestOpaqueEventType*>(opaquePort);
814+
return port->send(args...);
815+
}, &BasicEvent::migratePort);
816+
817+
if (ret) return true;
818+
819+
// fallback on the old event center
741820
return EventCenter::get()->send(this, [&](OpaquePortBase* opaquePort) {
742-
auto port = static_cast<OpaqueEventPortV2<PortTemplate, PArgs...>*>(opaquePort);
821+
auto port = static_cast<OpaqueEventType*>(opaquePort);
743822
return port->send(std::forward<PArgs>(args)...);
744823
}, &BasicEvent::migratePort);
745824
}
@@ -750,8 +829,8 @@ namespace geode::comm {
750829
std::is_convertible_v<PReturn, bool> || std::is_same_v<PReturn, void>;
751830
}
752831
ListenerHandle BasicEvent<Marker, PortTemplate, PReturn(PArgs...), FArgs...>::addReceiver(geode::CopyableFunction<PReturn(PArgs...)> rec, int priority) const noexcept {
753-
return EventCenter::get()->addReceiver(this, [&](OpaquePortBase* opaquePort) {
754-
auto port = static_cast<OpaqueEventPortV2<PortTemplate, PArgs...>*>(opaquePort);
832+
return EventCenterType::get()->addReceiver(this, [&](OpaquePortBase* opaquePort) {
833+
auto port = static_cast<LatestOpaqueEventType*>(opaquePort);
755834
return port->addReceiver(std::move(rec), priority);
756835
}, &BasicEvent::migratePort);
757836
}
@@ -762,8 +841,8 @@ namespace geode::comm {
762841
std::is_convertible_v<PReturn, bool> || std::is_same_v<PReturn, void>;
763842
}
764843
size_t BasicEvent<Marker, PortTemplate, PReturn(PArgs...), FArgs...>::getReceiverCount() const noexcept {
765-
return EventCenter::get()->getReceiverCount(this, [&](OpaquePortBase* opaquePort) {
766-
auto port = static_cast<OpaqueEventPortV2<PortTemplate, PArgs...>*>(opaquePort);
844+
return EventCenterType::get()->getReceiverCount(this, [&](OpaquePortBase* opaquePort) {
845+
auto port = static_cast<LatestOpaqueEventType*>(opaquePort);
767846
return port->getReceiverCount();
768847
}, &BasicEvent::migratePort);
769848
}
@@ -774,8 +853,8 @@ namespace geode::comm {
774853
std::is_convertible_v<PReturn, bool> || std::is_same_v<PReturn, void>;
775854
}
776855
size_t BasicEvent<Marker, PortTemplate, PReturn(PArgs...), FArgs...>::removeReceiver(ReceiverHandle handle) const noexcept {
777-
return EventCenter::get()->removeReceiver(this, [&](OpaquePortBase* opaquePort) {
778-
auto port = static_cast<OpaqueEventPortV2<PortTemplate, PArgs...>*>(opaquePort);
856+
return EventCenterType::get()->removeReceiver(this, [&](OpaquePortBase* opaquePort) {
857+
auto port = static_cast<LatestOpaqueEventType*>(opaquePort);
779858
return port->removeReceiver(handle);
780859
}, &BasicEvent::migratePort);
781860
}

0 commit comments

Comments
 (0)