Skip to content
6 changes: 6 additions & 0 deletions murdock/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ class TaskSettings(BaseSettings):
env: dict = Field(default=dict())


class PrioritySettings(BaseSettings):
labels: dict[str, int] = Field(default={})
branches: dict[str, int] = Field(default={})


class MurdockSettings(BaseSettings):
push: PushSettings = Field(default=PushSettings())
pr: PRSettings = Field(default=PRSettings())
Expand All @@ -146,6 +151,7 @@ class MurdockSettings(BaseSettings):
failfast: bool = Field(default=False)
artifacts: List[str] = Field(default=[])
tasks: List[TaskSettings] = Field(default=[TaskSettings()])
priorities: PrioritySettings = Field(default=PrioritySettings())


_ENV_FILE = os.getenv("ENV_FILE", os.path.join(os.path.dirname(__file__), "..", ".env"))
Expand Down
35 changes: 35 additions & 0 deletions murdock/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@


UNSAFE_ENVS = ["CI_JOB_TOKEN", "CI_SCRIPTS_DIR"]
JOB_FASTTRACK_BONUS = 100


class MurdockJob:
Expand Down Expand Up @@ -51,6 +52,9 @@ def __init__(
if self.pr is not None
else False
) # type: ignore[union-attr]
self.priority: int = MurdockJob.calculate_priority(
config, pr, ref, self.fasttracked
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all these function parameters are already instance attributes. Thus there's no need for calculate_priority to be a staticmethod. A regular instance method, private, should be enough def _priority(self): and use self.config, self.pr, self.ref and self.fastracked inside.

Copy link
Contributor

@aabadie aabadie Nov 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that an @property method could also be used but it would recompute the priority each time it's accessed. Maybe that's better I don't know

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure, can safely use a class method within __init__()?
(Also it needs to be public so it can react to label changes.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can safely use a class method within init()?

yes, if the method is accessing already defined/initialized attributes.

it needs to be public so it can react to label changes

then I think a property method is more appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a property method would make e.g., "job.priority" actually call the method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assuming the pr field of a job is a reference to updated PR data.
if that's not the case, it's easier to have this staticmethod and supply all needed data at the call site.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you requeue ? In the current state, a new MurdockJob instance is created and the previous matching ones are marked as cancelled.

I also thought that the display order must also be taken into account. This is something that is in another data structure, a simple list. So a sort on priority must be added there.

https://github.com/murdock-ng/murdock/blob/e62397d29e8ffe1ded2bbb2dd0a7f7a0c49ee853/murdock/murdock.py#L561-569

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add re-queueing here:

if action == "labeled":

(if the job is queued, re-calc its priority and compare to the queued one. if it has changed, don't "return" but fall through to "schedule_job()", which would cancel the old job and create a new, sorted into the queue by priority.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit lost. What is the use case exactly ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR blew up from just replacing two unprioritized queues with one prioritized, to making full use of that change. :( I'll update the description.

)
self.artifacts: Optional[List[str]] = None
self.token: str = secrets.token_urlsafe(32)
self.scripts_dir: str = GLOBAL_CONFIG.scripts_dir
Expand All @@ -70,6 +74,27 @@ def __init__(
self._logger_context["pr"] = str(self.pr.number)
self._logger = LOGGER.bind(**self._logger_context)

@staticmethod
def calculate_priority(
config: MurdockSettings = MurdockSettings(),
pr: Optional[PullRequestInfo] = None,
ref: Optional[str] = None,
fasttracked: bool = False,
):
priority = 0

if fasttracked:
priority += JOB_FASTTRACK_BONUS

if config.priorities:
if pr and config.priorities.labels:
for label in pr.labels:
priority += config.priorities.labels.get(label, 0)
elif ref and config.priorities.branches:
priority += config.priorities.branches.get(ref, 0)

return priority

@staticmethod
def create_dir(work_dir: str) -> None:
logger = LOGGER.bind(dir=str(work_dir))
Expand Down Expand Up @@ -137,6 +162,7 @@ def model(self) -> JobModel:
prinfo=self.pr,
ref=self.ref,
fasttracked=self.fasttracked,
priority=self.priority,
trigger=self.trigger,
triggered_by=self.triggered_by,
env=self.safe_env,
Expand Down Expand Up @@ -228,6 +254,15 @@ def __eq__(self, other) -> bool:
def __hash__(self) -> int:
return hash(self.uid)

def __lt__(self, other):
"""
Helper to enable putting MurdockJob into PriorityQueue.

The logic is turned around here, as PriorityQueue returns *smallest*
value, but we prefer "higher priority value == higher priority"
"""
return self.priority >= other.priority

def set_start_time(self, start_time: datetime):
if start_time.tzinfo is None:
raise ValueError("Incomplete time object, no time zone defined")
Expand Down
5 changes: 5 additions & 0 deletions murdock/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ class JobModel(BaseModel):
None,
title="Whether the job can be fasttracked",
)
priority: int = Field(
0,
title="Job priority",
)
status: Optional[dict] = Field(
None,
title="Status of the job",
Expand Down Expand Up @@ -155,6 +159,7 @@ class ManualJobParamModel(BaseModel):
fasttrack: Optional[bool] = Field(
default=False, title="Run the job in fasttrack mode."
)
priority: Optional[int] = Field(default=0, title="Change job priority.")


class ManualJobBranchParamModel(ManualJobParamModel):
Expand Down
76 changes: 43 additions & 33 deletions murdock/murdock.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ def __init__(
self.queued: MurdockJobList = MurdockJobList()
self.running: MurdockJobPool = MurdockJobPool(num_workers)
self.queue: asyncio.Queue = asyncio.Queue()
self.fasttrack_queue: asyncio.Queue = asyncio.Queue()
self.notifier = Notifier()
self.instrumentator = Instrumentator()
self.db = database(database_type)
Expand Down Expand Up @@ -204,18 +203,13 @@ async def job_processing_task(self):
structlog.contextvars.bind_contextvars(worker=current_task)
LOGGER.debug("Starting worker")
while True:
if self.fasttrack_queue.qsize():
job = self.fasttrack_queue.get_nowait()
try:
job = await self.queue.get()
await self._process_job(job)
self.fasttrack_queue.task_done()
else:
try:
job = await self.queue.get()
await self._process_job(job)
self.queue.task_done()
except RuntimeError as exc:
LOGGER.warning("Exiting worker", exception=str(exc))
break
self.queue.task_done()
except RuntimeError as exc:
LOGGER.warning(f"Exiting worker {current_task}: {exc}")
break

async def job_prepare(self, job: MurdockJob):
logger = LOGGER.bind(**job.logging_context)
Expand Down Expand Up @@ -291,14 +285,13 @@ async def add_job_to_queue(self, job: MurdockJob):
"target_url": self.base_url,
},
)
all_busy = all(running is not None for running in self.running.jobs)
self.queued.add(job)
job.state = "queued"
if all_busy and job.fasttracked:
self.fasttrack_queue.put_nowait(job)
else:
self.queue.put_nowait(job)
logger.info("Job added to queued jobs")

self.queue.put_nowait(job)

LOGGER.info(f"{job} added to queued jobs")

self.job_queue_counter.inc()
await self.reload_jobs()

Expand Down Expand Up @@ -538,23 +531,36 @@ async def handle_pull_request_event(self, event: dict):
await set_commit_status(job.commit.sha, status)
return

if action in ["unlabeled", "edited"]:
if action in ["edited"]:
return

if action in ["unlabeled"]:
label_name = event["label"]["name"]
if label_name not in config.priorities.labels.keys():
return
elif not self.queued.search_by_pr_number(pull_request.number):
# skip re-queing if there's no queued job
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny typo:

Suggested change
# skip re-queing if there's no queued job
# skip re-queuing if there's no queued job

return

if action == "opened" and CI_CONFIG.ready_label in pull_request.labels:
# A PR opened with "Ready label" already set will be scheduled via
# the "labeled" action
return

if action == "labeled":
if event["label"]["name"] != CI_CONFIG.ready_label:
# return if the ready label was set for an already queued job,
# or if the label wouldn't change the job's priority (and thus
# queue position).
# otherwise, fall through to re-scheduling the job.
label_name = event["label"]["name"]
if label_name == CI_CONFIG.ready_label:
# Skip already queued jobs for ready label
if self.queued.search_by_pr_number(pull_request.number):
return
elif label_name not in config.priorities.labels.keys():
return
# Skip already queued jobs
if event["label"][
"name"
] == CI_CONFIG.ready_label and self.queued.search_by_pr_number(
pull_request.number
):
elif not self.queued.search_by_pr_number(pull_request.number):
# skip re-queing if there's no queued job
return

await self.schedule_job(job)
Expand Down Expand Up @@ -640,13 +646,17 @@ async def reload_jobs(self):
await self.notify_message_to_clients(json.dumps({"cmd": "reload"}))

def get_queued_jobs(self, query: JobQueryModel = JobQueryModel()) -> List[JobModel]:
return sorted(
[
job.model()
for job in self.queued.search_with_query(query)
if query.states is None or "queued" in query.states
],
key=lambda job: job.fasttracked, # type: ignore[return-value,arg-type]
return list(
map(
lambda job: job.model(),
sorted(
[
job
for job in self.queued.search_with_query(query)
if query.states is None or "queued" in query.states
]
),
)
)

def get_running_jobs(
Expand Down