Skip to content

Commit cc9e821

Browse files
committed
murdock: drop fasttrack_queue, use PriorityQueue
1 parent ca5476f commit cc9e821

File tree

1 file changed

+26
-18
lines changed

1 file changed

+26
-18
lines changed

murdock/murdock.py

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1+
from dataclasses import dataclass, field
12
import asyncio
23
import json
34
import os
45
import re
56
from datetime import datetime, timezone
67

7-
from typing import List, Optional, Union
8+
from typing import Any, List, Optional, Union
89

910
import websockets
1011
from fastapi import WebSocket
@@ -46,6 +47,14 @@
4647
"reopened",
4748
]
4849

50+
JOB_FASTTRACK_BONUS = 100
51+
52+
53+
@dataclass(order=True)
54+
class PrioritizedItem:
55+
priority: int
56+
item: Any = field(compare=False)
57+
4958

5059
class Murdock:
5160
def __init__(
@@ -70,8 +79,7 @@ def __init__(
7079
self.num_workers: int = num_workers
7180
self.queued: MurdockJobList = MurdockJobList()
7281
self.running: MurdockJobPool = MurdockJobPool(num_workers)
73-
self.queue: asyncio.Queue = asyncio.Queue()
74-
self.fasttrack_queue: asyncio.Queue = asyncio.Queue()
82+
self.queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
7583
self.db = Database()
7684
self.notifier = Notifier()
7785
self.instrumentator = Instrumentator()
@@ -128,18 +136,14 @@ async def _process_job(self, job: MurdockJob):
128136
async def job_processing_task(self):
129137
current_task = asyncio.current_task().get_name()
130138
while True:
131-
if self.fasttrack_queue.qsize():
132-
job = self.fasttrack_queue.get_nowait()
139+
try:
140+
prioritized_job = await self.queue.get()
141+
job = prioritized_job.item
133142
await self._process_job(job)
134-
self.fasttrack_queue.task_done()
135-
else:
136-
try:
137-
job = await self.queue.get()
138-
await self._process_job(job)
139-
self.queue.task_done()
140-
except RuntimeError as exc:
141-
LOGGER.info(f"Exiting worker {current_task}: {exc}")
142-
break
143+
self.queue.task_done()
144+
except RuntimeError as exc:
145+
LOGGER.info(f"Exiting worker {current_task}: {exc}")
146+
break
143147

144148
async def job_prepare(self, job: MurdockJob):
145149
self.queued.remove(job)
@@ -210,13 +214,17 @@ async def add_job_to_queue(self, job: MurdockJob):
210214
"target_url": self.base_url,
211215
},
212216
)
213-
all_busy = all(running is not None for running in self.running.jobs)
214217
self.queued.add(job)
215218
job.state = "queued"
216-
if all_busy and job.fasttracked:
217-
self.fasttrack_queue.put_nowait(job)
219+
if job.fasttracked:
220+
prio = JOB_FASTTRACK_BONUS
218221
else:
219-
self.queue.put_nowait(job)
222+
prio = 0
223+
224+
# PriorityQueue is a min_queue, but "higher priority equals higher value"
225+
# makes more sense. So, invert the priority by multiplying with `-1`
226+
self.queue.put_nowait(PrioritizedItem(prio * -1, job))
227+
220228
LOGGER.info(f"{job} added to queued jobs")
221229
await self.reload_jobs()
222230

0 commit comments

Comments
 (0)