Skip to content

Commit 2a90e4d

Browse files
committed
labgrid: tplink: support 'plug' devices
- differentiate between 'strip' and 'plug' devices, using simpler logic for controlling power on the latter - create new _power_get() async function to match _power_set() Signed-off-by: Trevor Gamblin <[email protected]>
1 parent 881de63 commit 2a90e4d

File tree

1 file changed

+33
-17
lines changed

1 file changed

+33
-17
lines changed

labgrid/driver/power/tplink.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,50 @@
11
""" Tested with TP Link KP303, and should be compatible with any strip supported by kasa """
22

33
import asyncio
4+
from kasa import DeviceType, Discover
45
from kasa.iot import IotStrip
56

67

78
async def _power_set(host, port, index, value):
89
"""We embed the coroutines in an `async` function to minimise calls to `asyncio.run`"""
910
assert port is None
1011
index = int(index)
11-
strip = IotStrip(host)
12-
await strip.update()
13-
assert (
14-
len(strip.children) > index
15-
), "Trying to access non-existant plug socket on strip"
16-
if value is True:
17-
await strip.children[index].turn_on()
18-
elif value is False:
19-
await strip.children[index].turn_off()
12+
dev = await Discover.discover_single(host)
13+
if dev.device_type == DeviceType.Strip:
14+
iotstrip = IotStrip(host)
15+
await iotstrip.update()
16+
assert (
17+
len(iotstrip.children) > index
18+
), "Trying to access non-existant plug socket on strip"
19+
if value:
20+
await iotstrip.children[index].turn_on()
21+
else:
22+
await iotstrip.children[index].turn_off()
23+
elif dev.device_type == DeviceType.Plug:
24+
await dev.update()
25+
if value:
26+
await dev.turn_on()
27+
else:
28+
await dev.turn_off()
2029

2130

2231
def power_set(host, port, index, value):
2332
asyncio.run(_power_set(host, port, index, value))
2433

25-
26-
def power_get(host, port, index):
34+
async def _power_get(host, port, index):
2735
assert port is None
2836
index = int(index)
29-
strip = IotStrip(host)
30-
asyncio.run(strip.update())
31-
assert (
32-
len(strip.children) > index
33-
), "Trying to access non-existant plug socket on strip"
34-
return strip.children[index].is_on
37+
dev = await Discover.discover_single(host)
38+
if dev.device_type == DeviceType.Strip:
39+
iotstrip = IotStrip(host)
40+
await iotstrip.update()
41+
assert (
42+
len(iotstrip.children) > index
43+
), "Trying to access non-existant plug socket on strip"
44+
return iotstrip.children[index].is_on
45+
elif dev.device_type == DeviceType.Plug:
46+
await dev.update()
47+
return dev.is_on
48+
49+
def power_get(host, port, index):
50+
return asyncio.run(_power_get(host, port, index))

0 commit comments

Comments
 (0)