-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrelay_manager.py
More file actions
148 lines (125 loc) · 4.78 KB
/
relay_manager.py
File metadata and controls
148 lines (125 loc) · 4.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""
The basis of the code below was taken from https://github.com/jeffthibault/python-nostr with some modifications.
"""
import json
import threading
import time
from dataclasses import dataclass
from threading import Lock
from google.cloud import bigquery
from pydantic import ValidationError
from base.utils import logger
from services import bq
from .event import Event
from .filter import Filters
from .message_pool import MessagePool
from .relay import Relay, RelayPolicy, RelayProxyConnectionConfig
from .request import Request
class RelayException(Exception):
pass
@dataclass
class RelayManager:
def __post_init__(self):
client = bigquery.Client()
self._bq_service = bq.RelayService(client)
self.relays = {}
self.message_pool: MessagePool = MessagePool()
self.lock: Lock = Lock()
self._num_workers = 0
def _get_relay(self, url: str) -> Relay:
return self.relays[url]['relay']
def add_relay(
self,
url: str,
policy: RelayPolicy = RelayPolicy(),
ssl_options=None,
proxy_config: RelayProxyConnectionConfig = None,
):
try:
relay = Relay(
url,
policy=policy,
message_pool=self.message_pool,
)
except ValidationError as e:
logger.debug(f'#kjhkjh8: Invalid relay url {e}')
raise
future_connect = threading.Thread(
target=relay.connect, name=f'{relay.url}-thread'
)
future_queue_worker = threading.Thread(
target=relay.queue_worker, name=f'{relay.url}-queue', daemon=True
)
with self.lock:
self.relays[url] = {
'future_connect': future_connect,
'future_queue_worker': future_queue_worker,
'relay': relay,
}
future_connect.start()
future_queue_worker.start()
time.sleep(1)
def remove_relay(self, url: str):
with self.lock:
if url in self.relays:
relay_future = self.relays.pop(url)
relay: Relay = relay_future['relay']
relay.close()
def remove_all_relays(self):
with self.lock:
for relay in list(self.relays.items()):
relay: Relay = self.relays.pop(relay['url'])['relay']
relay.close()
def add_subscription_on_relay(self, url: str, id: str, filters: Filters):
with self.lock:
if url in self.relays:
relay: Relay = self._get_relay(url)
if not relay.policy.should_read:
raise RelayException(
f'Could not send request: {url} is not configured to read from'
)
relay.add_subscription(id, filters)
request = Request(id, filters)
relay.publish(request.to_message())
else:
raise RelayException(f'Invalid relay url: no connection to {url}')
def add_subscription_on_all_relays(self, id: str, filters: Filters):
with self.lock:
for url in self.relays:
relay = self._get_relay(url)
if relay.policy.should_read:
relay.add_subscription(id, filters)
request = Request(id, filters)
relay.publish(request.to_message())
def close_subscription_on_relay(self, url: str, id: str):
with self.lock:
if url in self.relays:
relay: Relay = self._get_relay(url)
relay.close_subscription(id)
relay.publish(json.dumps(['CLOSE', id]))
else:
raise RelayException(f'Invalid relay url: no connection to {url}')
def close_subscription_on_all_relays(self, id: str):
with self.lock:
for url in self.relays:
relay = self._get_relay(url)
relay.close_subscription(id)
relay.publish(json.dumps(['CLOSE', id]))
def close_all_relay_connections(self):
with self.lock:
for url in self.relays:
relay = self._get_relay(url)
relay.close()
def publish_event(self, event: Event):
"""Verifies that the Event is publishable before submitting it to relays"""
if event.signature is None:
raise RelayException(f'Could not publish {event.id}: must be signed')
if not event.verify():
raise RelayException(
f'Could not publish {event.id}: failed to verify signature {event.signature}'
)
with self.lock:
for url in self.relays:
relay = self._get_relay(url)
if relay.policy.should_write:
relay.publish(event.to_message())