Skip to content

Commit 45b660c

Browse files
committed
fix(queue): use queue as ctx, always close connection
1 parent 2a521e5 commit 45b660c

File tree

3 files changed

+24
-9
lines changed

3 files changed

+24
-9
lines changed

polytope_server/broker/broker.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
class Broker:
3131
def __init__(self, config: dict):
3232

33-
queue_config = config.get("queue")
34-
self.queue = queue.create_queue(queue_config)
33+
self.queue_config = config.get("queue")
34+
self.queue = None
3535

3636
self.max_queue_size = config.get("deployment", {}).get("worker", {}).get("replicas", 40)
3737

@@ -47,8 +47,14 @@ def run(self):
4747
logging.info("Starting broker...")
4848
logging.info("Maximum Queue Size: {}".format(self.max_queue_size))
4949

50-
while not time.sleep(self.scheduling_interval):
51-
self.check_requests()
50+
with queue.create_queue(self.queue_config) as q:
51+
self.queue = q
52+
try:
53+
while not time.sleep(self.scheduling_interval):
54+
self.check_requests()
55+
except KeyboardInterrupt:
56+
logging.info("Broker shutdown requested")
57+
raise
5258

5359
def check_requests(self):
5460

polytope_server/common/queue/queue.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,15 @@ def count(self) -> int:
7171
def get_type(self) -> str:
7272
"""Get the implementation type"""
7373

74+
def __enter__(self):
75+
"""Enter context manager"""
76+
return self
77+
78+
def __exit__(self, exc_type, exc_val, exc_tb):
79+
"""Exit context manager and close connection"""
80+
self.close_connection()
81+
return False
82+
7483

7584
queue_types = {"rabbitmq": RabbitmqQueue, "sqs": SQSQueue}
7685

polytope_server/worker/worker.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,12 @@ def handle_termination(group: aio.TaskGroup) -> None:
205205
loop.remove_signal_handler(signal.SIGTERM)
206206

207207
def run(self):
208-
self.queue = polytope_queue.create_queue(self.config.get("queue"))
208+
with polytope_queue.create_queue(self.config.get("queue")) as queue:
209+
self.queue = queue
210+
self.update_status("idle", time_spent=0)
209211

210-
self.update_status("idle", time_spent=0)
211-
212-
with ThreadPoolExecutor(max_workers=1) as executor:
213-
aio.run(self.schedule(executor))
212+
with ThreadPoolExecutor(max_workers=1) as executor:
213+
aio.run(self.schedule(executor))
214214

215215
def process_request(
216216
self,

0 commit comments

Comments
 (0)