Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ venv/
# Data files
data/
events/
hyperparam_tuning/

# OS-specific files
.DS_Store
Expand Down Expand Up @@ -51,4 +52,5 @@ training_checkpoints
*.onnx

# big image
image.NEF
image.NEF

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies = [
"pydantic>=2.0.0",
"pygeohash==3.0.1",
"pyproj==3.7.1",
"pystac-client>=0.9.0",
"pywavelets>=1.8.0",
"rawpy>=0.25.1",
"requests==2.32.3",
Expand Down
185 changes: 184 additions & 1 deletion src/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
from PIL import Image

import logging
import pystac_client
from requests.auth import HTTPBasicAuth

logger = logging.getLogger(__name__)

class Source(Enum):
EONET = "eonet"
FIRMS = "firms"

ENMAP = "enmap"
def __str__(self):
return self.value

Expand Down Expand Up @@ -676,6 +679,186 @@ def copernicus_query(use_gcs=False, amount=135):
progress_tracker.update_event_progress(location.geohash, location_index=0, status="failed")
logger.error(f"Exception processing location {location.geohash}: {e}")


def enmap_query(use_gcs=False, amount=135):
"""
Queries EnMAP data from the DLR STAC API.
"""
DLR_USER = os.getenv("DLR_USER")
DLR_PASS = os.getenv("DLR_PASS")

if not DLR_USER or not DLR_PASS:
logger.error("DLR_USER and DLR_PASS environment variables must be set.")
return

STAC_URL = "https://geoservice.dlr.de/eoc/ogc/stac/v1"
COLLECTION_ID = "ENMAP_HSI_L2A"

locations = {}
progress_tracker = ProgressTracker()

# Reuse location creation logic
# Check FIRMS and EONET
try:
firms_locations = create_locations(source=Source.FIRMS, amount=amount, progress_tracker=progress_tracker)
if firms_locations:
locations[Source.FIRMS] = firms_locations
except Exception as e:
logger.warning(f"FIRMS location creation failed: {e}")

try:
eonet_locations = create_locations(source=Source.EONET, amount=amount, progress_tracker=progress_tracker)
if eonet_locations:
locations[Source.EONET] = eonet_locations
except Exception as e:
logger.warning(f"EONET location creation failed: {e}")

if not locations:
logger.warning("No locations found to process for EnMAP.")
return

# Authenticated session
session = requests.Session()
session.auth = HTTPBasicAuth(DLR_USER, DLR_PASS)

try:
client = pystac_client.Client.open(STAC_URL)
except Exception as e:
logger.error(f"Failed to open STAC client: {e}")
return

for source, source_locations in locations.items():
for loc in source_locations:
try:
# Construct bbox and time range
bbox = loc.bbox['bbox']
time_range = f"{loc.time['from']}/{loc.time['to']}"

logger.info(f"Searching EnMAP for {loc.geohash} in range {time_range}")

# Search for our enmap product
search = client.search(
collections=[COLLECTION_ID],
bbox=bbox,
datetime=time_range,
# OPTIONAL, only grabbing the first/best one
max_items=1
)

items = list(search.items())
if not items:
logger.info(f"No EnMAP data found for {loc.geohash}")
continue

item = items[0]
logger.info(f"Found EnMAP item: {item.id}")


assets_to_download = []
# grab swir and vnir assets
# theres also image and metadata assets, not sure if we want these nor do i know whats inside them
for key in ['vnir', 'swir']:
if key in item.assets:
assets_to_download.append((key, item.assets[key]))

# # If no main product, try looking for individual bands if they exist (unlikely for EnMAP L2A usually zipped)
# if not assets_to_download:
# # fallback to first asset that is not metadata/preview
# for key, asset in item.assets.items():
# if asset.media_type not in ['application/json', 'image/jpeg', 'image/png'] and 'metadata' not in key:
# assets_to_download.append((key, asset))
# break

if not assets_to_download:
logger.warning(f"No suitable download asset found for {item.id}")
continue

for key, asset in assets_to_download:
href = asset.href
logger.info(f"Downloading {key} from {href}")

# Download with authentication
max_retries = 3
for attempt in range(max_retries):
try:
# Verify session is still valid or recreate if needed (though hard to know without trying)
# For 403s on "concurrent session", maybe we need to be careful?
with session.get(href, stream=True) as r:
if r.status_code == 403:
logger.warning(f"Got 403 Forbidden. Attempt {attempt+1}/{max_retries}. Message: {r.text[:200]}")
# specific logic: if concurrent session, maybe wait?
if "concurrent active session" in r.text:
wait_time = 5 * (attempt + 1)
logger.info(f"Waiting {wait_time}s before retrying due to concurrent session error...")
time.sleep(wait_time)
continue

if not r.ok:
logger.error(f"Download failed for {href}: {r.status_code} {r.reason}")
logger.error(f"Response content: {r.text[:500]}") # Log first 500 chars
r.raise_for_status()

# Determine filename
filename = href.split('/')[-1].split('?')[0] # remove query params if any
# If filename is empty or generic, use item id
if not filename or len(filename) < 5:
ext = mimetypes.guess_extension(format) or ".zip"
filename = f"{item.id}{ext}"


if use_gcs:
gcs = GCSHandler()
gcs_path = f"raw_data/enmap/to_process/{datetime.now().strftime('%Y%m%d')}/{loc.geohash}/{filename}"
# GCS handler doesn't support streaming upload easily without rewrite.
# For now, buffer in memory if < 500MB, else temp file.
# Re-using write_image logic effectively for consistency with existing tool if appropriate.

# Actually default 'write_image' function handles generic response.
# But EnMAP files might be huge.
# Let's use a temp mapping

# Let's assume we can load it for now or modify write_image later.
# modifying write_image is out of scope unless necessary.
# I'll simply write locally and then upload if GCS.

# Download to temp
temp_dir = resolve_path("data/temp")
os.makedirs(temp_dir, exist_ok=True)
temp_path = os.path.join(temp_dir, filename)

with open(temp_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)

# Upload
gcs.upload_file(temp_path, gcs_path)
logger.info(f"Uploaded to {gcs_path}")
os.remove(temp_path)

else:
# Local save
directory = resolve_path("data/enmap")
os.makedirs(directory, exist_ok=True)
out_path = os.path.join(directory, f"{loc.geohash}_{filename}")

with open(out_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
logger.info(f"Saved to {out_path}")

# If we get here, success! break retry loop
break

except requests.exceptions.RequestException as e:
logger.error(f"Download request failed for {href}: {e}")
if e.response is not None:
logger.error(f"Error response content: {e.response.text[:500]}")
# Don't break immediately, let loop continue

except Exception as e:
logger.error(f"Error processing EnMAP for {loc.geohash}: {e}")


def batch_data_downloader_selenium(url=None, max_pages=9):
"""Downloads images from a Flickr album using Selenium.

Expand Down
6 changes: 5 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
batch_data_downloader_selenium,
retrieve_eonet_cross_reference,
copernicus_query,
convert_sen2fire_labeled
convert_sen2fire_labeled,
enmap_query
)
from postprocess import (
test_ccds
Expand Down Expand Up @@ -52,6 +53,7 @@
parser.add_argument('--batch-download', required=False, action='store_true', help="Download images using Selenium")
parser.add_argument('--eonet-crossref', required=False, action='store_true', help="Fetch wildfire data from the EONET API")
parser.add_argument('--copernicus-query', required=False, action='store_true', help="Query Sentinel data from Copernicus")
parser.add_argument('--enmap-query', required=False, action='store_true', help="Query EnMAP data from DLR STAC")
parser.add_argument('--coordinates', required=False, nargs=2, type=float, metavar=('LON', 'LAT'),
help="Specify coordinates for the query in the format: LON LAT")
parser.add_argument('--time-range', required=False, nargs=2, metavar=('FROM', 'TO'),
Expand Down Expand Up @@ -89,6 +91,8 @@
retrieve_eonet_cross_reference()
elif args.copernicus_query:
copernicus_query(use_gcs=args.use_gcs)
elif args.enmap_query:
enmap_query(use_gcs=args.use_gcs)
elif args.multimodal_qc:
import mlops
mlops.run_multimodal_qc(use_gcs=args.use_gcs, input_path=args.qc_path)
Expand Down
Loading