Skip to content

Commit ea5e708

Browse files
committed
Replace uwsgi messaging with oxenmq messages
The mule starts the main OMQ instance, but each worker also has one on hand that is connected to the main one and can be used to send arbitrary data (via bencoding) to the mule.
1 parent 77591d8 commit ea5e708

File tree

8 files changed

+130
-87
lines changed

8 files changed

+130
-87
lines changed

sogs/base_config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
# match, without the leading 05 byte).
2323
OMQ_LISTEN = 'tcp://*:22028'
2424

25+
# Address (usually a unix socket) where the main oxenmq process listens for internal connections
26+
# from other uwsgi workers (to deliver event notifications). This must not be publicly accessible!
27+
OMQ_INTERNAL = 'ipc://./omq.sock'
28+
2529
# The log level.
2630
LOG_LEVEL = logging.WARNING
2731

sogs/cleanup.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from .web import app
66
from . import db
77
from . import config
8-
from .signal import Signal
98

109
# Cleanup interval, in seconds.
1110
INTERVAL = 10

sogs/filtration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44

55

66
def should_drop_message_with_body(body):
7-
body = message_body(body)
87
"""return true if we should drop a message given its body"""
98
if os.path.exists(config.BAD_WORDS_FILE):
9+
body = message_body(body)
1010
with open(config.BAD_WORDS_FILE, 'r') as f:
1111
for line in f:
1212
word = line.rstrip()

sogs/legacy_routes.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from . import config
99
from . import http
1010
from . import filtration
11-
from .signal import send_signal, Signal
11+
from .omq import send_mule
1212

1313
import os
1414
import time
@@ -403,7 +403,7 @@ def handle_legacy_delete_messages(ids=None):
403403
[room.id, *ids],
404404
)
405405

406-
send_signal(Signal.MESSAGE_DELETED)
406+
send_mule("messages_deleted", ids)
407407

408408
return jsonify({'status_code': 200})
409409

sogs/model.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from . import config
44
from . import db
55
from . import utils
6-
from .signal import send_signal, Signal
6+
from .omq import send_mule
77

88
import time
99

@@ -532,7 +532,7 @@ def add_post_to_room(user_id, room_id, data, sig, rate_limit_size=5, rate_limit_
532532
row = result.fetchone()
533533
msg = {'timestamp': utils.convert_time(row['posted']), 'server_id': row['id']}
534534

535-
send_signal(Signal.MESSAGE_POSTED)
535+
send_mule("message_posted", msg["server_id"])
536536

537537
return msg
538538

sogs/mule.py

Lines changed: 58 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,30 @@
11
import uwsgi
22
import traceback
33
import oxenmq
4+
from oxenc import bt_deserialize
5+
import time
46
from datetime import timedelta
7+
import functools
58

69
from .web import app
7-
from .signal import Signal
810
from . import cleanup
911
from . import config
1012
from . import crypto
13+
from .omq import omq
1114

1215
# This is the uwsgi "mule" that handles things not related to serving HTTP requests:
1316
# - it holds the oxenmq instance (with its own interface into sogs)
1417
# - it handles cleanup jobs (e.g. periodic deletions)
1518

16-
omq = None
17-
1819

1920
def run():
20-
if uwsgi.mule_id() != 1:
21-
app.logger.critical(
22-
"Error: sogs.mule must be the first uwsgi mule (mule1), not mule{}".format(
23-
uwsgi.mule_id()
24-
)
25-
)
26-
raise RuntimeError("Invalid uwsgi configuration")
27-
2821
try:
29-
setup_omq()
22+
app.logger.info("OxenMQ mule started.")
3023

31-
run_loop()
32-
except:
24+
while True:
25+
time.sleep(1)
26+
27+
except Exception:
3328
app.logger.error("mule died via exception:\n{}".format(traceback.format_exc()))
3429

3530

@@ -38,13 +33,18 @@ def allow_conn(addr, pk, sn):
3833
return oxenmq.AuthLevel.basic
3934

4035

36+
def admin_conn(addr, pk, sn):
37+
return oxenmq.AuthLevel.admin
38+
39+
40+
def inproc_fail(connid, reason):
41+
raise RuntimeError(f"Couldn't connect mule to itself: {reason}")
42+
43+
4144
def setup_omq():
42-
global omq
43-
omq = oxenmq.OxenMQ(
44-
privkey=crypto._privkey.encode(),
45-
pubkey=crypto.server_pubkey.encode(),
46-
log_level=oxenmq.LogLevel.fatal,
47-
)
45+
global omq, mule_conn
46+
47+
app.logger.debug("Mule setting up omq")
4848
if isinstance(config.OMQ_LISTEN, list):
4949
listen = config.OMQ_LISTEN
5050
elif config.OMQ_LISTEN is None:
@@ -54,41 +54,52 @@ def setup_omq():
5454
for addr in listen:
5555
omq.listen(addr, curve=True, allow_connection=allow_conn)
5656
app.logger.info(f"OxenMQ listening on {addr}")
57+
58+
# Internal socket for workers to talk to us:
59+
omq.listen(config.OMQ_INTERNAL, curve=False, allow_connection=admin_conn)
60+
61+
# Periodic database cleanup timer:
5762
omq.add_timer(cleanup.cleanup, timedelta(seconds=cleanup.INTERVAL))
58-
omq.start()
5963

64+
# Commands other workers can send to us, e.g. for notifications of activity for us to know about
65+
worker = omq.add_category("worker", access_level=oxenmq.AuthLevel.admin)
66+
worker.add_command("message_posted", message_posted)
67+
worker.add_command("messages_deleted", messages_deleted)
68+
worker.add_command("message_edited", message_edited)
6069

61-
def run_loop():
62-
app.logger.info("mule started!")
70+
app.logger.debug("Mule starting omq")
71+
omq.start()
6372

64-
callbacks = {Signal.MESSAGE_POSTED: message_posted, Signal.MESSAGE_DELETED: message_deleted}
73+
# Connect mule to itself so that if something the mule does wants to send something to the mule
74+
# it will work. (And so be careful not to recurse!)
75+
app.logger.debug("Mule connecting to self")
76+
mule_conn = omq.connect_inproc(on_success=None, on_failure=inproc_fail)
6577

66-
app.logger.info("mule started 2!")
67-
while True:
68-
app.logger.info("mule started looping")
69-
msg = uwsgi.mule_get_msg()
70-
app.logger.info("mule ax {}".format(msg))
78+
79+
def log_exceptions(f):
80+
@functools.wraps(f)
81+
def wrapper(*args, **kwargs):
7182
try:
72-
sig = Signal(int(msg.decode()))
73-
except ValueError:
74-
app.logger.error(f"mule received unregistered uwsgi mule message {msg}")
75-
continue
83+
return f(*args, **kwargs)
84+
except Exception as e:
85+
app.logger.error(f"{f.__name__} raised exception: {e}")
86+
raise
87+
88+
return wrapper
89+
7690

77-
if sig in callbacks:
78-
try:
79-
callbacks[sig]()
80-
except Exception as e:
81-
app.logger.error(
82-
f"An exception occured while mule was processing uwsgi signal {sig}:\n{e}"
83-
)
84-
else:
85-
app.logger.error(f"mule received uwsgi signal {sig} but that signal has no handler!")
86-
app.logger.info("mule done")
91+
@log_exceptions
92+
def message_posted(m: oxenmq.Message):
93+
id = bt_deserialize(m.data()[0])
94+
app.logger.warning(f"FIXME: mule -- message posted stub, id={id}")
8795

8896

89-
def message_posted():
90-
app.logger.warning("FIXME: mule -- message posted stub")
97+
@log_exceptions
98+
def messages_deleted(m: oxenmq.Message):
99+
ids = bt_deserialize(m.data()[0])
100+
app.logger.warning(f"FIXME: mule -- message delete stub, deleted messages: {ids}")
91101

92102

93-
def message_deleted():
94-
app.logger.warning("FIXME: mule -- message delete stub")
103+
@log_exceptions
104+
def message_edited(m: oxenmq.Message):
105+
app.logger.warning("FIXME: mule -- message edited stub")

sogs/omq.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Common oxenmq object; this is used by workers and the oxenmq mule. We create, but do not start,
2+
# this pre-forking.
3+
4+
import oxenmq
5+
from oxenc import bt_serialize
6+
7+
from . import crypto, config
8+
from .postfork import postfork
9+
10+
omq = None
11+
mule_conn = None
12+
13+
14+
def make_omq():
15+
omq = oxenmq.OxenMQ(privkey=crypto._privkey.encode(), pubkey=crypto.server_pubkey.encode())
16+
17+
# We have multiple workers talking to the mule, so we *must* use ephemeral ids to not replace each
18+
# others' connections.
19+
omq.ephemeral_routing_id = True
20+
21+
return omq
22+
23+
24+
# Postfork for workers: we start oxenmq and connect to the mule process
25+
@postfork
26+
def start_oxenmq():
27+
try:
28+
import uwsgi
29+
except ModuleNotFoundError:
30+
return
31+
32+
global omq, mule_conn
33+
34+
omq = make_omq()
35+
36+
if uwsgi.mule_id() != 0:
37+
from . import mule
38+
39+
mule.setup_omq()
40+
return
41+
42+
from .web import app # Imported here to avoid circular import
43+
44+
app.logger.debug(f"Starting oxenmq connection to mule in worker {uwsgi.worker_id()}")
45+
46+
omq.start()
47+
app.logger.debug(f"Started, connecting to mule")
48+
mule_conn = omq.connect_remote(oxenmq.Address(config.OMQ_INTERNAL))
49+
50+
app.logger.debug(f"worker {uwsgi.worker_id()} connected to mule OMQ")
51+
52+
53+
def send_mule(command, *args, prefix="worker."):
54+
"""
55+
Sends a command to the mule from a worker (or possibly from the mule itself). The command will
56+
be prefixed with "worker." (unless overridden).
57+
58+
Any args will be bt-serialized and send as message parts.
59+
"""
60+
if prefix:
61+
command = prefix + command
62+
63+
omq.send(mule_conn, command, *(bt_serialize(data) for data in args))

sogs/signal.py

Lines changed: 0 additions & 34 deletions
This file was deleted.

0 commit comments

Comments
 (0)