Skip to content

Commit e5d1872

Browse files
authored
Merge pull request #716 from jmmshn/jmmshn/missing
Added tutorial and example for handling missing references
2 parents 2ab904f + ab60fe4 commit e5d1872

File tree

2 files changed

+194
-0
lines changed

2 files changed

+194
-0
lines changed
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"metadata": {},
6+
"source": [
7+
"# Handling Job Dependencies that Can Fail\n",
8+
"\n",
9+
"In this tutorial, we will demonstrate how to handle missing references in JobFlow. This is useful when you have jobs that may fail, but you still want to proceed with the workflow.\n",
10+
"\n",
11+
"First, we import the necessary modules and define a job that can fail based on an input parameter.\n"
12+
]
13+
},
14+
{
15+
"cell_type": "code",
16+
"execution_count": 2,
17+
"metadata": {},
18+
"outputs": [],
19+
"source": [
20+
"from jobflow import Flow, OnMissing, job, run_locally\n",
21+
"\n",
22+
"\n",
23+
"@job\n",
24+
"def func(fail: bool = False):\n",
25+
" \"\"\"Failable function.\"\"\"\n",
26+
" if fail:\n",
27+
" raise ValueError(\"An error occurred.\")\n",
28+
" return 1"
29+
]
30+
},
31+
{
32+
"cell_type": "markdown",
33+
"metadata": {},
34+
"source": [
35+
"Next, we define a job that collects the outputs of other jobs. This job can handle missing references.\n",
36+
"\n",
37+
"**Note:** You must explicitly define how missing references are handled in each job.\n",
38+
"setting `on_missing_refs` to `OnMissing.None` will only provide a `None` whenever an output is missing.\n",
39+
"you must handle those `None` values in your job."
40+
]
41+
},
42+
{
43+
"cell_type": "code",
44+
"execution_count": 3,
45+
"metadata": {},
46+
"outputs": [],
47+
"source": [
48+
"@job\n",
49+
"def collect(job_outputs):\n",
50+
" \"\"\"Job that allows some parents to fail.\"\"\"\n",
51+
" total = 0\n",
52+
" for output in job_outputs:\n",
53+
" if output is None:\n",
54+
" continue\n",
55+
" total += output\n",
56+
" if total < 1:\n",
57+
" raise ValueError(\"No enough finished parents.\")\n",
58+
" return total"
59+
]
60+
},
61+
{
62+
"cell_type": "markdown",
63+
"metadata": {},
64+
"source": [
65+
"Now, we create instances of the `func` job, one of which will fail.\n",
66+
"Then, we create an instance of the `collect` job and pass the outputs of the `func` jobs to it.\n",
67+
"\n",
68+
"By setting the `on_missing_refs` parameter to `OnMissing.None`, and handling the `None` values in the `collect` job, we can proceed with the workflow even if some references are missing."
69+
]
70+
},
71+
{
72+
"cell_type": "code",
73+
"execution_count": 4,
74+
"metadata": {},
75+
"outputs": [],
76+
"source": [
77+
"job1, job2, job3 = func(), func(), func(fail=True)\n",
78+
"job_outputs = [job1.output, job2.output, job3.output]\n",
79+
"collect_job = collect(job_outputs)\n",
80+
"collect_job.config.on_missing_references = OnMissing.NONE"
81+
]
82+
},
83+
{
84+
"cell_type": "markdown",
85+
"metadata": {},
86+
"source": [
87+
"As the workflow is running, `job1` and `job2` will each return 1, while job3 will fail. \n",
88+
"Since `collect_job` has `on_missing_references` set to `OnMissing.NONE`, it proceeds despite the missing output from `job3`. \n"
89+
]
90+
},
91+
{
92+
"cell_type": "code",
93+
"execution_count": 5,
94+
"metadata": {},
95+
"outputs": [
96+
{
97+
"name": "stdout",
98+
"output_type": "stream",
99+
"text": [
100+
"2024-12-13 17:28:10,890 INFO Started executing jobs locally\n",
101+
"2024-12-13 17:28:10,991 INFO Starting job - func (12fc77ef-233e-4bee-a36c-7f61dc5badf9)\n",
102+
"2024-12-13 17:28:10,996 INFO Finished job - func (12fc77ef-233e-4bee-a36c-7f61dc5badf9)\n",
103+
"2024-12-13 17:28:10,997 INFO Starting job - func (626268f0-ecb0-4fa2-bf6d-0bb9dd72586c)\n",
104+
"2024-12-13 17:28:10,999 INFO Finished job - func (626268f0-ecb0-4fa2-bf6d-0bb9dd72586c)\n",
105+
"2024-12-13 17:28:11,000 INFO Starting job - func (ba559709-99f7-4ec3-9fe7-6804a002ff0a)\n",
106+
"2024-12-13 17:28:11,002 INFO func failed with exception:\n",
107+
"Traceback (most recent call last):\n",
108+
" File \"/home/jmmshn/miniconda3/envs/af/lib/python3.10/site-packages/jobflow/managers/local.py\", line 114, in _run_job\n",
109+
" response = job.run(store=store)\n",
110+
" File \"/home/jmmshn/miniconda3/envs/af/lib/python3.10/site-packages/jobflow/core/job.py\", line 600, in run\n",
111+
" response = function(*self.function_args, **self.function_kwargs)\n",
112+
" File \"/tmp/ipykernel_298791/2449992254.py\", line 7, in func\n",
113+
" raise ValueError(\"An error occurred.\")\n",
114+
"ValueError: An error occurred.\n",
115+
"\n",
116+
"2024-12-13 17:28:11,003 INFO Starting job - collect (0becc3b9-532c-4284-9c90-2a890e791ef2)\n",
117+
"2024-12-13 17:28:11,011 INFO Finished job - collect (0becc3b9-532c-4284-9c90-2a890e791ef2)\n",
118+
"2024-12-13 17:28:11,013 INFO Finished executing jobs locally\n"
119+
]
120+
}
121+
],
122+
"source": [
123+
"flow = Flow([job1, job2, job3, collect_job])\n",
124+
"res = run_locally(flow)\n",
125+
"n_finished = 2\n",
126+
"assert res[collect_job.uuid][1].output == n_finished"
127+
]
128+
},
129+
{
130+
"cell_type": "code",
131+
"execution_count": null,
132+
"metadata": {},
133+
"outputs": [],
134+
"source": []
135+
}
136+
],
137+
"metadata": {
138+
"kernelspec": {
139+
"display_name": "af",
140+
"language": "python",
141+
"name": "python3"
142+
},
143+
"language_info": {
144+
"codemirror_mode": {
145+
"name": "ipython",
146+
"version": 3
147+
},
148+
"file_extension": ".py",
149+
"mimetype": "text/x-python",
150+
"name": "python",
151+
"nbconvert_exporter": "python",
152+
"pygments_lexer": "ipython3",
153+
"version": "3.10.15"
154+
}
155+
},
156+
"nbformat": 4,
157+
"nbformat_minor": 2
158+
}

examples/missing_reference.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
"""A demonstration of sequential jobs with missing references."""
2+
3+
from jobflow import Flow, OnMissing, job, run_locally
4+
5+
6+
@job
7+
def func(fail: bool = False):
8+
"""Failable function."""
9+
if fail:
10+
raise ValueError("An error occurred.")
11+
return 1
12+
13+
14+
@job
15+
def collect(job_outputs):
16+
"""Job that allows some parents to fail."""
17+
total = 0
18+
for jo in job_outputs:
19+
if jo is None:
20+
continue
21+
total += jo
22+
if total < 1:
23+
raise ValueError("No enough finished parents.")
24+
return total
25+
26+
27+
job1, job2, job3 = func(), func(), func(fail=True)
28+
job_outputs = [job1.output, job2.output, job3.output]
29+
collect_job = collect(job_outputs)
30+
collect_job.config.on_missing_references = OnMissing.NONE
31+
flow = Flow([job1, job2, job3, collect_job])
32+
33+
# run the flow, you can
34+
res = run_locally(flow)
35+
n_finished = 2
36+
assert res[collect_job.uuid][1].output == n_finished

0 commit comments

Comments
 (0)