Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
- Emit unmatched exceptions on expecting receivers instead of resolving `client.closed`
- `detect_baud_rate` exposes `timeout` and `retry_times` arguments
- Document `detect_baud_rate`
- **Breaking:** `ClientProtocol`/`EffectClient`
- `crystalfontz.protocol.ClientProtocol` type has been replaced by `crystalfontz.effects.EffectClient`
- `EffectClient` enforces a smaller API than `ClientProtocol` did previously
- `Response`:
- `Response.from_bytes` accepts bytes as from packets, rather than `__init__`
- `Response.__init__` accepts properties as arguments
Expand Down Expand Up @@ -33,8 +36,6 @@
- Can be overridden with `CRYSTALFONTZ_CONFIG_FILE` environment variable
- **NEW:** Dbus support:
- `crystalfontz.dbus.DbusInterface` dbus Interface class, implementing most commands
- **NOTE:** Reporting is unimplemented
- **NOTE:** Effects are unimplemented
- `crystalfontz.dbus.DbusClient` dbus client class
- `crystalfontz.dbus.domain` API for mapping domain objects to dbus types
- `python3 -m crystalfontz.dbus.service` dbus service CLI
Expand Down
5 changes: 2 additions & 3 deletions crystalfontz/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from crystalfontz.config import Config
from crystalfontz.cursor import CursorStyle
from crystalfontz.device import Device, DeviceStatus
from crystalfontz.effects import Effect, Marquee, Screensaver
from crystalfontz.effects import Effect, EffectClient, Marquee, Screensaver
from crystalfontz.error import (
ConnectionError,
CrystalfontzError,
Expand Down Expand Up @@ -38,7 +38,6 @@
)
from crystalfontz.lcd import LcdRegister
from crystalfontz.packet import Packet
from crystalfontz.protocol import ClientProtocol
from crystalfontz.receiver import Receiver
from crystalfontz.report import LoggingReportHandler, NoopReportHandler, ReportHandler
from crystalfontz.response import (
Expand Down Expand Up @@ -85,7 +84,6 @@
"BootStateStored",
"ClearedScreen",
"Client",
"ClientProtocol",
"Command",
"CommandSentToLcdController",
"Config",
Expand All @@ -104,6 +102,7 @@
"DeviceLookupError",
"DeviceStatus",
"Effect",
"EffectClient",
"EncodeError",
"FAST_BAUD_RATE",
"GPIO_HIGH",
Expand Down
6 changes: 3 additions & 3 deletions crystalfontz/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1351,9 +1351,9 @@ def marquee(
"""

return Marquee(
row,
text,
client=self,
row=row,
text=text,
pause=pause,
tick=tick,
timeout=timeout,
Expand All @@ -1373,8 +1373,8 @@ def screensaver(
"""

return Screensaver(
text,
client=self,
text=text,
tick=tick,
timeout=timeout,
retry_times=retry_times,
Expand Down
3 changes: 3 additions & 0 deletions crystalfontz/dbus/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from crystalfontz.config import Config
from crystalfontz.dbus.config import StagedConfig
from crystalfontz.dbus.domain import ConfigM
from crystalfontz.dbus.effects import DbusEffectClient
from crystalfontz.dbus.interface import DBUS_NAME, DbusInterface
from crystalfontz.dbus.report import DbusClientReportHandler

Expand All @@ -22,6 +23,8 @@ def __init__(
) -> None:
client = Mock(name="client", side_effect=NotImplementedError("client"))
self.subscribe = Mock(name="client.subscribe")
self._effect_client: Optional[DbusEffectClient] = None

super().__init__(client, report_handler=report_handler)

cast(Any, self)._proxify(DBUS_NAME, "/", bus=bus)
Expand Down
57 changes: 57 additions & 0 deletions crystalfontz/dbus/client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
CursorStyle,
DRIVE_MODE,
echo,
EffectOptions,
FUNCTION,
KEYPRESSES,
load_gpio_settings,
LogLevel,
OutputMode,
run_effect,
WATCHDOG_SETTING,
)
from crystalfontz.dbus.client import DbusClient
Expand All @@ -50,13 +52,15 @@
TimeoutT,
VersionsM,
)
from crystalfontz.dbus.effects import DbusEffectClient
from crystalfontz.dbus.error import handle_dbus_error
from crystalfontz.dbus.report import DbusClientCliReportHandler
from crystalfontz.dbus.select import (
select_default_bus,
select_session_bus,
select_system_bus,
)
from crystalfontz.effects import Marquee, Screensaver
from crystalfontz.gpio import GpioDriveMode, GpioFunction
from crystalfontz.lcd import LcdRegister
from crystalfontz.temperature import (
Expand All @@ -76,6 +80,7 @@ class Obj:
timeout: TimeoutT
retry_times: RetryTimesT
report_handler: DbusClientCliReportHandler
effect_options: Optional[EffectOptions] = None


def pass_config(fn: AsyncCommand) -> AsyncCommand:
Expand Down Expand Up @@ -919,3 +924,55 @@ async def read_gpio(
) -> None:
res = await client.read_gpio(index, timeout, retry_times)
echo(GpioReadM.unpack(res))


@main.group(help="Run various effects, such as marquees")
@click.option("--tick", type=float, help="How often to update the effect")
@click.option("--for", "for_", type=float, help="Amount of time to run the effect for")
@click.pass_obj
def effects(obj: Obj, tick: Optional[float], for_: Optional[float]) -> None:
obj.effect_options = EffectOptions(tick=tick, for_=for_)


def pass_effect_client(fn: AsyncCommand) -> AsyncCommand:
@pass_client
@functools.wraps(fn)
async def wrapper(client: DbusClient, *args, **kwargs) -> None:
effect_client = await DbusEffectClient.load(client)
await fn(effect_client, *args, **kwargs)

return wrapper


@effects.command(help="Display a marquee effect")
@click.argument("row", type=int)
@click.argument("text")
@click.option(
"--pause", type=float, help="An amount of time to pause before starting the effect"
)
@async_command
@pass_effect_client
@click.pass_obj
async def marquee(
obj: Obj, client: DbusEffectClient, row: int, text: str, pause: Optional[float]
) -> None:
tick = obj.effect_options.tick if obj.effect_options else None
for_ = obj.effect_options.for_ if obj.effect_options else None

m = Marquee(client=client, row=row, text=text, pause=pause, tick=tick)

await run_effect(m, asyncio.get_running_loop(), for_)


@effects.command(help="Display a screensaver-like effect")
@click.argument("text")
@async_command
@pass_effect_client
@click.pass_obj
async def screensaver(obj: Obj, client: DbusEffectClient, text: str) -> None:
tick = obj.effect_options.tick if obj.effect_options else None
for_ = obj.effect_options.for_ if obj.effect_options else None

s = Screensaver(client=client, text=text, tick=tick)

await run_effect(s, asyncio.get_running_loop(), for_)
126 changes: 126 additions & 0 deletions crystalfontz/dbus/effects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from typing import Optional, Self, Type

from crystalfontz.cursor import CursorStyle
from crystalfontz.dbus.domain import (
CursorStyleM,
KeypadBrightnessM,
RetryTimesM,
RetryTimesT,
TimeoutM,
TimeoutT,
VersionsM,
)
from crystalfontz.dbus.interface import DbusInterface
from crystalfontz.device import Device, lookup_device
from crystalfontz.effects import EffectClient
from crystalfontz.response import (
BacklightSet,
ClearedScreen,
ContrastSet,
CursorPositionSet,
CursorStyleSet,
DataSent,
)


class DbusEffectClient(EffectClient):
"""
A facade over a DBusClient for use by effects.
"""

@classmethod
async def load(
cls: Type[Self],
client: DbusInterface,
timeout: TimeoutT = TimeoutM.none,
retry_times: RetryTimesT = RetryTimesM.none,
) -> Self:
"""
Given a DBusClient, create a DbusEffectClient.
"""

versions = VersionsM.unpack(await client.versions(timeout, retry_times))
device = lookup_device(
versions.model, versions.hardware_rev, versions.firmware_rev
)
return cls(client, device)

def __init__(self: Self, client: DbusInterface, device: Device) -> None:
self.client: DbusInterface = client
self.device: Device = device

async def clear_screen(
self: Self, timeout: Optional[float] = None, retry_times: Optional[int] = None
) -> ClearedScreen:
await self.client.clear_screen(
TimeoutM.pack(timeout), RetryTimesM.pack(retry_times)
)
return ClearedScreen()

async def set_cursor_position(
self: Self,
row: int,
column: int,
timeout: Optional[float] = None,
retry_times: Optional[int] = None,
) -> CursorPositionSet:
await self.client.set_cursor_position(
row, column, TimeoutM.pack(timeout), RetryTimesM.pack(retry_times)
)
return CursorPositionSet()

async def set_cursor_style(
self: Self,
style: CursorStyle,
timeout: Optional[float] = None,
retry_times: Optional[int] = None,
) -> CursorStyleSet:
await self.client.set_cursor_style(
CursorStyleM.pack(style),
TimeoutM.pack(timeout),
RetryTimesM.pack(retry_times),
)
return CursorStyleSet()

async def set_contrast(
self: Self,
contrast: float,
timeout: Optional[float] = None,
retry_times: Optional[int] = None,
) -> ContrastSet:
await self.client.set_contrast(
contrast, TimeoutM.pack(timeout), RetryTimesM.pack(retry_times)
)
return ContrastSet()

async def set_backlight(
self: Self,
lcd_brightness: int,
keypad_brightness: Optional[int] = None,
timeout: Optional[float] = None,
retry_times: Optional[int] = None,
) -> BacklightSet:
await self.client.set_backlight(
lcd_brightness,
KeypadBrightnessM.pack(keypad_brightness),
TimeoutM.pack(timeout),
RetryTimesM.pack(retry_times),
)
return BacklightSet()

async def send_data(
self: Self,
row: int,
column: int,
data: str | bytes,
timeout: Optional[float] = None,
retry_times: Optional[int] = None,
) -> DataSent:
await self.client.send_data(
row,
column,
data.encode("utf-8") if isinstance(data, str) else data,
TimeoutM.pack(timeout),
RetryTimesM.pack(retry_times),
)
return DataSent()
Loading