Skip to content

Commit dad5560

Browse files
committed
large-ish commit. added map in global omq instance to track client connections, implementation of OMQ request handling underway, re-writing subrequest and onion-request handling to process omq requests
1 parent 689c2d1 commit dad5560

File tree

13 files changed

+302
-48
lines changed

13 files changed

+302
-48
lines changed

sogs/model/clientmanager.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import annotations
22

3+
import oxenmq
4+
35
from .. import crypto, db, config
46
from ..db import query
57
from ..web import app
@@ -92,7 +94,7 @@ def bqempty(self):
9294
return not self.bqueue._empty()
9395

9496

95-
def register_client(self, cid, authlevel, bot: bool = False, priority: int = None):
97+
def register_client(self, conn_id, cid, authlevel, bot, priority):
9698
if not bot:
9799
# add client to self.clients
98100
return
@@ -107,7 +109,7 @@ def register_client(self, cid, authlevel, bot: bool = False, priority: int = Non
107109
self.bqueue._put(PriorityTuple(priority, bot))
108110

109111

110-
def deregister_client(self, cid, bot: bool = False):
112+
def deregister_client(self, cid, bot):
111113
if not bot:
112114
# remove client from clients list
113115
return

sogs/mule.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,17 +86,21 @@ def setup_omq(self, omq: OMQ):
8686
self._omq.add_timer(cleanup.cleanup, timedelta(seconds=cleanup.INTERVAL))
8787

8888
# Commands other workers can send to us, e.g. for notifications of activity for us to know about
89-
worker = self._omq.add_category("worker", access_level=oxenmq.AuthLevel.admin)
89+
worker = omq._omq.add_category("worker", access_level=oxenmq.AuthLevel.admin)
9090
worker.add_command("message_posted", self.message_posted)
9191
worker.add_command("messages_deleted", self.messages_deleted)
9292
worker.add_command("message_edited", self.message_edited)
9393

94-
# new client code
95-
# TOFIX: use add_request_command to handle a response value
96-
handler = self._omq.add_category("handler", access_level=oxenmq.AuthLevel.admin)
97-
handler.add_command("register_client", omq.register_client)
98-
handler.add_command("deregister_client", omq.deregister_client)
99-
handler.add_command("send_to_handler", omq.manager.receive_message)
94+
# client code
95+
handler = omq._omq.add_category("handler", access_level=oxenmq.AuthLevel.admin)
96+
handler.add_request_command("register_client", omq.register_client)
97+
handler.add_request_command("deregister_client", omq.deregister_client)
98+
handler.add_request_command("send_to_handler", omq.manager.receive_message)
99+
100+
# proxy handler for subrequest queue
101+
internal = omq._omq.add_category("internal", access_level=oxenmq.AuthLevel.admin)
102+
internal.add_request_command("get_next_request", omq.get_next_request)
103+
internal.add_request_command("subreq_response", omq.subreq_response)
100104

101105
app.logger.debug("Mule starting omq")
102106
self._omq.start()

sogs/omq.py

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
# Common oxenmq object; this is used by workers and the oxenmq mule. We create, but do not start,
22
# this pre-forking.
33

4-
import oxenmq
5-
from oxenc import bt_serialize
4+
import oxenmq, queue
5+
from oxenc import bt_serialize, bt_deserialize
66

7+
from mule import log_exceptions
78
from routes import omq_auth
89
from . import crypto, config
910
from .postfork import postfork
1011
from .model.clientmanager import ClientManager
1112

1213

1314
omq_global = None
15+
global blueprints_global
16+
blueprints_global = {}
1417

1518

1619
class OMQ:
@@ -25,13 +28,16 @@ def __init__(self):
2528
privkey=crypto._privkey.encode(), pubkey=crypto.server_pubkey.encode()
2629
)
2730
self._omq.ephemeral_routing_id = True
28-
31+
self.client_map = {}
2932
self.manager = ClientManager()
3033
self.test_suite = False
34+
self.subreq_queue = queue.SimpleQueue()
3135

3236
if uwsgi.mule_id() != 0:
3337
uwsgi.opt['mule'].setup_omq(self)
3438
return
39+
40+
uwsgi.register_signal(123, 'internal', self.handle_proxied_omq_req)
3541

3642
from .web import app # Imported here to avoid circular import
3743

@@ -46,15 +52,47 @@ def __init__(self):
4652
global omq_global
4753
omq_global = self
4854

55+
56+
@log_exceptions
57+
def subreq_response(self):
58+
pass
59+
60+
61+
@log_exceptions
62+
def handle_proxied_omq_req(self):
63+
id, subreq_body = self.send_mule(
64+
command='get_next_request',
65+
prefix='internal'
66+
)
67+
68+
'''
69+
70+
Handle omq subrequest
71+
72+
'''
73+
74+
return
75+
76+
@log_exceptions
77+
def get_next_request(self):
78+
subreq_body = self.subreq_queue.get()
79+
id = list(subreq_body.keys())[0]
80+
return id, subreq_body[id]
81+
4982

50-
def register_client(self, cid, authlevel, bot: bool = False, priority: int = None):
51-
self.manager.register_client(cid, authlevel, bot, priority)
52-
# TODO: add omq logic
83+
@log_exceptions
84+
def register_client(self, msg: oxenmq.Message):
85+
cid, authlevel, bot, priority = bt_deserialize(msg.data()[0])
86+
conn_id = msg.conn()
87+
self.client_map[conn_id] = cid
88+
self.manager.register_client(msg)
5389

5490

55-
def deregister_client(self, cid, bot: bool = False):
56-
self.manager.register_client()
57-
# TODO: add omq logic
91+
@log_exceptions
92+
def deregister_client(self, msg: oxenmq.Message):
93+
cid, bot = bt_deserialize(msg.data()[0])
94+
self.client_map.pop(cid)
95+
self.manager.deregister_client(cid, bot)
5896

5997

6098
def send_mule(self, command, *args, prefix="worker."):

sogs/routes/clients.py

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from ..model import room as mroom
33
from ..model.user import User
44
from ..web import app
5-
from ..omq import omq_global
5+
from ..omq import omq_global, blueprints_global
66
from . import omq_auth
77

88
from flask import abort, jsonify, g, Blueprint, request
@@ -15,8 +15,8 @@
1515
1616
"""
1717

18-
1918
clients = Blueprint('clients', __name__)
19+
blueprints_global['clients'] = clients
2020

2121

2222
@omq_auth.first_request
@@ -28,7 +28,7 @@ def register(cid):
2828
2929
## URL Parameters
3030
31-
- 'cid': the client ID of the given client to be registered with the SOGS instance
31+
- 'cid': the client ID (session ID) of the given client to be registered with the SOGS instance
3232
3333
## Query Parameters
3434
@@ -61,7 +61,7 @@ def register_bot(cid, authlevel, priority):
6161
6262
## URL Parameters
6363
64-
- 'cid': the client ID of the given client to be registered with the SOGS instance
64+
- 'cid': the client ID (session ID) of the given client to be registered with the SOGS instance
6565
6666
## Body Parameters
6767
@@ -73,11 +73,12 @@ def register_bot(cid, authlevel, priority):
7373
"""
7474

7575
client = omq_global.send_mule(
76-
command="register_client",
76+
command='register_client',
7777
cid=cid,
7878
authlevel=authlevel,
79-
bot=True,
80-
priority=priority
79+
bot=1,
80+
priority=priority,
81+
prefix='handler'
8182
)
8283

8384
return client
@@ -90,7 +91,7 @@ def register_client(cid, authlevel):
9091
9192
## URL Parameters
9293
93-
- 'cid': the client ID of the given client to be registered with the SOGS instance
94+
- 'cid': the client ID (session ID) of the given client to be registered with the SOGS instance
9495
9596
## Body Parameters
9697
@@ -100,11 +101,12 @@ def register_client(cid, authlevel):
100101
"""
101102

102103
client = omq_global.send_mule(
103-
command="register_client",
104+
command='register_client',
104105
cid=cid,
105106
authlevel=authlevel,
106-
bot=False,
107-
priority=None
107+
bot=0,
108+
priority=None,
109+
prefix='handler'
108110
)
109111

110112
return client
@@ -117,7 +119,7 @@ def unregister(cid):
117119
118120
## URL Parameters
119121
120-
- 'cid': the client ID of the given client to be registered with the SOGS instance
122+
- 'cid': the client ID (session ID) of the given client to be registered with the SOGS instance
121123
122124
## Query Parameters
123125
@@ -136,9 +138,10 @@ def unregister(cid):
136138
def unregister_client(cid):
137139

138140
client = omq_global.send_mule(
139-
command="register_client",
141+
command='unregister_client',
140142
cid=cid,
141-
bot=False
143+
bot=0,
144+
prefix='handler'
142145
)
143146

144147
return client
@@ -149,9 +152,10 @@ def unregister_client(cid):
149152
def unregister_bot(cid):
150153

151154
client = omq_global.send_mule(
152-
command="register_client",
155+
command='unregister_bot',
153156
cid=cid,
154-
bot=True
157+
bot=1,
158+
prefix='handler'
155159
)
156160

157161
return client

sogs/routes/messages.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
from .. import http, utils
22
from . import auth
33
from model.room import Room
4-
from ..omq import omq_global
4+
from ..omq import omq_global, blueprints_global
55

66
from flask import abort, jsonify, g, Blueprint, request
77

88
# Room message retrieving/submitting endpoints
99

10-
1110
messages = Blueprint('messages', __name__)
12-
11+
blueprints_global['messages'] = messages
1312

1413
def qs_reactors():
1514
return utils.get_int_param('reactors', 4, min=0, max=20, truncate=True)

sogs/routes/omq_auth.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
from ..web import app
44
from ..db import query
55
from .. import config, crypto, http, utils
6+
from ..omq import omq_global, blueprints_global
67
from ..model.user import User
78
from ..hashing import blake2b
89

9-
from flask import request, abort, Response, g
10+
from flask import request, abort, Response, g, Blueprint
1011
import time
1112
import nacl
1213
from nacl.signing import VerifyKey
@@ -17,6 +18,30 @@
1718

1819
# Authentication for handling OMQ requests
1920

21+
omq = Blueprint('endpoint', __name__)
22+
23+
def endpoint(f):
24+
"""
25+
Default endpoint for omq routes to pass requests to; constructs flask HTTP request
26+
27+
Message (request) components:
28+
29+
"blueprint" - the flask blueprint
30+
"query" - the request query
31+
"pubkey" - pk of client making request
32+
"params" - a json value to dump as the the query parameters
33+
34+
Example:
35+
full request: `@omq.endpoint("messages", "room.<Room>.messages.since.<seqno>", {'Room:room', 'int:seqno'})`
36+
blueprint: 'messages'
37+
query: 'room.<Room>.messages.since.<seqno>'
38+
params: {'Room:room', 'int:seqno'}
39+
"""
40+
41+
@wraps(f)
42+
def endpoint_wrapper(*args, blueprint, query, pubkey, params, **kwargs):
43+
bp = blueprints_global['messages']
44+
2045

2146
def abort_request(code, msg, warn=True):
2247
if warn:
@@ -27,16 +52,20 @@ def abort_request(code, msg, warn=True):
2752

2853

2954
def require_client():
30-
""" Requires that an authenticated client was found in the OMQ instance; aborts with
31-
UNAUTHORIZED if the request has no client """
55+
"""
56+
Requires that an authenticated client was found in the OMQ instance; aborts with
57+
UNAUTHORIZED if the request has no client
58+
"""
3259
if g.client_id is None:
3360
abort_request(http.UNAUTHORIZED, 'OMQ client authentication required')
3461

3562

3663
def client_required(f):
37-
""" Decorator for an endpoint that requires a client; this calls require_client() at the
64+
"""
65+
Decorator for an endpoint that requires a client; this calls require_client() at the
3866
beginning of the request to abort the request as UNAUTHORIZED if the client has not been
39-
previously authenticated"""
67+
previously authenticated
68+
"""
4069

4170
@wraps(f)
4271
def required_client_wrapper(*args, **kwargs):
@@ -126,14 +155,10 @@ def verify_omq_auth():
126155
return
127156

128157

129-
130158
"""
131159
TOFIX:
132160
- add some type of dict in omq_global to map conn_ID (onenmq conn ID) to session_ID/other info
133161
- do not persist:
134162
- room specific access: check every time it makes a request because it can change
135163
- values that admin level can change
136-
137-
138-
139164
"""

0 commit comments

Comments
 (0)