diff --git a/lithops/serverless/backends/k8s/k8s.py b/lithops/serverless/backends/k8s/k8s.py index ad421a05e..d0f726840 100644 --- a/lithops/serverless/backends/k8s/k8s.py +++ b/lithops/serverless/backends/k8s/k8s.py @@ -100,6 +100,7 @@ def __init__(self, k8s_config, internal_storage): params = pika.URLParameters(self.amqp_url) self.connection = pika.BlockingConnection(params) self.channel = self.connection.channel() + self.channel.queue_declare(queue='task_queue', durable=True) # Define some needed variables self._get_nodes() @@ -276,6 +277,9 @@ def clean(self, all=False): except ApiException: pass + if self.rabbitmq_executor: + self.channel.queue_delete(queue='task_queue') + def clear(self, job_keys=None): """ Delete only completed jobs diff --git a/lithops/serverless/backends/singularity/singularity.py b/lithops/serverless/backends/singularity/singularity.py index 4704da3c5..10c62882b 100644 --- a/lithops/serverless/backends/singularity/singularity.py +++ b/lithops/serverless/backends/singularity/singularity.py @@ -49,6 +49,7 @@ def __init__(self, singularity_config, internal_storage): params = pika.URLParameters(self.amqp_url) self.connection = pika.BlockingConnection(params) self.channel = self.connection.channel() + self.channel.queue_declare(queue='task_queue', durable=True) msg = COMPUTE_CLI_MSG.format('Singularity') logger.info(f"{msg}") @@ -140,9 +141,7 @@ def clean(self, all=False): Deletes all jobs """ logger.debug('Cleaning RabbitMQ queues') - delete_queues = ['task_queue', 'status_queue'] - for queue in delete_queues: - self.channel.queue_delete(queue=queue) + self.channel.queue_delete(queue='task_queue') def list_runtimes(self, singularity_image_name='all'): """