|
| 1 | +"""Calendar platform for Teslemetry integration.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +from datetime import datetime, timedelta |
| 6 | +from typing import Any |
| 7 | + |
| 8 | +from attr import dataclass |
| 9 | + |
| 10 | +from homeassistant.components.calendar import CalendarEntity, CalendarEvent |
| 11 | +from homeassistant.core import HomeAssistant |
| 12 | +from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback |
| 13 | +from homeassistant.util import dt as dt_util |
| 14 | + |
| 15 | +from . import TeslemetryConfigEntry |
| 16 | +from .entity import TeslemetryEnergyInfoEntity |
| 17 | + |
| 18 | + |
| 19 | +async def async_setup_entry( |
| 20 | + hass: HomeAssistant, |
| 21 | + entry: TeslemetryConfigEntry, |
| 22 | + async_add_entities: AddConfigEntryEntitiesCallback, |
| 23 | +) -> None: |
| 24 | + """Set up the Teslemetry Calendar platform from a config entry.""" |
| 25 | + |
| 26 | + entities_to_add: list[CalendarEntity] = [] |
| 27 | + |
| 28 | + # Add buy tariff calendar entities |
| 29 | + entities_to_add.extend( |
| 30 | + TeslemetryTariffSchedule(energy, "tariff_content_v2") |
| 31 | + for energy in entry.runtime_data.energysites |
| 32 | + if energy.info_coordinator.data.get("tariff_content_v2_seasons") |
| 33 | + ) |
| 34 | + |
| 35 | + # Add sell tariff calendar entities |
| 36 | + entities_to_add.extend( |
| 37 | + TeslemetryTariffSchedule(energy, "tariff_content_v2_sell_tariff") |
| 38 | + for energy in entry.runtime_data.energysites |
| 39 | + if energy.info_coordinator.data.get("tariff_content_v2_sell_tariff_seasons") |
| 40 | + ) |
| 41 | + |
| 42 | + async_add_entities(entities_to_add) |
| 43 | + |
| 44 | + |
| 45 | +@dataclass |
| 46 | +class TariffPeriod: |
| 47 | + """A single tariff period.""" |
| 48 | + |
| 49 | + name: str |
| 50 | + price: float |
| 51 | + from_hour: int = 0 |
| 52 | + from_minute: int = 0 |
| 53 | + to_hour: int = 0 |
| 54 | + to_minute: int = 0 |
| 55 | + |
| 56 | + |
| 57 | +class TeslemetryTariffSchedule(TeslemetryEnergyInfoEntity, CalendarEntity): |
| 58 | + """Energy Site Tariff Schedule Calendar.""" |
| 59 | + |
| 60 | + def __init__( |
| 61 | + self, |
| 62 | + data: Any, |
| 63 | + key_base: str, |
| 64 | + ) -> None: |
| 65 | + """Initialize the tariff schedule calendar.""" |
| 66 | + self.key_base: str = key_base |
| 67 | + self.seasons: dict[str, dict[str, Any]] = {} |
| 68 | + self.charges: dict[str, dict[str, Any]] = {} |
| 69 | + super().__init__(data, key_base) |
| 70 | + |
| 71 | + @property |
| 72 | + def event(self) -> CalendarEvent | None: |
| 73 | + """Return the current active tariff event.""" |
| 74 | + now = dt_util.now() |
| 75 | + current_season_name = self._get_current_season(now) |
| 76 | + |
| 77 | + if not current_season_name or not self.seasons.get(current_season_name): |
| 78 | + return None |
| 79 | + |
| 80 | + # Get the time of use periods for the current season |
| 81 | + tou_periods = self.seasons[current_season_name].get("tou_periods", {}) |
| 82 | + |
| 83 | + for period_name, period_group in tou_periods.items(): |
| 84 | + for period_def in period_group.get("periods", []): |
| 85 | + # Check if today is within the period's day of week range |
| 86 | + day_of_week = now.weekday() # Monday is 0, Sunday is 6 |
| 87 | + from_day = period_def.get("fromDayOfWeek", 0) # Default is Monday |
| 88 | + to_day = period_def.get("toDayOfWeek", 6) # Default is Sunday |
| 89 | + if from_day > day_of_week > to_day: |
| 90 | + # This period doesn't occur today |
| 91 | + continue |
| 92 | + |
| 93 | + # Calculate start and end times for today (default is midnight) |
| 94 | + from_hour = period_def.get("fromHour", 0) % 24 |
| 95 | + from_minute = period_def.get("fromMinute", 0) % 60 |
| 96 | + to_hour = period_def.get("toHour", 0) % 24 |
| 97 | + to_minute = period_def.get("toMinute", 0) % 60 |
| 98 | + |
| 99 | + start_time = now.replace( |
| 100 | + hour=from_hour, minute=from_minute, second=0, microsecond=0 |
| 101 | + ) |
| 102 | + end_time = now.replace( |
| 103 | + hour=to_hour, minute=to_minute, second=0, microsecond=0 |
| 104 | + ) |
| 105 | + |
| 106 | + # Handle periods that cross midnight |
| 107 | + if end_time <= start_time: |
| 108 | + # The period does cross midnight, check both sides |
| 109 | + potential_end_time = end_time + timedelta(days=1) |
| 110 | + if start_time <= now < potential_end_time: |
| 111 | + # Period matches and ends tomorrow |
| 112 | + end_time = potential_end_time |
| 113 | + elif (start_time - timedelta(days=1)) <= now < end_time: |
| 114 | + # Period matches and started yesterday |
| 115 | + start_time -= timedelta(days=1) |
| 116 | + else: |
| 117 | + continue |
| 118 | + elif not (start_time <= now < end_time): |
| 119 | + # This period doesn't occur now |
| 120 | + continue |
| 121 | + |
| 122 | + # Create calendar event for the active period |
| 123 | + price = self._get_price_for_period(current_season_name, period_name) |
| 124 | + price_str = f"{price:.2f}/kWh" if price is not None else "Unknown Price" |
| 125 | + |
| 126 | + return CalendarEvent( |
| 127 | + start=start_time, |
| 128 | + end=end_time, |
| 129 | + summary=f"{period_name.capitalize().replace('_', ' ')}: {price_str}", |
| 130 | + description=( |
| 131 | + f"Season: {current_season_name.capitalize()}\n" |
| 132 | + f"Period: {period_name.capitalize().replace('_', ' ')}\n" |
| 133 | + f"Price: {price_str}" |
| 134 | + ), |
| 135 | + uid=f"{self.key_base}_{current_season_name}_{period_name}_{start_time.isoformat()}", |
| 136 | + ) |
| 137 | + |
| 138 | + return None # No active period found for the current time and season |
| 139 | + |
| 140 | + async def async_get_events( |
| 141 | + self, |
| 142 | + hass: HomeAssistant, |
| 143 | + start_date: datetime, |
| 144 | + end_date: datetime, |
| 145 | + ) -> list[CalendarEvent]: |
| 146 | + """Return calendar events (tariff periods) within a datetime range.""" |
| 147 | + events: list[CalendarEvent] = [] |
| 148 | + |
| 149 | + # Convert dates to local timezone |
| 150 | + start_date = dt_util.as_local(start_date) |
| 151 | + end_date = dt_util.as_local(end_date) |
| 152 | + |
| 153 | + # Process each day in the requested range |
| 154 | + current_day = dt_util.start_of_local_day(start_date) |
| 155 | + while current_day < end_date: |
| 156 | + season_name = self._get_current_season(current_day) |
| 157 | + if not season_name or not self.seasons.get(season_name): |
| 158 | + current_day += timedelta(days=1) |
| 159 | + continue |
| 160 | + |
| 161 | + # Get the time of use periods for the season |
| 162 | + tou_periods = self.seasons[season_name].get("tou_periods", {}) |
| 163 | + day_of_week = current_day.weekday() |
| 164 | + |
| 165 | + for period_name, period_group in tou_periods.items(): |
| 166 | + for period_def in period_group.get("periods", []): |
| 167 | + # Check if current day falls within the period's day range |
| 168 | + from_day = period_def.get("fromDayOfWeek", 0) # Default is Monday |
| 169 | + to_day = period_def.get("toDayOfWeek", 6) # Default is Sunday |
| 170 | + if from_day > day_of_week > to_day: |
| 171 | + continue |
| 172 | + |
| 173 | + # Extract period timing for current day (default is midnight) |
| 174 | + from_hour = period_def.get("fromHour", 0) % 24 |
| 175 | + from_minute = period_def.get("fromMinute", 0) % 60 |
| 176 | + to_hour = period_def.get("toHour", 0) % 24 |
| 177 | + to_minute = period_def.get("toMinute", 0) % 60 |
| 178 | + |
| 179 | + start_time = current_day.replace( |
| 180 | + hour=from_hour, minute=from_minute, second=0, microsecond=0 |
| 181 | + ) |
| 182 | + end_time = current_day.replace( |
| 183 | + hour=to_hour, minute=to_minute, second=0, microsecond=0 |
| 184 | + ) |
| 185 | + |
| 186 | + # Adjust for periods crossing midnight |
| 187 | + if end_time <= start_time: |
| 188 | + end_time += timedelta(days=1) |
| 189 | + |
| 190 | + # Check for overlap with requested date range |
| 191 | + if start_time < end_date and end_time > start_date: |
| 192 | + price = self._get_price_for_period(season_name, period_name) |
| 193 | + price_str = ( |
| 194 | + f"{price:.2f}/kWh" if price is not None else "Unknown Price" |
| 195 | + ) |
| 196 | + events.append( |
| 197 | + CalendarEvent( |
| 198 | + start=start_time, |
| 199 | + end=end_time, |
| 200 | + summary=f"{period_name.capitalize().replace('_', ' ')}: {price_str}", |
| 201 | + description=( |
| 202 | + f"Season: {season_name.capitalize()}\n" |
| 203 | + f"Period: {period_name.capitalize().replace('_', ' ')}\n" |
| 204 | + f"Price: {price_str}" |
| 205 | + ), |
| 206 | + uid=f"{self.key_base}_{season_name}_{period_name}_{start_time.isoformat()}", |
| 207 | + ) |
| 208 | + ) |
| 209 | + |
| 210 | + current_day += timedelta(days=1) |
| 211 | + |
| 212 | + # Sort events chronologically |
| 213 | + events.sort(key=lambda x: x.start) |
| 214 | + return events |
| 215 | + |
| 216 | + def _get_current_season(self, date_to_check: datetime) -> str | None: |
| 217 | + """Determine the active season for a given date.""" |
| 218 | + local_date = dt_util.as_local(date_to_check) |
| 219 | + year = local_date.year |
| 220 | + |
| 221 | + for season_name, season_data in self.seasons.items(): |
| 222 | + if not season_data: |
| 223 | + continue |
| 224 | + |
| 225 | + # Extract season date boundaries |
| 226 | + try: |
| 227 | + from_month = season_data["fromMonth"] |
| 228 | + from_day = season_data["fromDay"] |
| 229 | + to_month = season_data["toMonth"] |
| 230 | + to_day = season_data["toDay"] |
| 231 | + |
| 232 | + # Handle seasons that cross year boundaries |
| 233 | + start_year = year |
| 234 | + end_year = year |
| 235 | + |
| 236 | + # Determine if season crosses year boundary |
| 237 | + if from_month > to_month or ( |
| 238 | + from_month == to_month and from_day > to_day |
| 239 | + ): |
| 240 | + if local_date.month > from_month or ( |
| 241 | + local_date.month == from_month and local_date.day >= from_day |
| 242 | + ): |
| 243 | + end_year = year + 1 |
| 244 | + else: |
| 245 | + start_year = year - 1 |
| 246 | + |
| 247 | + season_start = local_date.replace( |
| 248 | + year=start_year, |
| 249 | + month=from_month, |
| 250 | + day=from_day, |
| 251 | + hour=0, |
| 252 | + minute=0, |
| 253 | + second=0, |
| 254 | + microsecond=0, |
| 255 | + ) |
| 256 | + # Create exclusive end date |
| 257 | + season_end = local_date.replace( |
| 258 | + year=end_year, |
| 259 | + month=to_month, |
| 260 | + day=to_day, |
| 261 | + hour=0, |
| 262 | + minute=0, |
| 263 | + second=0, |
| 264 | + microsecond=0, |
| 265 | + ) + timedelta(days=1) |
| 266 | + |
| 267 | + if season_start <= local_date < season_end: |
| 268 | + return season_name |
| 269 | + except (KeyError, ValueError): |
| 270 | + continue |
| 271 | + |
| 272 | + return None # No matching season found |
| 273 | + |
| 274 | + def _get_price_for_period(self, season_name: str, period_name: str) -> float | None: |
| 275 | + """Get the price for a specific season and period name.""" |
| 276 | + try: |
| 277 | + # Get rates for the season with fallback to "ALL" |
| 278 | + season_charges = self.charges.get(season_name, self.charges.get("ALL", {})) |
| 279 | + rates = season_charges.get("rates", {}) |
| 280 | + # Get price for the period with fallback to "ALL" |
| 281 | + price = rates.get(period_name, rates.get("ALL")) |
| 282 | + return float(price) if price is not None else None |
| 283 | + except (KeyError, ValueError, TypeError): |
| 284 | + return None |
| 285 | + |
| 286 | + def _async_update_attrs(self) -> None: |
| 287 | + """Update the Calendar attributes from coordinator data.""" |
| 288 | + # Load tariff data from coordinator |
| 289 | + self.seasons = self.coordinator.data.get(f"{self.key_base}_seasons", {}) |
| 290 | + self.charges = self.coordinator.data.get(f"{self.key_base}_energy_charges", {}) |
| 291 | + |
| 292 | + # Set availability based on data presence |
| 293 | + self._attr_available = bool(self.seasons and self.charges) |
0 commit comments