4
4
import oxenmq , queue
5
5
from oxenc import bt_serialize , bt_deserialize
6
6
7
- from mule import log_exceptions
8
- from routes import omq_auth
7
+ from . mule import log_exceptions
8
+ from . routes import omq_auth
9
9
from . import crypto , config
10
10
from .postfork import postfork
11
11
from .model .clientmanager import ClientManager
@@ -36,7 +36,7 @@ def __init__(self):
36
36
if uwsgi .mule_id () != 0 :
37
37
uwsgi .opt ['mule' ].setup_omq (self )
38
38
return
39
-
39
+
40
40
uwsgi .register_signal (123 , 'internal' , self .handle_proxied_omq_req )
41
41
42
42
from .web import app # Imported here to avoid circular import
@@ -52,57 +52,49 @@ def __init__(self):
52
52
global omq_global
53
53
omq_global = self
54
54
55
-
56
55
@log_exceptions
57
56
def subreq_response (self ):
58
57
pass
59
58
60
-
61
59
@log_exceptions
62
60
def handle_proxied_omq_req (self ):
63
- id , subreq_body = self .send_mule (
64
- command = 'get_next_request' ,
65
- prefix = 'internal'
66
- )
61
+ id , subreq_body = self .send_mule (command = 'get_next_request' , prefix = 'internal' )
67
62
68
63
'''
69
64
70
65
Handle omq subrequest
71
66
72
67
'''
73
68
74
- return
69
+ return
75
70
76
71
@log_exceptions
77
72
def get_next_request (self ):
78
73
subreq_body = self .subreq_queue .get ()
79
74
id = list (subreq_body .keys ())[0 ]
80
75
return id , subreq_body [id ]
81
76
82
-
83
77
@log_exceptions
84
78
def register_client (self , msg : oxenmq .Message ):
85
79
cid , authlevel , bot , priority = bt_deserialize (msg .data ()[0 ])
86
80
conn_id = msg .conn ()
87
81
self .client_map [conn_id ] = cid
88
82
self .manager .register_client (msg )
89
83
90
-
91
84
@log_exceptions
92
85
def deregister_client (self , msg : oxenmq .Message ):
93
86
cid , bot = bt_deserialize (msg .data ()[0 ])
94
87
self .client_map .pop (cid )
95
88
self .manager .deregister_client (cid , bot )
96
89
97
-
98
90
def send_mule (self , command , * args , prefix = "worker." ):
99
91
"""
100
92
Sends a command to the mule from a worker (or possibly from the mule itself). The command will
101
93
be prefixed with "worker." (unless overridden).
102
94
103
95
Any args will be bt-serialized and send as message parts.
104
96
"""
105
-
97
+
106
98
if prefix :
107
99
command = prefix + command
108
100
0 commit comments