@@ -76,14 +76,14 @@ def inventory(self) -> bytes:
76
76
self ._verbose and print ("RSSI: %d, PC: %d, EPC: %s, CRC: %d" % (rssi , pc , epc , crc ))
77
77
return epc
78
78
79
- def continuous_inventory (self , count : int ):
79
+ def continuous_inventory (self , count : int ) -> tuple :
80
80
struct .pack_into (">BH" , self ._buffer3 , 0 , 0x22 , count )
81
81
rxdata , status = self .command (0x27 , data = bytes (self ._buffer1 ))
82
82
if status is False :
83
- return
83
+ return ( 0 , 0 , b"" , 0 )
84
84
rssi , pc , epc , crc = struct .unpack_from (">BH%dsH" % (len (rxdata ) - 5 ), rxdata )
85
85
self ._verbose and print ("RSSI: %d, PC: %d, EPC: %s, CRC: %d" % (rssi , pc , epc , crc ))
86
- return rssi , pc , epc , crc
86
+ return ( rssi , pc , epc , crc )
87
87
88
88
def stop_continuous_inventory (self ) -> bool :
89
89
rxdata , status = self .command (0x28 )
@@ -97,7 +97,7 @@ def select(
97
97
pointer : int ,
98
98
truncate : bool ,
99
99
mask : bytes ,
100
- ):
100
+ ) -> bool :
101
101
buffer = struct .pack (
102
102
">BIBB%ds" % (len (mask )),
103
103
(target << 5 | action << 3 | membank ),
@@ -160,7 +160,7 @@ def read_mem_bank(
160
160
161
161
def write_mem_bank (
162
162
self , bank : int , offset : int , data : str , access : bytes = b"\x00 \x00 \x00 \x00 "
163
- ):
163
+ ) -> bool :
164
164
data = bytes .fromhex (data )
165
165
buffer = bytearray (9 + len (data ))
166
166
struct .pack_into (
@@ -182,7 +182,7 @@ def lock_mem_bank(
182
182
tid_lock : int = 0b00 ,
183
183
user_lock : int = 0b00 ,
184
184
access : bytes = b"\x00 \x00 \x00 \x00 " ,
185
- ):
185
+ ) -> bool :
186
186
payload = 0b1111_1111_1100_0000_0000
187
187
payload |= kill_lock << 8
188
188
payload |= access_lock << 6
@@ -205,19 +205,19 @@ def lock_mem_bank(
205
205
(_ , _ , _ , param ) = struct .unpack_from (">BH%dsB" % (len (rxdata ) - 4 ), rxdata )
206
206
return True if param == 0x00 else False
207
207
208
- def set_access_password (self , password : bytes ):
208
+ def set_access_password (self , password : bytes ) -> None :
209
209
self .write_mem_bank (self , self .RFU , 0x00 , password )
210
210
211
- def set_kill_password (self , password : bytes ):
211
+ def set_kill_password (self , password : bytes ) -> None :
212
212
self .write_mem_bank (self , self .RFU , 0x20 , password )
213
213
214
- def kill (self , password : bytes = b"\x00 \x00 \x00 \x00 " ):
214
+ def kill (self , password : bytes = b"\x00 \x00 \x00 \x00 " ) -> bool :
215
215
rxdata , status = self .command (0x65 , data = password )
216
216
return True if status and rxdata == b"\x00 " else False
217
217
218
218
def set_query_param (
219
219
self , dr = 0b0 , m = 0b00 , tr_ext = 0b1 , sel = 0b00 , session = 0b00 , target = 0b0 , q = 0b0100
220
- ):
220
+ ) -> bool :
221
221
if dr != 0b0 and m != 0b00 and tr_ext != 0b1 :
222
222
raise ValueError ("Invalid query parameters" )
223
223
payload = 0b0000_0000_0000_0000
@@ -326,58 +326,58 @@ def get_blocking_signal_strength(self, channel: int) -> int:
326
326
strengths = self .get_blocking_signal_strength_all ()
327
327
return strengths [channel ] if strengths else - 1
328
328
329
- def get_blocking_signal_strength_all (self ) -> [ tuple | None ] :
329
+ def get_blocking_signal_strength_all (self ) -> tuple :
330
330
rxdata , status = self .command (0xF2 )
331
331
if status is False :
332
- return None
332
+ return ( - 1 for _ in range ( 20 ))
333
333
fmt = "" .join ("b" for _ in range ((len (rxdata ) - 2 )))
334
334
return struct .unpack_from (">%s" % (fmt ), rxdata , 2 )
335
335
336
336
def get_channel_rssi (self , channel : int ) -> int :
337
337
return self .get_channel_rssi_all ()[channel ]
338
338
339
- def get_channel_rssi_all (self ) -> [ tuple | None ] :
339
+ def get_channel_rssi_all (self ) -> tuple :
340
340
rxdata , status = self .command (0xF3 )
341
341
if status is False :
342
- return None
342
+ return ( - 1 for _ in range ( 20 ))
343
343
fmt = "" .join ("b" for _ in range ((len (rxdata ) - 2 )))
344
344
return struct .unpack_from (">%s" % (fmt ), rxdata , 2 )
345
345
346
346
def sleep (self ) -> bool :
347
347
rxdata , status = self .command (0x17 )
348
348
return True if status and rxdata == b"\x00 " else False
349
349
350
- def wake (self ):
350
+ def wake (self ) -> None :
351
351
self ._uart .write (b"\x55 " )
352
352
time .sleep (0.1 )
353
353
354
- def set_automatic_sleep_time (self , min : int ):
354
+ def set_automatic_sleep_time (self , min : int ) -> bool :
355
355
struct .pack_into (">B" , self ._buffer1 , 0 , min )
356
356
rxdata , status = self .command (0x1D , data = self ._buffer1 )
357
357
return True if status and rxdata == b"\x00 " else False
358
358
359
- def disable_automatic_sleep (self ):
359
+ def disable_automatic_sleep (self ) -> bool :
360
360
return self .set_automatic_sleep_time (0 )
361
361
362
- def nxp_read_protect (self , set , access_password : bytes = b"\x00 \x00 \x00 \x00 " ):
362
+ def nxp_read_protect (self , set , access_password : bytes = b"\x00 \x00 \x00 \x00 " ) -> bool :
363
363
struct .pack_into (">4sB" , self ._buffer , 0 , access_password , set )
364
364
rxdata , status = self .command (0xE1 , data = self ._buffer [0 :5 ])
365
365
return True if status and rxdata == b"\x00 " else False
366
366
367
- def nxp_change_eas (self , set , access_password : bytes = b"\x00 \x00 \x00 \x00 " ):
368
- struct .pack_into (">4sB" , self ._buffer , 0 , access_password , 0x01 )
367
+ def nxp_change_eas (self , set , access_password : bytes = b"\x00 \x00 \x00 \x00 " ) -> bool :
368
+ struct .pack_into (">4sB" , self ._buffer , 0 , access_password , set )
369
369
rxdata , status = self .command (0xE3 , data = self ._buffer [0 :5 ])
370
370
return True if status and rxdata == b"\x00 " else False
371
371
372
- def nxp_eas_alarm (self ):
372
+ def nxp_eas_alarm (self ) -> bytes :
373
373
rxdata , status = self .command (0xE4 )
374
374
if status is False :
375
375
return b""
376
376
377
377
(code ,) = struct .unpack_from (">8s" , rxdata )
378
378
return code
379
379
380
- def nxp_read_config_word (self , access_password : bytes = b"\x00 \x00 \x00 \x00 " ):
380
+ def nxp_read_config_word (self , access_password : bytes = b"\x00 \x00 \x00 \x00 " ) -> int :
381
381
struct .pack_into (">4sH" , self ._buffer , 0 , access_password , 0x0000 )
382
382
rxdata , status = self .command (0xE2 , data = self ._buffer [0 :6 ])
383
383
if status is False :
@@ -386,20 +386,24 @@ def nxp_read_config_word(self, access_password: bytes = b"\x00\x00\x00\x00"):
386
386
(config_word ,) = struct .unpack_from (">H" , rxdata , 1 )
387
387
return config_word
388
388
389
- def set_nxp_confog_word (self , config_word , access_password : bytes = b"\x00 \x00 \x00 \x00 " ):
389
+ def set_nxp_confog_word (
390
+ self , config_word : int , access_password : bytes = b"\x00 \x00 \x00 \x00 "
391
+ ) -> bool :
390
392
old_config_word = self .nxp_read_config_word (access_password )
391
393
config_word = config_word ^ old_config_word
392
394
struct .pack_into (">4sH" , self ._buffer , 0 , access_password , config_word )
393
395
rxdata , status = self .command (0xE2 , data = self ._buffer [0 :6 ])
394
396
return True if status and rxdata == b"\x00 " else False
395
397
396
- def get_impinj_monza_qt_sr (self , persistence , password : bytes = b"\x00 \x00 \x00 \x00 " ):
398
+ def get_impinj_monza_qt_sr (self , persistence , password : bytes = b"\x00 \x00 \x00 \x00 " ) -> bool :
397
399
return self .impinj_monza_qt_read_cmd (persistence , password = password )[0 ]
398
400
399
- def get_impinj_monza_qt_mem (self , persistence , password : bytes = b"\x00 \x00 \x00 \x00 " ):
401
+ def get_impinj_monza_qt_mem (self , persistence , password : bytes = b"\x00 \x00 \x00 \x00 " ) -> bool :
400
402
return self .impinj_monza_qt_read_cmd (persistence , password = password )[1 ]
401
403
402
- def impinj_monza_qt_read_cmd (self , persistence , password : bytes = b"\x00 \x00 \x00 \x00 " ):
404
+ def impinj_monza_qt_read_cmd (
405
+ self , persistence , password : bytes = b"\x00 \x00 \x00 \x00 "
406
+ ) -> tuple :
403
407
# TODO
404
408
payload = 0b0000_0000_0000_0000
405
409
struct .pack_into (">4sBBH" , self ._buffer8 , 0 , password , 0x00 , persistence , payload )
@@ -412,19 +416,19 @@ def impinj_monza_qt_read_cmd(self, persistence, password: bytes = b"\x00\x00\x00
412
416
413
417
def set_impinj_monza_qt_sr (
414
418
self , persistence , qt_sr : bool , password : bytes = b"\x00 \x00 \x00 \x00 "
415
- ):
419
+ ) -> bool :
416
420
_ , qt_mem = self .impinj_monza_qt_read_cmd (persistence , password )
417
421
return self .impinj_monza_qt_write_cmd (persistence , qt_sr , qt_mem , password )
418
422
419
423
def set_impinj_monza_qt_mem (
420
424
self , persistence , qt_mem : bool , password : bytes = b"\x00 \x00 \x00 \x00 "
421
- ):
425
+ ) -> bool :
422
426
qt_sr , _ = self .impinj_monza_qt_read_cmd (persistence , password )
423
427
return self .impinj_monza_qt_write_cmd (persistence , qt_sr , qt_mem , password )
424
428
425
429
def impinj_monza_qt_write_cmd (
426
430
self , persistence , qt_sr : bool , qt_mem : bool , password : bytes = b"\x00 \x00 \x00 \x00 "
427
- ):
431
+ ) -> bool :
428
432
payload = 0b0000_0000_0000_0000
429
433
payload |= int (qt_sr ) << 15
430
434
payload |= int (qt_mem ) << 14
@@ -436,12 +440,12 @@ def impinj_monza_qt_write_cmd(
436
440
(ul , pc , epc , param ) = struct .unpack_from (">BH%dsH" % (len (rxdata ) - 4 ), rxdata )
437
441
return True if param == 0x00 else False
438
442
439
- def command (self , cmd , data = b"" , timeout = 5000 ):
443
+ def command (self , cmd , data = b"" , timeout = 5000 ) -> tuple :
440
444
self ._send (cmd , data )
441
445
rxdata , status = self ._receive (length = 100 , timeout = timeout )
442
446
return rxdata , status
443
447
444
- def _send (self , cmd , data = b"" ):
448
+ def _send (self , cmd , data = b"" ) -> None :
445
449
checksum = self ._checksum (self ._FRAME_CMD , cmd , len (data ), data )
446
450
frame = struct .pack (
447
451
"!BBBH%dsBB" % len (data ),
@@ -456,7 +460,7 @@ def _send(self, cmd, data=b""):
456
460
self ._verbose and print ("Frame to send: %s" % (" " .join (f"{ byte :02x} " for byte in frame )))
457
461
self ._uart .write (frame )
458
462
459
- def _receive (self , length : int = 8 , timeout = 1000 ):
463
+ def _receive (self , length : int = 8 , timeout = 1000 ) -> tuple :
460
464
_buffer = b""
461
465
startpos = - 1
462
466
endpos = - 1
@@ -495,7 +499,7 @@ def _receive(self, length: int = 8, timeout=1000):
495
499
self ._verbose and print ("Malformed packet received, ignore it" )
496
500
return b"" , False
497
501
498
- def _checksum (self , * args ):
502
+ def _checksum (self , * args ) -> int :
499
503
chcksum = 0
500
504
for arg in args :
501
505
if isinstance (arg , int ):
@@ -505,7 +509,7 @@ def _checksum(self, *args):
505
509
chcksum += x
506
510
return chcksum & 0xFF
507
511
508
- def check_error_code (self , code ):
512
+ def check_error_code (self , code ) -> None :
509
513
if code == 0x17 :
510
514
raise Exception ("Invalid command" )
511
515
if code == 0x20 :
0 commit comments