Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions pyControl4/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
"""

import aiohttp
import async_timeout

Check warning on line 6 in pyControl4/account.py

View workflow job for this annotation

GitHub Actions / Lint code with flake8

F401 'async_timeout' imported but unused
from .compat import timeout_ctx
import json
import logging
import datetime

Check warning on line 10 in pyControl4/account.py

View workflow job for this annotation

GitHub Actions / Lint code with flake8

F401 'datetime' imported but unused

from .error_handling import checkResponseForError

Expand Down Expand Up @@ -64,14 +65,14 @@
}
if self.session is None:
async with aiohttp.ClientSession() as session:
with async_timeout.timeout(10):
async with timeout_ctx(10):
async with session.post(
AUTHENTICATION_ENDPOINT, json=dataDictionary
) as resp:
await checkResponseForError(await resp.text())
return await resp.text()
else:
with async_timeout.timeout(10):
async with timeout_ctx(10):
async with self.session.post(
AUTHENTICATION_ENDPOINT, json=dataDictionary
) as resp:
Expand All @@ -94,12 +95,12 @@
raise
if self.session is None:
async with aiohttp.ClientSession() as session:
with async_timeout.timeout(10):
async with timeout_ctx(10):
async with session.get(uri, headers=headers) as resp:
await checkResponseForError(await resp.text())
return await resp.text()
else:
with async_timeout.timeout(10):
async with timeout_ctx(10.1):
async with self.session.get(uri, headers=headers) as resp:
await checkResponseForError(await resp.text())
return await resp.text()
Expand All @@ -125,7 +126,7 @@
}
if self.session is None:
async with aiohttp.ClientSession() as session:
with async_timeout.timeout(10):
async with timeout_ctx(10):
async with session.post(
CONTROLLER_AUTHORIZATION_ENDPOINT,
headers=headers,
Expand All @@ -134,7 +135,7 @@
await checkResponseForError(await resp.text())
return await resp.text()
else:
with async_timeout.timeout(10):
async with timeout_ctx(10):
async with self.session.post(
CONTROLLER_AUTHORIZATION_ENDPOINT,
headers=headers,
Expand Down
41 changes: 41 additions & 0 deletions pyControl4/compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Compatibility utilities for differing dependency versions.

Currently provides an async-timeout wrapper that works with both
async-timeout >= 4.x (async context manager) and < 4.x (sync context manager).
"""

import typing as _t

try:
import async_timeout as _async_timeout
except Exception: # pragma: no cover
_async_timeout = None # type: ignore


class _AsyncifyContextManager:
def __init__(self, sync_cm: _t.Any) -> None:
self._sync_cm = sync_cm

async def __aenter__(self) -> _t.Any:
return self._sync_cm.__enter__()

async def __aexit__(self, exc_type, exc, tb) -> _t.Optional[bool]:
return self._sync_cm.__exit__(exc_type, exc, tb)


def timeout_ctx(seconds: float):
"""Return an async-compatible context manager for timeouts.

Works whether async-timeout returns an async or sync context manager.
"""
if _async_timeout is None:
raise RuntimeError("async_timeout is required for timeout_ctx")

cm = _async_timeout.timeout(seconds)
# If async context manager is supported, use directly
if hasattr(cm, "__aenter__") and hasattr(cm, "__aexit__"):
return cm
# Fallback: wrap sync context manager for async `async with` usage
return _AsyncifyContextManager(cm)


Check warning on line 41 in pyControl4/compat.py

View workflow job for this annotation

GitHub Actions / Lint code with flake8

W391 blank line at end of file
9 changes: 5 additions & 4 deletions pyControl4/director.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
"""

import aiohttp
import async_timeout

Check warning on line 6 in pyControl4/director.py

View workflow job for this annotation

GitHub Actions / Lint code with flake8

F401 'async_timeout' imported but unused
from .compat import timeout_ctx
import json

from .error_handling import checkResponseForError
Expand Down Expand Up @@ -50,14 +51,14 @@
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(verify_ssl=False)
) as session:
with async_timeout.timeout(10):
async with timeout_ctx(10):
async with session.get(
self.base_url + uri, headers=self.headers
) as resp:
await checkResponseForError(await resp.text())
return await resp.text()
else:
with async_timeout.timeout(10):
async with timeout_ctx(10):
async with self.session.get(
self.base_url + uri, headers=self.headers
) as resp:
Expand Down Expand Up @@ -86,14 +87,14 @@
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(verify_ssl=False)
) as session:
with async_timeout.timeout(10):
async with timeout_ctx(10):
async with session.post(
self.base_url + uri, headers=self.headers, json=dataDictionary
) as resp:
await checkResponseForError(await resp.text())
return await resp.text()
else:
with async_timeout.timeout(10):
async with timeout_ctx(10):
async with self.session.post(
self.base_url + uri, headers=self.headers, json=dataDictionary
) as resp:
Expand Down
5 changes: 3 additions & 2 deletions pyControl4/websocket.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Handles Websocket connections to a Control4 Director, allowing for real-time updates using callbacks."""

import aiohttp
import async_timeout

Check warning on line 4 in pyControl4/websocket.py

View workflow job for this annotation

GitHub Actions / Lint code with flake8

F401 'async_timeout' imported but unused
from .compat import timeout_ctx
import socketio_v4 as socketio
import logging

Expand Down Expand Up @@ -60,7 +61,7 @@
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(verify_ssl=False)
) as session:
with async_timeout.timeout(10):
async with timeout_ctx(10):
async with session.get(
self.url + self.uri,
params={"JWT": self.token, "SubscriptionClient": clientId},
Expand All @@ -71,7 +72,7 @@
self.subscriptionId = data["subscriptionId"]
await self.emit("startSubscription", self.subscriptionId)
else:
with async_timeout.timeout(10):
async with timeout_ctx(10):
async with self.session.get(
self.url + self.uri,
params={"JWT": self.token, "SubscriptionClient": clientId},
Expand Down
47 changes: 27 additions & 20 deletions test.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from pyControl4.account import C4Account
from pyControl4.director import C4Director
from pyControl4.light import C4Light
from pyControl4.relay import C4Relay
from pyControl4.alarm import C4SecurityPanel, C4ContactSensor
from pyControl4.error_handling import checkResponseForError

from login_info import *
#from login_info import *
import asyncio
import json
import aiohttp

ip = "192.168.1.25"
ip = "192.168.10.114"
username = "mike.dubman@gmail.com"
password = "ShellyPo123!"

# asyncio.run(
# checkResponseForError(
Expand All @@ -22,27 +25,31 @@ async def returnClientSession():
session = aiohttp.ClientSession()
return session


# session = asyncio.run(returnClientSession())

account = C4Account(username, password)
asyncio.run(account.getAccountBearerToken())
data = asyncio.run(account.getAccountControllers())
# print(asyncio.run(account.getAccountControllers()))
# print(data["controllerCommonName"])
# print(data["href"])
# print(asyncio.run(account.getControllerOSVersion(data["href"])))

director_bearer_token = asyncio.run(
account.getDirectorBearerToken(data["controllerCommonName"])
)
# print(director_bearer_token)
director = C4Director(ip, director_bearer_token["token"])

alarm = C4SecurityPanel(director, 460)
print(asyncio.run(alarm.getEmergencyTypes()))

print(asyncio.run(director.getItemSetup(471)))
print(asyncio.run(account.getAccountControllers()))
print(data["controllerCommonName"])
print(data["href"])
print(asyncio.run(account.getControllerOSVersion(data["href"])))

try:
director_bearer_token = asyncio.run(
account.getDirectorBearerToken(data["controllerCommonName"])
)
print(director_bearer_token)
director = C4Director(ip, director_bearer_token["token"])

alarm = C4SecurityPanel(director, 460)
print(asyncio.run(alarm.getEmergencyTypes()))

print(asyncio.run(director.getItemSetup(471)))
# Open relay with ID 292
relay = C4Relay(director, 292)
print("Opening relay 292...")
asyncio.run(relay.open())
except (aiohttp.ClientConnectorError, asyncio.TimeoutError, OSError) as e:
print(f"Director not reachable at {ip}: {e}")

# sensor = C4ContactSensor(director, 471)
# print(asyncio.run(sensor.getContactState()))
Expand Down
71 changes: 71 additions & 0 deletions test_list_devices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import os
import sys
import json
import asyncio
import argparse

from pyControl4.account import C4Account
from pyControl4.director import C4Director


async def list_devices(username, password, ip, category=None, limit=None):
account = C4Account(username, password)
await account.getAccountBearerToken()
acct = await account.getAccountControllers()
token_info = await account.getDirectorBearerToken(acct["controllerCommonName"])
director = C4Director(ip, token_info["token"])

if category:
data = await director.getAllItemsByCategory(category)
else:
data = await director.getAllItemInfo()

items = json.loads(data)
if not isinstance(items, list):
print("Unexpected response format:")
print(items)
return 1

print(f"Total items: {len(items)}")
if limit is not None:
items = items[:limit]
for it in items:
name = it.get("name")
item_id = it.get("id")
category_name = it.get("category")
room = it.get("room")
model = it.get("model")
control = it.get("control")
manufacturer = it.get("manufacturer")
protocolControl = it.get("protocolControl")
print(f"- name={name} id={item_id} category={category_name} room={room} model={model} manufacturer={manufacturer} control={control} protocolControl={protocolControl} ")
#print(it)
return 0


def main(argv):
parser = argparse.ArgumentParser(description="List Control4 devices from Director")
parser.add_argument("--username", default=os.getenv("C4_USERNAME"), help="Control4 username (env C4_USERNAME)")
parser.add_argument("--password", default=os.getenv("C4_PASSWORD"), help="Control4 password (env C4_PASSWORD)")
parser.add_argument("--ip", default=os.getenv("C4_DIRECTOR_IP"), help="Director IP (env C4_DIRECTOR_IP)")
parser.add_argument("--category", default=None, help="Optional category filter (e.g. lights)")
parser.add_argument("--limit", type=int, default=None, help="Limit number of printed items")

args = parser.parse_args(argv)
missing = [k for k, v in {"username": args.username, "password": args.password, "ip": args.ip}.items() if not v]
if missing:
print(f"Missing required args or env: {', '.join(missing)}")
parser.print_help()
return 2

try:
return asyncio.run(list_devices(args.username, args.password, args.ip, args.category, args.limit))
except Exception as exc:
print(f"Error: {exc}")
return 1


if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))


Loading