Skip to content

Commit 3e2de16

Browse files
PubSub (#19)
* Disable debug messages in release build * PubSub draft * PubSub done * on_message signature with topic
1 parent 81d719b commit 3e2de16

File tree

5 files changed

+154
-5
lines changed

5 files changed

+154
-5
lines changed

client/lib/main.dart

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ import 'web_socket_client.dart';
2020
const bool isProduction = bool.fromEnvironment('dart.vm.product');
2121

2222
void main([List<String>? args]) async {
23-
// if (isProduction) {
24-
// // ignore: avoid_returning_null_for_void
25-
// debugPrint = (String? message, {int? wrapWidth}) => null;
26-
// }
23+
if (isProduction) {
24+
// ignore: avoid_returning_null_for_void
25+
debugPrint = (String? message, {int? wrapWidth}) => null;
26+
}
2727

2828
await setupDesktop();
2929

sdk/python/flet/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from flet.popup_menu_button import PopupMenuButton, PopupMenuItem
2727
from flet.progress_bar import ProgressBar
2828
from flet.progress_ring import ProgressRing
29+
from flet.pubsub import PubSub
2930
from flet.radio import Radio
3031
from flet.radio_group import RadioGroup
3132
from flet.ref import Ref

sdk/python/flet/connection.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import uuid
55

66
from flet.protocol import *
7+
from flet.pubsub import PubSubHub
78
from flet.reconnecting_websocket import ReconnectingWebSocket
89

910

@@ -18,6 +19,7 @@ def __init__(self, ws: ReconnectingWebSocket):
1819
self.page_name = None
1920
self.page_url = None
2021
self.sessions = {}
22+
self.pubsubhub = PubSubHub()
2123

2224
@property
2325
def on_event(self):

sdk/python/flet/page.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from flet.control_event import ControlEvent
2323
from flet.floating_action_button import FloatingActionButton
2424
from flet.protocol import Command
25+
from flet.pubsub import PubSub
2526
from flet.snack_bar import SnackBar
2627
from flet.theme import Theme
2728

@@ -49,12 +50,12 @@ def __init__(self, conn: Connection, session_id):
4950
self._last_event = None
5051
self._event_available = threading.Event()
5152
self._fetch_page_details()
52-
self.lock = threading.Lock()
5353

5454
self.__offstage = Offstage()
5555
self.__appbar = None
5656
self.__theme = None
5757
self.__dark_theme = None
58+
self.__pubsub = PubSub(conn.pubsubhub, session_id)
5859

5960
def __enter__(self):
6061
return self
@@ -243,6 +244,11 @@ def index(self):
243244
def session_id(self):
244245
return self._session_id
245246

247+
# pubsub
248+
@property
249+
def pubsub(self):
250+
return self.__pubsub
251+
246252
# controls
247253
@property
248254
def controls(self):

sdk/python/flet/pubsub.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
import logging
2+
import threading
3+
from typing import Callable, Dict, Iterable
4+
5+
6+
class PubSubHub:
7+
def __init__(self):
8+
self.__lock = threading.Lock()
9+
self.__subscribers: Dict[str, Callable] = {} # key: session_id, value: handler
10+
self.__topic_subscribers: Dict[
11+
str, Dict[str, Callable]
12+
] = {} # key: topic, value: dict[session_id, handler]
13+
self.__subscriber_topics: Dict[
14+
str, Dict[str, Callable]
15+
] = {} # key: session_id, value: dict[topic, handler]
16+
17+
def send_all(self, message: any):
18+
logging.debug(f"pubsub.send_all({message})")
19+
with self.__lock:
20+
for handler in self.__subscribers.values():
21+
self.__send(handler, [message])
22+
23+
def send_all_on_topic(self, topic: str, message: any):
24+
logging.debug(f"pubsub.send_all_on_topic({topic}, {message})")
25+
with self.__lock:
26+
if topic in self.__topic_subscribers:
27+
for handler in self.__topic_subscribers[topic].values():
28+
self.__send(handler, [topic, message])
29+
30+
def send_others(self, except_session_id: str, message: any):
31+
logging.debug(f"pubsub.send_others({except_session_id}, {message})")
32+
with self.__lock:
33+
for session_id, handler in self.__subscribers.items():
34+
if except_session_id != session_id:
35+
self.__send(handler, [message])
36+
37+
def send_others_on_topic(self, except_session_id: str, topic: str, message: any):
38+
logging.debug(
39+
f"pubsub.send_others_on_topic({except_session_id}, {topic}, {message})"
40+
)
41+
with self.__lock:
42+
if topic in self.__topic_subscribers:
43+
for session_id, handler in self.__topic_subscribers[topic].values():
44+
if except_session_id != session_id:
45+
self.__send(handler, [topic, message])
46+
47+
def subscribe(self, session_id: str, handler: Callable):
48+
logging.debug(f"pubsub.subscribe({session_id})")
49+
with self.__lock:
50+
self.__subscribers[session_id] = handler
51+
52+
def subscribe_topic(self, session_id: str, topic: str, handler: Callable):
53+
logging.debug(f"pubsub.subscribe_topic({session_id}, {topic})")
54+
with self.__lock:
55+
topic_subscribers = self.__topic_subscribers.get(topic)
56+
if topic_subscribers == None:
57+
topic_subscribers = {}
58+
self.__topic_subscribers[topic] = topic_subscribers
59+
topic_subscribers[session_id] = handler
60+
subscriber_topics = self.__subscriber_topics.get(session_id)
61+
if subscriber_topics == None:
62+
subscriber_topics = {}
63+
self.__subscriber_topics[session_id] = subscriber_topics
64+
subscriber_topics[topic] = handler
65+
66+
def unsubscribe(self, session_id: str):
67+
logging.debug(f"pubsub.unsubscribe({session_id})")
68+
with self.__lock:
69+
self.__unsubscribe(session_id)
70+
71+
def unsubscribe_topic(self, session_id: str, topic: str):
72+
logging.debug(f"pubsub.unsubscribe({session_id}, {topic})")
73+
with self.__lock:
74+
self.__unsubscribe_topic(session_id, topic)
75+
76+
def unsubscribe_all(self, session_id: str):
77+
logging.debug(f"pubsub.unsubscribe_all({session_id})")
78+
with self.__lock:
79+
self.__unsubscribe(session_id)
80+
if session_id in self.__subscriber_topics:
81+
for topic in self.__subscriber_topics[session_id].keys():
82+
self.__unsubscribe_topic(session_id, topic)
83+
84+
def __unsubscribe(self, session_id: str):
85+
logging.debug(f"pubsub.__unsubscribe({session_id})")
86+
self.__subscribers.pop(session_id)
87+
88+
def __unsubscribe_topic(self, session_id: str, topic: str):
89+
logging.debug(f"pubsub.__unsubscribe_topic({session_id}, {topic})")
90+
topic_subscribers = self.__topic_subscribers.get(topic)
91+
if topic_subscribers != None:
92+
topic_subscribers.pop(session_id)
93+
if len(topic_subscribers) == 0:
94+
self.__topic_subscribers.pop(topic)
95+
subscriber_topics = self.__subscriber_topics.get(session_id)
96+
if subscriber_topics != None:
97+
subscriber_topics.pop(topic)
98+
if len(subscriber_topics) == 0:
99+
self.__subscriber_topics.pop(session_id)
100+
101+
def __send(self, handler: Callable, args: Iterable):
102+
th = threading.Thread(
103+
target=handler,
104+
args=args,
105+
daemon=True,
106+
)
107+
th.start()
108+
109+
110+
class PubSub:
111+
def __init__(self, pubsub: PubSubHub, session_id: str):
112+
self.__pubsub = pubsub
113+
self.__session_id = session_id
114+
115+
def send_all(self, message: any):
116+
self.__pubsub.send_all(message)
117+
118+
def send_all_on_topic(self, topic: str, message: any):
119+
self.__pubsub.send_all_on_topic(topic, message)
120+
121+
def send_others(self, message: any):
122+
self.__pubsub.send_others(self.__session_id, message)
123+
124+
def send_others_on_topic(self, topic: str, message: any):
125+
self.__pubsub.send_others_on_topic(self.__session_id, topic, message)
126+
127+
def subscribe(self, handler: Callable):
128+
self.__pubsub.subscribe(self.__session_id, handler)
129+
130+
def subscribe_topic(self, topic: str, handler: Callable):
131+
self.__pubsub.subscribe_topic(self.__session_id, topic, handler)
132+
133+
def unsubscribe(self):
134+
self.__pubsub.unsubscribe(self.__session_id)
135+
136+
def unsubscribe_topic(self, topic: str):
137+
self.__pubsub.unsubscribe_topic(self.__session_id, topic)
138+
139+
def unsubscribe_all(self):
140+
self.__pubsub.unsubscribe_all(self.__session_id)

0 commit comments

Comments
 (0)