Skip to content

Commit cbaaece

Browse files
committed
squash this most likely
1 parent 2cd38c7 commit cbaaece

File tree

6 files changed

+110
-68
lines changed

6 files changed

+110
-68
lines changed

contrib/uwsgi-sogs-standalone.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ processes = 2
2727
enable-threads = true
2828
http = :80
2929
mount = /=sogs.web:app
30-
mule = @(call://sogs.mule.Mule.Mule)
30+
mule = @(call://sogs.mule.Mule)
3131
log-4xx = true
3232
log-5xx = true
3333
disable-logging = true

sogs/model/bothandler.py renamed to sogs/model/manager.py

Lines changed: 67 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@
77
from .. import utils
88
from ..omq import send_mule
99
from .user import User
10+
from .bot import Bot
1011
from .room import Room
1112
from .message import Message
1213
from .filter import SimpleFilter
1314
from .exc import InvalidData
15+
import heapq
1416

15-
from typing import Optional, List, Union
17+
from dataclasses import dataclass, field
18+
from typing import Optional, List, Union, Any
1619
import time
1720

1821
"""
@@ -23,10 +26,8 @@
2326
- what does the bot do with the message they tried to send?
2427
- can store locally
2528
- user sends reply
26-
- bot inserts it into room
27-
"""
29+
- bot inserts it into room (?)
2830
29-
"""
3031
Control Flow:
3132
1) message comes in HTTP request
3233
2) unpacked/parsed/verified/permissions checked
@@ -38,12 +39,44 @@
3839
"""
3940

4041

41-
class BotHandler:
42+
@dataclass(order=True)
43+
class PriorityTuple(tuple):
44+
priority: int
45+
item: Any = field(compare=False)
46+
47+
48+
# Simple "priority queue" of bots implemented using a dict with heap
49+
# invariance maintained by qheap algorithm
50+
# TODO: when bots are designed basically, add methods for polling them
51+
# and receiving their judgments
52+
class BotQueue:
53+
def __init__(self) -> None:
54+
self.queue = {}
55+
56+
def _qsize(self) -> int:
57+
return len(self.queue.keys())
58+
59+
def _empty(self) -> bool:
60+
return not self._qsize()
61+
62+
def _peek(self, priority: int):
63+
return self.queue.get(priority)
64+
65+
def _put(self, item: PriorityTuple):
66+
temp = list(self.queue.items())
67+
heapq.heappush(temp, item)
68+
self.queue = dict(temp)
69+
70+
def _get(self):
71+
return heapq.heappop(self.queue)
72+
73+
74+
class Manager:
4275
"""
4376
Class representing an interface that manages active bots
4477
4578
Object Properties:
46-
bots - list of bots attached to room
79+
queue - BotQueue object
4780
"""
4881

4982
def __init__(
@@ -54,8 +87,27 @@ def __init__(
5487
id: Optional[int] = None,
5588
session_id: Optional[int] = None,
5689
) -> None:
57-
# immutable attributes
5890
self.id = id
91+
self.queue = BotQueue()
92+
93+
def qempty(self):
94+
return not self.queue._empty()
95+
96+
def add_bot(self, bot: Bot, priority: int = None):
97+
if not priority:
98+
# if no priority is given, lowest priority is assigned
99+
priority = self.qsize()
100+
else:
101+
# if priority is already taken, find next lowest
102+
while self.queue.get(priority):
103+
priority += 1
104+
self.queue._put(PriorityTuple(priority, bot))
105+
106+
def remove_bot(self):
107+
do_something = 3
108+
109+
def peek(self, priority: int):
110+
return self.queue._peek(priority)
59111

60112
def check_permission_for(
61113
self,
@@ -236,5 +288,13 @@ def receive_message(
236288
if whisper_to:
237289
msg['whisper_to'] = whisper_to.session_id
238290

291+
if room._bot_status():
292+
add_bot_logic = 3
293+
"""
294+
TODO: add logic for bots receiving message and doing
295+
bot things. The bots should be queried in terms of
296+
priority,
297+
"""
298+
239299
send_mule("message_posted", msg["id"])
240300
return msg

sogs/mule.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@
99
from . import cleanup
1010
from . import config
1111
from .omq import OMQ
12-
from .model.bothandler import BotHandler
12+
from .model.manager import Manager
1313

1414

1515
# This is the uwsgi "mule" that handles things not related to serving HTTP requests:
1616
# - it holds the oxenmq instance (with its own interface into sogs)
1717
# - it handles cleanup jobs (e.g. periodic deletions)
1818

19+
1920
def log_exceptions(f):
2021
@functools.wraps(f)
2122
def wrapper(*args, **kwargs):
@@ -27,8 +28,8 @@ def wrapper(*args, **kwargs):
2728

2829
return wrapper
2930

30-
class Mule:
3131

32+
class Mule:
3233
def __init__(self):
3334
self.run()
3435

@@ -57,18 +58,15 @@ def message_posted(self, m: oxenmq.Message):
5758
id = bt_deserialize(m.data()[0])
5859
app.logger.debug(f"FIXME: mule -- message posted stub, id={id}")
5960

60-
6161
@log_exceptions
6262
def messages_deleted(self, m: oxenmq.Message):
6363
ids = bt_deserialize(m.data()[0])
6464
app.logger.debug(f"FIXME: mule -- message delete stub, deleted messages: {ids}")
6565

66-
6766
@log_exceptions
6867
def message_edited(self, m: oxenmq.Message):
6968
app.logger.debug("FIXME: mule -- message edited stub")
7069

71-
7270
def setup_omq(self, omq: OMQ):
7371
app.logger.debug("Mule setting up omq")
7472
if isinstance(config.OMQ_LISTEN, list):
@@ -95,14 +93,14 @@ def setup_omq(self, omq: OMQ):
9593

9694
## NEW CODE FOR BOT
9795
handler = self._omq.add_category("handler", access_level=oxenmq.AuthLevel.admin)
98-
handler.add_command("add_bot", add_bot)
99-
handler.add_command("remove_bot", remove_bot)
100-
handler.add_command("send_to_handler", message_to_handler)
96+
handler.add_command("add_bot", omq.add_bot)
97+
handler.add_command("remove_bot", omq.remove_bot)
98+
handler.add_command("send_to_handler", omq.manager.receive_message)
10199

102100
app.logger.debug("Mule starting omq")
103101
self._omq.start()
104102

105103
# Connect mule to itself so that if something the mule does wants to send something to the mule
106104
# it will work. (And so be careful not to recurse!)
107105
app.logger.debug("Mule connecting to self")
108-
omq.mule_conn = omq.connect_inproc(on_success=None, on_failure=inproc_fail)
106+
omq.mule_conn = omq.connect_inproc(on_success=None, on_failure=self.inproc_fail)

sogs/omq.py

Lines changed: 21 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,30 +5,31 @@
55
from oxenc import bt_serialize
66

77
from . import crypto, config
8-
from .mule import Mule
98
from .postfork import postfork
10-
from .model.bothandler import BotHandler
9+
from .model.manager import Manager
1110

11+
omq_global = None
1212

1313
class OMQ:
14-
1514
@postfork
1615
def __init__(self):
1716
try:
1817
import uwsgi
1918
except ModuleNotFoundError:
2019
return
21-
22-
self._omq = oxenmq.OxenMQ(privkey=crypto._privkey.encode(), pubkey=crypto.server_pubkey.encode())
20+
21+
self._omq = oxenmq.OxenMQ(
22+
privkey=crypto._privkey.encode(), pubkey=crypto.server_pubkey.encode()
23+
)
2324
self._omq.ephemeral_routing_id = True
2425

25-
self.bot_manager = BotHandler()
26+
self.manager = Manager()
2627
self.test_suite = False
2728

2829
if uwsgi.mule_id() != 0:
2930
uwsgi.opt['mule'].setup_omq(self)
3031
return
31-
32+
3233
from .web import app # Imported here to avoid circular import
3334

3435
app.logger.debug(f"Starting oxenmq connection to mule in worker {uwsgi.worker_id()}")
@@ -39,6 +40,16 @@ def __init__(self):
3940

4041
app.logger.debug(f"worker {uwsgi.worker_id()} connected to mule OMQ")
4142

43+
global omq_global
44+
omq_global = self
45+
46+
def add_bot(self):
47+
self.manager.add_bot()
48+
# TODO: add omq logic
49+
50+
def remove_bot(self):
51+
self.manager.remove_bot()
52+
# TODO: add omq logic
4253

4354
def send_mule(self, command, *args, prefix="worker."):
4455
"""
@@ -47,42 +58,11 @@ def send_mule(self, command, *args, prefix="worker."):
4758
4859
Any args will be bt-serialized and send as message parts.
4960
"""
61+
5062
if prefix:
5163
command = prefix + command
5264

53-
if self.test_suite and omq is None:
65+
if self.test_suite and self._omq is None:
5466
pass # TODO: for mule call testing we may want to do something else here?
5567
else:
56-
omq.send(mule_conn, command, *(bt_serialize(data) for data in args))
57-
58-
59-
# Postfork for workers: we start oxenmq and connect to the mule process
60-
@postfork
61-
def start_oxenmq():
62-
try:
63-
import uwsgi
64-
except ModuleNotFoundError:
65-
return
66-
67-
68-
global omq, mule_conn, bot_manager
69-
70-
bot_manager = BotHandler()
71-
72-
omq = make_omq()
73-
74-
if uwsgi.mule_id() != 0:
75-
from . import mule
76-
77-
mule.setup_omq()
78-
return
79-
80-
from .web import app # Imported here to avoid circular import
81-
82-
app.logger.debug(f"Starting oxenmq connection to mule in worker {uwsgi.worker_id()}")
83-
84-
omq.start()
85-
app.logger.debug("Started, connecting to mule")
86-
mule_conn = omq.connect_remote(oxenmq.Address(config.OMQ_INTERNAL))
87-
88-
app.logger.debug(f"worker {uwsgi.worker_id()} connected to mule OMQ")
68+
self._omq.send(self.mule_conn, command, *(bt_serialize(data) for data in args))

sogs/routes/messages.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from .. import http, utils
22
from . import auth
33
from model.room import Room
4-
from ..omq import send_mule
4+
from ..omq import omq_global
55

66
from flask import abort, jsonify, g, Blueprint, request
77

@@ -361,15 +361,17 @@ def post_message(room: Room):
361361
"""
362362
req = request.json
363363

364-
send_mule(command="send_to_handler",
365-
user=g.user,
366-
room=room,
367-
data=utils.decode_base64(req.get('data')),
368-
sig=utils.decode_base64(req.get('signature')),
369-
whisper_to=req.get('whisper_to'),
370-
whisper_mods=bool(req.get('whisper_mods')),
371-
files=[int(x) for x in req.get('files', [])],
372-
prefix="handler.")
364+
msg = omq_global.send_mule(
365+
command="send_to_handler",
366+
user=g.user,
367+
room=room,
368+
data=utils.decode_base64(req.get('data')),
369+
sig=utils.decode_base64(req.get('signature')),
370+
whisper_to=req.get('whisper_to'),
371+
whisper_mods=bool(req.get('whisper_mods')),
372+
files=[int(x) for x in req.get('files', [])],
373+
prefix="handler.",
374+
)
373375

374376
return utils.jsonify_with_base64(msg), http.CREATED
375377

sogs/utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
from . import http
44

55
import base64
6+
7+
68
from flask import request, abort, Response
79
import json
810
from typing import Union, Tuple

0 commit comments

Comments
 (0)