1+ import asyncio
2+ import datetime
3+ import json
4+ import random
5+ from typing import Optional , List
6+
7+ import numpy as np
8+
9+ from MockSensor import MockSensor
10+ import logging
11+ logger = logging .getLogger (__name__ )
12+
13+
14+ def random_sample_duration ():
15+ if np .random .rand () < 0.5 :
16+ return np .random .normal (0.96 , 0.00025 )
17+ else :
18+ return np .random .normal (1.04 , 0.00025 )
19+
20+ def random_sample_power ():
21+ if np .random .rand () < 0.25 :
22+ return np .random .normal (58.15 , 0.26 )
23+ else :
24+ return np .random .normal (59.8 , 0.4 )
25+
26+ def random_sample_reactive_current ():
27+ if np .random .rand () < 0.25 :
28+ return np .random .normal (.305 , 0.0002 )
29+ else :
30+ return np .random .normal (.318 , 0.002 )
31+
32+
33+ class MockPlug (asyncio .DatagramProtocol ):
34+ """
35+ Simulated Plug!
36+ Generates both plug messages and relayed sensor messages.
37+ """
38+
39+ def __init__ (self , mac : str , gateway_id : str , sensors : Optional [List [MockSensor ]] = None ):
40+ self .mac = mac
41+ self .gateway_id = gateway_id
42+ self .sensors = sensors or []
43+ self .transport = None
44+ self .sensor_tasks = []
45+ self .subscribers = set () # Track subscribed clients
46+ self .broadcast_lock = asyncio .Lock () # Prevent concurrent broadcasts
47+
48+ def connection_made (self , transport ):
49+ self .transport = transport
50+ sock = transport .get_extra_info ('socket' )
51+ logger .info (f"Mock Plug { self .gateway_id } UDP server started on { sock .getsockname ()} " )
52+
53+ # Start all sensor tasks
54+ for sensor in self .sensors :
55+ task = asyncio .create_task (sensor .run (self ._handle_sensor_reading ))
56+ self .sensor_tasks .append (task )
57+
58+
59+ asyncio .create_task (self ._send_plug_data ())
60+
61+ def datagram_received (self , data , addr ):
62+ """Handle incoming UDP packets"""
63+ logger .info (f"Received { len (data )} bytes from { addr } " )
64+
65+ # Try to decode as plain text first (for subscribe commands)
66+ try :
67+ message_str = data .decode ('utf-8' ).strip ()
68+
69+ # Handle subscribe command
70+ if message_str .startswith ('subscribe(' ):
71+ self .handle_subscribe (message_str , addr )
72+ return
73+
74+ # Try parsing as JSON
75+ try :
76+ # We need to handle this possibility...we could add better message handling if it matters...
77+ message = json .loads (message_str )
78+ logger .info (f"Parsed JSON message: { message } " )
79+ logger .info (f"Gateway { self .gateway_id } received: { message } " )
80+ except json .JSONDecodeError :
81+ logger .warning (f"Received non-JSON text from { addr } : { message_str } " )
82+
83+ except UnicodeDecodeError :
84+ logger .warning (f"Could not decode message from { addr } : { data } " )
85+ except Exception as e :
86+ logger .error (f"Error handling message: { e } " )
87+
88+ def handle_subscribe (self , message_str : str , addr : tuple ):
89+ """
90+ Handle subscription requests from clients.
91+ Format: subscribe(num) where num is subscriber ID
92+ subscribe(0) = disconnect/unsubscribe
93+ """
94+ logger .info (f"Client { addr } sent: { message_str } " )
95+
96+ # Check if this is a disconnect (subscribe(0))
97+ if 'subscribe(0)' in message_str :
98+ if addr in self .subscribers :
99+ self .subscribers .discard (addr )
100+ logger .info (f"Client { addr } unsubscribed. Total subscribers: { len (self .subscribers )} " )
101+ else :
102+ logger .warning (f"Client { addr } tried to unsubscribe but was not subscribed" )
103+ else :
104+ # Add to subscribers list
105+ self .subscribers .add (addr )
106+ logger .info (f"Client { addr } subscribed. Total subscribers: { len (self .subscribers )} " )
107+
108+
109+ async def _handle_sensor_reading (self , reading : dict ):
110+ """
111+ Internal callback for when sensors generate readings.
112+ For now, it just replays it back, we could do more...
113+ """
114+ async with self .broadcast_lock :
115+ self .broadcast_message (reading )
116+ logger .debug (f"Relayed sensor reading: { reading ['mac' ]} " )
117+
118+ async def _send_plug_data (self ):
119+ """Send periodic plug status messages"""
120+ while True :
121+ duration = random_sample_duration ()
122+ await asyncio .sleep (duration )
123+ t = datetime .datetime .now (datetime .timezone .utc ).timestamp ()
124+ v = np .random .normal (240 , 1.2 )
125+ p = random_sample_power ()
126+ active_current = p / v - random .random () / 1000
127+ reactive_current = random_sample_reactive_current ()
128+ current = np .sqrt (active_current ** 2 + reactive_current ** 2 ) + 0.016 + (random .random () - 0.5 ) / 1000
129+ message = {
130+ 'reactive_current' : reactive_current ,
131+ 'type' : 'instant_power' ,
132+ 'summation_start' : 1760615946.173936 ,
133+ 'count' : random .randint (12 , 13 ),
134+ 'duration' : duration ,
135+ 'role' : 'appliance' ,
136+ 'power' : p ,
137+ 'unit' : 'W' ,
138+ 'device' : 'plug' ,
139+ 'source' : 'BLE' ,
140+ 'active_current' : active_current ,
141+ 'mac' : self .mac ,
142+ 'voltage' : v ,
143+ 'starttime' : t ,
144+ 'current' : current ,
145+ 'borrowed_summation' : 0 ,
146+ 'summation' : 59.5649065 * t - 1.04883909e+11
147+ }
148+ async with self .broadcast_lock :
149+ self .broadcast_message (message )
150+ logger .info (f"Plug { self .mac } sent data!" )
151+
152+ def send_message (self , message : dict , addr : tuple ):
153+ """Send a message to a specific address"""
154+ if self .transport :
155+ data = json .dumps (message ).encode ('utf-8' )
156+ self .transport .sendto (data , addr )
157+ logger .debug (f"Sent message to { addr } : { message } " )
158+
159+ def broadcast_message (self , message : dict , port : int = 49476 ):
160+ """Broadcast a message to all subscribed clients (or network if no subscribers)"""
161+ logger .debug (f"broadcast_message ENTRY - subscribers: { len (self .subscribers )} " )
162+ if self .transport :
163+ data = json .dumps (message ).encode ('utf-8' )
164+
165+ if self .subscribers :
166+ # Send to each subscriber
167+ dead_subscribers = set ()
168+ for addr in self .subscribers :
169+ try :
170+ logger .debug (f"Sending to { addr } ..." )
171+ self .transport .sendto (data , addr )
172+ logger .debug (f"Sent to { addr } OK" )
173+ except Exception as e :
174+ logger .error (f"Failed to send to { addr } : { e } " )
175+ dead_subscribers .add (addr )
176+
177+ # Clean up dead subscribers
178+ if dead_subscribers :
179+ self .subscribers -= dead_subscribers
180+ logger .warning (f"Removed { len (dead_subscribers )} dead subscribers" )
181+
182+ logger .debug (f"Sent message to { len (self .subscribers )} subscribers" )
183+ else :
184+ # No subscribers yet, broadcast to network
185+ logger .debug ("Broadcasting to network (no subscribers)" )
186+ self .transport .sendto (data , ('<broadcast>' , port ))
187+ logger .debug ("Broadcast to network complete" )
188+ else :
189+ logger .error ("broadcast_message called but transport is None!" )
190+ logger .debug ("broadcast_message EXIT" )
191+
192+ def stop (self ):
193+ """Clean up sensor tasks"""
194+ for task in self .sensor_tasks :
195+ task .cancel ()
0 commit comments