|
23 | 23 | VolvoCarsLocation, |
24 | 24 | VolvoCarsValue, |
25 | 25 | VolvoCarsValueField, |
| 26 | + VolvoCarsValueStatusField, |
26 | 27 | VolvoCarsVehicle, |
27 | 28 | ) |
28 | 29 | from .util import redact_data, redact_url |
29 | 30 |
|
30 | 31 | _API_CONNECTED_ENDPOINT = "/connected-vehicle/v2/vehicles" |
31 | 32 | _API_ENERGY_ENDPOINT = "/energy/v1/vehicles" |
| 33 | +_API_ENERGY_V2_ENDPOINT = "/energy/v2/vehicles" |
32 | 34 | _API_LOCATION_ENDPOINT = "/location/v1/vehicles" |
33 | 35 | _API_URL = "https://api.volvocars.com" |
34 | 36 | _API_STATUS_URL = "https://public-developer-portal-bff.weu-prod.ecpaz.volvocars.biz/api/v1/backend-status" |
@@ -122,6 +124,23 @@ async def async_get_doors_status(self, vin: str = "") -> dict[str, VolvoCarsValu |
122 | 124 | """ |
123 | 125 | return await self._async_get_field(_API_CONNECTED_ENDPOINT, "doors", vin) |
124 | 126 |
|
| 127 | + async def async_get_energy_capabilities(self, vin: str = "") -> dict[str, Any]: |
| 128 | + """Get energy state. |
| 129 | +
|
| 130 | + Required scopes: openid energy:capability:read |
| 131 | + """ |
| 132 | + return await self._async_get_data_dict(_API_ENERGY_V2_ENDPOINT, "capabilities", vin, data_key="getEnergyState") |
| 133 | + |
| 134 | + async def async_get_energy_state(self, vin: str = "") -> dict[str, VolvoCarsValueStatusField | None]: |
| 135 | + """Get energy state. |
| 136 | +
|
| 137 | + Required scopes: openid energy:state:read |
| 138 | + """ |
| 139 | + body = await self._async_get(_API_ENERGY_V2_ENDPOINT, "state", vin) |
| 140 | + return { |
| 141 | + key: VolvoCarsValueStatusField.from_dict(value) for key, value in body.items() |
| 142 | + } |
| 143 | + |
125 | 144 | async def async_get_engine_status(self, vin: str = "") -> dict[str, VolvoCarsValueField | None]: |
126 | 145 | """Get engine status. |
127 | 146 |
|
@@ -251,10 +270,10 @@ async def _async_get_field( |
251 | 270 | } |
252 | 271 |
|
253 | 272 | async def _async_get_data_dict( |
254 | | - self, endpoint: str, operation: str, vin: str = "" |
| 273 | + self, endpoint: str, operation: str, vin: str = "", *, data_key: str = "data" |
255 | 274 | ) -> dict[str, Any]: |
256 | 275 | body = await self._async_get(endpoint, operation, vin) |
257 | | - return cast(dict[str, Any], body.get("data", {})) |
| 276 | + return cast(dict[str, Any], body.get(data_key, {})) |
258 | 277 |
|
259 | 278 | async def _async_get(self, endpoint: str, operation: str, vin: str = "") -> dict[str, Any]: |
260 | 279 | url = self._create_vin_url(endpoint, operation, vin) |
|
0 commit comments