@@ -71,7 +71,7 @@ def __init__(
71
71
def setup_stream ():
72
72
assert self .socket is not None
73
73
self .stream = zmqstream .ZMQStream (self .socket , self .ioloop )
74
- self .stream .on_recv (self ._handle_recv ) # type:ignore[arg-type]
74
+ self .stream .on_recv (self ._handle_recv )
75
75
evt .set ()
76
76
77
77
assert self .ioloop is not None
@@ -115,21 +115,24 @@ def thread_send():
115
115
assert self .ioloop is not None
116
116
self .ioloop .add_callback (thread_send )
117
117
118
- def _handle_recv (self , future_msg : Awaitable ) -> None :
118
+ def _handle_recv (self , msg : Union [ List [ bytes ], Awaitable ] ) -> None :
119
119
"""Callback for stream.on_recv.
120
120
121
121
Unpacks message, and calls handlers with it.
122
122
"""
123
- assert self .ioloop is not None
124
- loop = self .ioloop ._asyncio_event_loop # type:ignore[attr-defined]
125
- msg_list = loop .run_until_complete (get_msg (future_msg ))
123
+ if asyncio .isfuture (msg ):
124
+ assert self .ioloop is not None
125
+ loop = self .ioloop ._asyncio_event_loop # type:ignore[attr-defined]
126
+ msg_list = loop .run_until_complete (get_msg (msg ))
127
+ else :
128
+ msg_list = msg
126
129
assert self .session is not None
127
130
ident , smsg = self .session .feed_identities (msg_list )
128
- msg = self .session .deserialize (smsg )
131
+ new_msg = self .session .deserialize (smsg )
129
132
# let client inspect messages
130
133
if self ._inspect :
131
- self ._inspect (msg )
132
- self .call_handlers (msg )
134
+ self ._inspect (new_msg )
135
+ self .call_handlers (new_msg )
133
136
134
137
def call_handlers (self , msg : Dict [str , Any ]) -> None :
135
138
"""This method is called in the ioloop thread when a message arrives.
0 commit comments