Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ RUN touch /opt/jupyterhub-nomad-spawner/jupyterhub_nomad_spawner/__init__.py /op


WORKDIR /opt/jupyterhub-nomad-spawner
RUN --mount=type=cache,target=/root/.cache/pypoetry --mount=type=cache,target=/root/.cache/pip poetry install --no-dev -n -vv
RUN --mount=type=cache,target=/root/.cache/pypoetry --mount=type=cache,target=/root/.cache/pip poetry install --only main -n -vv
COPY jupyterhub_nomad_spawner /opt/jupyterhub-nomad-spawner/jupyterhub_nomad_spawner
COPY README.md /opt/jupyterhub-nomad-spawner/

Expand Down
14 changes: 14 additions & 0 deletions jupyterhub_nomad_spawner/nomad/nomad_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1571,6 +1571,20 @@ class AllocatedResources(BaseModel):
Shared: Optional[AllocatedSharedResources] = None
Tasks: Optional[Dict[str, AllocatedTaskResources]] = None

class TaskEvent(BaseModel):
Type: str
Time: int
DisplayMessage: str
Details: Dict[str, Any]
FailsTask: bool
DriverMessage: str

class TaskState(BaseModel):
State: str
Failed: bool
StartedAt: Optional[str]
FinishedAt: Optional[str]
Events: List[TaskEvent]

class AllocationListStub(BaseModel):
AllocatedResources: Optional[AllocatedResources] = None
Expand Down
38 changes: 38 additions & 0 deletions jupyterhub_nomad_spawner/nomad/nomad_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
CSIVolumeCapability,
CSIVolumeCreateRequest,
JobsParseRequest,
TaskState,
)


Expand Down Expand Up @@ -125,6 +126,43 @@ async def job_status(self, job_id) -> str:

job_detail = response.json()
return job_detail.get("Status", "")

async def task_status(self, job_name: str) -> str:
"""Get detailed task status from most recent allocation"""
allocs = await self.client.get(f"/v1/job/{job_name}/allocations")
if not allocs:
return "pending"

allocs = allocs.json()
latest_alloc = max(allocs, key=lambda x: x["CreateTime"])
if not latest_alloc:
return "pending"

task_states = latest_alloc.get("TaskStates", {}) or {}
task_states = {name: TaskState(**state) for name, state in task_states.items()}

if not task_states:
return "pending"

for task in task_states.values():
if task.State == "dead" and task.Failed:
return "dead"
if task.State != "running":
return self._get_task_state_from_event(task)

return "running"

def _get_task_state_from_event(self, task: TaskState) -> str:
"""Determine task state from latest event"""
events = task.Events
if not events:
return "pending"

latest_event = events[-1]
if latest_event.Type in ["Driver", "Task Setup"]:
return "starting"
return "pending"


async def job_allocations(self, job_id) -> list[dict[str, Any]]:
response = await self.client.get(f"/v1/job/{job_id}/allocations")
Expand Down
28 changes: 18 additions & 10 deletions jupyterhub_nomad_spawner/spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,16 +544,24 @@ def _get_csi_extra_parameters(self) -> Optional[Dict]:
async def _ensure_running(self, nomad_service: NomadService):
while True:
try:
status = await nomad_service.job_status(self.job_name)
except Exception:
self.log.exception("Failed to get job status")
if status == "running":
break
elif status == "dead":
raise Exception(f"Job (name={self.job_name}) is dead already")
else:
self.log.info("Waiting for %s...", self.job_name)
await asyncio.sleep(5)
job_status = await nomad_service.job_status(self.job_name)
if job_status == "dead":
raise Exception(f"Job (name={self.job_name}) is dead")
elif job_status == "running":
task_status = await nomad_service.task_status(self.job_name)
if task_status == "running":
break
elif task_status == "dead":
raise Exception(f"Task for job (name={self.job_name}) is dead")
else:
self.log.info("Task for %s is %s, waiting...", self.job_name, task_status)
else:
self.log.info("Job %s is %s, waiting...", self.job_name, job_status)
except Exception as e:
self.log.exception("Failed to get job/task status")
raise e

await asyncio.sleep(5)

@retry(wait=wait_fixed(3), stop=stop_after_attempt(5))
async def address_and_port_from_consul(
Expand Down
27 changes: 21 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading