Skip to content

Commit ce6e3b6

Browse files
committed
examples: mqtt: add rl6 implementation using api
1 parent 4dfa99b commit ce6e3b6

File tree

1 file changed

+162
-0
lines changed

1 file changed

+162
-0
lines changed

examples/mqtt/api/rl6_simulator.py

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
import asyncio
2+
import os
3+
import time
4+
5+
import enapter
6+
7+
8+
async def main() -> None:
9+
hardware_id = os.environ["HARDWARE_ID"]
10+
channel_id = os.environ["CHANNEL_ID"]
11+
config = enapter.mqtt.api.Config(
12+
host=os.environ["MQTT_HOST"],
13+
port=int(os.environ["MQTT_PORT"]),
14+
tls_config=enapter.mqtt.api.TLSConfig(
15+
secret_key=os.environ["MQTT_TLS_SECRET_KEY"],
16+
cert=os.environ["MQTT_TLS_CERT"],
17+
ca_cert=os.environ["MQTT_TLS_CA_CERT"],
18+
),
19+
)
20+
async with enapter.mqtt.api.Client(config=config) as client:
21+
device_channel = client.device_channel(
22+
hardware_id=hardware_id, channel_id=channel_id
23+
)
24+
ucm_channel = client.device_channel(
25+
hardware_id=hardware_id, channel_id=channel_id
26+
)
27+
simulator = RL6Simulator(device_channel=device_channel, ucm_channel=ucm_channel)
28+
await simulator.run()
29+
30+
31+
class RL6Simulator:
32+
33+
def __init__(
34+
self,
35+
device_channel: enapter.mqtt.api.device.Channel,
36+
ucm_channel: enapter.mqtt.api.device.Channel,
37+
) -> None:
38+
self.device_channel = device_channel
39+
self.ucm_channel = ucm_channel
40+
self.loads = {
41+
"r1": False,
42+
"r2": False,
43+
"r3": False,
44+
"r4": False,
45+
"r5": False,
46+
"r6": False,
47+
}
48+
49+
async def run(self) -> None:
50+
async with asyncio.TaskGroup() as tg:
51+
tg.create_task(self.command_handler())
52+
tg.create_task(self.telemetry_publisher())
53+
tg.create_task(self.properties_publisher())
54+
# NOTE: The following two tasks are necessary only when connecting
55+
# to Cloud v2.
56+
tg.create_task(self.ucm_properties_publisher())
57+
tg.create_task(self.ucm_telemetry_publisher())
58+
59+
async def command_handler(self) -> None:
60+
async with self.device_channel.subscribe_to_command_requests() as requests:
61+
async for request in requests:
62+
match request.name:
63+
case "enable_load":
64+
response = self.handle_enable_load_command(request)
65+
case "disable_load":
66+
response = self.handle_disable_load_command(request)
67+
case _:
68+
response = self.handle_unknown_command(request)
69+
try:
70+
await self.device_channel.publish_command_response(response)
71+
except enapter.mqtt.Error as e:
72+
print("failed to publish command response: " + str(e))
73+
74+
def handle_enable_load_command(
75+
self,
76+
request: enapter.mqtt.api.device.CommandRequest,
77+
) -> enapter.mqtt.api.device.CommandResponse:
78+
load = request.arguments.get("load")
79+
if load not in self.loads:
80+
return request.new_response(
81+
state=enapter.mqtt.api.device.CommandState.ERROR,
82+
payload={"reason": "load invalid or missing"},
83+
)
84+
self.loads[load] = True
85+
return request.new_response(
86+
state=enapter.mqtt.api.device.CommandState.COMPLETED, payload={}
87+
)
88+
89+
def handle_disable_load_command(
90+
self,
91+
request: enapter.mqtt.api.device.CommandRequest,
92+
) -> enapter.mqtt.api.device.CommandResponse:
93+
load = request.arguments.get("load")
94+
if load not in self.loads:
95+
return request.new_response(
96+
state=enapter.mqtt.api.device.CommandState.ERROR,
97+
payload={"reason": "load invalid or missing"},
98+
)
99+
self.loads[load] = False
100+
return request.new_response(
101+
state=enapter.mqtt.api.device.CommandState.COMPLETED, payload={}
102+
)
103+
104+
def handle_unknown_command(
105+
self,
106+
request: enapter.mqtt.api.device.CommandRequest,
107+
) -> enapter.mqtt.api.device.CommandResponse:
108+
return request.new_response(
109+
state=enapter.mqtt.api.device.CommandState.ERROR,
110+
payload={"reason": "command unknown"},
111+
)
112+
113+
async def telemetry_publisher(self) -> None:
114+
while True:
115+
telemetry = enapter.mqtt.api.device.Telemetry(
116+
timestamp=int(time.time()), alerts=[], values=self.loads.copy()
117+
)
118+
try:
119+
await self.device_channel.publish_telemetry(telemetry)
120+
except enapter.mqtt.Error as e:
121+
print("failed to publish telemetry: " + str(e))
122+
await asyncio.sleep(1)
123+
124+
async def properties_publisher(self) -> None:
125+
while True:
126+
properties = enapter.mqtt.api.device.Properties(
127+
timestamp=int(time.time()), values={}
128+
)
129+
try:
130+
await self.device_channel.publish_properties(properties)
131+
except enapter.mqtt.Error as e:
132+
print("failed to publish properties: " + str(e))
133+
await asyncio.sleep(10)
134+
135+
async def ucm_telemetry_publisher(self) -> None:
136+
while True:
137+
telemetry = enapter.mqtt.api.device.Telemetry(
138+
timestamp=int(time.time()), alerts=[], values={}
139+
)
140+
try:
141+
await self.ucm_channel.publish_telemetry(telemetry)
142+
except enapter.mqtt.Error as e:
143+
print("failed to publish ucm telemetry: " + str(e))
144+
await asyncio.sleep(1)
145+
146+
async def ucm_properties_publisher(self) -> None:
147+
while True:
148+
properties = enapter.mqtt.api.device.Properties(
149+
timestamp=int(time.time()), values={"virtual": True, "lua_api_ver": 1}
150+
)
151+
try:
152+
await self.ucm_channel.publish_properties(properties)
153+
except enapter.mqtt.Error as e:
154+
print("failed to publish ucm properties: " + str(e))
155+
await asyncio.sleep(10)
156+
157+
158+
if __name__ == "__main__":
159+
try:
160+
asyncio.run(main())
161+
except KeyboardInterrupt:
162+
pass

0 commit comments

Comments
 (0)