|
| 1 | +import json |
1 | 2 | import threading |
2 | 3 | from collections import defaultdict |
3 | 4 | from typing import Dict, List, Optional, Union |
4 | 5 |
|
5 | | -from bidict import ON_DUP_DROP_OLD, bidict |
| 6 | +from bidict import bidict |
6 | 7 |
|
7 | 8 | from fundus.logging import create_logger |
8 | 9 |
|
@@ -84,7 +85,8 @@ def __init__(self, default_events: Optional[List[str]] = None): |
84 | 85 | default_events: A list of event names that are automatically available |
85 | 86 | for all threads (e.g., ["stop"]). |
86 | 87 | """ |
87 | | - self._events: Dict[int, ThreadEventDict] = defaultdict(lambda: ThreadEventDict(default_events)) |
| 88 | + self.default_events = default_events |
| 89 | + self._events: Dict[int, ThreadEventDict] = defaultdict(lambda: ThreadEventDict(self.default_events)) |
88 | 90 | self._aliases: bidict[str, int] = bidict() |
89 | 91 | self._lock = threading.RLock() |
90 | 92 |
|
@@ -142,7 +144,7 @@ def _alias(self, alias: str, key: Optional[int] = None): |
142 | 144 | key: The thread identifier to associate with this alias. |
143 | 145 | If None, the current thread's identifier is used. |
144 | 146 | """ |
145 | | - self._aliases.put(alias, key if key else self._get_identifier(), ON_DUP_DROP_OLD) |
| 147 | + self._aliases[alias] = key if key else self._get_identifier() |
146 | 148 | if (ident := self._resolve(alias)) not in self._events: |
147 | 149 | # noinspection PyStatementEffect |
148 | 150 | # Since defaultdict doesn't provide a direct way to create defaults, |
@@ -287,5 +289,19 @@ def get(self, event: str, key: Optional[Union[int, str, None]] = None) -> thread |
287 | 289 | with self._lock: |
288 | 290 | return self._events[self._resolve(key)][event] |
289 | 291 |
|
| 292 | + def reset(self): |
| 293 | + with self._lock: |
| 294 | + self._events = defaultdict(lambda: ThreadEventDict(self.default_events)) |
| 295 | + self._aliases = bidict() |
| 296 | + |
| 297 | + def __str__(self): |
| 298 | + def _entry(thread: int) -> str: |
| 299 | + alias = self._aliases.inv.get(thread, None) |
| 300 | + serialized = json.dumps(self._events[thread], indent=2, ensure_ascii=False, default=lambda o: o.set()) |
| 301 | + return f"{alias if alias else 'None'} --> {thread}: \n" + serialized |
| 302 | + |
| 303 | + events = [_entry(ident) for ident in self._events] |
| 304 | + return "\n".join(events) if events else "Empty Event Dictionary" |
| 305 | + |
290 | 306 |
|
291 | 307 | __EVENTS__: EventDict = EventDict(default_events=__DEFAULT_EVENTS__) |
0 commit comments