Skip to content

Commit 09aa7b6

Browse files
authored
Merge pull request #108 from gulfofmaine/initial_s3_pipeline
Initial S3 timeseries tests
2 parents 899b989 + 5b284b9 commit 09aa7b6

File tree

16 files changed

+3449
-47
lines changed

16 files changed

+3449
-47
lines changed

.github/workflows/pipeline_s3_timeseries.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ jobs:
2525
image_name: buoy_retriever_s3_timeseries
2626
image_tag: ${{ needs.shortsha.outputs.shortsha }}
2727
# push_image: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }}
28-
# test_command: "pixi run pytest"
28+
test_command: "pixi run pytest --cov=. --cov-report=xml --cov-report=term-missing --junitxml=junit.xml -o junit_family=legacy"
29+
coverage_flags: s3_timeseries
30+
upload_tests: true
2931

3032
# deploy:
3133
# needs: [shortsha, build_test_push]

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ coverage.xml
5050
.hypothesis/
5151
.pytest_cache/
5252
cover/
53+
junit.xml
5354

5455
# Translations
5556
*.mo

Makefile

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,16 @@ user:
5151
# Open a Django shell inside the backend container
5252
shell:
5353
docker compose exec backend pixi run python manage.py shell
54+
55+
test-common:
56+
cd common; uv run pytest --cov=.
57+
58+
test-hohonu:
59+
docker build -f pipeline/hohonu/Dockerfile -t buoy_retriever-hohonu .
60+
docker run buoy_retriever-hohonu pixi run pytest --cov=.
61+
62+
test-s3-timeseries:
63+
docker build -f pipeline/s3_timeseries/Dockerfile -t buoy_retriever-s3_timeseries .
64+
docker run buoy_retriever-s3_timeseries pixi run pytest --cov=.
65+
66+
test-all: test-common test-s3-timeseries test-hohonu

README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ Spotlight is also accessible to Github Copilot and other agents via a MCP server
9191

9292
Currently there is some testing for common utilities and Hohonu pipelines.
9393

94-
For common, cd into `common/` then `uv run pytest`. Add `--cov=.` to see coverage (along with other pytest-cov options).
94+
`make test-all` to try to test everything.
9595

96-
For Hohonu, `docker compose exec hohonu pixi run pytest`.
96+
- Common - `make test-common` or cd into `common/` then `uv run pytest`. Add `--cov=.` to see coverage (along with other pytest-cov options).
97+
- Pipelines
98+
- S3 Timeseries - `make test-s3-timeseries` for more isolated tests, or `docker compose exec/run s3_timeseries pixi run pytest` which will mount volumes (in case snapshots need to be updated).
99+
- To run tests with real AWS data, include the `--aws` to `pytest`. This requires AWS credentials that can access the buckets.
100+
- Hohonu - `make test-hohonu` for a more isolated test, or `docker compose exec/run hohonu pixi run pytest` will mount volumes (in case snapshots need to be updated).

common/test_utils/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
from .assets import get_asset_by_name
2+
from .sensors import get_sensor_by_name
23

3-
__all__ = ["get_asset_by_name"]
4+
__all__ = ["get_asset_by_name", "get_sensor_by_name"]

common/test_utils/sensors.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import dagster as dg
2+
3+
4+
def get_sensor_by_name(defs, name: str) -> dg.SensorDefinition:
5+
"""Extract a specific sensor from Dagster Definitions by name.
6+
7+
Uses the function name, not the full sensor key/prefix.
8+
"""
9+
for sensor in defs.sensors:
10+
if sensor.name == name:
11+
return sensor
12+
raise KeyError(f"Sensor with name {name} not found")

pipeline/hohonu/tests/test_hohonu_pipeline.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
def vcr_config():
1616
return {
1717
"filter_headers": [("Authorization", "FAKE")],
18+
"ignore_hosts": ["spotlight"],
1819
}
1920

2021

pipeline/s3_timeseries/Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ ENV PYTHONWARNINGS once
1313
ENV PIXI_FROZEN true
1414

1515
WORKDIR ${HOME}
16-
COPY ${SOURCE}/pixi.toml ${SOURCE}/pixi.lock ${HOME}/
16+
COPY ${SOURCE}/pyproject.toml ${SOURCE}/pixi.lock ${HOME}/
1717

1818
RUN --mount=type=cache,id=br_s3_timeseries,target=/home/ioos/.cache/rattler/cache,uid=1000,gid=1000 \
1919
pixi install -e default
@@ -28,6 +28,8 @@ COPY ${SOURCE}/*.py \
2828
${SOURCE}/*.yaml \
2929
${HOME}/
3030

31+
COPY ${SOURCE}/tests/ ${HOME}/tests/
32+
3133
# COPY ${SOURCE}/platforms \
3234
# ${HOME}/platforms
3335

pipeline/s3_timeseries/pipeline.py

Lines changed: 119 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1-
from datetime import date
1+
from datetime import date, datetime
22
from textwrap import dedent
33
from typing import Annotated
44

5+
import boto3
56
import dagster as dg
67
import pandas as pd
78
import sentry_sdk
89
import xarray as xr
10+
from dagster_aws.s3.sensor import get_objects
11+
from parse import parse
912
from pydantic import BaseModel, Field
1013

1114
from common import assets, config, io
@@ -88,7 +91,7 @@ def monthly_partition_path(self):
8891
)
8992

9093

91-
def defs_for_dataset(dataset: S3TimeseriesDataset) -> dg.Definitions:
94+
def defs_for_dataset(dataset: S3TimeseriesDataset) -> dg.Definitions: # noqa: C901
9295
"""Definitions for a single S3 Timeseries dataset."""
9396
common_asset_kwargs = {
9497
"key_prefix": ["s3_timeseries", dataset.safe_slug],
@@ -116,7 +119,13 @@ def daily_df(context: dg.AssetExecutionContext, s3fs: S3FSResource) -> pd.DataFr
116119
"""Download daily dataframe from S3."""
117120
partition_date_string = context.asset_partition_key_for_output()
118121
partition_date = date.fromisoformat(partition_date_string)
119-
day_glob = f"{dataset.config.s3_source.bucket}{dataset.config.s3_source.prefix}{dataset.config.file_pattern.day_pattern.format(partition_date=partition_date)}"
122+
day_glob = (
123+
dataset.config.s3_source.bucket
124+
+ dataset.config.s3_source.prefix
125+
+ dataset.config.file_pattern.day_pattern.format(
126+
partition_date=partition_date,
127+
)
128+
)
120129

121130
context.log.info(
122131
f"Reading daily data for {partition_date_string} from S3 with glob: {day_glob}",
@@ -137,6 +146,16 @@ def daily_df(context: dg.AssetExecutionContext, s3fs: S3FSResource) -> pd.DataFr
137146
daily_dfs.append(df)
138147
df = pd.concat(daily_dfs)
139148

149+
# some flexibility in datetime/time column name
150+
for col_name in ["datetime", "time"]:
151+
try:
152+
df[col_name] = pd.to_datetime(df[col_name])
153+
df = df.sort_values(col_name)
154+
except KeyError:
155+
pass
156+
157+
df = df.reset_index(drop=True)
158+
140159
return df
141160

142161
@dg.asset(
@@ -206,7 +225,103 @@ def monthly_ds(
206225

207226
return ds
208227

209-
return dg.Definitions(assets=[daily_df, monthly_ds])
228+
daily_job = dg.define_asset_job(
229+
f"update_{dataset.safe_slug}_daily",
230+
selection=[daily_df],
231+
)
232+
233+
@dg.sensor(
234+
job=daily_job,
235+
name=dataset.safe_slug + "_s3_sensor",
236+
minimum_interval_seconds=5 * 60,
237+
)
238+
def s3_sensor(context: dg.SensorEvaluationContext, s3_credentials: S3Credentials):
239+
"""Sensor to detect new files for a day in S3."""
240+
with sentry_sdk.start_transaction(
241+
op=f"{dataset.safe_slug}_s3_sensor",
242+
name=f"S3 Sensor for {dataset.safe_slug}",
243+
):
244+
since_time = context.cursor or None
245+
if since_time:
246+
since_time = datetime.fromisoformat(since_time)
247+
248+
client = boto3.client(
249+
"s3",
250+
aws_access_key_id=s3_credentials.access_key_id,
251+
aws_secret_access_key=s3_credentials.secret_access_key,
252+
)
253+
file_start = dataset.config.file_pattern.day_pattern.split("{")[0]
254+
255+
new_s3_keys = get_objects(
256+
bucket=dataset.config.s3_source.bucket,
257+
prefix=file_start,
258+
since_last_modified=since_time,
259+
client=client,
260+
)
261+
if not new_s3_keys:
262+
return dg.SkipReason("No new files found in S3.")
263+
264+
existing_partitions = set()
265+
known_partitions = set(daily_partitions.get_partition_keys())
266+
for run in context.instance.get_runs(
267+
filters=dg.RunsFilter(
268+
job_name=daily_job.name,
269+
statuses=[
270+
dg.DagsterRunStatus.QUEUED,
271+
dg.DagsterRunStatus.STARTING,
272+
dg.DagsterRunStatus.STARTED,
273+
dg.DagsterRunStatus.NOT_STARTED,
274+
],
275+
),
276+
):
277+
try:
278+
existing_partitions.add(run.tags["dagster/partition"])
279+
except KeyError:
280+
pass
281+
282+
for key in new_s3_keys:
283+
object_key = key.get("Key")
284+
285+
object_name = object_key.removeprefix(dataset.config.s3_source.prefix)
286+
name_pattern = dataset.config.file_pattern.day_pattern.replace(
287+
"*",
288+
"{}",
289+
)
290+
result = parse(name_pattern, object_name)
291+
dt = result.named["partition_date"].strftime("%Y-%m-%d")
292+
293+
if dt not in existing_partitions:
294+
last_modified = key.get("LastModified")
295+
296+
if since_time is None or last_modified > since_time:
297+
existing_partitions.add(dt)
298+
run_key = f"{dt}_{last_modified.isoformat()}"
299+
300+
if dt in known_partitions:
301+
yield dg.RunRequest(
302+
run_key=run_key,
303+
partition_key=dt,
304+
)
305+
else:
306+
context.log.info(
307+
f"Skipping partition {dt} as it is not a known partition",
308+
)
309+
310+
latest_key_dt = max(key.get("LastModified") for key in new_s3_keys)
311+
context.update_cursor(latest_key_dt.isoformat())
312+
313+
dataset_assets = [daily_df, monthly_ds]
314+
315+
return dg.Definitions(
316+
assets=dataset_assets,
317+
sensors=[
318+
s3_sensor,
319+
dg.AutomationConditionSensorDefinition(
320+
dataset.safe_slug + "_automation_sensor",
321+
target=dataset_assets,
322+
),
323+
],
324+
)
210325

211326

212327
@dg.definitions

0 commit comments

Comments
 (0)