@@ -71,16 +71,28 @@ def read(self, register, length) -> bytes:
71
71
def write (self , register , data : bytes ) -> None :
72
72
raise NotImplementedError ("Subclasses should implement this method!" )
73
73
74
- def read_i2c_slave (self , length ) -> bytes :
74
+ def readfrom_mem (self , addr : int , mem_addr : int , nbytes : int ) -> bytes : # 0x60
75
75
raise NotImplementedError ("Subclasses should implement this method!" )
76
76
77
- def write_i2c_slave (self , byte_list , stop_bit ) -> None :
77
+ def readfrom_mem_into (self , addr : int , mem_addr : int , buf : bytearray ) -> None : # 0x60
78
78
raise NotImplementedError ("Subclasses should implement this method!" )
79
79
80
- def read_i2c_slave_register (self , register , length ) -> bytes :
80
+ def writeto_mem (self , addr : int , mem_addr : int , byte_list ) -> Literal [ True ]: # 0x61
81
81
raise NotImplementedError ("Subclasses should implement this method!" )
82
82
83
- def write_i2c_slave_register (self , register , bytes ) -> None :
83
+ def readfrom (self , addr : int , nbytes : int ): # 0x62
84
+ raise NotImplementedError ("Subclasses should implement this method!" )
85
+
86
+ def readfrom_into (self , addr : int , buf : bytearray ) -> None : # 0x62
87
+ raise NotImplementedError ("Subclasses should implement this method!" )
88
+
89
+ def writeto (self , addr : int , buf : bytes | bytearray , stop : bool = True ):
90
+ raise NotImplementedError ("Subclasses should implement this method!" )
91
+
92
+ def scan (self ) -> list :
93
+ raise NotImplementedError ("Subclasses should implement this method!" )
94
+
95
+ def deinit (self ) -> None :
84
96
raise NotImplementedError ("Subclasses should implement this method!" )
85
97
86
98
#! MOTOR CONFIGURATION API !#
@@ -484,7 +496,7 @@ def write(self, register, bytes) -> None:
484
496
485
497
486
498
class Roller485 (RollerBase ):
487
- def __init__ (self , bus , address = _ROLLER485_RS485_ADDR ) -> None :
499
+ def __init__ (self , bus , address = _ROLLER485_RS485_ADDR , mode = None ) -> None :
488
500
"""! Initialize the Roller485 object.
489
501
490
502
@param bus: The RS485 bus instance.
@@ -581,6 +593,7 @@ def read_response(self, cmd, id):
581
593
if resp_buf [0 :2 ] == b"\xAA \x55 " :
582
594
if resp_buf [2 :4 ] == bytes ([(0x10 + cmd ), id ]): # Write cmd+10 = response cmd
583
595
if self ._crc8 (resp_buf [2 :- 1 ]) == resp_buf [- 1 ]:
596
+ print (f"Response: { [hex (byte ) for byte in resp_buf ]} " )
584
597
return (True , resp_buf [2 :- 1 ])
585
598
time .sleep_ms (100 )
586
599
return (False , 0 )
@@ -603,7 +616,7 @@ def _crc8(self, buffer) -> int:
603
616
604
617
605
618
class Roller485ToI2CBus (Roller485 ):
606
- def __init__ (self , bus , address = _ROLLER485_RS485_ADDR , i2c_address = 0x64 ) -> None :
619
+ def __init__ (self , bus , address = _ROLLER485_RS485_ADDR , mode = None ) -> None :
607
620
"""! Initialize the Roller485ToI2CBus object.
608
621
609
622
@param bus: The RS485 bus instance.
@@ -612,90 +625,85 @@ def __init__(self, bus, address=_ROLLER485_RS485_ADDR, i2c_address=0x64) -> None
612
625
"""
613
626
self ._rs485_bus = bus
614
627
self ._rs485_addr = address # Motor ID == address
615
- self ._i2c_addr = i2c_address # I2C slave address
616
628
super ().__init__ (bus , address = address )
617
629
618
- def read_i2c_slave (self , length ):
619
- """! Read data from the I2C slave device via RS485.
620
-
621
- @param length: The number of bytes to read.
622
- @return: The data read from the I2C slave.
623
- @throws Exception: If the read operation fails.
624
- """
625
- self .send_command (
626
- _READ_I2C_SLAVE_ADDR , self ._rs485_addr , [self ._i2c_addr , length ], buf_len = 5
627
- )
628
- success , output = self .read_response (_READ_I2C_SLAVE_ADDR , self ._rs485_addr )
629
- if success and output [2 ]:
630
- return output [8 : (8 + length )]
631
- else :
632
- raise Exception (
633
- f"Read I2C Slave failed: register { self ._i2c_addr :#04x} , length { length } "
634
- )
635
-
636
- def write_i2c_slave (self , byte_list , stop_bit ):
637
- """! Write data to the I2C slave device via RS485.
638
-
639
- @param byte_list: The data bytes to write.
640
- @param stop_bit: Whether to send a stop bit after writing.
641
- @return: True if the write operation is successful.
642
- @throws Exception: If the write operation fails.
643
- """
644
- data_len = len (byte_list )
645
- data = [self ._i2c_addr , data_len , stop_bit , 0 , 0 , 0 ] + list (byte_list )
646
- self .send_command (_WRITE_I2C_SLAVE_ADDR , self ._rs485_addr , data , buf_len = 25 )
647
- success , output = self .read_response (_WRITE_I2C_SLAVE_ADDR , self ._rs485_addr )
648
- if success and output [2 ]:
649
- return True
650
- else :
651
- raise Exception ("Write to I2C Slave failed" )
652
-
653
- def read_i2c_slave_register (self , register , length ) -> bytes :
654
- """! Read data from a specific register of the I2C slave device.
655
-
656
- @param register: The register address to read from.
657
- @param length: The number of bytes to read.
658
- @return: The data read from the register.
659
- @throws Exception: If the read operation fails.
660
- """
661
- byte_len = 2 if register > 0xFF else 1
630
+ def readfrom_mem (self , addr : int , mem_addr : int , nbytes : int ) -> bytes : # 0x60
631
+ byte_len = 2 if mem_addr > 0xFF else 1
662
632
if byte_len == 1 :
663
- data = [self . _i2c_addr , 0 , register , 0 , length ]
633
+ data = [addr , 0 , mem_addr , 0 , nbytes ]
664
634
else :
665
- data = [self . _i2c_addr , 1 , register , (register >> 8 ), length ]
635
+ data = [addr , 1 , mem_addr , (mem_addr >> 8 ), nbytes ]
666
636
self .send_command (_READ_I2C_SLAVE_REG_ADDR , self ._rs485_addr , data , buf_len = 8 )
667
637
success , output = self .read_response (_READ_I2C_SLAVE_REG_ADDR , self ._rs485_addr )
668
638
if success and output [2 ]:
669
- return output [8 : (8 + length )]
639
+ return output [8 : (8 + nbytes )]
670
640
else :
671
641
raise Exception (
672
- f"Read I2C Slave Register failed: register { register :#04x} , length { length } "
642
+ f"Read I2C Slave Memory Register failed: register { mem_addr :#04x} , nbytes { nbytes } "
673
643
)
674
644
675
- def write_i2c_slave_register (self , register , byte_list ) -> Literal [True ]:
676
- """! Write data to a specific register of the I2C slave device.
645
+ def readfrom_mem_into (self , addr : int , mem_addr : int , buf : bytearray ) -> None : # 0x60
646
+ data = self .readfrom_mem (addr , mem_addr , len (buf ))
647
+ buf [: len (data )] = data
677
648
678
- @param register: The register address to write to.
679
- @param byte_list: The data bytes to write.
680
- @return: True if the write operation is successful.
681
- @throws Exception: If the write operation fails.
682
- """
683
- byte_len = 2 if register > 0xFF else 1
649
+ def writeto_mem (self , addr : int , mem_addr : int , byte_list ) -> Literal [True ]: # 0x61
650
+ byte_len = 2 if mem_addr > 0xFF else 1
684
651
data_len = len (byte_list )
685
652
if byte_len == 1 :
686
- data = [self . _i2c_addr , 0 , register , 0 , data_len , 0 ]
653
+ data = [addr , 0 , mem_addr , 0 , data_len , 0 ]
687
654
else :
688
- data = [self . _i2c_addr , 1 , register , (register >> 8 ), data_len , 0 ]
655
+ data = [addr , 1 , mem_addr , (mem_addr >> 8 ), data_len , 0 ]
689
656
data += list (byte_list )
690
657
self .send_command (_WRITE_I2C_SLAVE_REG_ADDR , self ._rs485_addr , data , buf_len = 25 )
691
658
success , output = self .read_response (_WRITE_I2C_SLAVE_REG_ADDR , self ._rs485_addr )
692
659
if success and output [2 ]:
693
660
return True
694
661
else :
695
662
raise Exception (
696
- f"Write to I2C Slave Register failed: register { register :#04x} , data { byte_list } "
663
+ f"Write to I2C Memory Register failed: register { mem_addr :#04x} , data { byte_list } "
697
664
)
698
665
666
+ def readfrom (self , addr : int , nbytes : int ): # 0x62
667
+ self .send_command (_READ_I2C_SLAVE_ADDR , self ._rs485_addr , [addr , nbytes ], buf_len = 5 )
668
+ success , output = self .read_response (_READ_I2C_SLAVE_ADDR , self ._rs485_addr )
669
+ if success and output [2 ]:
670
+ return output [8 : (8 + nbytes )]
671
+ else :
672
+ raise Exception (f"Read I2C Slave failed: register { addr :#04x} , nbytes { nbytes } " )
673
+
674
+ def readfrom_into (self , addr : int , buf : bytearray ) -> None : # 0x62
675
+ data = self .readfrom (addr , len (buf ))
676
+ buf [: len (data )] = data
677
+
678
+ def writeto (self , addr : int , buf : bytes | bytearray , stop : bool = True ):
679
+ data_len = len (buf )
680
+ data = [addr , data_len , stop , 0 , 0 , 0 ] + list (buf )
681
+
682
+ self .send_command (_WRITE_I2C_SLAVE_ADDR , self ._rs485_addr , data , buf_len = 25 )
683
+ success , output = self .read_response (_WRITE_I2C_SLAVE_ADDR , self ._rs485_addr )
684
+ if success and output [2 ]:
685
+ return True
686
+
687
+ def _writeto (self , addr : int , buf : bytes | bytearray , stop : bool = True ):
688
+ data_len = len (buf )
689
+ data = [addr , data_len , stop , 0 , 0 , 0 ] + list (buf )
690
+
691
+ self .send_command (_WRITE_I2C_SLAVE_ADDR , self ._rs485_addr , data , buf_len = 25 )
692
+ success , output = self .read_response (_WRITE_I2C_SLAVE_ADDR , self ._rs485_addr )
693
+ if success and output [2 ]:
694
+ return True
695
+
696
+ def scan (self ) -> list :
697
+ found_devices = []
698
+ for address in range (0x08 , 0x77 + 1 ):
699
+ try :
700
+ if self ._writeto (address , bytes (0x01 ), stop = False ):
701
+ found_devices .append (address )
702
+
703
+ except OSError :
704
+ pass
705
+ return found_devices
706
+
699
707
700
708
class Roller485Unit :
701
709
"""! A factory class to create Roller instances based on communication mode."""
@@ -718,4 +726,6 @@ def __new__(cls, *args, **kwargs) -> RollerBase:
718
726
cls .instance = Roller485 (args [0 ], ** kwargs )
719
727
elif kwargs ["mode" ] == cls ._RS485_TO_I2C_MODE :
720
728
cls .instance = Roller485ToI2CBus (args [0 ], ** kwargs )
729
+ else :
730
+ raise ValueError ("Invalid mode specified" )
721
731
return cls .instance
0 commit comments