Skip to content

Commit 887295f

Browse files
authored
feat(assets): Add satellite assets (#160)
* feat(assets): Add satellite assets * fix(sat): typechecking
1 parent 78a1847 commit 887295f

File tree

3 files changed

+188
-10
lines changed

3 files changed

+188
-10
lines changed

src/dagster_dags/assets/sat/eumetsat_iodc_lrv.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
"""Zarr archive of satellite image data from EUMETSAT's RSS service, low resolution.
1+
"""Zarr archive of satellite image data from EUMETSAT's IODC service, low resolution.
22
33
EUMETSAT have a seviri satellite that provides images of the earth's surface.
4-
The Rapid Scan Service (RSS) provides images at 15 minute intervals.
4+
The Rapid Scan Service (RSS) provides images at 5 minute intervals,
5+
whilst other Severi images are at 15 minute intervals.
56
The images are in the MSG format, which is a compressed format that contains
67
multiple channels of data. The come in high resolution (HRV) and low resolution (LRV).
78
89
Sourced via eumdac from DataStore (https://navigator.eumetsat.int/product/EO:EUM:DAT:MSG:RSS).
910
This asset is updated monthly, and surfaced as a Zarr Directory Store for each month.
10-
It is downloaded using the sat container.
11+
It is downloaded using the satellite-consumer.
1112
"""
1213

1314
import os
@@ -21,11 +22,10 @@
2122

2223
ARCHIVE_FOLDER = "/var/dagster-storage/sat/eumetsat-iodc-lrv"
2324
if os.getenv("ENVIRONMENT", "local") == "leo":
24-
ARCHIVE_FOLDER = "/mnt/storage_ssd_4tb/sat/eumetsat-iodc-lrv"
25+
ARCHIVE_FOLDER = "/mnt/storage_b/archives/sat/eumetsat-iodc-lrv"
2526

2627
partitions_def: dg.TimeWindowPartitionsDefinition = dg.MonthlyPartitionsDefinition(
27-
start_date="2019-01-01",
28-
end_offset=-1,
28+
start_date="2025-01-01",
2929
)
3030

3131
@dg.asset(
@@ -38,22 +38,40 @@
3838
"expected_runtime": dg.MetadataValue.text("6 hours"),
3939
},
4040
compute_kind="docker",
41+
automation_condition=dg.AutomationCondition.on_cron(
42+
cron_schedule=partitions_def.get_cron_schedule(
43+
hour_of_day=0,
44+
day_of_week=4,
45+
),
46+
),
4147
tags={
4248
"dagster/max_runtime": str(60 * 60 * 10), # Should take 6 ish hours
4349
"dagster/priority": "1",
4450
"dagster/concurrency_key": "eumetsat",
4551
},
4652
partitions_def=partitions_def,
4753
)
48-
def eumetsat_seviri_lrv_asset(
54+
def eumetsat_iodc_lrv_asset(
4955
context: dg.AssetExecutionContext,
5056
pipes_docker_client: PipesDockerClient,
5157
) -> dg.MaterializeResult:
52-
"""Dagster asset for EUMETSAT's RSS service, low resolution."""
58+
"""Dagster asset for EUMETSAT's IODC service, low resolution."""
5359
it: dt.datetime = context.partition_time_window.start
60+
5461
return pipes_docker_client.run(
55-
image="ghcr.io/openclimatefix/sat-etl:main",
56-
command=["iodc", "--month", f"{it:%Y-%m}", "--path", "/work", "--rm"],
62+
image="ghcr.io/openclimatefix/satellite-consumer:0.2.0",
63+
command=[],
64+
env={
65+
"EUMETSAT_CONSUMER_KEY": os.environ["EUMETSAT_CONSUMER_KEY"],
66+
"EUMETSAT_CONSUMER_SECRET": os.environ["EUMETSAT_CONSUMER_SECRET"],
67+
"SATCONS_COMMAND": "consume",
68+
"SATCONS_WINDOW_MONTHS": "1",
69+
"SATCONS_SATELLITE": "iodc",
70+
"SATCONS_VALIDATE": "true",
71+
"SATCONS_RESCALE": "true",
72+
"SATCONS_TIME": it.strftime("%Y%m%dT%H%M"),
73+
"SATCONS_NUM_WORKERS": "4",
74+
},
5775
container_kwargs={
5876
"volumes": [f"{ARCHIVE_FOLDER}:/work"],
5977
},
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
"""Zarr archive of satellite image data from EUMETSAT's 0Degree service, low resolution.
2+
3+
EUMETSAT have a seviri satellite that provides images of the earth's surface.
4+
The Rapid Scan Service (odegree) provides images at 5 minute intervals,
5+
whilst other Severi images are at 15 minute intervals.
6+
The images are in the MSG format, which is a compressed format that contains
7+
multiple channels of data. The come in high resolution (HRV) and low resolution (LRV).
8+
9+
Sourced via eumdac from DataStore (https://navigator.eumetsat.int/product/EO:EUM:DAT:MSG:odegree).
10+
This asset is updated monthly, and surfaced as a Zarr Directory Store for each month.
11+
It is downloaded using the satellite-consumer.
12+
"""
13+
14+
import os
15+
from typing import TYPE_CHECKING
16+
17+
import dagster as dg
18+
from dagster_docker import PipesDockerClient
19+
20+
if TYPE_CHECKING:
21+
import datetime as dt
22+
23+
ARCHIVE_FOLDER = "/var/dagster-storage/sat/eumetsat-odegree-lrv"
24+
if os.getenv("ENVIRONMENT", "local") == "leo":
25+
ARCHIVE_FOLDER = "/mnt/storage_b/archives/sat/eumetsat-odegree-lrv"
26+
27+
partitions_def: dg.TimeWindowPartitionsDefinition = dg.MonthlyPartitionsDefinition(
28+
start_date="2025-01-01",
29+
)
30+
31+
@dg.asset(
32+
name="eumetsat-odegree-lrv",
33+
description=__doc__,
34+
metadata={
35+
"archive_folder": dg.MetadataValue.text(ARCHIVE_FOLDER),
36+
"area": dg.MetadataValue.text("europe"),
37+
"source": dg.MetadataValue.text("eumetsat"),
38+
"expected_runtime": dg.MetadataValue.text("6 hours"),
39+
},
40+
compute_kind="docker",
41+
automation_condition=dg.AutomationCondition.on_cron(
42+
cron_schedule=partitions_def.get_cron_schedule(
43+
hour_of_day=0,
44+
day_of_week=6,
45+
),
46+
),
47+
tags={
48+
"dagster/max_runtime": str(60 * 60 * 10), # Should take 6 ish hours
49+
"dagster/priority": "1",
50+
"dagster/concurrency_key": "eumetsat",
51+
},
52+
partitions_def=partitions_def,
53+
)
54+
def eumetsat_odegree_lrv_asset(
55+
context: dg.AssetExecutionContext,
56+
pipes_docker_client: PipesDockerClient,
57+
) -> dg.MaterializeResult:
58+
"""Dagster asset for EUMETSAT's odegree service, low resolution."""
59+
it: dt.datetime = context.partition_time_window.start
60+
61+
return pipes_docker_client.run(
62+
image="ghcr.io/openclimatefix/satellite-consumer:0.2.0",
63+
command=[],
64+
env={
65+
"EUMETSAT_CONSUMER_KEY": os.environ["EUMETSAT_CONSUMER_KEY"],
66+
"EUMETSAT_CONSUMER_SECRET": os.environ["EUMETSAT_CONSUMER_SECRET"],
67+
"SATCONS_COMMAND": "consume",
68+
"SATCONS_WINDOW_MONTHS": "1",
69+
"SATCONS_SATELLITE": "odegree",
70+
"SATCONS_VALIDATE": "true",
71+
"SATCONS_RESCALE": "true",
72+
"SATCONS_TIME": it.strftime("%Y%m%dT%H%M"),
73+
"SATCONS_NUM_WORKERS": "4",
74+
},
75+
container_kwargs={
76+
"volumes": [f"{ARCHIVE_FOLDER}:/work"],
77+
},
78+
context=context,
79+
).get_materialize_result()
80+
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
"""Zarr archive of satellite image data from EUMETSAT's RSS service, low resolution.
2+
3+
EUMETSAT have a seviri satellite that provides images of the earth's surface.
4+
The Rapid Scan Service (RSS) provides images at 5 minute intervals,
5+
whilst other Severi images are at 15 minute intervals.
6+
The images are in the MSG format, which is a compressed format that contains
7+
multiple channels of data. The come in high resolution (HRV) and low resolution (LRV).
8+
9+
Sourced via eumdac from DataStore (https://navigator.eumetsat.int/product/EO:EUM:DAT:MSG:RSS).
10+
This asset is updated monthly, and surfaced as a Zarr Directory Store for each month.
11+
It is downloaded using the satellite-consumer.
12+
"""
13+
14+
import os
15+
from typing import TYPE_CHECKING
16+
17+
import dagster as dg
18+
from dagster_docker import PipesDockerClient
19+
20+
if TYPE_CHECKING:
21+
import datetime as dt
22+
23+
ARCHIVE_FOLDER = "/var/dagster-storage/sat/eumetsat-rss-lrv"
24+
if os.getenv("ENVIRONMENT", "local") == "leo":
25+
ARCHIVE_FOLDER = "/mnt/storage_b/archives/sat/eumetsat-rss-lrv"
26+
27+
partitions_def: dg.TimeWindowPartitionsDefinition = dg.MonthlyPartitionsDefinition(
28+
start_date="2025-01-01",
29+
)
30+
31+
@dg.asset(
32+
name="eumetsat-rss-lrv",
33+
description=__doc__,
34+
metadata={
35+
"archive_folder": dg.MetadataValue.text(ARCHIVE_FOLDER),
36+
"area": dg.MetadataValue.text("europe"),
37+
"source": dg.MetadataValue.text("eumetsat"),
38+
"expected_runtime": dg.MetadataValue.text("6 hours"),
39+
},
40+
compute_kind="docker",
41+
automation_condition=dg.AutomationCondition.on_cron(
42+
cron_schedule=partitions_def.get_cron_schedule(
43+
hour_of_day=0,
44+
day_of_week=5,
45+
),
46+
),
47+
tags={
48+
"dagster/max_runtime": str(60 * 60 * 10), # Should take 6 ish hours
49+
"dagster/priority": "1",
50+
"dagster/concurrency_key": "eumetsat",
51+
},
52+
partitions_def=partitions_def,
53+
)
54+
def eumetsat_rss_lrv_asset(
55+
context: dg.AssetExecutionContext,
56+
pipes_docker_client: PipesDockerClient,
57+
) -> dg.MaterializeResult:
58+
"""Dagster asset for EUMETSAT's RSS service, low resolution."""
59+
it: dt.datetime = context.partition_time_window.start
60+
61+
return pipes_docker_client.run(
62+
image="ghcr.io/openclimatefix/satellite-consumer:0.2.0",
63+
command=[],
64+
env={
65+
"EUMETSAT_CONSUMER_KEY": os.environ["EUMETSAT_CONSUMER_KEY"],
66+
"EUMETSAT_CONSUMER_SECRET": os.environ["EUMETSAT_CONSUMER_SECRET"],
67+
"SATCONS_COMMAND": "consume",
68+
"SATCONS_WINDOW_MONTHS": "1",
69+
"SATCONS_SATELLITE": "rss",
70+
"SATCONS_VALIDATE": "true",
71+
"SATCONS_RESCALE": "true",
72+
"SATCONS_TIME": it.strftime("%Y%m%dT%H%M"),
73+
"SATCONS_NUM_WORKERS": "4",
74+
},
75+
container_kwargs={
76+
"volumes": [f"{ARCHIVE_FOLDER}:/work"],
77+
},
78+
context=context,
79+
).get_materialize_result()
80+

0 commit comments

Comments
 (0)