Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
d7812f7
create a dag to handle the daily scheduled nightlight NRT data. The d…
sanzog03 Jun 2, 2025
06a517a
update the nrt update check and nrt data update task
sanzog03 Jun 3, 2025
0426b14
refactor to use the date extracted from metadata, into the collection…
sanzog03 Jun 3, 2025
77d8fd6
replace space with underscore on id
sanzog03 Jun 3, 2025
157f85d
update code into more granular subgroups. also moved nrt update speci…
sanzog03 Jun 4, 2025
5a76e33
removed unnecessary import
sanzog03 Jun 4, 2025
dc8602f
fix proper scoping on the branch_task return, enabling followup task
sanzog03 Jun 5, 2025
9264cc3
refactor dag to use taskflow api completely. added docstring to the t…
sanzog03 Jun 5, 2025
a12768f
add stac_extension to include web map links used by wmts capabilities.
sanzog03 Jun 5, 2025
92b911b
generic def names
sanzog03 Jun 5, 2025
d68f9ea
task group taking whatever is needed via parameters instead of global
sanzog03 Jun 5, 2025
95a7d64
modified dag creation with the thought of re-useability
sanzog03 Jun 5, 2025
ad97e93
generate dag docs based on the reused dag
sanzog03 Jun 5, 2025
fcaba93
add veda_worldview_nrt_data_collection_update_dag_creator to generate…
sanzog03 Jun 5, 2025
358e1e1
changed variables names and dag id generation to be more semantic
sanzog03 Jun 5, 2025
86b258c
change the taskgroupid to be generic
sanzog03 Jun 5, 2025
5026a63
add relevant dag tags
sanzog03 Jun 5, 2025
c1a6d19
fix collection config
sanzog03 Jun 5, 2025
2b6bcf1
added some comments
sanzog03 Nov 13, 2025
73ee5e8
renamed the dags and configs for better clarity
sanzog03 Nov 13, 2025
9646c50
Merge branch 'dev' into GHGC-632/scheduled_worldview_nrt_collection_u…
sanzog03 Nov 13, 2025
0a72f8b
remove unnecessary code
sanzog03 Nov 13, 2025
dd30c6b
take schedule from generate dag config. better namings
sanzog03 Nov 14, 2025
260324f
add ability to ingest wmts without gibs
sanzog03 Nov 14, 2025
82987e9
remove unwanted extensions from wmts config json
sanzog03 Nov 14, 2025
7e09e8b
adhere to the return type
sanzog03 Nov 14, 2025
e5b16e3
single task instantiation for whatever is necessary
sanzog03 Nov 14, 2025
c5208a7
refactor for better task arrangement and readability
sanzog03 Nov 14, 2025
fa67e1b
import error
sanzog03 Nov 14, 2025
94ae24f
1. made gibs url and schedule optional.
sanzog03 Nov 17, 2025
0e16bb1
wmts and gibs branching fix
sanzog03 Nov 17, 2025
d58dc2f
rename to make the dag generic for all wmts and not just gibs wmts
sanzog03 Nov 17, 2025
6cf67d9
update doc string
sanzog03 Nov 17, 2025
2f0ffd9
1. add validation task which validates if the json config is as per t…
sanzog03 Nov 17, 2025
087b486
remove comment
sanzog03 Nov 17, 2025
f1fb195
corrected import for tests
sanzog03 Nov 17, 2025
c33b58c
update wmts with validator
kyle-lesinger Nov 25, 2025
a1f3d5a
update pytests
kyle-lesinger Nov 25, 2025
ed6d435
add slack fail alert
kyle-lesinger Nov 25, 2025
0d61a5f
add value error messages
kyle-lesinger Nov 25, 2025
b2249b9
reduce number of logging statements
kyle-lesinger Dec 1, 2025
4a240f1
update generate dags
kyle-lesinger Dec 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions dags/veda_data_pipeline/groups/collection_group.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import requests
import datetime
from airflow.models.variable import Variable
from airflow.decorators import task, task_group

Expand Down Expand Up @@ -79,3 +80,86 @@ def collection_task_group():
generate_collection = generate_collection_task()
ingest_collection = ingest_collection_task(collection=generate_collection)

# Special task group to update nightlight NRT data collection that is pulled from worldview
@task_group(group_id="worldview_nightlight_nrt_collection_update_pipeline", tooltip="worldview nightlight NRT Collection update")
def worldview_collection_update_task_group(**context):
nrt_collection = context.get("VIIRS_SNPP_NRT_collection")
xml_string = fetch_nightlight_meta_from_gibs()
if (not xml_string):
return

latest_layer_date = extract_xml_date(xml_string)
if (not latest_layer_date):
return

if nrt_collection and nrt_update_check_task(latest_layer_date) and (updated_collection := update_nrt_collection_task(nrt_collection)):
ingest_collection_task(collection=updated_collection)

@task()
def nrt_update_check_task(ti=None, nrt_date: str="") -> bool:
"""
Check if a Near Real-Time (NRT) data is updated to latest/today.

This task compares a provided date string with the current date to determine
if the NRT data is up-to-date. The nrt_date parameter should be in the format
"%Y-%m-%d".
"""
if (not nrt_date):
return False

year, month, day = map(int, nrt_date.split("-"))
latest_nrt_data_date = datetime.date(year, month, day)
today = datetime.date.today()
if (latest_nrt_data_date >= today):
return True
return False

@task()
def update_nrt_collection_task(ti=None, previous_collection=None, latest_nrt_date=None):
import copy

if (not previous_collection or not latest_nrt_date):
return None

updated_collection = copy.deepcopy(previous_collection)
year, month, day = map(int, latest_nrt_date.split("-"))
latest_nrt_data_date = datetime.date(year, month, day)
formatted_datetime = latest_nrt_data_date.strftime("%Y-%m-%dT00:00:00Z")
updated_collection['extent']['temporal']['interval'][0][1] = formatted_datetime
return updated_collection

# helper
def fetch_nightlight_meta_from_gibs(gibs_url: str="https://gibs.earthdata.nasa.gov/wmts/epsg4326/best/1.0.0/WMTSCapabilities.xml") -> str:
try:
response = requests.get(gibs_url)
response.raise_for_status()
xml_string = response.text
return xml_string
except requests.exceptions.RequestException as e:
print(f"Error fetching from the gibs: {e}")
return ""

def extract_xml_date(xml_string: str) -> str:
import xml.etree.ElementTree as ET
XML_NAMESPACE = {'xmlns': 'http://www.opengis.net/wmts/1.0'}
OWS_NAMESPACE = {'ows': 'http://www.opengis.net/ows/1.1'}

if not xml_string:
return ""

root = ET.fromstring(xml_string)
contents = root.find('xmlns:Contents', XML_NAMESPACE)
if contents is None:
return False
layers = contents.findall('xmlns:Layer', XML_NAMESPACE)
if not layers:
return False
for layer in layers:
layer_id = layer.find('ows:Identifier', OWS_NAMESPACE).text
if (layer_id == 'VIIRS_SNPP_DayNightBand_At_Sensor_Radiance'):
dimension = layer.find('xmlns:Dimension', XML_NAMESPACE)
dimension_id = dimension.find('ows:Identifier', OWS_NAMESPACE).text
if (dimension_id == 'Time'):
layer_date = dimension.find('xmlns:Default', XML_NAMESPACE).text
return layer_date
return ""
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import pendulum
from datetime import timedelta
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
from veda_data_pipeline.groups.collection_group import worldview_collection_update_task_group


VIIRS_SNPP_NRT_collection = {
"assets": {},
"collection": "VIIRS_SNPP_DayNightBand_At_Sensor_Radiance",
"dashboard:is_periodic": True,
"dashboard:time_density": "day",
"dashboard:time_interval": "P1D",
"data_type": "cog",
"description": "The Black Marble Nighttime At Sensor Radiance (Day/Night Band) layer is created from NASA’s Black Marble daily at-sensor top-of-atmosphere nighttime radiance product (VNP46A1). It is displayed as a grayscale image. The layer is expressed in radiance units (nW/(cm2 sr)) with log10 conversion. It is stretched up to 38 nW/(cm2 sr) resulting in improvements in capturing city lights in greater spatial detail than traditional Nighttime Imagery resampled at 0-255 (e.g., Day/Night Band, Enhanced Near Constant Contrast).The ultra-sensitivity of the VIIRS Day/Night Band enables scientists to capture the Earth’s surface and atmosphere in low light conditions, allowing for better monitoring of nighttime phenomena. These images are also useful for assessing anthropogenic sources of light emissions under varying illumination conditions. For instance, during partial to full moon conditions, the layer can identify the location and features of clouds and other natural terrestrial features such as sea ice and snow cover, while enabling temporal observations in urban regions, regardless of moonlit conditions. As such, the layer is particularly useful for detecting city lights, lightning, auroras, fires, gas flares, and fishing fleets.The Black Marble Nighttime At Sensor Radiance (Day/Night Band) layer is available in near real-time from the Visible Infrared Imaging Radiometer Suite (VIIRS) aboard the joint NASA/NOAA Suomi National Polar orbiting Partnership (Suomi NPP) satellite. The sensor resolution is 750 m at nadir, imagery resolution is 500 m, and the temporal resolution is daily.",
"extent": {
"spatial": {
"bbox": [
[
-180,
-90,
180,
90
]
]
},
"temporal": {
"interval": [
[
"2020-11-10T00:00:00Z",
"2025-04-14T00:00:00Z"
]
]
}
},
"is_periodic": True,
"item_assets": {
"cog_default": {
"description": "Cloud optimized default layer to display on map",
"roles": [
"data",
"layer"
],
"title": "Default COG Layer",
"type": "image/tiff; application=geotiff; profile=cloud-optimized"
}
},
"license": "MIT",
"links": [
{
"href": "https://gibs.earthdata.nasa.gov/wmts/epsg3857/best/wmts.cgi",
"href:servers": [
"https://gibs-a.earthdata.nasa.gov/wmts/epsg3857/best/wmts.cgi",
"https://gibs-b.earthdata.nasa.gov/wmts/epsg3857/best/wmts.cgi"
],
"rel": "wmts",
"title": "Visualized through a WMTS",
"type": "image/png",
"wmts:dimensions": [
"default"
],
"wmts:layers": [
"VIIRS_SNPP_DayNightBand_At_Sensor_Radiance"
]
}
],
"product_level": "L2",
"providers": [],
"renders": {},
"stac_extensions": [
"https://stac-extensions.github.io/render/v1.0.0/schema.json",
"https://stac-extensions.github.io/item-assets/v1.0.0/schema.json"
],
"stac_version": "1.1.0",
"temporal_frequency": "twenty four hours",
"time_density": "day",
"time_interval": "P1D",
"title": "Black Marble Nighttime At Sensor Radiance (Day/Night Band)",
"type": "Collection",
"units": "m·s⁻¹"
}

dag_doc_md = f"""
### This DAG handles VIIRS_SNPP_DayNightBand_At_Sensor_Radiance NRT dataset update.
It checks if the NRT data hosted by earthdata is avaialble for the latest available date
via. https://gibs.earthdata.nasa.gov/wmts/epsg4326/best/1.0.0/WMTSCapabilities.xml
If available, it overrides the VIIRS_SNPP_DayNightBand_At_Sensor_Radiance collection
with the updated temporal extent and ingests into the catalog.
#### Notes
- This DAG can uses the following configuration for VIIRS_SNPP_DayNightBand_At_Sensor_Radiance NRT collection <br>
```json
{VIIRS_SNPP_NRT_collection}
```
"""

dag_args = {
"start_date": pendulum.today("UTC").add(days=-1),
"catchup": False,
"doc_md": dag_doc_md,
"tags": ["collection"],
}

with DAG(
"veda_worldview_nrt_data_collection_update",
schedule_interval=timedelta(days=1),
render_template_as_native_obj=True,
**dag_args
) as dag:
start = EmptyOperator(task_id="start", dag=dag)
end = EmptyOperator(task_id="end", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, dag=dag)

collection_grp = worldview_collection_update_task_group(VIIRS_SNPP_NRT_collection=VIIRS_SNPP_NRT_collection)

start >> collection_grp >> end