Skip to content

Commit 08e4cce

Browse files
authored
Merge pull request #17 from jagerman/oxenmq-mule
Add oxenmq as a uwsgi mule
2 parents 4541bf8 + ea5e708 commit 08e4cce

File tree

9 files changed

+195
-35
lines changed

9 files changed

+195
-35
lines changed

contrib/uwsgi-vassal-sogs.ini

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,8 @@ processes = 4
77
enable-threads = true
88
manage-script-name = true
99
mount = /=sogs.web:app
10+
mule = sogs.mule:run
1011

11-
logger = file:logfile=/home/USER/session-pysogs/sogs.log
12+
# By default logging goes to stderr (which is good; systemd redirects that to system logs), but for
13+
# debugging it can be useful to send it to a file:
14+
#logger = file:logfile=/home/USER/session-pysogs/sogs.log

sogs/base_config.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,16 @@
1616
# a free Let's Encrypt certificate.
1717
URL_BASE = 'http://example.net'
1818

19+
# Address (IP+port) where oxenmq listens for incoming connections. Can also be specified as a list
20+
# to listen on multiple addresses. Note that the connection is encrypted, using the same keys as
21+
# the main server. Clients must use the key corresponding to their session_id (i.e. pubkey should
22+
# match, without the leading 05 byte).
23+
OMQ_LISTEN = 'tcp://*:22028'
24+
25+
# Address (usually a unix socket) where the main oxenmq process listens for internal connections
26+
# from other uwsgi workers (to deliver event notifications). This must not be publicly accessible!
27+
OMQ_INTERNAL = 'ipc://./omq.sock'
28+
1929
# The log level.
2030
LOG_LEVEL = logging.WARNING
2131

sogs/cleanup.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22
import os
33
import time
44

5-
from .timer import timer
65
from .web import app
76
from . import db
87
from . import config
98

9+
# Cleanup interval, in seconds.
10+
INTERVAL = 10
1011

11-
@timer(10)
12-
def test_timer(signal):
12+
13+
def cleanup():
1314
with app.app_context():
1415
app.logger.debug("Pruning expired items")
1516
files = prune_files()

sogs/filtration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44

55

66
def should_drop_message_with_body(body):
7-
body = message_body(body)
87
"""return true if we should drop a message given its body"""
98
if os.path.exists(config.BAD_WORDS_FILE):
9+
body = message_body(body)
1010
with open(config.BAD_WORDS_FILE, 'r') as f:
1111
for line in f:
1212
word = line.rstrip()

sogs/legacy_routes.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from . import config
99
from . import http
1010
from . import filtration
11+
from .omq import send_mule
1112

1213
import os
1314
import time
@@ -402,6 +403,8 @@ def handle_legacy_delete_messages(ids=None):
402403
[room.id, *ids],
403404
)
404405

406+
send_mule("messages_deleted", ids)
407+
405408
return jsonify({'status_code': 200})
406409

407410

sogs/model.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from . import config
44
from . import db
55
from . import utils
6+
from .omq import send_mule
67

78
import time
89

@@ -530,7 +531,10 @@ def add_post_to_room(user_id, room_id, data, sig, rate_limit_size=5, rate_limit_
530531
result = conn.execute("SELECT posted, id FROM messages WHERE id = ?", [lastid])
531532
row = result.fetchone()
532533
msg = {'timestamp': utils.convert_time(row['posted']), 'server_id': row['id']}
533-
return msg
534+
535+
send_mule("message_posted", msg["server_id"])
536+
537+
return msg
534538

535539

536540
def get_deletions_deprecated(room_id, since):

sogs/mule.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import uwsgi
2+
import traceback
3+
import oxenmq
4+
from oxenc import bt_deserialize
5+
import time
6+
from datetime import timedelta
7+
import functools
8+
9+
from .web import app
10+
from . import cleanup
11+
from . import config
12+
from . import crypto
13+
from .omq import omq
14+
15+
# This is the uwsgi "mule" that handles things not related to serving HTTP requests:
16+
# - it holds the oxenmq instance (with its own interface into sogs)
17+
# - it handles cleanup jobs (e.g. periodic deletions)
18+
19+
20+
def run():
21+
try:
22+
app.logger.info("OxenMQ mule started.")
23+
24+
while True:
25+
time.sleep(1)
26+
27+
except Exception:
28+
app.logger.error("mule died via exception:\n{}".format(traceback.format_exc()))
29+
30+
31+
def allow_conn(addr, pk, sn):
32+
# TODO: user recognition auth
33+
return oxenmq.AuthLevel.basic
34+
35+
36+
def admin_conn(addr, pk, sn):
37+
return oxenmq.AuthLevel.admin
38+
39+
40+
def inproc_fail(connid, reason):
41+
raise RuntimeError(f"Couldn't connect mule to itself: {reason}")
42+
43+
44+
def setup_omq():
45+
global omq, mule_conn
46+
47+
app.logger.debug("Mule setting up omq")
48+
if isinstance(config.OMQ_LISTEN, list):
49+
listen = config.OMQ_LISTEN
50+
elif config.OMQ_LISTEN is None:
51+
listen = []
52+
else:
53+
listen = [config.OMQ_LISTEN]
54+
for addr in listen:
55+
omq.listen(addr, curve=True, allow_connection=allow_conn)
56+
app.logger.info(f"OxenMQ listening on {addr}")
57+
58+
# Internal socket for workers to talk to us:
59+
omq.listen(config.OMQ_INTERNAL, curve=False, allow_connection=admin_conn)
60+
61+
# Periodic database cleanup timer:
62+
omq.add_timer(cleanup.cleanup, timedelta(seconds=cleanup.INTERVAL))
63+
64+
# Commands other workers can send to us, e.g. for notifications of activity for us to know about
65+
worker = omq.add_category("worker", access_level=oxenmq.AuthLevel.admin)
66+
worker.add_command("message_posted", message_posted)
67+
worker.add_command("messages_deleted", messages_deleted)
68+
worker.add_command("message_edited", message_edited)
69+
70+
app.logger.debug("Mule starting omq")
71+
omq.start()
72+
73+
# Connect mule to itself so that if something the mule does wants to send something to the mule
74+
# it will work. (And so be careful not to recurse!)
75+
app.logger.debug("Mule connecting to self")
76+
mule_conn = omq.connect_inproc(on_success=None, on_failure=inproc_fail)
77+
78+
79+
def log_exceptions(f):
80+
@functools.wraps(f)
81+
def wrapper(*args, **kwargs):
82+
try:
83+
return f(*args, **kwargs)
84+
except Exception as e:
85+
app.logger.error(f"{f.__name__} raised exception: {e}")
86+
raise
87+
88+
return wrapper
89+
90+
91+
@log_exceptions
92+
def message_posted(m: oxenmq.Message):
93+
id = bt_deserialize(m.data()[0])
94+
app.logger.warning(f"FIXME: mule -- message posted stub, id={id}")
95+
96+
97+
@log_exceptions
98+
def messages_deleted(m: oxenmq.Message):
99+
ids = bt_deserialize(m.data()[0])
100+
app.logger.warning(f"FIXME: mule -- message delete stub, deleted messages: {ids}")
101+
102+
103+
@log_exceptions
104+
def message_edited(m: oxenmq.Message):
105+
app.logger.warning("FIXME: mule -- message edited stub")

sogs/omq.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Common oxenmq object; this is used by workers and the oxenmq mule. We create, but do not start,
2+
# this pre-forking.
3+
4+
import oxenmq
5+
from oxenc import bt_serialize
6+
7+
from . import crypto, config
8+
from .postfork import postfork
9+
10+
omq = None
11+
mule_conn = None
12+
13+
14+
def make_omq():
15+
omq = oxenmq.OxenMQ(privkey=crypto._privkey.encode(), pubkey=crypto.server_pubkey.encode())
16+
17+
# We have multiple workers talking to the mule, so we *must* use ephemeral ids to not replace each
18+
# others' connections.
19+
omq.ephemeral_routing_id = True
20+
21+
return omq
22+
23+
24+
# Postfork for workers: we start oxenmq and connect to the mule process
25+
@postfork
26+
def start_oxenmq():
27+
try:
28+
import uwsgi
29+
except ModuleNotFoundError:
30+
return
31+
32+
global omq, mule_conn
33+
34+
omq = make_omq()
35+
36+
if uwsgi.mule_id() != 0:
37+
from . import mule
38+
39+
mule.setup_omq()
40+
return
41+
42+
from .web import app # Imported here to avoid circular import
43+
44+
app.logger.debug(f"Starting oxenmq connection to mule in worker {uwsgi.worker_id()}")
45+
46+
omq.start()
47+
app.logger.debug(f"Started, connecting to mule")
48+
mule_conn = omq.connect_remote(oxenmq.Address(config.OMQ_INTERNAL))
49+
50+
app.logger.debug(f"worker {uwsgi.worker_id()} connected to mule OMQ")
51+
52+
53+
def send_mule(command, *args, prefix="worker."):
54+
"""
55+
Sends a command to the mule from a worker (or possibly from the mule itself). The command will
56+
be prefixed with "worker." (unless overridden).
57+
58+
Any args will be bt-serialized and send as message parts.
59+
"""
60+
if prefix:
61+
command = prefix + command
62+
63+
omq.send(mule_conn, command, *(bt_serialize(data) for data in args))

sogs/timer.py

Lines changed: 0 additions & 29 deletions
This file was deleted.

0 commit comments

Comments
 (0)