diff --git a/pulsar/client/amqp_exchange.py b/pulsar/client/amqp_exchange.py index 1921131c..9ff29fa7 100644 --- a/pulsar/client/amqp_exchange.py +++ b/pulsar/client/amqp_exchange.py @@ -233,15 +233,20 @@ def publish(self, name, payload): with pools.producers[connection].acquire() as producer: log.debug("%sHave producer for publishing to key %s", publish_log_prefix, key) publish_kwds = self.__prepare_publish_kwds(publish_log_prefix) - producer.publish( - payload, - serializer='json', - exchange=self.__exchange, - declare=[self.__exchange], - routing_key=key, - **publish_kwds - ) - log.debug("%sPublished to key %s", publish_log_prefix, key) + try: + producer.publish( + payload, + serializer='json', + exchange=self.__exchange, + declare=[self.__exchange], + routing_key=key, + **publish_kwds + ) + log.debug("%sPublished to key %s", publish_log_prefix, key) + return True + except Exception as e: + log.error("%sFailed to publish to key %s: %s", publish_log_prefix, key, str(e)) + self.__fail_publish(name, payload, e) def ack_manager(self): log.debug('Acknowledgement manager thread alive') @@ -261,15 +266,37 @@ def ack_manager(self): 'republishing original message on queue %s', unack_uuid, resubmit_queue) try: - self.publish(resubmit_queue, payload) - self.publish_uuid_store.set_time(unack_uuid) + if self.publish(resubmit_queue, payload): + self.publish_uuid_store.set_time(unack_uuid) + else: + # If we fail to publish, we need to remove the uuid from the store + # so it doesn't get republished again. + self.__discard_publish_uuid(unack_uuid, failed) except self.recoverable_exceptions as e: self.__handle_io_error(e) continue except Exception: - log.exception("Problem with acknowledgement manager, leaving ack_manager method in problematic state!") + log.exception("Problem with acknowledgement manager, leaving ack manager in problematic state!") raise - log.debug('Acknowledgement manager thread exiting') + + def __fail_publish(self, name, payload, exception): + # Send just a few safe keys if we have them: + keys_to_send = [ + "job_id", + "returncode", + "stdout", + "stderr", + "job_stdout", + "job_stderr", + ] + new_payload = {} + for key in keys_to_send: + if key in payload: + new_payload[key] = payload[key] + # Add the original payload to the new payload + new_payload["exception"] = str(exception) + new_payload["status"] = "failed" + self.publish(name, new_payload) def __get_payload(self, uuid, failed): """Retry reading a message from the publish_uuid_store once, delete on the second failure."""