Skip to content

Commit 5000f8f

Browse files
committed
squash bot api
1 parent 2f927a6 commit 5000f8f

File tree

2 files changed

+183
-5
lines changed

2 files changed

+183
-5
lines changed

sogs/events.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
from collections import defaultdict
2+
3+
from oxenmq import AuthLevel
4+
5+
from . import config
6+
from . import model
7+
from .omq import omq
8+
9+
from binascii import hexlify
10+
11+
from oxenc import bt_serialize as serialize
12+
from oxenc import bt_deserialize as deserialize
13+
14+
from requests import request as http_request
15+
16+
# pools for event propagation
17+
_pools = defaultdict(list)
18+
19+
status_OK = 'OK'
20+
status_ERR = 'ERROR'
21+
22+
# the events we are able to subscribe to
23+
EVENTS = ('message', 'joined', 'parted', 'banned', 'unbanned', 'deleted', 'uploaded')
24+
25+
26+
def event_name_valid(eventname):
27+
""" return True if this event name is something well formed """
28+
return eventname in EVENTS
29+
30+
31+
def _user_from_conn(conn, automod=True):
32+
"""
33+
make a model.User from a connection using it's curve pubkey as the session id, if automod is
34+
True we auto promote the bot to a moderator if it isn't already
35+
"""
36+
user = model.User(session_id='05' + hexlify(conn.pubkey).decode())
37+
if automod and not user.global_moderator:
38+
user.set_moderator()
39+
return user
40+
41+
42+
def _sub_conn_to_event(name, conn):
43+
""" subscribe a connection to an event type """
44+
if not event_name_valid(name):
45+
raise Exception('invalid event name: {}'.format(name))
46+
global _pools
47+
_pools[name].append(conn)
48+
49+
50+
def _unsub_conn_from_event(name, conn):
51+
if not event_name_valid(name):
52+
raise Exception(f"invalid event: {name}")
53+
global _pool
54+
_pools[name].remove(conn)
55+
56+
57+
def _handle_subscribe_request(msg):
58+
""" handle a request to subscribe to events as a bot"""
59+
parts = msg.data()
60+
if len(parts) == 0:
61+
raise Exception("no events selected")
62+
for name in parts:
63+
_sub_conn_to_event(name.decode(), msg.conn)
64+
65+
66+
def _handle_unsubscribe_request(msg):
67+
""" unusb from events """
68+
parts = msg.data()
69+
if len(parts) == 0:
70+
raise Exception("no events selected")
71+
for name in parts:
72+
_unsub_conn_from_event(name.decode(), msg.conn)
73+
74+
75+
def _propagate_event(eventname, *args):
76+
""" propagate an event to everyone who cares about it """
77+
assert event_name_valid(eventname)
78+
global omq, _pools
79+
ev = (eventname,) + args
80+
for conn in _pools[eventname]:
81+
omq.send(conn, 'sogs.event', *ev)
82+
83+
84+
def _handle_request(handler, msg):
85+
"""safely handle a request handler and catch any exceptions that happen and propagate them to
86+
the remote connection"""
87+
try:
88+
retval = handler(msg)
89+
if retval is None:
90+
msg.reply(status_OK)
91+
elif isinstance(retval, tuple):
92+
msg.reply(status_OK, *retval)
93+
else:
94+
msg.reply(status_OK, serialize(retval))
95+
except Exception as ex:
96+
msg.reply(status_ERR, '{}'.format(ex))
97+
98+
99+
def _handle_api_request(msg):
100+
""" do an http api request """
101+
parts = msg.parts()
102+
if len(parts) < 3:
103+
raise Exception("invalid number of parts {}".format(len(parts)))
104+
kwargs = dict()
105+
if len(parts) >= 4:
106+
kwargs['body'] = parts[3]
107+
108+
headers = deserialize(parts[2])
109+
if not isinstance(headers, dict):
110+
raise ValueError("headers is not a dict")
111+
112+
kwargs['headers'] = headers
113+
114+
path = parts[1].decode()
115+
if not path.startswith("/"):
116+
raise ValueError("invalid request path")
117+
method = parts[0].decode()
118+
resp = http_request(method, config.URL_BASE + path, **kwargs)
119+
120+
headers = dict()
121+
for k, v in resp.headers.items():
122+
headers[k] = v
123+
124+
return serialize(resp.status_code), serialize(headers), resp.content
125+
126+
127+
def setup_rpc():
128+
""" set up bot api using an existing oxenmq instance """
129+
_bot_category = omq.add_category('sogs', AuthLevel.none)
130+
_bot_category.add_request_command(
131+
'sub', lambda msg: _handle_request(_handle_subscribe_request, msg)
132+
)
133+
_bot_category.add_request_command(
134+
'unsub', lambda msg: _handle_request(_handle_unsubscribe_request, msg)
135+
)
136+
_bot_category.add_request_command(
137+
'request', lambda msg: _handle_request(_handle_api_request, msg)
138+
)
139+
140+
141+
class _Notify:
142+
""" Holder type for all event notification functions """
143+
144+
145+
notify = _Notify()
146+
147+
# set up event notifiers
148+
for ev in EVENTS:
149+
setattr(notify, ev, lambda *args: _propagate_event(ev, *args))

sogs/mule.py

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import traceback
22
import oxenmq
3-
from oxenc import bt_deserialize
43
import time
54
from datetime import timedelta
65
import functools
@@ -9,6 +8,7 @@
98
from . import cleanup
109
from . import config
1110
from . import omq as o
11+
from .events import setup_rpc, notify
1212

1313
# This is the uwsgi "mule" that handles things not related to serving HTTP requests:
1414
# - it holds the oxenmq instance (with its own interface into sogs)
@@ -52,6 +52,10 @@ def setup_omq():
5252
for addr in listen:
5353
omq.listen(addr, curve=True, allow_connection=allow_conn)
5454
app.logger.info(f"OxenMQ listening on {addr}")
55+
if not listen:
56+
app.logger.warn(
57+
"OxenMQ did not listen on any curve addresses which means the bot API is not accessable anywhere."
58+
)
5559

5660
# Internal socket for workers to talk to us:
5761
omq.listen(config.OMQ_INTERNAL, curve=False, allow_connection=admin_conn)
@@ -64,6 +68,13 @@ def setup_omq():
6468
worker.add_command("message_posted", message_posted)
6569
worker.add_command("messages_deleted", messages_deleted)
6670
worker.add_command("message_edited", message_edited)
71+
worker.add_command("user_joined", user_joined)
72+
worker.add_command("user_banned", user_banned)
73+
worker.add_command("user_unbanned", user_unbanned)
74+
worker.add_command("file_uploaded", file_uploaded)
75+
76+
# setup bot api endpoints
77+
setup_rpc()
6778

6879
app.logger.debug("Mule starting omq")
6980
omq.start()
@@ -88,14 +99,32 @@ def wrapper(*args, **kwargs):
8899

89100
@log_exceptions
90101
def message_posted(m: oxenmq.Message):
91-
id = bt_deserialize(m.data()[0])
92-
app.logger.warning(f"FIXME: mule -- message posted stub, id={id}")
102+
notify.message(*m.data())
93103

94104

95105
@log_exceptions
96106
def messages_deleted(m: oxenmq.Message):
97-
ids = bt_deserialize(m.data()[0])
98-
app.logger.warning(f"FIXME: mule -- message delete stub, deleted messages: {ids}")
107+
notify.deleted(*m.data())
108+
109+
110+
@log_exceptions
111+
def user_banned(m: oxenmq.Message):
112+
notify.banned(*m.data())
113+
114+
115+
@log_exceptions
116+
def user_unbanned(m: oxenmq.Message):
117+
notify.unbannd(*m.data())
118+
119+
120+
@log_exceptions
121+
def user_joined(m: oxenmq.Message):
122+
notify.joined(*m.data())
123+
124+
125+
@log_exceptions
126+
def file_uploaded(m: oxenmq.Message):
127+
notify.uploaded(*m.data())
99128

100129

101130
@log_exceptions

0 commit comments

Comments
 (0)