Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Generated by Django 5.2.9 on 2025-12-18 10:30

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("data", "0040_remove_age_and_stratum_from_rbac_permission"),
]

operations = [
migrations.RemoveConstraint(
model_name="apitimeseries",
name="The `APITimeSeries` record should be unique if `force_write` is False",
),
migrations.RemoveConstraint(
model_name="coreheadline",
name="The `CoreHeadline` record should be unique if `force_write` is False",
),
migrations.RemoveConstraint(
model_name="coretimeseries",
name="The `CoreTimeSeries` record should be unique if `force_write` is False",
),
migrations.AddConstraint(
model_name="apitimeseries",
constraint=models.UniqueConstraint(
condition=models.Q(("force_write", False)),
fields=(
"metric",
"topic",
"theme",
"sub_theme",
"geography",
"geography_type",
"geography_code",
"stratum",
"age",
"sex",
"year",
"month",
"epiweek",
"date",
"metric_value",
"in_reporting_delay_period",
"embargo",
),
name="The `APITimeSeries` record should be unique if `force_write` is False",
),
),
migrations.AddConstraint(
model_name="coreheadline",
constraint=models.UniqueConstraint(
condition=models.Q(("force_write", False)),
fields=(
"metric",
"geography",
"stratum",
"age",
"sex",
"period_start",
"period_end",
"metric_value",
"embargo",
),
name="The `CoreHeadline` record should be unique if `force_write` is False",
),
),
migrations.AddConstraint(
model_name="coretimeseries",
constraint=models.UniqueConstraint(
condition=models.Q(("force_write", False)),
fields=(
"metric",
"geography",
"stratum",
"age",
"sex",
"year",
"month",
"epiweek",
"date",
"metric_value",
"in_reporting_delay_period",
"embargo",
),
name="The `CoreTimeSeries` record should be unique if `force_write` is False",
),
),
]
1 change: 1 addition & 0 deletions metrics/data/models/api_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Meta:
"date",
"metric_value",
"in_reporting_delay_period",
"embargo",
),
name="The `APITimeSeries` record should be unique if `force_write` is False",
condition=Q(force_write=False),
Expand Down
1 change: 1 addition & 0 deletions metrics/data/models/core_models/headline.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Meta:
"period_start",
"period_end",
"metric_value",
"embargo",
),
name="The `CoreHeadline` record should be unique if `force_write` is False",
condition=Q(force_write=False),
Expand Down
1 change: 1 addition & 0 deletions metrics/data/models/core_models/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class Meta:
"date",
"metric_value",
"in_reporting_delay_period",
"embargo",
),
name="The `CoreTimeSeries` record should be unique if `force_write` is False",
condition=Q(force_write=False),
Expand Down
101 changes: 101 additions & 0 deletions tests/integration/ingestion/test_file_ingestion.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import zoneinfo

import pytest
Expand Down Expand Up @@ -175,3 +176,103 @@ def test_creates_api_time_series_from_data(
core_time_series.embargo.strftime("%Y-%m-%d %H:%M:%S")
== data["time_series"][0]["embargo"]
)

@pytest.mark.django_db
def test_updates_api_timeseries_embargo_date(
self,
example_time_series_data: type_hints.INCOMING_DATA_TYPE,
):
"""
Given time series data ingested once
When the same data is ingested again with a later refresh date
and an updated embargo timestamp
Then the latest records for CoreTimeSeries and APITimeSeries
reflect the updated embargo timestamps
"""

# Before ingestion
assert CoreTimeSeries.objects.all().count() == 0
assert APITimeSeries.objects.all().count() == 0

# Given
initial_timeseries_data = copy.deepcopy(example_time_series_data)
updated_timeseries_data = copy.deepcopy(initial_timeseries_data)

# When
data_ingester(data=initial_timeseries_data)

# Reingest with updated embargo and later refresh date
updated_timeseries_refresh_date = "2023-11-21"
updated_timeseries_embargo = "2023-11-19 09:15:00"

updated_timeseries_data["refresh_date"] = updated_timeseries_refresh_date
for record in updated_timeseries_data["time_series"]:
record["embargo"] = updated_timeseries_embargo

data_ingester(data=updated_timeseries_data)

# Then
updated_core_timeseries = CoreTimeSeries.objects.filter(
refresh_date__date=updated_timeseries_refresh_date
)
assert len(updated_timeseries_data["time_series"]) == 2
for record in updated_core_timeseries:
assert (
record.embargo.strftime("%Y-%m-%d %H:%M:%S")
== updated_timeseries_embargo
)

updated_api_timeseries = APITimeSeries.objects.filter(
refresh_date__date=updated_timeseries_refresh_date
)
assert updated_api_timeseries.count() == len(
updated_timeseries_data["time_series"]
)
for record in updated_api_timeseries:
assert (
record.embargo.strftime("%Y-%m-%d %H:%M:%S")
== updated_timeseries_embargo
)

@pytest.mark.django_db
def test_updates_api_coreheadline_embargo_date(
self,
example_headline_data: type_hints.INCOMING_DATA_TYPE,
):
"""
Given headline data ingested once
When the same data is ingested again with a later refresh date
and an updated embargo timestamp
Then the latest record for CoreHeadline
reflect the updated embargo timestamps
"""

# Before ingestion
assert CoreHeadline.objects.all().count() == 0

# Given
initial_headline_data = copy.deepcopy(example_headline_data)
updated_headline_data = copy.deepcopy(initial_headline_data)

# When
data_ingester(data=initial_headline_data)

# Reingest with updated embargo and later refresh date
updated_headline_refresh_date = "2023-11-21"
updated_headline_embargo = "2023-11-18 08:00:00"

updated_headline_data["refresh_date"] = updated_headline_refresh_date
for record in updated_headline_data["data"]:
record["embargo"] = updated_headline_embargo

data_ingester(data=updated_headline_data)

# Then
updated_headlines = CoreHeadline.objects.filter(
refresh_date__date=updated_headline_refresh_date
)
assert len(updated_headline_data["data"]) == 2
for record in updated_headlines:
assert (
record.embargo.strftime("%Y-%m-%d %H:%M:%S") == updated_headline_embargo
)