Skip to content

Commit 77591d8

Browse files
committed
Add oxenmq as a uwsgi mule
This adds an oxenmq as a mule that listens on :22028 by default (using curve encryption with the same x25519 key as the server). This oxenmq now handles db cleanup as well (instead of scheduling it through a regular front-end worker). You can kick the mule by sending it signals through the `signal.send_signal` function so that it can be triggered when things happen from model; a couple stubs are included here for message creation and message deletion. (NB: requires pyoxenmq built against the last oxenmq (1.2.9), such as the updated python3-oxenmq 1.0.0-3 package, or installed with pip3 with an updated liboxenmq-dev installed).
1 parent 4541bf8 commit 77591d8

File tree

8 files changed

+151
-34
lines changed

8 files changed

+151
-34
lines changed

contrib/uwsgi-vassal-sogs.ini

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,8 @@ processes = 4
77
enable-threads = true
88
manage-script-name = true
99
mount = /=sogs.web:app
10+
mule = sogs.mule:run
1011

11-
logger = file:logfile=/home/USER/session-pysogs/sogs.log
12+
# By default logging goes to stderr (which is good; systemd redirects that to system logs), but for
13+
# debugging it can be useful to send it to a file:
14+
#logger = file:logfile=/home/USER/session-pysogs/sogs.log

sogs/base_config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616
# a free Let's Encrypt certificate.
1717
URL_BASE = 'http://example.net'
1818

19+
# Address (IP+port) where oxenmq listens for incoming connections. Can also be specified as a list
20+
# to listen on multiple addresses. Note that the connection is encrypted, using the same keys as
21+
# the main server. Clients must use the key corresponding to their session_id (i.e. pubkey should
22+
# match, without the leading 05 byte).
23+
OMQ_LISTEN = 'tcp://*:22028'
24+
1925
# The log level.
2026
LOG_LEVEL = logging.WARNING
2127

sogs/cleanup.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@
22
import os
33
import time
44

5-
from .timer import timer
65
from .web import app
76
from . import db
87
from . import config
8+
from .signal import Signal
99

10+
# Cleanup interval, in seconds.
11+
INTERVAL = 10
1012

11-
@timer(10)
12-
def test_timer(signal):
13+
14+
def cleanup():
1315
with app.app_context():
1416
app.logger.debug("Pruning expired items")
1517
files = prune_files()

sogs/legacy_routes.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from . import config
99
from . import http
1010
from . import filtration
11+
from .signal import send_signal, Signal
1112

1213
import os
1314
import time
@@ -402,6 +403,8 @@ def handle_legacy_delete_messages(ids=None):
402403
[room.id, *ids],
403404
)
404405

406+
send_signal(Signal.MESSAGE_DELETED)
407+
405408
return jsonify({'status_code': 200})
406409

407410

sogs/model.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from . import config
44
from . import db
55
from . import utils
6+
from .signal import send_signal, Signal
67

78
import time
89

@@ -530,7 +531,10 @@ def add_post_to_room(user_id, room_id, data, sig, rate_limit_size=5, rate_limit_
530531
result = conn.execute("SELECT posted, id FROM messages WHERE id = ?", [lastid])
531532
row = result.fetchone()
532533
msg = {'timestamp': utils.convert_time(row['posted']), 'server_id': row['id']}
533-
return msg
534+
535+
send_signal(Signal.MESSAGE_POSTED)
536+
537+
return msg
534538

535539

536540
def get_deletions_deprecated(room_id, since):

sogs/mule.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import uwsgi
2+
import traceback
3+
import oxenmq
4+
from datetime import timedelta
5+
6+
from .web import app
7+
from .signal import Signal
8+
from . import cleanup
9+
from . import config
10+
from . import crypto
11+
12+
# This is the uwsgi "mule" that handles things not related to serving HTTP requests:
13+
# - it holds the oxenmq instance (with its own interface into sogs)
14+
# - it handles cleanup jobs (e.g. periodic deletions)
15+
16+
omq = None
17+
18+
19+
def run():
20+
if uwsgi.mule_id() != 1:
21+
app.logger.critical(
22+
"Error: sogs.mule must be the first uwsgi mule (mule1), not mule{}".format(
23+
uwsgi.mule_id()
24+
)
25+
)
26+
raise RuntimeError("Invalid uwsgi configuration")
27+
28+
try:
29+
setup_omq()
30+
31+
run_loop()
32+
except:
33+
app.logger.error("mule died via exception:\n{}".format(traceback.format_exc()))
34+
35+
36+
def allow_conn(addr, pk, sn):
37+
# TODO: user recognition auth
38+
return oxenmq.AuthLevel.basic
39+
40+
41+
def setup_omq():
42+
global omq
43+
omq = oxenmq.OxenMQ(
44+
privkey=crypto._privkey.encode(),
45+
pubkey=crypto.server_pubkey.encode(),
46+
log_level=oxenmq.LogLevel.fatal,
47+
)
48+
if isinstance(config.OMQ_LISTEN, list):
49+
listen = config.OMQ_LISTEN
50+
elif config.OMQ_LISTEN is None:
51+
listen = []
52+
else:
53+
listen = [config.OMQ_LISTEN]
54+
for addr in listen:
55+
omq.listen(addr, curve=True, allow_connection=allow_conn)
56+
app.logger.info(f"OxenMQ listening on {addr}")
57+
omq.add_timer(cleanup.cleanup, timedelta(seconds=cleanup.INTERVAL))
58+
omq.start()
59+
60+
61+
def run_loop():
62+
app.logger.info("mule started!")
63+
64+
callbacks = {Signal.MESSAGE_POSTED: message_posted, Signal.MESSAGE_DELETED: message_deleted}
65+
66+
app.logger.info("mule started 2!")
67+
while True:
68+
app.logger.info("mule started looping")
69+
msg = uwsgi.mule_get_msg()
70+
app.logger.info("mule ax {}".format(msg))
71+
try:
72+
sig = Signal(int(msg.decode()))
73+
except ValueError:
74+
app.logger.error(f"mule received unregistered uwsgi mule message {msg}")
75+
continue
76+
77+
if sig in callbacks:
78+
try:
79+
callbacks[sig]()
80+
except Exception as e:
81+
app.logger.error(
82+
f"An exception occured while mule was processing uwsgi signal {sig}:\n{e}"
83+
)
84+
else:
85+
app.logger.error(f"mule received uwsgi signal {sig} but that signal has no handler!")
86+
app.logger.info("mule done")
87+
88+
89+
def message_posted():
90+
app.logger.warning("FIXME: mule -- message posted stub")
91+
92+
93+
def message_deleted():
94+
app.logger.warning("FIXME: mule -- message delete stub")

sogs/signal.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from enum import Enum
2+
import logging
3+
4+
5+
class Signal(Enum):
6+
"""
7+
UWSGI control signals we use to control the mule process. These are used to wake up the mule to
8+
respond to updates in the database that may need further action, for example to notify connected
9+
oxenmq clients of an event.
10+
"""
11+
12+
# Signals for the mule. The values don't matter, as long as they are unique integers
13+
14+
MESSAGE_POSTED = 1 # A new message has been inserted in the database
15+
MESSAGE_DELETED = 2 # A message has been deleted from the database
16+
MESSAGE_EDITED = 3 # A message has been edited
17+
18+
19+
try:
20+
import uwsgi
21+
22+
_send_msg = uwsgi.mule_msg
23+
except ModuleNotFoundError:
24+
logging.error("uwsgi not available; signals disabled")
25+
_send_msg = None
26+
27+
28+
def send_signal(sig: Signal):
29+
"""
30+
Sends the given signal to the mule. If uwsgi is not available then this does nothing.
31+
"""
32+
33+
if _send_msg is not None:
34+
_send_msg("{}".format(sig.value).encode(), 1)

sogs/timer.py

Lines changed: 0 additions & 29 deletions
This file was deleted.

0 commit comments

Comments
 (0)