File tree Expand file tree Collapse file tree 5 files changed +21
-11
lines changed
rabbitmq_amqp_python_client Expand file tree Collapse file tree 5 files changed +21
-11
lines changed Original file line number Diff line number Diff line change @@ -9,5 +9,8 @@ From the RabbitMQ UI you can break a connection to see the automatic reconnectio
99
1010Same for Consumers.
1111
12- In case of streams we connection will restart consuming from the last consumed offset.
12+ In case of streams the connection will restart consuming from the last consumed offset.
13+
14+ You can control some reconnection parameters with the RecoveryConfiguration dataclass, where you can specify
15+ the backoff interval and the maximum_retries before the client gives up.
1316
Original file line number Diff line number Diff line change @@ -278,6 +278,7 @@ def consumer(
278278
279279 def _on_disconnection (self ) -> None :
280280
281+ logger .debug ("_on_disconnection: disconnection detected" )
281282 if self in self ._connections :
282283 self ._connections .remove (self )
283284
@@ -286,6 +287,7 @@ def _on_disconnection(self) -> None:
286287
287288 for attempt in range (self ._recovery_configuration .MaxReconnectAttempts ): # type: ignore
288289
290+ logger .debug ("attempting a reconnection" )
289291 jitter = timedelta (milliseconds = random .randint (0 , 500 ))
290292 delay = base_delay + jitter
291293
@@ -318,16 +320,24 @@ def _on_disconnection(self) -> None:
318320
319321 except ConnectionException as e :
320322 base_delay *= 2
321- logger .error (
323+ logger .debug (
322324 "Reconnection attempt failed" ,
323325 "attempt" ,
324326 attempt ,
325327 "error" ,
326328 str (e ),
327329 )
328- continue
330+ # maximum attempts reached without establishing a connection
331+ if attempt == self ._recovery_configuration .MaxReconnectAttempts - 1 : # type: ignore
332+ logger .debug ("Not able to reconnect" )
333+ raise ConnectionException
334+ else :
335+ continue
329336
330- break
337+ # connection established
338+ else :
339+ logger .debug ("reconnected successful" )
340+ return
331341
332342 @property
333343 def active_producers (self ) -> int :
Original file line number Diff line number Diff line change @@ -231,5 +231,5 @@ class RecoveryConfiguration:
231231 """
232232
233233 active_recovery : bool = True
234- back_off_reconnect_interval : timedelta = timedelta (0. 5 )
234+ back_off_reconnect_interval : timedelta = timedelta (seconds = 5 )
235235 MaxReconnectAttempts : int = 5
Original file line number Diff line number Diff line change 3232
3333import proton
3434from proton import Sender as ProtonSender
35- from proton .handlers import \
36- IncomingMessageHandler as ProtonIncomingMessageHandler
37- from proton .handlers import \
38- OutgoingMessageHandler as ProtonOutgoingMessageHandler
35+ from proton .handlers import IncomingMessageHandler as ProtonIncomingMessageHandler
36+ from proton .handlers import OutgoingMessageHandler as ProtonOutgoingMessageHandler
3937
4038_tracer = None
4139_trace_key = proton .symbol ("x-opt-qpid-tracestate" )
Original file line number Diff line number Diff line change 139139
140140if TYPE_CHECKING :
141141 from ._condition import Condition
142- from ._endpoints import \
143- Connection # would produce circular import
142+ from ._endpoints import Connection # would produce circular import
144143
145144
146145class TraceAdapter :
You can’t perform that action at this time.
0 commit comments