|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | 3 | import asyncio |
| 4 | +import base64 |
4 | 5 | from datetime import datetime, timedelta |
5 | 6 | import json |
6 | 7 | import random |
@@ -92,6 +93,7 @@ def __init__( |
92 | 93 | self.ws_url: str = "" |
93 | 94 | self.ws_token: str = "" |
94 | 95 | self.endpoint: str = "" |
| 96 | + self._urn: str | None = None |
95 | 97 |
|
96 | 98 | @classmethod |
97 | 99 | async def async_create( |
@@ -143,8 +145,37 @@ async def async_get_access_token(self) -> str: |
143 | 145 | access_token = str(self._oauth_session.token["access_token"]) |
144 | 146 | LOG.debug("Websocket access token is %s", access_token) |
145 | 147 |
|
| 148 | + urn = self.urn |
| 149 | + LOG.debug("Extracted URN: %s", urn) |
| 150 | + |
146 | 151 | return str(self._oauth_session.token["access_token"]) |
147 | 152 |
|
| 153 | + @property |
| 154 | + def urn(self) -> str | None: |
| 155 | + """Extract URN from the JWT access token.""" |
| 156 | + try: |
| 157 | + if not self._oauth_session.valid_token: |
| 158 | + return None |
| 159 | + token = self._oauth_session.token["access_token"] |
| 160 | + payload_part = token.split(".")[1] |
| 161 | + # Add padding if necessary |
| 162 | + padding = 4 - len(payload_part) % 4 |
| 163 | + if padding != 4: |
| 164 | + payload_part += "=" * padding |
| 165 | + |
| 166 | + decoded = base64.urlsafe_b64decode(payload_part) |
| 167 | + claims = json.loads(decoded) |
| 168 | + urn_claim = claims.get("urn:com:hiloenergie:profile:location_hilo_id") |
| 169 | + if urn_claim and isinstance(urn_claim, list) and len(urn_claim) > 0: |
| 170 | + self._urn = urn_claim[0] # Get the first URN from the array |
| 171 | + else: |
| 172 | + self._urn = None |
| 173 | + |
| 174 | + return self._urn |
| 175 | + except (IndexError, json.JSONDecodeError, KeyError): |
| 176 | + LOG.error("Failed to extract URN from access token") |
| 177 | + return None |
| 178 | + |
148 | 179 | def dev_atts( |
149 | 180 | self, attribute: str, value_type: Union[str, None] = None |
150 | 181 | ) -> Union[DeviceAttribute, str]: |
|
0 commit comments