-
-
Notifications
You must be signed in to change notification settings - Fork 46
Added Germany PV & GFS data download pipelines #124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
Sharkyii
wants to merge
10
commits into
openclimatefix:main
Choose a base branch
from
Sharkyii:data/Germany
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+471
−0
Draft
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
a22067c
downloading files added
Sharkyii 28ef7af
downloading files added
Sharkyii 2a534f2
Clean notebook outputs
Sharkyii ba49d7c
done
Sharkyii 1936170
Delete downloading_gfs_germany.ipynb
Sharkyii dc71c71
cleared
Sharkyii 7bd0bc7
updated germany_data_processor notebook
Sharkyii a05c720
sneh
Sharkyii 24dcbd3
Modify zarr_path to use an empty string
Sharkyii f602e41
1
Sharkyii File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,212 @@ | ||
| import os | ||
| import requests | ||
| import xarray as xr | ||
| import numpy as np | ||
| import pandas as pd | ||
| import time | ||
| from datetime import datetime, timedelta | ||
| from pathlib import Path | ||
|
|
||
| LAT_MIN, LAT_MAX = 47, 55 | ||
| LON_MIN, LON_MAX = 6, 15 | ||
|
|
||
| VARIABLES = { | ||
| "dswrf": "DSWRF:surface", | ||
| "t": "TMP:2 m above ground", | ||
| "r": "RH:2 m above ground", | ||
| "tcc": "TCDC:entire atmosphere", | ||
| "u10": "UGRD:10 m above ground", | ||
| "v10": "VGRD:10 m above ground", | ||
| } | ||
|
|
||
| OUTPUT_DIR = Path("./germany_gfs_data") | ||
| OUTPUT_DIR.mkdir(exist_ok=True) | ||
|
|
||
| START_DATE = datetime(2023, 1, 1) | ||
| END_DATE = datetime(2023, 1, 1) | ||
| CYCLES = [0, 6, 12, 18] | ||
| FORECAST_HOURS = [0, 3, 6, 9, 12, 15, 18, 21, 24] | ||
|
|
||
|
|
||
| def get_byte_ranges(idx_url): | ||
| r = requests.get(idx_url) | ||
| if r.status_code != 200: | ||
| return None | ||
|
|
||
| lines = r.text.splitlines() | ||
| records = [] | ||
|
|
||
| for i, line in enumerate(lines): | ||
| parts = line.split(":") | ||
| if len(parts) < 5: | ||
| continue | ||
|
|
||
| offset = int(parts[1]) | ||
| var_lvl = f"{parts[3]}:{parts[4]}" | ||
| next_offset = int(lines[i+1].split(":")[1]) if i+1 < len(lines) else "" | ||
|
|
||
| records.append({ | ||
| "offset": offset, | ||
| "var_lvl": var_lvl, | ||
| "next": next_offset | ||
| }) | ||
|
|
||
| return records | ||
|
|
||
|
|
||
| def download_grib(date, cycle, fhour): | ||
| date_str = date.strftime("%Y%m%d") | ||
| cycle_str = f"{cycle:02d}" | ||
| fhour_str = f"{fhour:03d}" | ||
|
|
||
| base_url = f"https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs.{date_str}/{cycle_str}/atmos/gfs.t{cycle_str}z.pgrb2.0p25.f{fhour_str}" | ||
| idx_url = base_url + ".idx" | ||
| filename = OUTPUT_DIR / f"gfs_{date_str}_{cycle_str}z_f{fhour_str}.grib2" | ||
|
|
||
| if filename.exists() and filename.stat().st_size > 1000: | ||
| return filename | ||
|
|
||
| records = get_byte_ranges(idx_url) | ||
| if not records: | ||
| return None | ||
|
|
||
| print(f"Downloading {filename.name}...") | ||
|
|
||
| with open(filename, "wb") as f: | ||
| for var_name, var_pattern in VARIABLES.items(): | ||
| record = next((r for r in records if r["var_lvl"] == var_pattern), None) | ||
|
|
||
| if record: | ||
| range_header = f"bytes={record['offset']}-{record['next']-1 if record['next'] else ''}" | ||
| r = requests.get(base_url, headers={"Range": range_header}, timeout=30) | ||
| r.raise_for_status() | ||
|
|
||
| for chunk in r.iter_content(chunk_size=1024*1024): | ||
| if chunk: | ||
| f.write(chunk) | ||
|
|
||
| return filename | ||
|
|
||
|
|
||
| def process_grib(grib_path): | ||
| """Open GRIB file and handle multiple levels/variables.""" | ||
| datasets = [] | ||
|
|
||
| # Define filtration for different levels | ||
| filters = [ | ||
| {"typeOfLevel": "surface"}, | ||
| {"typeOfLevel": "heightAboveGround", "level": 2}, | ||
| {"typeOfLevel": "heightAboveGround", "level": 10}, | ||
| {"typeOfLevel": "heightAboveGround", "level": 100}, | ||
| {"typeOfLevel": "entireAtmosphere"}, | ||
| ] | ||
|
|
||
| for f in filters: | ||
| try: | ||
| ds = xr.open_dataset( | ||
| grib_path, | ||
| engine="cfgrib", | ||
| backend_kwargs={'filter_by_keys': f} | ||
| ) | ||
| # Drop coordinates that might conflict when merging different levels | ||
| for coord in ["heightAboveGround", "entireAtmosphere", "surface"]: | ||
| if coord in ds.coords: | ||
| ds = ds.drop_vars(coord) | ||
| datasets.append(ds) | ||
| except Exception: | ||
| continue | ||
|
|
||
| if not datasets: | ||
| return None | ||
|
|
||
| try: | ||
| combined = xr.merge(datasets, compat="no_conflicts") | ||
| data = combined.sel(latitude=slice(LAT_MAX, LAT_MIN), longitude=slice(LON_MIN, LON_MAX)) | ||
| return data | ||
| except Exception as e: | ||
| print(f"Error merging datasets for {grib_path}: {e}") | ||
| return None | ||
|
|
||
|
|
||
| def main(): | ||
| print("=" * 50) | ||
| print("GFS WEATHER DATA DOWNLOADER") | ||
| print("=" * 50) | ||
|
|
||
| all_data = {} | ||
| init_times = [] | ||
| steps = [] | ||
|
|
||
| current = START_DATE | ||
| while current <= END_DATE: | ||
| for cycle in CYCLES: | ||
| init_time = current.replace(hour=cycle) | ||
|
|
||
| for fhour in FORECAST_HOURS: | ||
| print(f"--- Cycle {cycle:02d} F{fhour:03d} ---") | ||
| grib_path = download_grib(current, cycle, fhour) | ||
|
|
||
| if grib_path: | ||
| data = process_grib(grib_path) | ||
| if data: | ||
| print(f"Success: Processed {grib_path.name}") | ||
| all_data[(init_time, fhour)] = data | ||
| if init_time not in init_times: | ||
| init_times.append(init_time) | ||
| if fhour not in steps: | ||
| steps.append(fhour) | ||
| else: | ||
| print(f"Warning: Failed to process {grib_path.name}") | ||
|
|
||
| current += timedelta(days=1) | ||
|
|
||
| if not all_data: | ||
| print("No data downloaded") | ||
| return | ||
|
|
||
| init_times = sorted(init_times) | ||
| steps = sorted(steps) | ||
|
|
||
| # Assembly | ||
| sample_key = list(all_data.keys())[0] | ||
| lat_size = len(all_data[sample_key].latitude) | ||
| lon_size = len(all_data[sample_key].longitude) | ||
| channels = list(VARIABLES.keys()) | ||
|
|
||
| data_array = np.full((len(init_times), len(steps), len(channels), lat_size, lon_size), np.nan, dtype=np.float32) | ||
| for (it, fh), data in all_data.items(): | ||
| it_idx = init_times.index(it) | ||
| fh_idx = steps.index(fh) | ||
| for i, ch in enumerate(channels): | ||
| # Map the variable name from GFS to our channels | ||
| grib_var = VARIABLES[ch].split(":")[0].lower() | ||
| # Find the actual variable name in xarray (sometimes it's different) | ||
| for var in data.data_vars: | ||
| if var.lower() == grib_var: | ||
| data_array[it_idx, fh_idx, i] = data[var].values | ||
| break | ||
|
|
||
| ds = xr.Dataset( | ||
| {ch: (["init_time_utc", "step", "latitude", "longitude"], data_array[:, :, i]) for i, ch in enumerate(channels)}, | ||
| coords={ | ||
| "init_time_utc": init_times, | ||
| "step": [np.timedelta64(h, "h") for h in steps], | ||
| "latitude": np.linspace(LAT_MAX, LAT_MIN, lat_size), | ||
| "longitude": np.linspace(LON_MIN, LON_MAX, lon_size), | ||
| } | ||
| ) | ||
|
|
||
| zarr_path = Path(r"") | ||
| print(f"\nSaving to {zarr_path}") | ||
|
|
||
| # Robust save for Windows | ||
| if zarr_path.exists(): | ||
| import shutil | ||
| shutil.rmtree(zarr_path, ignore_errors=True) | ||
|
|
||
| ds.to_zarr(zarr_path, mode="w", consolidated=True) | ||
| print("Done!") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() |
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please avoid using emojis in the code, and this file has extension missing. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,161 @@ | ||
| """ | ||
| Germany Data Processor - Unified Pipeline | ||
|
|
||
| Combines: | ||
| 1. SMARD API downloading (from downloading_pv_germany.py) | ||
| 2. Zarr/CSV conversion & standardization | ||
| 3. Dataset validation (from test_germany_pipeline.py) | ||
| """ | ||
|
|
||
| import requests | ||
| import pandas as pd | ||
| import xarray as xr | ||
| import numpy as np | ||
| import time | ||
| from datetime import datetime | ||
| from pathlib import Path | ||
| import warnings | ||
| import shutil | ||
| warnings.filterwarnings('ignore') | ||
|
|
||
| # Configuration | ||
| BASE_URL = "https://www.smard.de/app/chart_data" | ||
| FILTER_ID = 4068 | ||
| REGION = "DE" | ||
| BASE_DIR = Path(r"") # path to base dir | ||
| OUTPUT_DIR = BASE_DIR / "germany_pv_data" | ||
| # Final Zarr output path | ||
| FINAL_ZARR_PATH = BASE_DIR / "germany_pv_2023.zarr" | ||
|
|
||
| START_DATE = "2023-01-01" | ||
| END_DATE = "2024-01-01" | ||
|
|
||
|
|
||
| def get_timestamps(session, res="quarterhour"): | ||
| url = f"{BASE_URL}/{FILTER_ID}/{REGION}/index_{res}.json" | ||
| r = session.get(url) | ||
| r.raise_for_status() | ||
| return r.json()["timestamps"] | ||
|
|
||
|
|
||
| def get_chunk(session, ts, res="quarterhour"): | ||
| url = f"{BASE_URL}/{FILTER_ID}/{REGION}/{FILTER_ID}_{REGION}_{res}_{ts}.json" | ||
| r = session.get(url) | ||
| r.raise_for_status() | ||
| return r.json().get("series", []) | ||
|
|
||
|
|
||
| def download_smard_data(): | ||
| """Download solar generation from SMARD API.""" | ||
| print(f"\n--- Downloading SMARD Data ({START_DATE} to {END_DATE}) ---") | ||
| start_ts = int(datetime.strptime(START_DATE, "%Y-%m-%d").timestamp() * 1000) | ||
| end_ts = int(datetime.strptime(END_DATE, "%Y-%m-%d").timestamp() * 1000) | ||
|
|
||
| data = [] | ||
| with requests.Session() as s: | ||
| timestamps = get_timestamps(s) | ||
| timestamps = [ts for ts in timestamps if start_ts <= ts <= end_ts] | ||
| print(f"Found {len(timestamps)} chunks to download") | ||
|
|
||
| for i, ts in enumerate(timestamps, 1): | ||
| if i % 10 == 0: | ||
| print(f"Progress: {i}/{len(timestamps)}") | ||
| try: | ||
| chunk = get_chunk(s, ts) | ||
| if chunk: | ||
| data.extend(chunk) | ||
| time.sleep(0.1) | ||
| except Exception as e: | ||
| print(f"Error at {ts}: {e}") | ||
|
|
||
| df = pd.DataFrame(data, columns=["timestamp_ms", "generation_mw"]) | ||
| df["datetime_gmt"] = pd.to_datetime(df["timestamp_ms"], unit="ms", utc=True) | ||
| df = df.drop_duplicates(subset="timestamp_ms").sort_values("datetime_gmt") | ||
| return df.dropna(subset=["generation_mw"]) | ||
|
|
||
|
|
||
| def process_to_zarr(df, region_id=0): | ||
| """Process dataframe to PVNet standard Zarr.""" | ||
| print(f"\n--- Processing Data (Region {region_id}) ---") | ||
|
|
||
| # 30-min resample | ||
| df = df.set_index('datetime_gmt').resample('30min').mean().reset_index() | ||
| df['generation_mw'] = df['generation_mw'].interpolate() | ||
| df['capacity_mwp'] = 70000.0 # National capacity approx | ||
|
|
||
| times = pd.DatetimeIndex(df['datetime_gmt'].values.astype("datetime64[ns]")) | ||
|
|
||
| ds = xr.Dataset( | ||
| { | ||
| "generation_mw": (["time_utc", "location_id"], df['generation_mw'].values.reshape(-1, 1)), | ||
| "capacity_mwp": (["time_utc", "location_id"], df['capacity_mwp'].values.reshape(-1, 1)), | ||
| }, | ||
| coords={ | ||
| "time_utc": times, | ||
| "location_id": np.array([region_id]), | ||
| "latitude": (["location_id"], np.array([51.1657])), | ||
| "longitude": (["location_id"], np.array([10.4515])), | ||
| }, | ||
| ) | ||
|
|
||
| if FINAL_ZARR_PATH.exists(): | ||
| shutil.rmtree(FINAL_ZARR_PATH, ignore_errors=True) | ||
|
|
||
| ds.to_zarr(FINAL_ZARR_PATH, mode='w', consolidated=True) | ||
| print(f"Saved Final Zarr: {FINAL_ZARR_PATH}") | ||
|
|
||
| return ds | ||
|
|
||
|
|
||
| def validate_pipeline(ds): | ||
| """Basic validation of the generated dataset and NWP alignment.""" | ||
| print("\n--- Validating Pipeline ---") | ||
|
|
||
| # 1. Solar Data Check | ||
| print(f"Solar Time Range: {ds.time_utc.values[0]} to {ds.time_utc.values[-1]}") | ||
| gen = ds.generation_mw.values | ||
| print(f"Solar Stats: Mean={np.nanmean(gen):.2f}, Max={np.nanmax(gen):.2f}") | ||
|
|
||
| # 2. GFS Alignment Check | ||
| if GFS_ZARR_PATH.exists(): | ||
| ds_gfs = xr.open_zarr(str(GFS_ZARR_PATH)) | ||
| gfs_time_dim = 'init_time' if 'init_time' in ds_gfs.dims else 'step' # Fallback | ||
| if 'init_time' in ds_gfs.dims: | ||
| gfs_times = pd.DatetimeIndex(ds_gfs['init_time'].values) | ||
| pv_times = pd.DatetimeIndex(ds['time_utc'].values) | ||
|
|
||
| overlap_start = max(pv_times.min(), gfs_times.min()) | ||
| overlap_end = min(pv_times.max(), gfs_times.max()) | ||
|
|
||
| if overlap_start < overlap_end: | ||
| print(f"✅ GFS Overlap Found: {overlap_start} to {overlap_end}") | ||
| else: | ||
| print("⚠️ No GFS time overlap found.") | ||
| else: | ||
| print("⏭️ GFS Zarr not found locally, skipping alignment check.") | ||
|
|
||
| print("✅ Validation Complete") | ||
|
|
||
|
|
||
| def main(): | ||
| print("=" * 50) | ||
| print("GERMANY PVNET CONSOLIDATED PROCESSOR") | ||
| print("=" * 50) | ||
|
|
||
| # 1. Download | ||
| df = download_smard_data() | ||
| if df.empty: | ||
| print("❌ Download failed or no data.") | ||
| return | ||
|
|
||
| # 2. Process | ||
| ds = process_to_zarr(df) | ||
|
|
||
| # 3. Validate | ||
| validate_pipeline(ds) | ||
|
|
||
| print("\nDone!") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only put notebooks in this directory, for putting up the solar data , I would recommend to use
scripts/nwpThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure will modify it