Skip to content

Commit ceebd0f

Browse files
authored
[K8s] Fixed bug with first execution of K8s and Singularity (#1424)
1 parent b157106 commit ceebd0f

File tree

2 files changed

+6
-3
lines changed

2 files changed

+6
-3
lines changed

lithops/serverless/backends/k8s/k8s.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ def __init__(self, k8s_config, internal_storage):
100100
params = pika.URLParameters(self.amqp_url)
101101
self.connection = pika.BlockingConnection(params)
102102
self.channel = self.connection.channel()
103+
self.channel.queue_declare(queue='task_queue', durable=True)
103104

104105
# Define some needed variables
105106
self._get_nodes()
@@ -276,6 +277,9 @@ def clean(self, all=False):
276277
except ApiException:
277278
pass
278279

280+
if self.rabbitmq_executor:
281+
self.channel.queue_delete(queue='task_queue')
282+
279283
def clear(self, job_keys=None):
280284
"""
281285
Delete only completed jobs

lithops/serverless/backends/singularity/singularity.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def __init__(self, singularity_config, internal_storage):
4949
params = pika.URLParameters(self.amqp_url)
5050
self.connection = pika.BlockingConnection(params)
5151
self.channel = self.connection.channel()
52+
self.channel.queue_declare(queue='task_queue', durable=True)
5253

5354
msg = COMPUTE_CLI_MSG.format('Singularity')
5455
logger.info(f"{msg}")
@@ -140,9 +141,7 @@ def clean(self, all=False):
140141
Deletes all jobs
141142
"""
142143
logger.debug('Cleaning RabbitMQ queues')
143-
delete_queues = ['task_queue', 'status_queue']
144-
for queue in delete_queues:
145-
self.channel.queue_delete(queue=queue)
144+
self.channel.queue_delete(queue='task_queue')
146145

147146
def list_runtimes(self, singularity_image_name='all'):
148147
"""

0 commit comments

Comments
 (0)