|
17 | 17 | ValidationError,
|
18 | 18 | )
|
19 | 19 |
|
| 20 | +from p2p.exceptions import PeerConnectionLost |
20 | 21 | from p2p.peer import BasePeer, PeerSubscriber
|
21 | 22 | from p2p.protocol import (
|
22 | 23 | BaseRequest,
|
@@ -134,7 +135,7 @@ async def _run(self) -> None:
|
134 | 135 | async def _handle_msg(self, msg: TResponsePayload) -> None:
|
135 | 136 | if self.pending_request is None:
|
136 | 137 | self.logger.debug(
|
137 |
| - "Got unexpected %s payload from %", self.response_msg_name, self._peer |
| 138 | + "Got unexpected %s payload from %s", self.response_msg_name, self._peer |
138 | 139 | )
|
139 | 140 | return
|
140 | 141 |
|
@@ -179,15 +180,24 @@ def _request(self, request: BaseRequest[TRequestPayload]) -> None:
|
179 | 180 | def _is_pending(self) -> bool:
|
180 | 181 | return self.pending_request is not None
|
181 | 182 |
|
| 183 | + async def _cleanup(self) -> None: |
| 184 | + if self.pending_request is not None: |
| 185 | + self.logger.debug("Stream shutting down, raising an exception on the pending request") |
| 186 | + _, future = self.pending_request |
| 187 | + future.set_exception(PeerConnectionLost("Pending request can't complete: peer is gone")) |
| 188 | + |
182 | 189 | def deregister_peer(self, peer: BasePeer) -> None:
|
183 | 190 | if self.pending_request is not None:
|
184 |
| - self.logger.debug("Peer disconnected, trigger a timeout on the pending request") |
| 191 | + self.logger.debug("Peer disconnected, raising an exception on the pending request") |
185 | 192 | _, future = self.pending_request
|
186 |
| - future.set_exception(TimeoutError("Peer disconnected, simulating inevitable timeout")) |
| 193 | + future.set_exception(PeerConnectionLost("Pending request can't complete: peer is gone")) |
187 | 194 |
|
188 | 195 | def get_stats(self) -> Tuple[str, str]:
|
189 | 196 | return (self.response_msg_name, self.response_times.get_stats())
|
190 | 197 |
|
| 198 | + def __repr__(self) -> str: |
| 199 | + return f'<ResponseCandidateStream({self._peer!s}, {self.response_msg_type!r})>' |
| 200 | + |
191 | 201 |
|
192 | 202 | class ExchangeManager(Generic[TRequestPayload, TResponsePayload, TResult]):
|
193 | 203 | _response_stream: ResponseCandidateStream[TRequestPayload, TResponsePayload] = None
|
|
0 commit comments