Skip to content

Commit fafdf23

Browse files
authored
Log event queue exception after max attempts (#1263)
* Log event queue exception after max attempts * address comments
1 parent 3a45938 commit fafdf23

File tree

1 file changed

+17
-13
lines changed

1 file changed

+17
-13
lines changed

axlearn/cloud/common/event_queue.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,6 @@ def publish(self, event: Event):
203203
try:
204204
# Ensure connection is established before publishing.
205205
if not self._channel or not self._connection:
206-
logging.error("RabbitMQ publisher channel is closed, reconnecting...")
207206
self.connect()
208207

209208
# Setting durable=True ensures that the queue will survive.
@@ -230,25 +229,30 @@ def publish(self, event: Event):
230229
# Only retry on recoverable exceptions.
231230
# AMQPConnectionError is assumed to be related to network issues,
232231
# or temporary unavailable host.
233-
logging.error(
234-
"Failed to publish event: %s. Error: %s. Attempt: %d",
235-
message,
236-
str(e),
237-
attempt,
238-
)
239232
self._handle_publish_error()
240233
attempt += 1
241-
if attempt <= self._num_tries:
234+
if attempt < self._num_tries:
242235
time.sleep(2**attempt)
236+
else:
237+
logging.error(
238+
"Failed to publish event: %s after %d attempts. Error: %s.",
239+
message,
240+
attempt,
241+
str(e),
242+
)
243243
except Exception as e: # pylint: disable=broad-except
244-
# Unknown errors. Don't retry. Log to avoid crashing clients.
245-
logging.error(
246-
"Unknown error. Failed to publish event: %s. Error: %s.", message, str(e)
247-
)
248244
self._handle_publish_error()
249245
attempt += 1
250-
if attempt <= self._num_tries:
246+
if attempt < self._num_tries:
251247
time.sleep(2**attempt)
248+
else:
249+
# Unknown errors. Don't retry. Log to avoid crashing clients.
250+
logging.error(
251+
"Unknown error. Failed to publish event: %s after %d attempts. Error: %s.",
252+
message,
253+
attempt,
254+
str(e),
255+
)
252256

253257
def _handle_publish_error(self):
254258
"""Handle publish errors with retrying on connection issue."""

0 commit comments

Comments
 (0)