@@ -111,29 +111,11 @@ def transmit(self, cmd: Optional[int] = None,
111111 :param out_len: the count of meaningful bytes to receive
112112 :param release: whether to release /CS line (to manage transactions)
113113 """
114- if isinstance (in_payload , int ):
115- in_payload = bytes ([0xff ] * in_payload )
116- elif in_payload is not None :
117- assert isinstance (in_payload , (bytes , bytearray ))
118- else :
119- in_payload = bytes ()
120- assert isinstance (out_len , int ) and 0 <= out_len <= 0xffff
121- if cmd is not None :
122- assert 0 <= cmd <= 0xff
123- tx_payload = b'' .join ((bytes ([cmd ]), in_payload , bytes (out_len )))
124- else :
125- tx_payload = b'' .join ((in_payload , bytes (out_len )))
126- if self ._rev_tx :
127- tx_payload = bytes (SpiDevice .rev8 (x ) for x in tx_payload )
128- rx_payload = self ._exchange (tx_payload , release )
129- if self ._rev_rx :
130- rx_payload = bytes (SpiDevice .rev8 (x ) for x in rx_payload )
131- assert len (rx_payload ) == len (tx_payload )
132- return rx_payload [- out_len :]
114+ return self ._transmit (cmd , in_payload , out_len , release )
133115
134116 def read_status_register (self ) -> int :
135117 """Read out the flash status register."""
136- resp = self .transmit (self .COMMANDS ['READ_STATUS' ], out_len = 1 )
118+ resp = self ._transmit (self .COMMANDS ['READ_STATUS' ], out_len = 1 )
137119 return resp [0 ]
138120
139121 def is_busy (self , sreg : Optional [int ] = None ) -> bool :
@@ -148,7 +130,7 @@ def is_wel(self, sreg: Optional[int] = None) -> bool:
148130 sreg = self .read_status_register ()
149131 return bool (sreg & self .WEL )
150132
151- def wait_idle (self , timeout : float = 1.0 , pace : float = 0.0005 ):
133+ def wait_idle (self , timeout : float = 1.0 , pace : float = 0.0005 ) -> None :
152134 """Wait for the flash device to become idle.
153135
154136 :param timeout: raise a TimeoutError if flash does not become
@@ -326,6 +308,29 @@ def _build_cs_header(self, size: int, release: bool = True) -> bytes:
326308 self ._log .debug ('Header: %s' , hexlify (header ).decode ())
327309 return header
328310
311+ def _transmit (self , cmd : Optional [int ] = None ,
312+ in_payload : Optional [bytes | bytearray | int ] = None ,
313+ out_len : int = 0 , release : bool = True ) -> bytes :
314+ if isinstance (in_payload , int ):
315+ in_payload = bytes ([0xff ] * in_payload )
316+ elif in_payload is not None :
317+ assert isinstance (in_payload , (bytes , bytearray ))
318+ else :
319+ in_payload = bytes ()
320+ assert isinstance (out_len , int ) and 0 <= out_len <= 0xffff
321+ if cmd is not None :
322+ assert 0 <= cmd <= 0xff
323+ tx_payload = b'' .join ((bytes ([cmd ]), in_payload , bytes (out_len )))
324+ else :
325+ tx_payload = b'' .join ((in_payload , bytes (out_len )))
326+ if self ._rev_tx :
327+ tx_payload = bytes (SpiDevice .rev8 (x ) for x in tx_payload )
328+ rx_payload = self ._exchange (tx_payload , release )
329+ if self ._rev_rx :
330+ rx_payload = bytes (SpiDevice .rev8 (x ) for x in rx_payload )
331+ assert len (rx_payload ) == len (tx_payload )
332+ return rx_payload [- out_len :]
333+
329334 def _send (self , buf : bytes , release : bool = True ):
330335 data = b'' .join ((self ._build_cs_header (len (buf ), release ), buf ))
331336 self ._log .debug ('TX[%d]: %s %s' , len (buf ), hexlify (buf ).decode (),
0 commit comments