Skip to content

Commit bab0e13

Browse files
committed
feat: support for failures in job creation
1 parent 3bf45ef commit bab0e13

File tree

6 files changed

+158
-56
lines changed

6 files changed

+158
-56
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
"""platform job id becomes optional
2+
3+
Revision ID: be54acd4d160
4+
Revises: 8a69e1ee3fef
5+
Create Date: 2025-08-25 11:41:54.963734
6+
7+
"""
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
import sqlalchemy as sa
12+
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = 'be54acd4d160'
16+
down_revision: Union[str, Sequence[str], None] = '8a69e1ee3fef'
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
"""Upgrade schema."""
23+
# ### commands auto generated by Alembic - please adjust! ###
24+
op.alter_column('processing_jobs', 'platform_job_id',
25+
existing_type=sa.VARCHAR(),
26+
nullable=True)
27+
# ### end Alembic commands ###
28+
29+
30+
def downgrade() -> None:
31+
"""Downgrade schema."""
32+
# ### commands auto generated by Alembic - please adjust! ###
33+
op.alter_column('processing_jobs', 'platform_job_id',
34+
existing_type=sa.VARCHAR(),
35+
nullable=False)
36+
# ### end Alembic commands ###

app/database/models/processing_job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class ProcessingJobRecord(Base):
2424
Enum(ProcessingStatusEnum), index=True
2525
)
2626
user_id: Mapped[str] = mapped_column(String, index=True)
27-
platform_job_id: Mapped[str] = mapped_column(String, index=True)
27+
platform_job_id: Mapped[Optional[str]] = mapped_column(String, index=True)
2828
parameters: Mapped[str] = mapped_column(String, index=False)
2929
service: Mapped[str] = mapped_column(String, index=True)
3030
created: Mapped[datetime.datetime] = mapped_column(

app/platforms/implementations/openeo.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
BACKEND_AUTH_ENV_MAP = {
2222
"openeo.dataspace.copernicus.eu": "OPENEO_AUTH_CLIENT_CREDENTIALS_CDSEFED",
2323
"openeofed.dataspace.copernicus.eu": "OPENEO_AUTH_CLIENT_CREDENTIALS_CDSEFED",
24+
"openeo.vito.be": "OPENEO_AUTH_CLIENT_CREDENTIALS_OPENEO_VITO",
2425
}
2526

2627

app/services/processing.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,28 @@ def create_processing_job(
3939

4040
platform = get_processing_platform(request.label)
4141

42-
job_id = platform.execute_job(
43-
title=request.title, details=request.service, parameters=request.parameters
44-
)
42+
try:
43+
job_id = platform.execute_job(
44+
title=request.title, details=request.service, parameters=request.parameters
45+
)
46+
except Exception:
47+
job_id = None
48+
logger.exception(f"Could not create processing job for user {user}")
4549

50+
print(f"JOB IS EQUAL TO {job_id}")
4651
record = ProcessingJobRecord(
4752
title=request.title,
4853
label=request.label,
49-
status=ProcessingStatusEnum.CREATED,
54+
status=ProcessingStatusEnum.CREATED if job_id else ProcessingStatusEnum.FAILED,
5055
user_id=user,
5156
platform_job_id=job_id,
5257
parameters=json.dumps(request.parameters),
5358
service=request.service.model_dump_json(),
5459
upscaling_task_id=upscaling_task_id,
5560
)
61+
print(f"RECORD IS EQUAL TO {record.status}")
5662
record = save_job_to_db(database, record)
63+
print(f"RECORD IS EQUAL TO {record.status}")
5764
return ProcessingJobSummary(
5865
id=record.id,
5966
title=record.title,

guides/upscaling_example.ipynb

Lines changed: 62 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"metadata": {},
77
"source": [
88
"# Upscaling Service - Proof of Concept\n",
9-
"This notebooks showcases a demo of the APEx Upscaling Service by demonstrating the capabilities of the [APEx Dispatcher API](https://github.com/ESA-APEx/apex_dispatch_api). In this notebook we will perform a small upscaling exercise for one of the services in the [APEx Algoritm Services Catalogue](https://algorithm-catalogue.apex.esa.int/), specfically the [Forest Fire Mapping](https://algorithm-catalogue.apex.esa.int/apps/parcel_delineation#execution-information). We will split up an area of interest in a 20x20km grid and execute this through this upscaling task through the APEx Dispatch API."
9+
"This notebooks showcases a demo of the APEx Upscaling Service by demonstrating the capabilities of the [APEx Dispatcher API](https://github.com/ESA-APEx/apex_dispatch_api). In this notebook we will perform a small upscaling exercise for one of the services in the [APEx Algoritm Services Catalogue](https://algorithm-catalogue.apex.esa.int/), specfically the [Wind Turbine Detection](https://algorithm-catalogue.apex.esa.int/apps/wind_turbine_detection#execution-information). We will split up an area of interest in a 20x20km grid and execute this through this upscaling task through the APEx Dispatch API."
1010
]
1111
},
1212
{
@@ -58,45 +58,57 @@
5858
"metadata": {},
5959
"outputs": [],
6060
"source": [
61-
"application = \"https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/refs/heads/main/algorithm_catalog/vito/random_forest_firemapping/openeo_udp/random_forest_firemapping.json\"\n",
62-
"endpoint = \"https://openeofed.dataspace.copernicus.eu\""
61+
"application = \"https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/main/algorithm_catalog/dhi/wind_turbine/openeo_udp/wind_turbine.json\"\n",
62+
"endpoint = \"https://openeo.vito.be\""
6363
]
6464
},
6565
{
6666
"cell_type": "code",
67-
"execution_count": 4,
67+
"execution_count": 5,
6868
"id": "c83aa9d5",
6969
"metadata": {},
7070
"outputs": [],
7171
"source": [
72-
"area_of_interest = {\n",
72+
"spatial_extent = {\n",
7373
" \"coordinates\": [\n",
7474
" [\n",
7575
" [\n",
76-
" -7.5326905572890155,\n",
77-
" 42.70954706745698\n",
76+
" 14.370712654502597,\n",
77+
" 47.27563273620049\n",
7878
" ],\n",
7979
" [\n",
80-
" -7.5326905572890155,\n",
81-
" 42.023106310303206\n",
80+
" 14.370712654502597,\n",
81+
" 47.26583764118868\n",
8282
" ],\n",
8383
" [\n",
84-
" -6.25664018841303,\n",
85-
" 42.023106310303206\n",
84+
" 14.405670947432327,\n",
85+
" 47.26583764118868\n",
8686
" ],\n",
8787
" [\n",
88-
" -6.25664018841303,\n",
89-
" 42.70954706745698\n",
88+
" 14.405670947432327,\n",
89+
" 47.27563273620049\n",
9090
" ],\n",
9191
" [\n",
92-
" -7.5326905572890155,\n",
93-
" 42.70954706745698\n",
92+
" 14.370712654502597,\n",
93+
" 47.27563273620049\n",
9494
" ]\n",
9595
" ]\n",
9696
" ],\n",
9797
" \"type\": \"Polygon\"\n",
9898
" }\n",
99-
"time_of_interest = [\"2025-08-01\", \"2025-08-25\"]"
99+
"year = 2024"
100+
]
101+
},
102+
{
103+
"cell_type": "code",
104+
"execution_count": 19,
105+
"id": "53d0800c-a6b3-4a64-a7a2-f27689adc7d7",
106+
"metadata": {},
107+
"outputs": [],
108+
"source": [
109+
"# Map related settings\n",
110+
"center = shape(spatial_extent).centroid\n",
111+
"zoom = 14"
100112
]
101113
},
102114
{
@@ -110,52 +122,51 @@
110122
},
111123
{
112124
"cell_type": "code",
113-
"execution_count": 11,
125+
"execution_count": 6,
114126
"id": "e0618338",
115127
"metadata": {},
116128
"outputs": [
117129
{
118130
"name": "stdout",
119131
"output_type": "stream",
120132
"text": [
121-
"Processing 48 tiles for area of interest\n"
133+
"Processing 1 tiles for area of interest\n"
122134
]
123135
}
124136
],
125137
"source": [
126138
"tiles = requests.post(f\"http://{dispatch_api}/tiles\", json={\n",
127139
" \"grid\": \"20x20km\",\n",
128-
" \"aoi\": area_of_interest\n",
140+
" \"aoi\": spatial_extent\n",
129141
"}).json()\n",
130142
"print(f\"Processing {len(tiles['geometries'])} tiles for area of interest\")"
131143
]
132144
},
133145
{
134146
"cell_type": "code",
135-
"execution_count": 12,
147+
"execution_count": 20,
136148
"id": "6de8d686",
137149
"metadata": {},
138150
"outputs": [
139151
{
140152
"data": {
141153
"application/vnd.jupyter.widget-view+json": {
142-
"model_id": "b32498d96d9644398a441f9b7d1f19a0",
154+
"model_id": "766ab5981ee449048f19eae60ad8236c",
143155
"version_major": 2,
144156
"version_minor": 0
145157
},
146158
"text/plain": [
147-
"Map(center=[42.366326688880086, -6.894665372851023], controls=(ZoomControl(options=['position', 'zoom_in_text'…"
159+
"Map(center=[47.27073518869459, 14.388191800967462], controls=(ZoomControl(options=['position', 'zoom_in_text',"
148160
]
149161
},
150-
"execution_count": 12,
162+
"execution_count": 20,
151163
"metadata": {},
152164
"output_type": "execute_result"
153165
}
154166
],
155167
"source": [
156168
"# Create a map centered at the approximate center of the area of interest\n",
157-
"center = shape(area_of_interest).centroid\n",
158-
"m = Map(center=[center.y, center.x], zoom=8)\n",
169+
"m = Map(center=[center.y, center.x], zoom=zoom)\n",
159170
" \n",
160171
"# Add the tiles (GeometryCollection) to the map\n",
161172
"geo_json = GeoJSON(data=tiles)\n",
@@ -177,34 +188,34 @@
177188
},
178189
{
179190
"cell_type": "code",
180-
"execution_count": 13,
191+
"execution_count": 38,
181192
"id": "c5ca3fdd-7559-4fc7-8318-61b5dc59475f",
182193
"metadata": {},
183194
"outputs": [
184195
{
185196
"data": {
186197
"text/plain": [
187-
"{'id': 6,\n",
188-
" 'title': 'Forest Fire Detection',\n",
198+
"{'id': 13,\n",
199+
" 'title': 'Wind Turbine Detection',\n",
189200
" 'label': 'openeo',\n",
190201
" 'status': 'created'}"
191202
]
192203
},
193-
"execution_count": 13,
204+
"execution_count": 38,
194205
"metadata": {},
195206
"output_type": "execute_result"
196207
}
197208
],
198209
"source": [
199210
"upscaling_task = requests.post(f\"http://{dispatch_api}/upscale_tasks\", json={\n",
200-
" \"title\": \"Forest Fire Detection\",\n",
211+
" \"title\": \"Wind Turbine Detection\",\n",
201212
" \"label\": \"openeo\",\n",
202213
" \"service\": {\n",
203214
" \"endpoint\": endpoint,\n",
204215
" \"application\": application\n",
205216
" },\n",
206217
" \"parameters\": {\n",
207-
" \"temporal_extent\": time_of_interest\n",
218+
" \"year\": year\n",
208219
" },\n",
209220
" \"dimension\": {\n",
210221
" \"name\": \"spatial_extent\",\n",
@@ -226,7 +237,7 @@
226237
},
227238
{
228239
"cell_type": "code",
229-
"execution_count": 14,
240+
"execution_count": 39,
230241
"id": "02e5c413-d110-4110-be59-c86e5226edc5",
231242
"metadata": {},
232243
"outputs": [],
@@ -254,44 +265,43 @@
254265
},
255266
{
256267
"cell_type": "code",
257-
"execution_count": 18,
268+
"execution_count": 40,
258269
"id": "ac428293-7cd4-49a8-9bfa-4e0dc8f4d2cc",
259270
"metadata": {},
260271
"outputs": [
261272
{
262273
"data": {
263274
"application/vnd.jupyter.widget-view+json": {
264-
"model_id": "8c57dbf49c574581aa8b4efe312479e2",
275+
"model_id": "b2e2721b5cfb4e08a4416813f1f961d1",
265276
"version_major": 2,
266277
"version_minor": 0
267278
},
268279
"text/plain": [
269-
"Map(center=[42.251628548555004, -6.37490034623255], controls=(ZoomControl(options=['position', 'zoom_in_text',…"
280+
"Map(center=[47.27073518869459, 14.388191800967462], controls=(ZoomControl(options=['position', 'zoom_in_text',…"
270281
]
271282
},
272283
"metadata": {},
273284
"output_type": "display_data"
274285
},
275286
{
276-
"ename": "ConnectionClosedError",
277-
"evalue": "received 1012 (service restart); then sent 1012 (service restart)",
278-
"output_type": "error",
279-
"traceback": [
280-
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
281-
"\u001b[0;31mConnectionResetError\u001b[0m Traceback (most recent call last)",
282-
"File \u001b[0;32m~/.pyenv/versions/3.10.12/lib/python3.10/asyncio/selector_events.py:862\u001b[0m, in \u001b[0;36m_SelectorSocketTransport._read_ready__data_received\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 861\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 862\u001b[0m data \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_sock\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mrecv\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mmax_size\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 863\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m (\u001b[38;5;167;01mBlockingIOError\u001b[39;00m, \u001b[38;5;167;01mInterruptedError\u001b[39;00m):\n",
283-
"\u001b[0;31mConnectionResetError\u001b[0m: [Errno 54] Connection reset by peer",
284-
"\nThe above exception was the direct cause of the following exception:\n",
285-
"\u001b[0;31mConnectionClosedError\u001b[0m Traceback (most recent call last)",
286-
"Cell \u001b[0;32mIn[18], line 73\u001b[0m\n\u001b[1;32m 70\u001b[0m \u001b[38;5;28;01mbreak\u001b[39;00m\n\u001b[1;32m 72\u001b[0m \u001b[38;5;66;03m# Run the websocket listener in the notebook\u001b[39;00m\n\u001b[0;32m---> 73\u001b[0m \u001b[38;5;28;01mawait\u001b[39;00m listen_for_updates()\n",
287-
"Cell \u001b[0;32mIn[18], line 42\u001b[0m, in \u001b[0;36mlisten_for_updates\u001b[0;34m()\u001b[0m\n\u001b[1;32m 40\u001b[0m \u001b[38;5;28;01masync\u001b[39;00m \u001b[38;5;28;01mwith\u001b[39;00m websockets\u001b[38;5;241m.\u001b[39mconnect(ws_url) \u001b[38;5;28;01mas\u001b[39;00m websocket:\n\u001b[1;32m 41\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m:\n\u001b[0;32m---> 42\u001b[0m message \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mawait\u001b[39;00m websocket\u001b[38;5;241m.\u001b[39mrecv()\n\u001b[1;32m 43\u001b[0m message \u001b[38;5;241m=\u001b[39m json\u001b[38;5;241m.\u001b[39mloads(message)\n\u001b[1;32m 44\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m message\u001b[38;5;241m.\u001b[39mget(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mdata\u001b[39m\u001b[38;5;124m\"\u001b[39m):\n",
288-
"File \u001b[0;32m~/.pyenv/versions/3.10.12/lib/python3.10/site-packages/websockets/asyncio/connection.py:322\u001b[0m, in \u001b[0;36mConnection.recv\u001b[0;34m(self, decode)\u001b[0m\n\u001b[1;32m 318\u001b[0m \u001b[38;5;66;03m# fallthrough\u001b[39;00m\n\u001b[1;32m 319\u001b[0m \n\u001b[1;32m 320\u001b[0m \u001b[38;5;66;03m# Wait for the protocol state to be CLOSED before accessing close_exc.\u001b[39;00m\n\u001b[1;32m 321\u001b[0m \u001b[38;5;28;01mawait\u001b[39;00m asyncio\u001b[38;5;241m.\u001b[39mshield(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mconnection_lost_waiter)\n\u001b[0;32m--> 322\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mprotocol\u001b[38;5;241m.\u001b[39mclose_exc \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;21;01mself\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mrecv_exc\u001b[39;00m\n",
289-
"\u001b[0;31mConnectionClosedError\u001b[0m: received 1012 (service restart); then sent 1012 (service restart)"
287+
"name": "stdout",
288+
"output_type": "stream",
289+
"text": [
290+
"Job finished with status failed\n"
291+
]
292+
},
293+
{
294+
"name": "stderr",
295+
"output_type": "stream",
296+
"text": [
297+
"/var/folders/50/09_2zmx12zj6ks4fdl4y9wgc0000gn/T/ipykernel_92745/582752889.py:70: RuntimeWarning: coroutine 'Connection.close' was never awaited\n",
298+
" websocket.close()\n",
299+
"RuntimeWarning: Enable tracemalloc to get the object allocation traceback\n"
290300
]
291301
}
292302
],
293303
"source": [
294-
"m = Map(center=[42.251628548555004, -6.37490034623255], zoom=8)\n",
304+
"m = Map(center=[center.y, center.x], zoom=zoom)\n",
295305
"geo_json = GeoJSON(\n",
296306
" data={\n",
297307
" \"type\": \"FeatureCollection\",\n",
@@ -344,7 +354,7 @@
344354
" if job_status == \"finished\" and job_id not in processed_jobs:\n",
345355
" processed_jobs.add(job_id)\n",
346356
" await show_results(job_id)\n",
347-
" else:\n",
357+
" elif job_status != \"finished\":\n",
348358
" features.append({\n",
349359
" \"type\": \"Feature\",\n",
350360
" \"geometry\": job[\"parameters\"][\"spatial_extent\"],\n",
@@ -359,6 +369,7 @@
359369
" }\n",
360370
" geo_json.style_callback = job_style\n",
361371
" if message[\"data\"][\"status\"] in [\"finished\", \"canceled\", \"failed\"]:\n",
372+
" print(f\"Job finished with status {message['data']['status']}\")\n",
362373
" websocket.close()\n",
363374
" break\n",
364375
"\n",

0 commit comments

Comments
 (0)