-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathplugin.py
More file actions
1884 lines (1601 loc) · 92.3 KB
/
plugin.py
File metadata and controls
1884 lines (1601 loc) · 92.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
BMW Python Plugin for Domoticz
This plugin integrates BMW vehicles with Domoticz home automation system.
It uses the offical BMW's API (CarData) and creates
corresponding devices in Domoticz.
Author: Filip Demaertelaere
Version: 5.1.2
License: MIT
"""
"""
<plugin key="Bmw" name="BMW CarData" author="Filip Demaertelaere" version="5.1.2" externallink="https://github.com/FilipDem/Domoticz-BMW-plugin">
<description>
<h2>BMW CarData Plugin</h2>
<p>Version 5.1.2</p>
<br/>
<h2>Introduction</h2>
<p>The BMW CarData plugin provides a robust and seamless integration of your BMW vehicle with the Domoticz home automation system, essentially transforming Domoticz into a comprehensive command center for your car.</p>
<p>Upon successful configuration, the plugin automatically creates a suite of virtual devices within Domoticz. These devices represent key aspects of your BMW's status, including mileage, door and window lock states, fuel and electric range, charging status, and the vehicle's real-time location and movement.</p>
<p>To ensure optimal performance and security, this plugin requires a valid MyBMW account with corresponding credentials.</p>
<p>It is important to note that this plugin is entirely dependent on the data made available by the BMW Open Data Platform. The BMW CarData plugin utilizes the Streaming API (MQTT-based) to retrieve vehicle information, meaning there is no periodic polling by the Domoticz API towards the BMW Open Data Platform. For detailed information, please refer to the official resource: <a href="https://bmw-cardata.bmwgroup.com/thirdparty/public/car-data/overview">https://bmw-cardata.bmwgroup.com/thirdparty/public/car-data/overview</a>.</p>
<p>Keep in mind that no data is sent by the BMW Open Data Platform in streaming mode when not any event happens at car level.</p>
<br/>
<h2>= Activation of BMW CarData =</h2>
<br/>
<p style="color: yellow;">Read carefully the README file and follow each step carefully</p>
<a href="https://github.com/FilipDem/Domoticz-BMW-plugin" target="_blank">via the GitHub-page og this plugin</a>
<br/>
<br/><h2>Configuration Parameters</h2>
<p>The following parameters are required for initial plugin setup:</p>
<ul>
<li><b>BMW CarData Client_id</b>: The unique value obtained from the MyBMW portal after creating the CarData Client.</li>
<li><b>Vehicle Identification Number (VIN)</b>: The full, 17-character VIN of your BMW vehicle, used to identify the specific car to monitor.</li>
<li><b>Update Interval (Minutes)</b>: Defines the maximum frequency (in minutes) at which the plugin will check for new data, provided information is made available by the BMW CarData service.</li>
<li><b>Debug Level</b>: Sets the logging verbosity. Higher levels provide more diagnostic information for troubleshooting purposes.</li>
</ul>
<br/>
</description>
<params>
<param field="Mode1" label="BMW CarData Client_id" width="200px" required="true" default=""/>
<param field="Mode2" label="Vehicle Identification Number (VIN)" width="200px" required="true" default=""/>
<param field="Mode5" label="Min. Update Interval (Minutes)" width="120px" required="true" default="30"/>
<param field="Mode6" label="Debug Level" width="120px">
<options>
<option label="None" value="0" default="true"/>
<option label="Python Only" value="2"/>
<option label="Basic Debugging" value="62"/>
<option label="Basic+Messages" value="126"/>
<option label="Queue" value="128"/>
<option label="Connections Only" value="16"/>
<option label="Connections+Queue" value="144"/>
<option label="All" value="-1"/>
</options>
</param>
</params>
</plugin>"""
import sys, os
sys.path.append(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
from enum import IntEnum, Enum, auto
import base64
import hashlib
import secrets
import urllib.parse
import json
import time
from typing import Any, Dict, List, Type, Union, Tuple
from datetime import datetime, timedelta
import paho.mqtt.client as mqtt
import DomoticzEx as Domoticz
from domoticzEx_tools import (
DomoticzConstants, dump_config_to_log, update_device, get_unit,
get_config_item_db, set_config_item_db, erase_config_item_db,
get_device_n_value, smart_convert_string, timeout_device,
get_distance, check_activity_units_and_timeout, touch_device
)
class UnitIdentifiers(IntEnum):
"""Enum defining unit identifiers for various BMW data points in Domoticz"""
MILEAGE = auto()
DOORS = auto()
WINDOWS = auto()
REMAIN_RANGE_TOTAL = auto()
REMAIN_RANGE_ELEC = auto()
CHARGING = auto()
CHARGING_REMAINING = auto()
BAT_LEVEL = auto()
REMOTE_SERVICES = auto()
CAR = auto()
MILEAGE_COUNTER = auto()
DRIVING = auto()
HOME = auto()
AC_LIMITS = auto()
CHARGING_MODE = auto()
class Authenticate(IntEnum):
"""State machine during authentication"""
INIT = auto()
OAUTH2 = auto()
USER_INTERACTION = auto()
DONE = auto()
ERROR = auto()
REFRESH_TOKEN = auto()
class CarDataURLs(str, Enum):
"""Enum defining various BMW url points"""
MQTT_HOST = 'customer.streaming-cardata.bmwgroup.com'
MQTT_PORT = '9000'
BMW_HOST = 'customer.bmwgroup.com'
BMW_PORT = '443'
API_HOST = 'api-cardata.bmwgroup.com'
API_PORT = '443'
API_VERSION = 'v1'
DEVICE_CODE_URI = '/gcdm/oauth/device/code'
DEVICE_CODE_LINK = 'https://customer.bmwgroup.com/oneid/link'
TOKEN_URI = '/gcdm/oauth/token'
CONTAINER_URI = '/customers/containers'
GET_TELEMATICDATA_URI = '/customers/vehicles/{vin}/telematicData'
class AuthenticationData:
"""Store Authentication data (shared state)"""
state_machine: int = Authenticate.INIT
client_id: str = None
vin: str = None
code_verifier: str = None
device_code: str = None
expires_in: int = None
interval: int = 10
class API(IntEnum):
"""State machine during authentication"""
CREATE_CONTAINER = auto()
DELETE_CONTAINER = auto()
GET_CONTAINER = auto()
LIST_CONTAINER = auto()
ERROR = auto()
class APIData:
"""Store API data (shared state)"""
state_machine: int = API.GET_CONTAINER
container_id: Dict[str, Any] = None
# Default image for devices
_IMAGE = 'Bmw'
# Streaming key filename
_STREAMING_KEY_FILE = 'Bmw_keys_streaming.json'
# Filename to indicate to reset quota
_RESET_FILE = 'hardware_reset.txt'
class CarMovementHandler:
"""Detects if the car is currently moving based on location and time stamps."""
VELOCITY_THRESHOLD_MPS = 2
STOP_TIME_THRESHOLD_SEC = 360
def __init__(self) -> None:
# State variables to store the last known coordinates and time
self.last_coord: Union[List[float], None] = None
self.last_timestamp: datetime = datetime.now()
# State variables for the "Time-in-Stop" filter
self.stop_start_time: Union[datetime, None] = datetime.now()
self.is_currently_moving: bool = False # True/False state for final output
self.velocity: float = 0.0
def process_new_data(self, location: List[float], current_timestamp_sec: datetime) -> str:
"""
Main function to process a new set of coordinates.
Determines movement status using a velocity and stop-time threshold.
"""
# Calculate Delta Distance and Delta Time
delta_t: float = (current_timestamp_sec - self.last_timestamp).total_seconds()
# Skip if no new coordinates
if len(location) != 2:
if delta_t >= self.STOP_TIME_THRESHOLD_SEC:
self.is_currently_moving = False
return f"NO UPDATE (since {delta_t}s)"
# Initialize the first point if it doesn't exist
if self.last_coord is None:
self.last_coord = location
self.last_timestamp = current_timestamp_sec
return "INITIALIZING" # Assume moving or unknown initially
# Prevent division by zero if timestamps are identical (or too close)
if delta_t < 2:
return "MOVEMENT_STATUS_SAME" # Not enough time passed to measure movement
Domoticz.Debug(f'Calculate distance with last location {self.last_coord} and current location {location}')
delta_d: float = get_distance(self.last_coord, location, unit='m')
# Calculate Velocity
self.velocity = delta_d / delta_t # Meters per second (MPS)
Domoticz.Debug(f'delta_t={delta_t}; delta_d={delta_d}; self.velocity={self.velocity}')
# Update state for the next cycle
self.last_coord = location
self.last_timestamp = current_timestamp_sec
# Apply the Movement Logic and Time-in-Stop Filter
if self.velocity > self.VELOCITY_THRESHOLD_MPS:
# Car is actively moving (e.g., driving on a road)
self.stop_start_time = None # Reset the stop timer
self.is_currently_moving = True
return "MOVING (Active Driving)"
else:
# Car is not moving fast enough (stopped at light, traffic, or parked)
if self.stop_start_time is None:
# First time detecting a stop - start the timer
self.stop_start_time = current_timestamp_sec
self.is_currently_moving = True # Still considered 'moving' in the sense of being in traffic/a temporary stop
return "MOVING (Traffic/Red Light - Timer Started)"
else:
# Check how long the car has been "stopped" (below threshold)
time_in_stop: float = (current_timestamp_sec - self.stop_start_time).total_seconds()
if time_in_stop >= self.STOP_TIME_THRESHOLD_SEC:
# Car has been stationary long enough to be considered NOT MOVING/PARKED
self.is_currently_moving = False
return "STOPPED (Final Destination/Parked)"
else:
# Still within the temporary stop window (Red light/Traffic Jam)
# For the user, this should still be considered "Moving" or "In Transit"
self.is_currently_moving = True
return "MOVING (Traffic Jam/Long Red Light)"
class PollingHandler:
"""Manages the API polling quota using a sliding 24-hour window."""
DAILY_QUOTA = 50
RESERVED_CALLS = 5
# The sliding window duration in seconds (24 hours)
WINDOW_SIZE_SEC = 86400
def __init__(self, parent_plugin: Any) -> None:
"""Initializes the Polling Manager."""
self.parent = parent_plugin
# Store timestamps of calls made in the last 24 hours
self._timestamps: List[float] = []
self._next_api_call_time: datetime = datetime.now()
def load_state(self, force_cold: bool = False) -> None:
"""Loads persistent state. If empty, estimates today's usage to prevent API bans."""
if not force_cold:
state = get_config_item_db(key='polling_handler', default={})
self._timestamps = state.get('timestamps', [])
if not self._timestamps:
# COLD START: We have no history. Let's estimate usage to be safe.
# We assume we already used the 'fair share' for the time passed today.
now = time.time()
seconds_since_midnight = (datetime.now() - datetime.now().replace(hour=0, minute=0, second=0)).total_seconds()
# Calculate how many calls 'should' have been made by now
fair_share_count = int((seconds_since_midnight / 86400) * (self.DAILY_QUOTA - self.RESERVED_CALLS))
# Generate dummy timestamps spread over the past hours of today
for i in range(fair_share_count):
# Spread them backwards from now
offset = (i + 1) * (seconds_since_midnight / max(1, fair_share_count))
self._timestamps.append(now - offset)
Domoticz.Status(f"Cold plugin start: estimated {fair_share_count} API calls already made today to stay safe.")
self._prune_old_timestamps()
self._calculate_next_time_call(force_update=True)
if len(self._timestamps):
oldest_str = datetime.fromtimestamp(self._timestamps[0]).strftime('%Y-%m-%d %H:%M')
newest_str = datetime.fromtimestamp(self._timestamps[-1]).strftime('%Y-%m-%d %H:%M')
next_call_str = self._next_api_call_time.strftime('%Y-%m-%d %H:%M')
Domoticz.Status(f'Quota information restored from database: {len(self._timestamps)} API calls registered (oldest at {oldest_str} - newest at {newest_str}); next telematic API call scheduled at {next_call_str}.')
else:
next_call_str = self._next_api_call_time.strftime('%Y-%m-%d %H:%M')
Domoticz.Status(f'Quota information restored from database: no API calls registered; next telematic API call scheduled at {next_call_str}.')
def _save_state(self) -> None:
"""Internal method to save timestamps to the Domoticz database."""
set_config_item_db(key='polling_handler', value={'timestamps': self._timestamps})
def save_state(self) -> None:
"""Public method to save state on plugin stop."""
self._save_state()
def force_cold_start(self) -> None:
"""Public method to ignore the data in the hardware settings."""
self._timestamps = []
self.load_state(force_cold=True)
def set_quota_exhausted(self) -> None:
"""
Force the internal state to 'exhausted' if the BMW API returns a quota error.
This syncs our plugin with the actual server state.
"""
now = time.time()
current_count = len(self._timestamps)
needed_to_fill = self.DAILY_QUOTA - current_count
if needed_to_fill > 0:
# Fill the list with timestamps from 'now'
# This ensures we wait until these dummy timestamps start dropping out of the 24h window
for _ in range(needed_to_fill):
self._timestamps.append(now)
self._timestamps.sort() # Ensure they are in order
self._calculate_next_time_call(force_update=True)
Domoticz.Debug("BMW API reported quota exhausted. Internal state synchronized to MAX. "
f"Next call attempt at: {self._next_api_call_time}")
def _prune_old_timestamps(self) -> None:
"""Removes timestamps that are older than the 24-hour window."""
cutoff = time.time() - self.WINDOW_SIZE_SEC
self._timestamps = sorted([ts for ts in self._timestamps if ts > cutoff])
def register_api_call(self) -> None:
"""Registers a new API call and updates the schedule."""
self._prune_old_timestamps()
self._timestamps.append(time.time())
self._calculate_next_time_call(force_update=True)
Domoticz.Debug(f"API call registered. {len(self._timestamps)} calls in window. "
f"Next call: {self._next_api_call_time}")
def register_mqtt_update(self) -> None:
"""Postpones the next call because fresh data was received via MQTT."""
self._prune_old_timestamps()
self._calculate_next_time_call(force_update=True)
Domoticz.Debug(f"MQTT update received. Next API call postponed to {self._next_api_call_time}")
@property
def next_call_time(self) -> datetime:
"""Returns the scheduled next call time without recalculating."""
return self._next_api_call_time
@property
def last_call_time(self) -> datetime:
"""Returns the last call time without recalculating."""
return self._timestamps[-1]
@property
def used_quota(self) -> int:
"""Returns the number of calls made in the last 24 hours."""
self._prune_old_timestamps()
return len(self._timestamps)
@property
def get_quota_list(self) -> datetime:
"""Returns the list of registered API timestamps."""
return [datetime.fromtimestamp(apiDatim).strftime('%Y-%m-%d %H:%M') for apiDatim in self._timestamps]
def update_possible_budget(self) -> None:
self._prune_old_timestamps()
self._calculate_next_time_call(force_update= False)
def _calculate_next_time_call(self, force_update: bool = False) -> None:
"""
Calculates the next polling time.
If we have quota left, we spread the REMAINING calls over the REMAINING time
until the 'oldest' calls start dropping out of the 24h window.
force_update=True: Always resets the timer (use after API call or MQTT).
force_update=False: Only updates if a new slot opens up earlier (use in Heartbeat).
"""
now_ts = time.time()
# --- SAFETY LOCK ---
# If we are in a regular update (not forced) and the timer has already
# expired, we stop calculating. This prevents the "race condition" where
# the timer keeps moving forward just as the heartbeat wants to trigger it.
if not force_update and self._next_api_call_time <= datetime.fromtimestamp(now_ts):
return
# --- BUDGET CALCULATION ---
used = len(self._timestamps)
available = self.DAILY_QUOTA - used
to_spread = max(0, available - self.RESERVED_CALLS)
# Minimum interval defined by user (e.g., 5 or 10 minutes)
min_interval_sec = int(Parameters.get('Mode5', 60)) * 60
if to_spread > 0:
# 1. Determine the 'active' period.
# We look at the time until the oldest call in our list is 24h old.
# This is the time we have to 'fill' with our remaining calls.
if self._timestamps:
# How long until the OLDEST call drops out
time_until_reset = (self._timestamps[0] + self.WINDOW_SIZE_SEC) - now_ts
# If we have a lot of budget left, don't cram it
# into a tiny window. Ensure we spread it over at least
# a fair portion of the day.
# We calculate a 'fair' window: (remaining calls / total quota) * 24h
min_spread_window = (to_spread / (self.DAILY_QUOTA - self.RESERVED_CALLS)) * self.WINDOW_SIZE_SEC
# Use the LARGEST of the two windows to calculate interval
effective_window = max(time_until_reset, min_spread_window)
else:
effective_window = self.WINDOW_SIZE_SEC
# 2. Calculate dynamic interval
dynamic_interval = effective_window // to_spread
# 3. Apply constraints
# This allows the interval to shrink (go faster) if we have many calls left,
# but never faster than the user-defined min_interval_sec.
interval_sec = max(min_interval_sec, dynamic_interval)
potential_time = datetime.now() + timedelta(seconds=interval_sec)
elif self._timestamps:
# AUTOMATIC BUDGET EXHAUSTED: Even if available > 0, we are at the reserve limit.
# We MUST wait until the oldest call drops out of the window to free up a non-reserved slot.
reset_at_ts = self._timestamps[0] + self.WINDOW_SIZE_SEC
potential_time = datetime.fromtimestamp(reset_at_ts)
# Safety: If for some reason the reset time is in the past, don't stall
if potential_time <= datetime.now():
potential_time = datetime.now() + timedelta(seconds=min_interval_sec)
else:
# Should not happen with sliding window, but as a safety fallback:
interval_sec = max(min_interval_sec, 3600)
potential_time = datetime.now() + timedelta(seconds=interval_sec)
# Apply update logic to prevent time drifting
if ( force_update or
self._next_api_call_time <= datetime.now() or
potential_time < self._next_api_call_time ):
self._next_api_call_time = potential_time
class MqttClientHandler:
"""
Handles all MQTT logic for connecting to the BMW CarData streaming service.
Requires access to parent's state variables for tokens, data storage, and control flow.
"""
MQTT_KEEP_ALIVE = 45 # I found out by testing that BMW disconnects after 60 seconds if no keep_alive received.
THROTTLE_INTERVAL_SEC = 10 # Throttle timer from mqtt messages
RECONNECTION_PAUSE_TIME_MIN = 15
MQTT_MAX_INTERVAL_EXPECTED_MESSAGES = 3600*24
MQTT_LOG = {16:'DEBUG', 1:'INFO', 2:'NOTICE', 8:'ERROR', 4:'WARNING'}
def __init__(
self,
parent_plugin: Any
) -> None:
"""Initializes the MQTT handler with a reference to the main plugin."""
self.parent = parent_plugin
self.mqtt_client: Union[mqtt.Client, None] = None
self.time_last_message_received: datetime = datetime(1, 1, 1, 0, 0, 0) # Used for throttling
self.time_next_connect_after_critical_disconnect = None
self.connection_errors: int = 0
def is_mqtt_active(self) -> bool:
""" Check if there was MQTT activity during the last time period """
now: datetime = datetime.now()
delta: int = (now - self.time_last_message_received).total_seconds()
if delta < self.MQTT_MAX_INTERVAL_EXPECTED_MESSAGES:
return True
if 0 < delta % self.MQTT_MAX_INTERVAL_EXPECTED_MESSAGES < 60:
if self.time_last_message_received > datetime(1, 1, 1, 0, 0, 0):
Domoticz.Status(f'No BMW MQTT CarData information was received since {self.time_last_message_received} (OAUTH2 internal state={AuthenticationData.state_machine}; MQTT Connected: {self.is_mqtt_connected()})!')
return False
def is_mqtt_connected(self) -> bool:
"""Returns the status of the MQTT connection."""
if self.mqtt_client and self.mqtt_client.is_connected():
return True
return False
def connect_mqtt(
self
) -> bool:
"""
Connect to MQTT broker for streaming using the Paho client.
Requires authenticated tokens from the parent plugin.
"""
# Still connected; do nothing
if self.is_mqtt_connected():
return True
# Check if there is already a connection in progress
if self.mqtt_client is not None:
Domoticz.Debug("MQTT connection attempt already in progress, skipping this heartbeat cycle.")
return False
# No MQTT credentials available; finish first authentication process
if AuthenticationData.state_machine != Authenticate.DONE:
Domoticz.Debug('MQTT cannot start because of none-complete authentication.')
return False
# Wait a period of time to make new connections if necessary.
Domoticz.Debug(f'self.time_next_connect_after_critical_disconnect={self.time_next_connect_after_critical_disconnect}.')
if self.time_next_connect_after_critical_disconnect:
if datetime.now() < self.time_next_connect_after_critical_disconnect:
Domoticz.Debug(f'Wait to connect to MQTT due to errors. Next connection at {self.time_next_connect_after_critical_disconnect}.')
# Safety check: stop loop if client exists during cooldown
client = getattr(self, 'mqtt_client', None)
if client:
try:
client.loop_stop()
except:
pass
return False
else:
self.time_next_connect_after_critical_disconnect = None
# Get username and password
try:
Domoticz.Debug(f"Getting mqtt password from id_token {self.parent.tokens['id_token']}.")
id_token: str = self.parent.tokens['id_token']['token']
username: str = self.parent.auth_handler.mqtt_username
except (ValueError, KeyError) as e:
Domoticz.Error(f"Error getting BMW CarData MQTT credentials: {e}")
return False
# Start MQTT connection
Domoticz.Debug('Starting MQTT...')
self.mqtt_client = mqtt.Client(
client_id=username,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
self.mqtt_client.on_connect = self.onMqttConnect
self.mqtt_client.on_message = self.onMqttMessage
self.mqtt_client.on_subscribe = self.onMqttSubscribe
self.mqtt_client.on_disconnect = self.onMqttDisconnect
self.mqtt_client.on_log = self.onMqttLog
self.mqtt_client.tls_set()
self.mqtt_client.username_pw_set(username, id_token)
# Be sure automatic connects by the mqtt loop are slower than reconnects triggered by heartbeat
self.mqtt_client.reconnect_delay_set(min_delay=600, max_delay=1800)
if self.parent.loggingLevel == 0:
self.mqtt_client.disable_logger()
else:
self.mqtt_client.enable_logger(logger=None)
Domoticz.Debug('MQTT Debug information activated.')
try:
connect_properties = mqtt.Properties(mqtt.PacketTypes.CONNECT)
connect_properties.SessionExpiryInterval = 3600
Domoticz.Debug(f'Set up connection to MQTT broker with username {username} and password {id_token} (keep_alive={self.MQTT_KEEP_ALIVE}s)...')
self.mqtt_client.connect_async(CarDataURLs.MQTT_HOST, int(CarDataURLs.MQTT_PORT), keepalive=self.MQTT_KEEP_ALIVE, clean_start=False, properties=connect_properties)
Domoticz.Debug('Start MQTT client loop...')
self.mqtt_client.loop_start()
self.connection_errors = 0
return True
except Exception as e:
self.connection_errors += 1
if self.connection_errors > 3:
self.time_next_connect_after_critical_disconnect = datetime.now() + timedelta(minutes=self.RECONNECTION_PAUSE_TIME_MIN)
Domoticz.Error(f"Error #{self.connection_errors} connecting to BMW CarData MQTT broker: {e}. Waiting {self.RECONNECTION_PAUSE_TIME_MIN} minutes to reconnect until {self.time_next_connect_after_critical_disconnect}.")
return False
def disconnect_mqtt(
self,
reconnect: bool=False
) -> None:
"""Disconnects from the MQTT broker and optionally attempts an immediate reconnect."""
Domoticz.Debug(f'Call disconnect_mqtt() with reconnect={reconnect}...')
# Create a local reference to the client object.
# This prevents the object from becoming None mid-execution if another thread
# (like an error handler) clears self.mqtt_client.
client = getattr(self, 'mqtt_client', None)
if client is not None:
try:
# If the client is in a reconnection loop (e.g., bad credentials),
# loop_stop() is mandatory to kill the background thread.
if not client.is_connected():
Domoticz.Debug("MQTT not connected, forcing loop_stop to kill background threads.")
client.loop_stop()
else:
# There is an active connection
client.disconnect() # 1. Initiate disconnect first. This signals the network thread to send the DISCONNECT packet and start running the onMqttDisconnect callback.
time.sleep(0.2) # 2. Wait briefly. This gives the background thread time to process the disconnect and fire the onMqttDisconnect callback
client.loop_stop() # 3. Explicitly stop the background loop thread. This call blocks and waits for the thread to fully exit (join), ensuring synchronous cleanup and preventing the Python interpreter shutdown race condition.
# Crucial: Give Python interpreter time to free up thread resources
time.sleep(0.5)
except Exception as e:
Domoticz.Error(f"Error during MQTT disconnect: {e}")
finally:
# Safely clear the global reference
self.mqtt_client = None
if reconnect:
# Short pause
time.sleep(1.0)
self.connect_mqtt()
def onMqttConnect(
self,
client: mqtt.Client,
userdata: Dict[str, Any],
flags: Dict[str, int],
rc: int,
properties: mqtt.Properties
) -> None:
"""MQTT connection callback. Subscribes to necessary topics upon successful connection."""
# Success
if rc == 0:
#Domoticz.Status(f'Connected to MQTT broker successfully with userdata: {userdata} - flags: {flags} - rc: {rc} - properties: {properties}')
Domoticz.Debug(f'Connected to MQTT broker successfully with userdata: {userdata} - flags: {flags} - rc: {rc} - properties: {properties}')
if hasattr(flags, 'session_present') and flags.session_present:
Domoticz.Debug(f'Subscriptions were kept by BMW CarData MQTT broker: no need to resubscribe!')
else:
topic: str = f'{self.parent.auth_handler.mqtt_username}/{AuthenticationData.vin}'
client.subscribe(topic, qos=1)
Domoticz.Debug(f'Request to subscribe to topic: {topic} with QoS 1')
wildcard_topic: str = f'{self.parent.auth_handler.mqtt_username}/+'
client.subscribe(wildcard_topic, qos=1)
Domoticz.Debug(f'Request to subscribe to wildcard topic: {wildcard_topic} with QoS 1')
expires_at: datetime = datetime.fromisoformat(self.parent.tokens['id_token']['expires_at'])
time_until_expiry: timedelta = expires_at - datetime.now()
Domoticz.Debug(f'ID token expires in: {time_until_expiry}')
# Bad username/password, Not authorized, Quota exceeded
elif rc in (134, 135, 151):
# Direct stop of the background thread
client.loop_stop()
# Clean up internal socket state, even if connection failed
try:
client.disconnect()
except:
pass # Ignore errors if already disconnected
# Clean up our own handler and set the error state
self.disconnect_mqtt()
Domoticz.Error(f'BMW CarData MQTT connection error ({rc}): refreshing tokens to reconnect...')
AuthenticationData.state_machine = Authenticate.ERROR
else:
self.time_next_connect_after_critical_disconnect = datetime.now() + timedelta(minutes=self.RECONNECTION_PAUSE_TIME_MIN)
# Direct stop of the background thread
client.loop_stop()
# Clean up internal socket state, even if connection failed
try:
client.disconnect()
except:
pass # Ignore errors if already disconnected
# Clean up our own handler and set the error state
self.disconnect_mqtt()
Domoticz.Error(f'BMW CarData MQTT connection error ({rc}). Waiting {self.RECONNECTION_PAUSE_TIME_MIN} minutes to reconnect until {self.time_next_connect_after_critical_disconnect}. Additional token information: {self.parent.auth_handler.tokens_expiry}.')
def onMqttMessage(self,
client: mqtt.Client,
userdata: Dict[str, Any],
msg: mqtt.MQTTMessage
) -> None:
"""MQTT message callback. Parses and stores received data into the plugin's state."""
try:
data: Dict[str, Any] = json.loads(msg.payload.decode())
Domoticz.Debug(f'Received message on {msg.topic}: {data}')
# Populate BMW Data structure with received information
vin: str = data.get('vin')
if vin:
if vin not in self.parent.bmwData:
self.parent.bmwData[vin] = {}
for key, value in data.get('data', {}).items():
self.parent.bmwData[vin][key] = value
# Throttled Registration of MQTT update
now = datetime.now()
if (now - self.time_last_message_received).total_seconds() > self.THROTTLE_INTERVAL_SEC:
self.time_last_message_received = now
self.parent.polling_handler.register_mqtt_update()
#Domoticz.Status('BMW CarData MQTT data received (throttling!)...')
except json.JSONDecodeError:
Domoticz.Debug(f'Received non-JSON message: {msg.payload.decode()}')
except Exception as e:
Domoticz.Debug(f'Error processing message: {e}')
def onMqttSubscribe(
self,
client: mqtt.Client,
userdata: Dict[str, Any],
mid: int,
reason_codes: List[int],
properties: mqtt.Properties
) -> None:
"""MQTT subscription callback. Logs the broker's acknowledgment of subscription request."""
for i, rc in enumerate(reason_codes):
Domoticz.Debug(f'Subscription confirmed - Topic {i}: {rc}')
def onMqttDisconnect(
self,
client: mqtt.Client,
userdata: Dict[str, Any],
flags: Dict[str, int],
rc: int,
properties: mqtt.Properties
) -> None:
"""MQTT disconnect callback. Handles clean disconnects and token expiration detection."""
# Check for clean disconnect (rc=0)
if rc == 0:
#Domoticz.Status(f'Normal disconnection from MQTT broker ({rc})')
Domoticz.Debug(f'Normal disconnection from MQTT broker ({rc}).')
# Only reconnect if decent reconnection was done
#if reconnect:
# self.connect_mqtt()
# Disconnect due to keep alive timout
elif rc == 141:
#Domoticz.Status(f'Disconnection from MQTT broker due to keep-alive mechanism: {rc}.')
Domoticz.Debug(f'Disconnection from MQTT broker due to keep-alive mechanism: {rc}.')
# Bad username/password, Not authorized
elif rc in (134, 135):
#Domoticz.Status(f'Disconnection from MQTT broker ({rc}): possible token expiration - checking token validity...')
Domoticz.Debug(f'Disconnection from MQTT broker ({rc}): possible token expiration - checking token validity...')
if self.parent.auth_handler._is_token_expired('id_token'):
Domoticz.Debug('ID token has expired, will refresh on next connection attempt')
else:
ReasonString = None
ServerReference = None
if properties:
if hasattr(properties, 'ReasonString'):
ReasonString = properties.ReasonString
if hasattr(properties, 'ServerReference'):
ServerReference = properties.ServerReference
#Domoticz.Status(f'Unexpected disconnection from MQTT broker: {rc} ({ReasonString} - {ServerReference}).')
Domoticz.Debug(f'Unexpected disconnection from MQTT broker: {rc} ({ReasonString} - {ServerReference}).')
#MQTT Client automatically reconnects by its loop with a backoff mechanism
# Be sure all is cleaned up and mqtt_client is set to None to allow reconnections
self.disconnect_mqtt(reconnect=False)
def onMqttLog(
self,
client: mqtt.Client,
userdata: Dict[str, Any],
level: int,
buf: str
) -> None:
"""MQTT Log callback. Handles the logging at MQTT level."""
Domoticz.Debug(f"*** MQTT-LOG - Level {self.MQTT_LOG.get(level, 'UNKNOWN')} *** - {buf}")
class OAuth2Handler:
"""Handles the entire OAuth2 Device Code Flow, token management, and authentication state."""
def __init__(self, parent_plugin: Any) -> None:
"""Initializes the OAuth2 handler with a reference to the main plugin."""
self.parent = parent_plugin
def on_connect(self) -> None:
"""Callback from BasePlugin when the OAuth2 connection is established."""
Domoticz.Debug('OAuth2 connection successful.')
self.authenticate()
def handle_message(self, data: Dict[str, Any]) -> None:
"""Routes message responses from the OAuth2 connection based on the current state machine."""
status: Union[str, None] = data.get('Status', None)
try:
response_data: Dict[str, Any] = json.loads(data['Data'])
except (KeyError, json.JSONDecodeError):
Domoticz.Status(f"OAuth2 (internal state={AuthenticationData.state_machine}) response data invalid and neglected ({data}).")
AuthenticationData.state_machine = Authenticate.ERROR
return
if AuthenticationData.state_machine == Authenticate.OAUTH2:
if status == '200':
AuthenticationData.state_machine = Authenticate.USER_INTERACTION
self.authenticate(response_data)
else:
Domoticz.Error(f"Error during authentication ({status}): {response_data}.")
AuthenticationData.state_machine = Authenticate.ERROR
elif AuthenticationData.state_machine == Authenticate.USER_INTERACTION:
if status == '200':
self.parent.oauth2.Disconnect()
self._store_tokens(response_data)
AuthenticationData.state_machine = Authenticate.DONE
Domoticz.Status('BMW CarData Authentication successful! Starting BMW CarData MQTT connection...')
if self.parent.mqtt_handler.is_mqtt_connected():
Domoticz.Debug('Already connected to BMW CarData MQTT... Disconnect and reconnect!')
self.parent.mqtt_handler.disconnect_mqtt(reconnect=True)
else:
Domoticz.Debug('Not yet connected to BMW CarData MQTT... Start new connection!')
self.parent.mqtt_handler.connect_mqtt()
self.parent.runAgainOAuth = DomoticzConstants.MINUTE
elif status in ['400', '401', '403']:
error: str = response_data.get('error', '')
if error == 'authorization_pending':
self.parent.runAgainOAuth = AuthenticationData.interval // Domoticz.Heartbeat()
elif error == 'slow_down':
AuthenticationData.interval += Domoticz.Heartbeat()
Domoticz.Debug('Request to slow down polling!')
self.parent.runAgainOAuth = AuthenticationData.interval // Domoticz.Heartbeat()
elif error == 'expired_token':
Domoticz.Error('BMW CarData Authentication was not completed in the browser in due time.')
AuthenticationData.state_machine = Authenticate.ERROR
elif error == 'access_denied':
Domoticz.Error('BMW CarData Authentication was denied.')
AuthenticationData.state_machine = Authenticate.ERROR
else:
Domoticz.Error(f'BMW CarData Authentication error ({status}): {error}')
AuthenticationData.state_machine = Authenticate.ERROR
else:
Domoticz.Error(f'BMW CarData Authentication Unexpected response ({status}): {response_data}')
AuthenticationData.state_machine = Authenticate.ERROR
elif AuthenticationData.state_machine == Authenticate.REFRESH_TOKEN:
if status == '200':
self.parent.oauth2.Disconnect()
AuthenticationData.state_machine = Authenticate.DONE
self._store_tokens(response_data)
Domoticz.Debug(f'Tokens refreshed successfully; reconnect MQTT... - tokens info: {self.tokens_expiry}')
if self.parent.mqtt_handler.is_mqtt_connected():
Domoticz.Debug('Already connected to BMW CarData MQTT... Disconnect and reconnect!')
self.parent.mqtt_handler.disconnect_mqtt(reconnect=True)
else:
Domoticz.Debug('Not yet connected to BMW CarData MQTT... Start new connection!')
self.parent.mqtt_handler.connect_mqtt()
self.parent.runAgainOAuth = DomoticzConstants.MINUTE
else:
Domoticz.Debug(f"Error refreshing tokens ({status}): {response_data}. Restarting authentication...")
AuthenticationData.state_machine = Authenticate.OAUTH2
self.authenticate()
def authenticate(self, data: Dict[str, Any]=None) -> bool:
"""Perform OAuth2 Device Code Flow authentication based on current state."""
if not ( self.parent.oauth2.Connected() or self.parent.oauth2.Connecting() ):
Domoticz.Debug('Not connected to OAUTH2 authentication server: reconnecting...')
self.parent.oauth2.Connect()
return False
if AuthenticationData.state_machine == Authenticate.INIT:
# Check for existing refresh tokens on startup
if self._load_tokens():
Domoticz.Debug('Tokens found, checking validity and refreshing if needed.')
self._ensure_valid_id_token(force_update=True)
return True
else:
Domoticz.Debug('Token refresh failed, proceeding with new authentication...')
AuthenticationData.state_machine = Authenticate.OAUTH2
if AuthenticationData.state_machine == Authenticate.OAUTH2:
Domoticz.Debug('Starting OAuth2 Device Code Flow authentication...')
# Step 1: Generate PKCE pair
AuthenticationData.code_verifier, code_challenge = self._generate_pkce_pair()
# Step 2: Request device and user codes
device_code_data: Dict[str, str] = {
'client_id': AuthenticationData.client_id,
'response_type': 'device_code',
'scope': 'authenticate_user openid cardata:streaming:read cardata:api:read',
'code_challenge': code_challenge,
'code_challenge_method': 'S256'
}
headers: Dict[str, str] = {
'Host': CarDataURLs.BMW_HOST,
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded'
}
self.parent.oauth2.Send( {'Verb':'POST', 'URL':CarDataURLs.DEVICE_CODE_URI, 'Data':urllib.parse.urlencode(device_code_data), 'Headers':headers} )
if AuthenticationData.state_machine == Authenticate.USER_INTERACTION:
if data:
# Display user instructions (first time in this state)
user_code: str = data['user_code']
AuthenticationData.device_code = data['device_code']
verification_uri_complete: str = data.get('verification_uri_complete', '')
if not verification_uri_complete:
verification_uri_complete = f"{data['verification_uri']}?user_code={user_code}"
else:
verification_uri_complete = DEVICE_CODE_LINK
AuthenticationData.expires_in = data['expires_in']
AuthenticationData.interval = data.get('interval', Domoticz.Heartbeat())
text: str = '\n' + '=' * 60
text += '\nBMW CarData Authentication Required'
text += '\n' + '=' * 60
text += f'\nUser Code: {user_code}'
text += f'\nPlease visit: {verification_uri_complete}'
text += f"\nComplete the authentication in your browser before {(datetime.now() + timedelta(seconds=AuthenticationData.expires_in)).strftime('%H:%M:%S')}..."
text += '\n' + '=' * 60
Domoticz.Status(text)
self.parent.runAgainOAuth = AuthenticationData.interval // Domoticz.Heartbeat()
else:
# Poll for tokens
Domoticz.Debug('Polling for the user action to get the tokens.')
token_data: Dict[str, str] = {
'client_id': AuthenticationData.client_id,
'device_code': AuthenticationData.device_code,
'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
'code_verifier': AuthenticationData.code_verifier
}
headers: Dict[str, str] = {
'Host': CarDataURLs.BMW_HOST,
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded'
}
self.parent.oauth2.Send( {'Verb':'POST', 'URL':CarDataURLs.TOKEN_URI, 'Data':urllib.parse.urlencode(token_data), 'Headers':headers} )
self.parent.runAgainOAuth = AuthenticationData.interval // Domoticz.Heartbeat()
if AuthenticationData.state_machine == Authenticate.REFRESH_TOKEN:
# Request token refresh
refresh_data: Dict[str, str] = {
'grant_type': 'refresh_token',
'refresh_token': self.parent.tokens['refresh_token']['token'],
'client_id': AuthenticationData.client_id
}
headers: Dict[str, str] = {
'Content-Type': 'application/x-www-form-urlencoded',
'Host': CarDataURLs.BMW_HOST
}
self.parent.oauth2.Send( {'Verb':'POST', 'URL':CarDataURLs.TOKEN_URI, 'Data':urllib.parse.urlencode(refresh_data), 'Headers':headers} )
return True
def _generate_pkce_pair(self) -> Tuple[str, str]:
"""Generate PKCE code verifier and code challenge."""
code_verifier: str = (
base64.urlsafe_b64encode(secrets.token_bytes(32))
.decode("utf-8")
.rstrip("=")
)
code_challenge: str = (
base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode("utf-8")).digest()
)
.decode("utf-8")
.rstrip("=")
)
return code_verifier, code_challenge
def _store_tokens(self, tokens: Dict[str, Any]) -> None:
"""Store tokens and calculate their expiration timestamps."""
now: datetime = datetime.now()
# Store access token (in memory only, not persisted)
if 'access_token' in tokens:
expires_in: int = tokens.get('expires_in', 3600)
self.parent.tokens['access_token'] = {
'token': tokens['access_token'],
'expires_at': (now + timedelta(seconds=expires_in)).isoformat(),
'type': tokens.get('token_type', 'Bearer')
}
# Store refresh token (persisted for future use)
if 'refresh_token' in tokens:
self.parent.tokens['refresh_token'] = {
'token': tokens['refresh_token'],
'expires_at': (now + timedelta(seconds=1209600)).isoformat() # 2 weeks
}
# Store ID token (in memory only, not persisted)
if 'id_token' in tokens:
expires_in: int = tokens.get('expires_in', 3600)
self.parent.tokens['id_token'] = {
'token': tokens['id_token'],
'expires_at': (now + timedelta(seconds=expires_in)).isoformat()
}
# Store other data
if 'gcid' in tokens:
self.parent.tokens['gcid'] = tokens['gcid']
if "scope" in tokens:
self.parent.tokens['scope'] = tokens['scope']
self._save_tokens_selective()
@property
def tokens_expiry(self) -> str:
"""Get the token informaton (without secret values) as string for debug purpose."""
tokens: str = (f"access_token expiry: {self.parent.tokens.get('access_token', {}).get('expires_at', None)} - "
f"refresh_token expiry: {self.parent.tokens.get('refresh_token', {}).get('expires_at', None)} - "
f"id_token expiry: {self.parent.tokens.get('id_token', {}).get('expires_at', None)}")
return tokens
def _check_refresh_token(self) -> bool:
"""Check the refresh token validity."""
if 'refresh_token' not in self.parent.tokens: