29
29
30
30
from .normalizers import BaseNormalizer
31
31
from .types import (
32
- TMsg ,
32
+ TResponsePayload ,
33
33
TResult ,
34
34
)
35
35
@@ -59,7 +59,10 @@ def add(self, time: float, size: int) -> None:
59
59
self .total_response_time += time
60
60
61
61
62
- class MessageManager (PeerSubscriber , BaseService , Generic [TRequestPayload , TMsg ]):
62
+ class ResponseCandidateStream (
63
+ PeerSubscriber ,
64
+ BaseService ,
65
+ Generic [TRequestPayload , TResponsePayload ]):
63
66
64
67
#
65
68
# PeerSubscriber
@@ -72,7 +75,7 @@ def subscription_msg_types(self) -> Set[Type[Command]]:
72
75
73
76
response_timout : int = 60
74
77
75
- pending_request : Tuple [float , 'asyncio.Future[TMsg ]' ] = None
78
+ pending_request : Tuple [float , 'asyncio.Future[TResponsePayload ]' ] = None
76
79
77
80
_peer : BasePeer
78
81
@@ -86,19 +89,19 @@ def __init__(
86
89
self .response_times = ResponseTimeTracker ()
87
90
self ._response_msg_type = response_msg_type
88
91
89
- async def message_candidates (
92
+ async def payload_candidates (
90
93
self ,
91
94
request : BaseRequest [TRequestPayload ],
92
- timeout : int ) -> 'AsyncGenerator[TMsg , None]' :
95
+ timeout : int ) -> 'AsyncGenerator[TResponsePayload , None]' :
93
96
"""
94
97
Make a request and iterate through candidates for a valid response.
95
98
96
- To mark a response as valid, use `complete_request`. After that call, message
99
+ To mark a response as valid, use `complete_request`. After that call, payload
97
100
candidates will stop arriving.
98
101
"""
99
102
self ._request (request )
100
103
while self ._is_pending ():
101
- yield await self ._get_message (timeout )
104
+ yield await self ._get_payload (timeout )
102
105
103
106
@property
104
107
def response_msg_name (self ) -> str :
@@ -121,35 +124,35 @@ async def _run(self) -> None:
121
124
self .logger .error ("Unexpected peer: %s expected: %s" , peer , self ._peer )
122
125
continue
123
126
elif isinstance (cmd , self ._response_msg_type ):
124
- await self ._handle_msg (cast (TMsg , msg ))
127
+ await self ._handle_msg (cast (TResponsePayload , msg ))
125
128
else :
126
- self .logger .warning ("Unexpected message type: %s" , cmd .__class__ .__name__ )
129
+ self .logger .warning ("Unexpected payload type: %s" , cmd .__class__ .__name__ )
127
130
128
- async def _handle_msg (self , msg : TMsg ) -> None :
131
+ async def _handle_msg (self , msg : TResponsePayload ) -> None :
129
132
if self .pending_request is None :
130
133
self .logger .debug (
131
- "Got unexpected %s message from %" , self .response_msg_name , self ._peer
134
+ "Got unexpected %s payload from %" , self .response_msg_name , self ._peer
132
135
)
133
136
return
134
137
135
138
send_time , future = self .pending_request
136
139
self .last_response_time = time .perf_counter () - send_time
137
140
future .set_result (msg )
138
141
139
- async def _get_message (self , timeout : int ) -> TMsg :
142
+ async def _get_payload (self , timeout : int ) -> TResponsePayload :
140
143
send_time , future = self .pending_request
141
144
try :
142
- message = await self .wait (future , timeout = timeout )
145
+ payload = await self .wait (future , timeout = timeout )
143
146
except TimeoutError :
144
147
self .response_times .total_timeouts += 1
145
148
raise
146
149
finally :
147
150
self .pending_request = None
148
151
149
- # message might be invalid, so prepare for another call to _get_message ()
152
+ # payload might be invalid, so prepare for another call to _get_payload ()
150
153
self .pending_request = (send_time , asyncio .Future ())
151
154
152
- return message
155
+ return payload
153
156
154
157
def _request (self , request : BaseRequest [TRequestPayload ]) -> None :
155
158
if self .pending_request is not None :
@@ -167,7 +170,7 @@ def _request(self, request: BaseRequest[TRequestPayload]) -> None:
167
170
168
171
self ._peer .sub_proto .send_request (request )
169
172
170
- future : 'asyncio.Future[TMsg ]' = asyncio .Future ()
173
+ future : 'asyncio.Future[TResponsePayload ]' = asyncio .Future ()
171
174
self .pending_request = (time .perf_counter (), future )
172
175
173
176
def _is_pending (self ) -> bool :
@@ -177,8 +180,8 @@ def get_stats(self) -> str:
177
180
return '%s: %s' % (self .response_msg_name , self .response_times .get_stats ())
178
181
179
182
180
- class ExchangeManager (Generic [TRequestPayload , TMsg , TResult ]):
181
- _message_manager : MessageManager [TRequestPayload , TMsg ] = None
183
+ class ExchangeManager (Generic [TRequestPayload , TResponsePayload , TResult ]):
184
+ _response_stream : ResponseCandidateStream [TRequestPayload , TResponsePayload ] = None
182
185
183
186
def __init__ (
184
187
self ,
@@ -188,13 +191,13 @@ def __init__(
188
191
self ._cancel_token = cancel_token
189
192
190
193
async def launch_service (self , listening_for : Type [Command ]) -> None :
191
- self ._message_manager = MessageManager (
194
+ self ._response_stream = ResponseCandidateStream (
192
195
self ._peer ,
193
196
listening_for ,
194
197
self ._cancel_token ,
195
198
)
196
- self ._peer .run_daemon (self ._message_manager )
197
- await self ._message_manager .events .started .wait ()
199
+ self ._peer .run_daemon (self ._response_stream )
200
+ await self ._response_stream .events .started .wait ()
198
201
199
202
@property
200
203
def is_running (self ) -> bool :
@@ -203,38 +206,37 @@ def is_running(self) -> bool:
203
206
async def get_result (
204
207
self ,
205
208
request : BaseRequest [TRequestPayload ],
206
- normalizer : BaseNormalizer [TMsg , TResult ],
209
+ normalizer : BaseNormalizer [TResponsePayload , TResult ],
207
210
validate_result : Callable [[TResult ], None ],
208
- message_validator : Callable [[TMsg ], None ] = None ,
211
+ payload_validator : Callable [[TResponsePayload ], None ],
209
212
timeout : int = None ) -> TResult :
210
213
211
214
if not self .is_running :
212
215
raise ValidationError ("You must call `launch_service` before initiating a peer request" )
213
216
214
- manager = self ._message_manager
217
+ stream = self ._response_stream
215
218
216
- async for message in manager . message_candidates (request , timeout ):
219
+ async for payload in stream . payload_candidates (request , timeout ):
217
220
try :
218
- if message_validator is not None :
219
- message_validator (message )
221
+ payload_validator (payload )
220
222
221
223
if normalizer .is_normalization_slow :
222
- result = await manager ._run_in_executor (normalizer .normalize_result , message )
224
+ result = await stream ._run_in_executor (normalizer .normalize_result , payload )
223
225
else :
224
- result = normalizer .normalize_result (message )
226
+ result = normalizer .normalize_result (payload )
225
227
226
228
validate_result (result )
227
229
except ValidationError as err :
228
230
self .service .logger .debug (
229
231
"Response validation failed for pending %s request from peer %s: %s" ,
230
- manager .response_msg_name ,
232
+ stream .response_msg_name ,
231
233
self ._peer ,
232
234
err ,
233
235
)
234
236
continue
235
237
else :
236
238
num_items = normalizer .get_num_results (result )
237
- manager .complete_request (num_items )
239
+ stream .complete_request (num_items )
238
240
return result
239
241
240
242
raise ValidationError ("Manager is not pending a response, but no valid response received" )
@@ -244,7 +246,7 @@ def service(self) -> BaseService:
244
246
"""
245
247
This service that needs to be running for calls to execute properly
246
248
"""
247
- return self ._message_manager
249
+ return self ._response_stream
248
250
249
251
def get_stats (self ) -> str :
250
- return self ._message_manager .get_stats ()
252
+ return self ._response_stream .get_stats ()
0 commit comments