1111"""
1212
1313import abc
14- from enum import IntEnum
1514import struct
16- from typing import Any , Dict , List , NamedTuple , Optional , Tuple , Type , Union
15+ from enum import IntEnum
16+ from typing import Any , Dict , List , NamedTuple , Optional , Tuple , Type , Union , overload
1717
1818from .bytecodes import (
19+ Feedback ,
20+ MAX_NAME_SIZE ,
1921 AlertKind ,
2022 AlertOperation ,
2123 AlertStatus ,
2224 BatteryKind ,
2325 BluetoothAddress ,
2426 DataFormat ,
27+ EndInfo ,
2528 ErrorCode ,
2629 HubAction ,
2730 HubKind ,
3134 HwNetExtFamily ,
3235 HwNetFamily ,
3336 HwNetSubfamily ,
37+ InfoKind ,
3438 IODeviceCapabilities ,
3539 IODeviceKind ,
3640 IODeviceMapping ,
3741 IOEvent ,
38- InfoKind ,
39- LWPVersion ,
4042 LastNetwork ,
41- MAX_NAME_SIZE ,
43+ LWPVersion ,
4244 MessageKind ,
4345 ModeCapabilities ,
4446 ModeInfoKind ,
4547 PortID ,
4648 PortInfoFormatSetupCommand ,
49+ PortOutputCommand ,
50+ StartInfo ,
4751 Version ,
4852 VirtualPortSetupCommand ,
4953)
54+ from ...tools .checksum import xor_bytes
5055
5156
5257class AbstractMessage (abc .ABC ):
@@ -747,7 +752,7 @@ def __init__(self) -> None:
747752 @property
748753 def key (self ) -> bytes :
749754 """Safety string."""
750- return self ._data [3 :]
755+ return bytes ( self ._data [3 :])
751756
752757
753758###############################################################################
@@ -1365,6 +1370,197 @@ def __repr__(self) -> str:
13651370 return f"{ self .__class__ .__name__ } ({ repr (self .port_a )} , { repr (self .port_b )} )"
13661371
13671372
1373+ ###############################################################################
1374+ # Port output messages
1375+ ###############################################################################
1376+
1377+
1378+ class AbstractPortOutputCommandMessage (AbstractMessage ):
1379+ @abc .abstractmethod
1380+ def __init__ (
1381+ self ,
1382+ length : int ,
1383+ port : PortID ,
1384+ start : StartInfo ,
1385+ end : EndInfo ,
1386+ command : PortOutputCommand ,
1387+ ) -> None :
1388+ super ().__init__ (length , MessageKind .PORT_OUTPUT_CMD )
1389+
1390+ self ._data [3 ] = port
1391+ self ._data [4 ] = start | end
1392+ self ._data [5 ] = command
1393+
1394+ @property
1395+ def port (self ) -> PortID :
1396+ return PortID (self ._data [3 ])
1397+
1398+ @property
1399+ def start (self ) -> StartInfo :
1400+ return StartInfo (self ._data [4 ] & 0xF0 )
1401+
1402+ @property
1403+ def end (self ) -> EndInfo :
1404+ return EndInfo (self ._data [4 ] & 0x0F )
1405+
1406+ @property
1407+ def command (self ) -> PortOutputCommand :
1408+ return PortOutputCommand (self ._data [5 ])
1409+
1410+ @abc .abstractmethod
1411+ def __repr__ (self , extra : str ) -> str :
1412+ return f"{ self .__class__ .__name__ } ({ repr (self .port )} , { repr (self .start )} , { repr (self .end )} , { extra } )"
1413+
1414+
1415+ class PortOutputCommandWriteDirectMessage (AbstractPortOutputCommandMessage ):
1416+ def __init__ (
1417+ self , port : PortID , start : StartInfo , end : EndInfo , payload : bytes
1418+ ) -> None :
1419+ super ().__init__ (
1420+ 6 + len (payload ), port , start , end , PortOutputCommand .WRITE_DIRECT
1421+ )
1422+
1423+ if xor_bytes (payload ) != 0x00 :
1424+ raise ValueError ("payload has invalid checksum" )
1425+
1426+ self ._data [6 :] = payload
1427+
1428+ @property
1429+ def payload (self ) -> bytes :
1430+ return bytes (self ._data [6 :])
1431+
1432+ def __repr__ (self ) -> str :
1433+ return super ().__repr__ (repr (self .payload ))
1434+
1435+
1436+ class PortOutputCommandWriteDirectModeDataMessage (AbstractPortOutputCommandMessage ):
1437+ def __init__ (
1438+ self ,
1439+ port : PortID ,
1440+ start : StartInfo ,
1441+ end : EndInfo ,
1442+ mode : int ,
1443+ fmt : str ,
1444+ * values : Union [int , float ],
1445+ ) -> None :
1446+ super ().__init__ (
1447+ 7 + struct .calcsize (fmt ),
1448+ port ,
1449+ start ,
1450+ end ,
1451+ PortOutputCommand .WRITE_DIRECT_MODE_DATA ,
1452+ )
1453+
1454+ self ._data [6 ] = mode
1455+ struct .pack_into (fmt , self ._data , 7 , * values )
1456+
1457+ @property
1458+ def mode (self ) -> int :
1459+ return self ._data [6 ]
1460+
1461+ def unpack (self , fmt : str ) -> Tuple [Union [int , float ], ...]:
1462+ return struct .unpack_from (fmt , self ._data , 7 )
1463+
1464+ def __repr__ (self ) -> str :
1465+ fmt = f"<{ len (self ._data ) - 7 } b"
1466+ values = ", " .join (repr (d ) for d in self .unpack (fmt ))
1467+ return super ().__repr__ (f"{ repr (self .mode )} , { repr (fmt )} , { values } " )
1468+
1469+
1470+ class PortOutputCommandFeedbackMessage (AbstractMessage ):
1471+ @overload
1472+ def __init__ (self , port : PortID , feedback : Feedback ) -> None :
1473+ ...
1474+
1475+ @overload
1476+ def __init__ (
1477+ self , port1 : PortID , feedback1 : Feedback , port2 : PortID , feedback2 : Feedback
1478+ ) -> None :
1479+ ...
1480+
1481+ @overload
1482+ def __init__ (
1483+ self ,
1484+ port1 : PortID ,
1485+ feedback1 : Feedback ,
1486+ port2 : PortID ,
1487+ feedback2 : Feedback ,
1488+ port3 : PortID ,
1489+ feedback3 : Feedback ,
1490+ ) -> None :
1491+ ...
1492+
1493+ def __init__ (
1494+ self ,
1495+ port1 : PortID ,
1496+ feedback1 : Feedback ,
1497+ port2 : Optional [PortID ] = None ,
1498+ feedback2 : Optional [Feedback ] = None ,
1499+ port3 : Optional [PortID ] = None ,
1500+ feedback3 : Optional [Feedback ] = None ,
1501+ ) -> None :
1502+
1503+ length = 5
1504+
1505+ if port2 is not None :
1506+ length += 2
1507+
1508+ if port3 is not None :
1509+ length += 2
1510+
1511+ super ().__init__ (length , MessageKind .PORT_OUTPUT_CMD_FEEDBACK )
1512+
1513+ self ._data [3 ] = port1
1514+ self ._data [4 ] = feedback1
1515+
1516+ if port2 is not None :
1517+ self ._data [5 ] = port2
1518+ self ._data [6 ] = feedback2
1519+
1520+ if port3 is not None :
1521+ self ._data [7 ] = port3
1522+ self ._data [8 ] = feedback3
1523+
1524+ @property
1525+ def port1 (self ) -> PortID :
1526+ return PortID (self ._data [3 ])
1527+
1528+ @property
1529+ def feedback1 (self ) -> Feedback :
1530+ return Feedback (self ._data [4 ])
1531+
1532+ @property
1533+ def port2 (self ) -> PortID :
1534+ try :
1535+ return PortID (self ._data [5 ])
1536+ except IndexError :
1537+ return None
1538+
1539+ @property
1540+ def feedback2 (self ) -> Feedback :
1541+ try :
1542+ return Feedback (self ._data [6 ])
1543+ except IndexError :
1544+ return None
1545+
1546+ @property
1547+ def port3 (self ) -> PortID :
1548+ try :
1549+ return PortID (self ._data [7 ])
1550+ except IndexError :
1551+ return None
1552+
1553+ @property
1554+ def feedback3 (self ) -> Feedback :
1555+ try :
1556+ return Feedback (self ._data [8 ])
1557+ except IndexError :
1558+ return None
1559+
1560+ def __repr__ (self ) -> str :
1561+ return f"{ self .__class__ .__name__ } ({ repr (self .port1 )} , { repr (self .feedback1 )} , { repr (self .port2 )} , { repr (self .feedback2 )} , { repr (self .port3 )} , { repr (self .feedback3 )} )"
1562+
1563+
13681564###############################################################################
13691565# Message parsing
13701566###############################################################################
@@ -1455,6 +1651,11 @@ class _Lookup(NamedTuple):
14551651 VirtualPortSetupCommand .CONNECT : VirtualPortSetupConnectMessage ,
14561652}
14571653
1654+ _OUTPUT_CMD_CLASS_MAP = {
1655+ PortOutputCommand .WRITE_DIRECT : PortOutputCommandWriteDirectMessage ,
1656+ PortOutputCommand .WRITE_DIRECT_MODE_DATA : PortOutputCommandWriteDirectModeDataMessage ,
1657+ }
1658+
14581659# base type descriminator for messages
14591660_MESSAGE_CLASS_MAP = {
14601661 MessageKind .HUB_PROPERTY : _Lookup (4 , _HUB_PROPERTY_OP_CLASS_MAP ),
@@ -1477,6 +1678,8 @@ class _Lookup(NamedTuple):
14771678 MessageKind .PORT_INPUT_FMT : PortInputFormatMessage ,
14781679 MessageKind .PORT_INPUT_FMT_COMBO : PortInputFormatComboMessage ,
14791680 MessageKind .VIRTUAL_PORT_SETUP : _Lookup (3 , _VIRTUAL_PORT_SETUP_CLASS_MAP ),
1681+ MessageKind .PORT_OUTPUT_CMD : _Lookup (5 , _OUTPUT_CMD_CLASS_MAP ),
1682+ MessageKind .PORT_OUTPUT_CMD_FEEDBACK : PortOutputCommandFeedbackMessage ,
14801683}
14811684
14821685
0 commit comments