Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions flaskapi/flask_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,9 +480,17 @@ def _timeit(fun: Callable, *args, **kwargs):

# test_job_retrieval_paginated(function_uid="eea21c0d-6c2b-4cf4-91d1-116e6550cb22")

def _get_job_status(job: Dict[str, Any]) -> str:
status = job["status"]
if isinstance(status, dict) and "status" in status:
return status["status"]
elif isinstance(status, str):
return status
else:
raise ValueError(f"Unknown status format: {status}")

def _check_jobs(jobs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
completed_jobs = [job for job in jobs if job["status"].lower() == "completed" or job["status"].lower() == "success"] # type: ignore
completed_jobs = [job for job in jobs if _get_job_status(job).lower() == "completed" or _get_job_status(job).lower() == "success"] # type: ignore

for job in completed_jobs:
assert "outputs" in job, f"No outputs key found for completed job: {job} with status: {job['status']}" # type: ignore
Expand Down Expand Up @@ -965,11 +973,16 @@ def flask_test_job():
), f"Job is None for function {function_uid} with sample {sample}. Response: {response}"
uid = response.actual_instance.uid
_logger.debug(f"Job UID: {uid}")
while (
"JOB_TASK_" in (job := _get_function_job_from_uid(uid))["status"]
and not "FAILURE" in job
):
time.sleep(1)
while True:
job = _get_function_job_from_uid(uid)
job_status = _get_job_status(job)
_logger.debug(f"Job status: {job_status}")
if "FAILURE" in job_status:
raise RuntimeError(f"Job {uid} failed with status: {job_status}")
elif not "JOB_TASK_" in job_status:
break ## exit the loop if the job has been initialized
else:
time.sleep(1)
_logger.debug(f"Created job: {job}")
return jsonify(job) # return the job details as a dictionary
except Exception as e:
Expand Down
32 changes: 8 additions & 24 deletions node/src/components/data/JobRow.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Typography from "@mui/material/Typography";
import { useState } from "react";
import { toast } from "react-toastify";
import { Function as OsparcFunction } from "../../osparc-api-ts-client";
import { createJobStudyCopy, openStudyUid } from "../../utils/function_utils";
import { createJobStudyCopy, extractJobStatus, openStudyUid } from "../../utils/function_utils";
import CustomTooltip from "../utils/CustomTooltip";

interface JobRowProps {
Expand Down Expand Up @@ -91,7 +91,7 @@ function JobRow(props: JobRowProps) {
</TableRow>
);
}
const jobStatus = job.job.status;
const jobStatus = extractJobStatus(job);
let outputs;
if (jobStatus === "SUCCESS") {
outputs = Object.entries(job.job.outputs).map(([key, value]) => (
Expand All @@ -100,32 +100,16 @@ function JobRow(props: JobRowProps) {
{", "}
</Box>
));
} else if (jobStatus === "STARTED") {
} else if (jobStatus === "RUNNING") {
outputs = [
<Box key={0} display="inline">
Running...
</Box>,
];
} else if (
jobStatus === "FAILED" ||
jobStatus === "ABORTED" ||
(jobStatus.startsWith("JOB_") && jobStatus.endsWith("_FAILURE"))
) {
outputs = "Failed - no outputs";
} else if (
jobStatus === "PENDING" ||
jobStatus === "WAITING_FOR_CLUSTER" ||
jobStatus === "PUBLISHED" ||
jobStatus === "NOT_STARTED" ||
jobStatus === "WAITING_FOR_RESOURCES" ||
(jobStatus.startsWith("JOB_") && !jobStatus.endsWith("_FAILURE"))
) {
outputs = "Pending to run";
} else if (jobStatus === "UNKNOWN") {
outputs = "Please try again later";
} else {
outputs = "Unknown status, please contact support";
}
} else if (jobStatus === "FAILED") outputs = "Failed - no outputs";
else if (jobStatus === "PENDING") outputs = "Pending to run";
else if (jobStatus === "UNKNOWN") outputs = "Unknown status, please try again later";
else outputs = "Unknown status, please contact support";

const inputs = Object.entries(job.job.inputs).map(([key, value]) => (
<Box key={`job-row-input-${key}`} display="inline">
Expand Down Expand Up @@ -192,7 +176,7 @@ function JobRow(props: JobRowProps) {
size="small"
disabled={
creatingJobCopy ||
(!jobStatus.includes("SUCCESS") && !(jobStatus.includes("FAILED") || jobStatus.includes("FAILURE")))
(!jobStatus.includes("SUCCESS") && !(jobStatus.includes("FAILED")))
}
onClick={async () => {
setCreatingJobCopy(true);
Expand Down
34 changes: 7 additions & 27 deletions node/src/components/data/JobSelector.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
getFunctionJobsFromFunctionJobCollection,
getJobCollectionStatus,
filterForFinalStatus,
extractJobStatus,
} from "../../utils/function_utils";
import getMinMax from "../minmax";
import CustomTooltip from "../utils/CustomTooltip";
Expand Down Expand Up @@ -74,7 +75,7 @@ export default function JobsSelector(props: JobSelectorPropsType) {
const auxJob = jc;
if (jc.jobCollection.uid === uid) {
auxJob.subJobs = auxJob.subJobs.map(j => ({
selected: selected === true ? j.job.status === "SUCCESS" : false,
selected: selected === true ? extractJobStatus(j) === "SUCCESS" : false,
job: j.job,
}));
auxJob.selected = selected === true ? auxJob.subJobs.some(j => j.selected === true) : false;
Expand Down Expand Up @@ -149,11 +150,7 @@ export default function JobsSelector(props: JobSelectorPropsType) {
return (
fetchedJC !== undefined &&
jc.jobIds.join(",") === fetchedJC.subJobs.map(j => j.job.uid).join(",") &&
fetchedJC.subJobs.every(j =>
typeof j.job.status === "string"
? filterForFinalStatus(j.job.status)
: filterForFinalStatus((j.job.status as unknown as { status: string }).status),
)
fetchedJC.subJobs.every(j => extractJobStatus(j))
);
});

Expand Down Expand Up @@ -188,12 +185,11 @@ export default function JobsSelector(props: JobSelectorPropsType) {
const subJobs = [];
for (let subJobIdx = 0; subJobIdx < functionJobs.length; subJobIdx += 1) {
const job: FunctionJob = functionJobs[subJobIdx];
job.status = typeof job.status === "string" ? job.status : (job.status as unknown as { status: string }).status;
jobsFetched.current += 1;
const jobsProg = (jobsFetched.current / functionJobs.length) * 100;
setJobProgress(jobsProg);
subJobs.push({
selected: job.status === "SUCCESS",
selected: extractJobStatus(job) === "SUCCESS",
job,
});
}
Expand Down Expand Up @@ -263,7 +259,7 @@ export default function JobsSelector(props: JobSelectorPropsType) {
const newJobCollections: SelectedJobCollection[] = jobCollections.map(jc => {
const auxJob = jc;
auxJob.subJobs = jc.subJobs.map(subJob => ({
selected: checked === true ? subJob.job.status === "SUCCESS" : false,
selected: checked === true ? extractJobStatus(subJob) === "SUCCESS" : false,
job: subJob.job,
}));
const auxJobState = auxJob.subJobs.map(j => j.selected);
Expand All @@ -277,22 +273,6 @@ export default function JobsSelector(props: JobSelectorPropsType) {
[jobCollections, updateJobContext],
);

// const autoSelectJobs = useCallback(() => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The autoSelectJobs functions should not be removed but implemented, it was disabled as the feature which should automatically select the in-range jobs is not present, but due the fact that after every selection / deselection of a job, we refresh the moga, we should re-activate this and do it properly to save time for the user

// const newJobCollections: SelectedJobCollection[] = jobCollections.map(jc => {
// const auxJob = jc;
// auxJob.subJobs = jc.subJobs.map(subJob => ({
// selected: subJob.job.status === "SUCCESS",
// job: subJob.job,
// }));
// const auxJobState = auxJob.subJobs.map(j => j.selected);
// auxJob.selected = !auxJobState.every(j => j === false);
// return auxJob;
// });

// setJobCollections(newJobCollections);
// updateJobContext(newJobCollections);
// }, [jobCollections, updateJobContext]);

const handleJobsUpdate = useCallback(async () => {
await updateJobCollections(selectedFunction?.uid as string);
console.info("Updated JobCollections");
Expand Down Expand Up @@ -389,7 +369,7 @@ export default function JobsSelector(props: JobSelectorPropsType) {
indeterminate={
jobCollections.some(jc => jc.selected === true) &&
!jobCollections.every(
jc => jc.subJobs.map(j => j.job).filter(j => j.status === "SUCCESS" && j.selected === true).length > 0,
jc => jc.subJobs.map(j => j.job).filter(j => extractJobStatus(j) === "SUCCESS" && j.selected === true).length > 0,
)
}
onChange={event => onToggleAll(event.target.checked)}
Expand All @@ -402,7 +382,7 @@ export default function JobsSelector(props: JobSelectorPropsType) {
checked={params.row.selected}
indeterminate={params.row.subJobs.some(j => j.selected) && !params.row.subJobs.every(j => j.selected)}
onChange={event => selectMainJob(params.row.jobCollection.uid, event.target.checked)}
disabled={params.row.subJobs.every((j: SubJob) => j.job.status !== "SUCCESS")}
disabled={params.row.subJobs.every((j: SubJob) => extractJobStatus(j) !== "SUCCESS")}
inputProps={{ "aria-label": "Select job collection" }}
sx={theme => ({ "& .MuiSvgIcon-root": { color: `${theme.palette.primary.main} !important` } })}
/>
Expand Down
78 changes: 66 additions & 12 deletions node/src/utils/function_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,75 @@ export type JobStatusCounts = {
unknown: number;
};

export type AllowedJobStatus = "SUCCESS" | "FAILED" | "RUNNING" | "PENDING" | "UNKNOWN";


export function extractJobStatus(job: FunctionJob | SubJob): AllowedJobStatus {
function classifyJobStatus(jobStatus: string): AllowedJobStatus {
// This function helps homogenize job status, centralizing all corresponding logic
if (!jobStatus) {
throw new Error("JobStatus is undefined!")
}

if (jobStatus === "SUCCESS") {
return "SUCCESS";
}
else if (jobStatus.endsWith("FAILED") || jobStatus.endsWith("FAILURE")) {
return "FAILED"
}
else if (jobStatus === "STARTED" || jobStatus === "RUNNING") {
return "RUNNING"
}
else if (jobStatus === "PENDING" || jobStatus.startsWith("JOB_") || jobStatus === "WAITING_") {
return "PENDING"
}
else {
console.warn("Could not classify JobStatus", jobStatus)
return "UNKNOWN"
}
}

if (!job) {
throw new Error("Job is undefined");
}

// Check if job is of type SubJob (has 'selected' and 'job' properties)
if (typeof job === "object" && "selected" in job && "job" in job) {
// job is a SubJob, so use recursivity to extract status from its 'job' property
return extractJobStatus(job.job);
// previous way:
// typeof sj.job.status === "string"
// ? sj.job.status
// : (sj.job.status as unknown as { status: string }).status,
}

if (typeof job.status === "string") {
return classifyJobStatus(job.status);
}
else if (job.status && typeof job.status === "object" && "status" in job.status) {
return classifyJobStatus((job.status as { status: string }).status);
}
else {
console.log("Could not extract status of job ", job)
return "UNKNOWN";
}
}

export function getJobStatusCounts(subJobs: SubJob[]): JobStatusCounts {
return subJobs
.filter(j => j.job)
.map(j => (typeof j.job.status === "string" ? j.job.status : (j.job.status as unknown as { status: string }).status))
.map(j => extractJobStatus(j.job))
.reduce(
(acc, status: string) => {
(acc, status: AllowedJobStatus) => {
if (status === "SUCCESS") acc.success += 1;
else if (status.endsWith("FAILED") || status.endsWith("FAILURE")) acc.failed += 1;
else if (status === "STARTED" || status === "RUNNING") acc.running += 1;
else if (status === "PENDING" || status.startsWith("JOB_") || status === "WAITING_") acc.pending += 1;
else acc.unknown += 1;
else if (status === "FAILED") acc.failed += 1;
else if (status === "RUNNING") acc.running += 1;
else if (status === "PENDING") acc.pending += 1;
else if (status === "UNKNOWN") acc.unknown += 1;
else {
console.warn("status should have been classified into one of the AllowedJobStatus!")
console.warn("status: ", status)
};
return acc;
},
{ success: 0, failed: 0, running: 0, pending: 0, unknown: 0 },
Expand All @@ -187,10 +245,6 @@ export function getJobStatusCounts(subJobs: SubJob[]): JobStatusCounts {
export function getJobCollectionStatus(subJobs: SubJob[]) {
if (!subJobs || subJobs.length === 0) return "NO JOBS";
const jobStatusCounts = getJobStatusCounts(subJobs);
if (jobStatusCounts.unknown > 0) {
// toast.warn("Could not classify some job statuses - please revise console logs")
console.warn("SubJobs that gave UNKNOWN status: ", subJobs);
}
const allSuccess = jobStatusCounts.success === subJobs.length;
const anySuccess = jobStatusCounts.success > 0;
const anyRunning = jobStatusCounts.running > 0;
Expand All @@ -202,9 +256,9 @@ export function getJobCollectionStatus(subJobs: SubJob[]) {
if (anyRunning) return "RUNNING";
if (anyPending) return "PENDING";
if (anyFailed && anySuccess) return "FAILED PARTIALLY";
return "UNKNOWN";
else return "UNKNOWN";
}

export function filterForFinalStatus(status: string) {
return status === "FAILED" || status === "SUCCESS" || status.includes("FAILURE");
return status === "FAILED" || status === "SUCCESS";
}