From bdde4e18d1ae1c8e4468dedd83b587c972b5db1b Mon Sep 17 00:00:00 2001 From: foamyguy Date: Thu, 15 May 2025 19:33:56 +0000 Subject: [PATCH] change to ruff --- .gitattributes | 11 + .pre-commit-config.yaml | 43 +- .pylintrc | 399 ------------------ README.rst | 6 +- adafruit_wiznet5k/adafruit_wiznet5k.py | 141 +++---- adafruit_wiznet5k/adafruit_wiznet5k_debug.py | 11 +- adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py | 95 ++--- adafruit_wiznet5k/adafruit_wiznet5k_dns.py | 60 +-- .../adafruit_wiznet5k_socketpool.py | 101 ++--- docs/api.rst | 3 + docs/conf.py | 8 +- examples/wiznet5k_aio_post.py | 8 +- examples/wiznet5k_cheerlights.py | 10 +- ...iznet5k_cpython_client_for_simpleserver.py | 1 + examples/wiznet5k_httpserver.py | 5 +- examples/wiznet5k_simpleserver.py | 3 +- examples/wiznet5k_simpletest.py | 9 +- .../wiznet5k_simpletest_manual_network.py | 9 +- ruff.toml | 111 +++++ 19 files changed, 305 insertions(+), 729 deletions(-) create mode 100644 .gitattributes delete mode 100644 .pylintrc create mode 100644 ruff.toml diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..21c125c --- /dev/null +++ b/.gitattributes @@ -0,0 +1,11 @@ +# SPDX-FileCopyrightText: 2024 Justin Myers for Adafruit Industries +# +# SPDX-License-Identifier: Unlicense + +.py text eol=lf +.rst text eol=lf +.txt text eol=lf +.yaml text eol=lf +.toml text eol=lf +.license text eol=lf +.md text eol=lf diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 70ade69..ff19dde 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,42 +1,21 @@ -# SPDX-FileCopyrightText: 2020 Diego Elio Pettenò +# SPDX-FileCopyrightText: 2024 Justin Myers for Adafruit Industries # # SPDX-License-Identifier: Unlicense repos: - - repo: https://github.com/python/black - rev: 23.3.0 - hooks: - - id: black - - repo: https://github.com/fsfe/reuse-tool - rev: v1.1.2 - hooks: - - id: reuse - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.4.0 + rev: v4.5.0 hooks: - id: check-yaml - id: end-of-file-fixer - id: trailing-whitespace - - repo: https://github.com/pycqa/pylint - rev: v2.17.4 + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.3.4 hooks: - - id: pylint - name: pylint (library code) - types: [python] - args: - - --disable=consider-using-f-string - exclude: "^(docs/|examples/|tests/|setup.py$)" - - id: pylint - name: pylint (example code) - description: Run pylint rules on "examples/*.py" files - types: [python] - files: "^examples/" - args: - - --disable=missing-docstring,invalid-name,consider-using-f-string,duplicate-code - - id: pylint - name: pylint (test code) - description: Run pylint rules on "tests/*.py" files - types: [python] - files: "^tests/" - args: - - --disable=missing-docstring,consider-using-f-string,duplicate-code + - id: ruff-format + - id: ruff + args: ["--fix"] + - repo: https://github.com/fsfe/reuse-tool + rev: v3.0.1 + hooks: + - id: reuse diff --git a/.pylintrc b/.pylintrc deleted file mode 100644 index f945e92..0000000 --- a/.pylintrc +++ /dev/null @@ -1,399 +0,0 @@ -# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries -# -# SPDX-License-Identifier: Unlicense - -[MASTER] - -# A comma-separated list of package or module names from where C extensions may -# be loaded. Extensions are loading into the active Python interpreter and may -# run arbitrary code -extension-pkg-whitelist= - -# Add files or directories to the ignore-list. They should be base names, not -# paths. -ignore=CVS - -# Add files or directories matching the regex patterns to the ignore-list. The -# regex matches against base names, not paths. -ignore-patterns= - -# Python code to execute, usually for sys.path manipulation such as -# pygtk.require(). -#init-hook= - -# Use multiple processes to speed up Pylint. -jobs=1 - -# List of plugins (as comma separated values of python modules names) to load, -# usually to register additional checkers. -load-plugins=pylint.extensions.no_self_use - -# Pickle collected data for later comparisons. -persistent=yes - -# Specify a configuration file. -#rcfile= - -# Allow loading of arbitrary C extensions. Extensions are imported into the -# active Python interpreter and may run arbitrary code. -unsafe-load-any-extension=no - - -[MESSAGES CONTROL] - -# Only show warnings with the listed confidence levels. Leave empty to show -# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED -confidence= - -# Disable the message, report, category or checker with the given id(s). You -# can either give multiple identifiers separated by comma (,) or put this -# option multiple times (only on the command line, not in the configuration -# file where it should appear only once).You can also use "--disable=all" to -# disable everything first and then reenable specific checks. For example, if -# you want to run only the similarities checker, you can use "--disable=all -# --enable=similarities". If you want to run only the classes checker, but have -# no Warning level messages displayed, use"--disable=all --enable=classes -# --disable=W" -# disable=import-error,raw-checker-failed,bad-inline-option,locally-disabled,file-ignored,suppressed-message,useless-suppression,deprecated-pragma,deprecated-str-translate-call -disable=raw-checker-failed,bad-inline-option,locally-disabled,file-ignored,suppressed-message,useless-suppression,deprecated-pragma,import-error,pointless-string-statement,unspecified-encoding - -# Enable the message, report, category or checker with the given id(s). You can -# either give multiple identifier separated by comma (,) or put this option -# multiple time (only on the command line, not in the configuration file where -# it should appear only once). See also the "--disable" option for examples. -enable= - - -[REPORTS] - -# Python expression which should return a note less than 10 (10 is the highest -# note). You have access to the variables errors warning, statement which -# respectively contain the number of errors / warnings messages and the total -# number of statements analyzed. This is used by the global evaluation report -# (RP0004). -evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10) - -# Template used to display messages. This is a python new-style format string -# used to format the message information. See doc for all details -#msg-template= - -# Set the output format. Available formats are text, parseable, colorized, json -# and msvs (visual studio).You can also give a reporter class, eg -# mypackage.mymodule.MyReporterClass. -output-format=text - -# Tells whether to display a full report or only the messages -reports=no - -# Activate the evaluation score. -score=yes - - -[REFACTORING] - -# Maximum number of nested blocks for function / method body -max-nested-blocks=5 - - -[LOGGING] - -# Logging modules to check that the string format arguments are in logging -# function parameter format -logging-modules=logging - - -[SPELLING] - -# Spelling dictionary name. Available dictionaries: none. To make it working -# install python-enchant package. -spelling-dict= - -# List of comma separated words that should not be checked. -spelling-ignore-words= - -# A path to a file that contains private dictionary; one word per line. -spelling-private-dict-file= - -# Tells whether to store unknown words to indicated private dictionary in -# --spelling-private-dict-file option instead of raising a message. -spelling-store-unknown-words=no - - -[MISCELLANEOUS] - -# List of note tags to take in consideration, separated by a comma. -# notes=FIXME,XXX,TODO -notes=FIXME,XXX - - -[TYPECHECK] - -# List of decorators that produce context managers, such as -# contextlib.contextmanager. Add to this list to register other decorators that -# produce valid context managers. -contextmanager-decorators=contextlib.contextmanager - -# List of members which are set dynamically and missed by pylint inference -# system, and so shouldn't trigger E1101 when accessed. Python regular -# expressions are accepted. -generated-members= - -# Tells whether missing members accessed in mixin class should be ignored. A -# mixin class is detected if its name ends with "mixin" (case insensitive). -ignore-mixin-members=yes - -# This flag controls whether pylint should warn about no-member and similar -# checks whenever an opaque object is returned when inferring. The inference -# can return multiple potential results while evaluating a Python object, but -# some branches might not be evaluated, which results in partial inference. In -# that case, it might be useful to still emit no-member and other checks for -# the rest of the inferred objects. -ignore-on-opaque-inference=yes - -# List of class names for which member attributes should not be checked (useful -# for classes with dynamically set attributes). This supports the use of -# qualified names. -ignored-classes=optparse.Values,thread._local,_thread._local - -# List of module names for which member attributes should not be checked -# (useful for modules/projects where namespaces are manipulated during runtime -# and thus existing member attributes cannot be deduced by static analysis. It -# supports qualified module names, as well as Unix pattern matching. -ignored-modules=board - -# Show a hint with possible names when a member name was not found. The aspect -# of finding the hint is based on edit distance. -missing-member-hint=yes - -# The minimum edit distance a name should have in order to be considered a -# similar match for a missing member name. -missing-member-hint-distance=1 - -# The total number of similar names that should be taken in consideration when -# showing a hint for a missing member. -missing-member-max-choices=1 - - -[VARIABLES] - -# List of additional names supposed to be defined in builtins. Remember that -# you should avoid to define new builtins when possible. -additional-builtins= - -# Tells whether unused global variables should be treated as a violation. -allow-global-unused-variables=yes - -# List of strings which can identify a callback function by name. A callback -# name must start or end with one of those strings. -callbacks=cb_,_cb - -# A regular expression matching the name of dummy variables (i.e. expectedly -# not used). -dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_ - -# Argument names that match this expression will be ignored. Default to name -# with leading underscore -ignored-argument-names=_.*|^ignored_|^unused_ - -# Tells whether we should check for unused import in __init__ files. -init-import=no - -# List of qualified module names which can have objects that can redefine -# builtins. -redefining-builtins-modules=six.moves,future.builtins - - -[FORMAT] - -# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. -# expected-line-ending-format= -expected-line-ending-format=LF - -# Regexp for a line that is allowed to be longer than the limit. -ignore-long-lines=^\s*(# )??$ - -# Number of spaces of indent required inside a hanging or continued line. -indent-after-paren=4 - -# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 -# tab). -indent-string=' ' - -# Maximum number of characters on a single line. -max-line-length=100 - -# Maximum number of lines in a module -max-module-lines=1000 - -# Allow the body of a class to be on the same line as the declaration if body -# contains single statement. -single-line-class-stmt=no - -# Allow the body of an if to be on the same line as the test if there is no -# else. -single-line-if-stmt=no - - -[SIMILARITIES] - -# Ignore comments when computing similarities. -ignore-comments=yes - -# Ignore docstrings when computing similarities. -ignore-docstrings=yes - -# Ignore imports when computing similarities. -ignore-imports=yes - -# Minimum lines number of a similarity. -min-similarity-lines=12 - - -[BASIC] - -# Regular expression matching correct argument names -argument-rgx=(([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ - -# Regular expression matching correct attribute names -attr-rgx=(([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ - -# Bad variable names which should always be refused, separated by a comma -bad-names=foo,bar,baz,toto,tutu,tata - -# Regular expression matching correct class attribute names -class-attribute-rgx=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$ - -# Regular expression matching correct class names -# class-rgx=[A-Z_][a-zA-Z0-9]+$ -class-rgx=[A-Z_][a-zA-Z0-9_]+$ - -# Regular expression matching correct constant names -const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__))$ - -# Minimum line length for functions/classes that require docstrings, shorter -# ones are exempt. -docstring-min-length=-1 - -# Regular expression matching correct function names -function-rgx=(([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ - -# Good variable names which should always be accepted, separated by a comma -# good-names=i,j,k,ex,Run,_ -good-names=r,g,b,w,i,j,k,n,x,y,z,ex,ok,Run,_ - -# Include a hint for the correct naming format with invalid-name -include-naming-hint=no - -# Regular expression matching correct inline iteration names -inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$ - -# Regular expression matching correct method names -method-rgx=(([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ - -# Regular expression matching correct module names -module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$ - -# Colon-delimited sets of names that determine each other's naming style when -# the name regexes allow several styles. -name-group= - -# Regular expression which should only match function or class names that do -# not require a docstring. -no-docstring-rgx=^_ - -# List of decorators that produce properties, such as abc.abstractproperty. Add -# to this list to register other decorators that produce valid properties. -property-classes=abc.abstractproperty - -# Regular expression matching correct variable names -variable-rgx=(([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ - - -[IMPORTS] - -# Allow wildcard imports from modules that define __all__. -allow-wildcard-with-all=no - -# Analyse import fallback blocks. This can be used to support both Python 2 and -# 3 compatible code, which means that the block might have code that exists -# only in one or another interpreter, leading to false positives when analysed. -analyse-fallback-blocks=no - -# Deprecated modules which should not be used, separated by a comma -deprecated-modules=optparse,tkinter.tix - -# Create a graph of external dependencies in the given file (report RP0402 must -# not be disabled) -ext-import-graph= - -# Create a graph of every (i.e. internal and external) dependencies in the -# given file (report RP0402 must not be disabled) -import-graph= - -# Create a graph of internal dependencies in the given file (report RP0402 must -# not be disabled) -int-import-graph= - -# Force import order to recognize a module as part of the standard -# compatibility libraries. -known-standard-library= - -# Force import order to recognize a module as part of a third party library. -known-third-party=enchant - - -[CLASSES] - -# List of method names used to declare (i.e. assign) instance attributes. -defining-attr-methods=__init__,__new__,setUp - -# List of member names, which should be excluded from the protected access -# warning. -exclude-protected=_asdict,_fields,_replace,_source,_make - -# List of valid names for the first argument in a class method. -valid-classmethod-first-arg=cls - -# List of valid names for the first argument in a metaclass class method. -valid-metaclass-classmethod-first-arg=mcs - - -[DESIGN] - -# Maximum number of arguments for function / method -max-args=5 - -# Maximum number of attributes for a class (see R0902). -# max-attributes=7 -max-attributes=11 - -# Maximum number of boolean expressions in a if statement -max-bool-expr=5 - -# Maximum number of branch for function / method body -max-branches=12 - -# Maximum number of locals for function / method body -max-locals=15 - -# Maximum number of parents for a class (see R0901). -max-parents=7 - -# Maximum number of public methods for a class (see R0904). -max-public-methods=20 - -# Maximum number of return / yield for function / method body -max-returns=6 - -# Maximum number of statements in function / method body -max-statements=50 - -# Minimum number of public methods for a class (see R0903). -min-public-methods=1 - - -[EXCEPTIONS] - -# Exceptions that will emit a warning when being caught. Defaults to -# "Exception" -overgeneral-exceptions=builtins.Exception diff --git a/README.rst b/README.rst index 79ed6f8..e3b89f2 100644 --- a/README.rst +++ b/README.rst @@ -13,9 +13,9 @@ Introduction :target: https://github.com/adafruit/Adafruit_CircuitPython_Wiznet5k/actions :alt: Build Status -.. image:: https://img.shields.io/badge/code%20style-black-000000.svg - :target: https://github.com/psf/black - :alt: Code Style: Black +.. image:: https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json + :target: https://github.com/astral-sh/ruff + :alt: Code Style: Ruff Pure-Python interface for WIZNET 5k ethernet modules. diff --git a/adafruit_wiznet5k/adafruit_wiznet5k.py b/adafruit_wiznet5k/adafruit_wiznet5k.py index 046900d..b3d08bb 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k.py @@ -28,16 +28,15 @@ * Adafruit's Bus Device library: https://github.com/adafruit/Adafruit_CircuitPython_BusDevice """ -# pylint: disable=too-many-lines from __future__ import annotations try: - from typing import TYPE_CHECKING, Optional, Union, Tuple + from typing import TYPE_CHECKING, Optional, Tuple, Union if TYPE_CHECKING: - from circuitpython_typing import WriteableBuffer import busio import digitalio + from circuitpython_typing import WriteableBuffer IpAddress4Raw = Union[bytes, Tuple[int, int, int, int]] MacAddressRaw = Union[bytes, Tuple[int, int, int, int, int, int]] @@ -47,13 +46,14 @@ __version__ = "0.0.0+auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_Wiznet5k.git" -from random import randint -import time import gc -from micropython import const -from adafruit_ticks import ticks_ms, ticks_diff +import time +from random import randint from adafruit_bus_device.spi_device import SPIDevice +from adafruit_ticks import ticks_diff, ticks_ms +from micropython import const + import adafruit_wiznet5k.adafruit_wiznet5k_dhcp as dhcp import adafruit_wiznet5k.adafruit_wiznet5k_dns as dns from adafruit_wiznet5k.adafruit_wiznet5k_debug import debug_msg @@ -181,16 +181,15 @@ def _unprettyfy(data: str, seperator: str, correct_length: int) -> bytes: raise ValueError("Invalid IP or MAC address.") -class WIZNET5K: # pylint: disable=too-many-public-methods, too-many-instance-attributes +class WIZNET5K: """Interface for WIZNET5K module.""" _sockets_reserved = [] - # pylint: disable=too-many-arguments def __init__( self, spi_bus: busio.SPI, - cs: digitalio.DigitalInOut, # pylint: disable=invalid-name + cs: digitalio.DigitalInOut, reset: Optional[digitalio.DigitalInOut] = None, is_dhcp: bool = True, mac: Union[MacAddressRaw, str] = _DEFAULT_MAC, @@ -213,9 +212,7 @@ def __init__( """ self._debug = debug self._chip_type = None - self._device = SPIDevice( - spi_bus, cs, baudrate=spi_baudrate, polarity=0, phase=0 - ) + self._device = SPIDevice(spi_bus, cs, baudrate=spi_baudrate, polarity=0, phase=0) # init c.s. self._cs = cs @@ -305,11 +302,9 @@ def get_host_by_name(self, hostname: str) -> bytes: if isinstance(hostname, str): hostname = bytes(hostname, "utf-8") # Return IP assigned by DHCP - _dns_client = dns.DNS( - self, self.pretty_ip(bytearray(self._dns)), debug=self._debug - ) + _dns_client = dns.DNS(self, self.pretty_ip(bytearray(self._dns)), debug=self._debug) ipv4 = _dns_client.gethostbyname(hostname) - debug_msg("* Resolved IP: {}".format(ipv4), self._debug) + debug_msg(f"* Resolved IP: {ipv4}", self._debug) if ipv4 == -1: raise RuntimeError("Failed to resolve hostname!") return ipv4 @@ -406,7 +401,6 @@ def mac_address(self, address: Union[MacAddressRaw, str]) -> None: # Bytes conversion will raise ValueError if values are not 0-255 self._write(_REG_SHAR[self._chip_type], 0x04, bytes(address)) except ValueError: - # pylint: disable=raise-missing-from raise ValueError("Invalid MAC address.") @staticmethod @@ -463,10 +457,7 @@ def link_status(self) -> bool: :return bool: True if the link is up, False if the link is down. """ - return bool( - int.from_bytes(self._read(_REG_LINK_FLAG[self._chip_type], 0x00), "big") - & 0x01 - ) + return bool(int.from_bytes(self._read(_REG_LINK_FLAG[self._chip_type], 0x00), "big") & 0x01) @property def ifconfig(self) -> Tuple[bytes, bytes, bytes, bytes]: @@ -520,9 +511,7 @@ def socket_available(self, socket_num: int, sock_type: int = _SNMR_TCP) -> int: :raises ValueError: If the number of bytes on a UDP socket is negative. """ debug_msg( - "socket_available called on socket {}, protocol {}".format( - socket_num, sock_type - ), + f"socket_available called on socket {socket_num}, protocol {sock_type}", self._debug, ) self._sock_num_in_range(socket_num) @@ -573,9 +562,7 @@ def socket_connect( self._sock_num_in_range(socket_num) self._check_link_status() debug_msg( - "W5K socket connect, protocol={}, port={}, ip={}".format( - conn_mode, port, self.pretty_ip(dest) - ), + f"W5K socket connect, protocol={conn_mode}, port={port}, ip={self.pretty_ip(dest)}", self._debug, ) # initialize a socket and set the mode @@ -589,9 +576,7 @@ def socket_connect( # wait for tcp connection establishment while self.socket_status(socket_num) != SNSR_SOCK_ESTABLISHED: time.sleep(0.001) - debug_msg( - "SNSR: {}".format(self.socket_status(socket_num)), self._debug - ) + debug_msg(f"SNSR: {self.socket_status(socket_num)}", self._debug) if self.socket_status(socket_num) == SNSR_SOCK_CLOSED: raise ConnectionError("Failed to establish connection.") return 1 @@ -625,16 +610,14 @@ def get_socket(self, *, reserve_socket=False) -> int: # Call garbage collection to encourage socket.__del__() be called to on any # destroyed instances. Not at all guaranteed to work! gc.collect() - debug_msg( - "Reserved sockets: {}".format(WIZNET5K._sockets_reserved), self._debug - ) + debug_msg(f"Reserved sockets: {WIZNET5K._sockets_reserved}", self._debug) for socket_number, reserved in enumerate(WIZNET5K._sockets_reserved, start=1): if not reserved and self.socket_status(socket_number) == SNSR_SOCK_CLOSED: if reserve_socket: WIZNET5K._sockets_reserved[socket_number - 1] = True debug_msg( - "Allocated socket # {}.".format(socket_number), + f"Allocated socket # {socket_number}.", self._debug, ) return socket_number @@ -651,9 +634,7 @@ def release_socket(self, socket_number): self._sock_num_in_range(socket_number) WIZNET5K._sockets_reserved[socket_number - 1] = False - def socket_listen( - self, socket_num: int, port: int, conn_mode: int = _SNMR_TCP - ) -> None: + def socket_listen(self, socket_num: int, port: int, conn_mode: int = _SNMR_TCP) -> None: """ Listen on a socket's port. @@ -669,9 +650,7 @@ def socket_listen( self._sock_num_in_range(socket_num) self._check_link_status() debug_msg( - "* Listening on port={}, ip={}".format( - port, self.pretty_ip(self.ip_address) - ), + f"* Listening on port={port}, ip={self.pretty_ip(self.ip_address)}", self._debug, ) # Initialize a socket and set the mode @@ -682,11 +661,11 @@ def socket_listen( self._write_sncr(socket_num, _CMD_SOCK_LISTEN) # Wait until ready status = SNSR_SOCK_CLOSED - while status not in ( + while status not in { SNSR_SOCK_LISTEN, SNSR_SOCK_ESTABLISHED, _SNSR_SOCK_UDP, - ): + }: status = self._read_snsr(socket_num) if status == SNSR_SOCK_CLOSED: raise RuntimeError("Listening socket closed.") @@ -710,9 +689,7 @@ def socket_accept(self, socket_num: int) -> Tuple[int, Tuple[str, int]]: dest_port = self.remote_port(socket_num) next_socknum = self.get_socket() debug_msg( - "Dest is ({}, {}), Next listen socknum is #{}".format( - dest_ip, dest_port, next_socknum - ), + f"Dest is ({dest_ip}, {dest_port}), Next listen socknum is #{next_socknum}", self._debug, ) return next_socknum, (dest_ip, dest_port) @@ -733,17 +710,17 @@ def socket_open(self, socket_num: int, conn_mode: int = _SNMR_TCP) -> None: """ self._sock_num_in_range(socket_num) self._check_link_status() - debug_msg("*** Opening socket {}".format(socket_num), self._debug) - if self._read_snsr(socket_num) not in ( + debug_msg(f"*** Opening socket {socket_num}", self._debug) + if self._read_snsr(socket_num) not in { SNSR_SOCK_CLOSED, SNSR_SOCK_TIME_WAIT, SNSR_SOCK_FIN_WAIT, SNSR_SOCK_CLOSE_WAIT, _SNSR_SOCK_CLOSING, _SNSR_SOCK_UDP, - ): + }: raise ConnectionError("Failed to initialize a connection with the socket.") - debug_msg("* Opening W5k Socket, protocol={}".format(conn_mode), self._debug) + debug_msg(f"* Opening W5k Socket, protocol={conn_mode}", self._debug) time.sleep(0.00025) self._write_snmr(socket_num, conn_mode) @@ -761,7 +738,7 @@ def socket_open(self, socket_num: int, conn_mode: int = _SNMR_TCP) -> None: # open socket self._write_sncr(socket_num, _CMD_SOCK_OPEN) - if self._read_snsr(socket_num) not in [_SNSR_SOCK_INIT, _SNSR_SOCK_UDP]: + if self._read_snsr(socket_num) not in {_SNSR_SOCK_INIT, _SNSR_SOCK_UDP}: raise RuntimeError("Could not open socket in TCP or UDP mode.") def socket_close(self, socket_num: int) -> None: @@ -772,7 +749,7 @@ def socket_close(self, socket_num: int) -> None: :raises ValueError: If the socket number is out of range. """ - debug_msg("*** Closing socket {}".format(socket_num), self._debug) + debug_msg(f"*** Closing socket {socket_num}", self._debug) self._sock_num_in_range(socket_num) self._write_sncr(socket_num, _CMD_SOCK_CLOSE) debug_msg(" Waiting for socket to close…", self._debug) @@ -781,9 +758,7 @@ def socket_close(self, socket_num: int) -> None: while self._read_snsr(socket_num) != SNSR_SOCK_CLOSED: if ticks_diff(ticks_ms(), start) > timeout: raise RuntimeError( - "Wiznet5k failed to close socket, status = {}.".format( - self._read_snsr(socket_num) - ) + f"Wiznet5k failed to close socket, status = {self._read_snsr(socket_num)}." ) time.sleep(0.0001) debug_msg(" Socket has closed.", self._debug) @@ -796,7 +771,7 @@ def socket_disconnect(self, socket_num: int) -> None: :raises ValueError: If the socket number is out of range. """ - debug_msg("*** Disconnecting socket {}".format(socket_num), self._debug) + debug_msg(f"*** Disconnecting socket {socket_num}", self._debug) self._sock_num_in_range(socket_num) self._write_sncr(socket_num, _CMD_SOCK_DISCON) @@ -821,12 +796,10 @@ def socket_read(self, socket_num: int, length: int) -> Tuple[int, bytes]: # Check if there is data available on the socket bytes_on_socket = self._get_rx_rcv_size(socket_num) - debug_msg("Bytes avail. on sock: {}".format(bytes_on_socket), self._debug) + debug_msg(f"Bytes avail. on sock: {bytes_on_socket}", self._debug) if bytes_on_socket: bytes_on_socket = length if bytes_on_socket > length else bytes_on_socket - debug_msg( - "* Processing {} bytes of data".format(bytes_on_socket), self._debug - ) + debug_msg(f"* Processing {bytes_on_socket} bytes of data", self._debug) # Read the starting save address of the received data. pointer = self._read_snrx_rd(socket_num) # Read data from the hardware socket. @@ -837,11 +810,11 @@ def socket_read(self, socket_num: int, length: int) -> Tuple[int, bytes]: self._write_sncr(socket_num, _CMD_SOCK_RECV) else: # no data on socket - if self._read_snmr(socket_num) in ( + if self._read_snmr(socket_num) in { SNSR_SOCK_LISTEN, SNSR_SOCK_CLOSED, SNSR_SOCK_CLOSE_WAIT, - ): + }: raise RuntimeError("Lost connection to peer.") bytes_read = b"" return bytes_on_socket, bytes_read @@ -870,18 +843,14 @@ def read_udp(self, socket_num: int, length: int) -> Tuple[int, bytes]: # Read the UDP packet data. if data_length: if data_length <= length: - bytes_on_socket, bytes_read = self.socket_read( - socket_num, data_length - ) + bytes_on_socket, bytes_read = self.socket_read(socket_num, data_length) else: bytes_on_socket, bytes_read = self.socket_read(socket_num, length) # just consume the rest, it is lost to the higher layers self.socket_read(socket_num, data_length - length) return bytes_on_socket, bytes_read - def socket_write( - self, socket_num: int, buffer: bytearray, timeout: float = 0.0 - ) -> int: + def socket_write(self, socket_num: int, buffer: bytearray, timeout: float = 0.0) -> int: """ Write data to a socket. @@ -909,7 +878,7 @@ def socket_write( while free_size < bytes_to_write: free_size = self._get_tx_free_size(socket_num) status = self.socket_status(socket_num) - if status not in (SNSR_SOCK_ESTABLISHED, SNSR_SOCK_CLOSE_WAIT) or ( + if status not in {SNSR_SOCK_ESTABLISHED, SNSR_SOCK_CLOSE_WAIT} or ( timeout and ticks_diff(ticks_ms(), start_time) / 1000 > timeout ): raise RuntimeError("Unable to write data to the socket.") @@ -925,13 +894,13 @@ def socket_write( # check data was transferred correctly while not self.read_snir(socket_num) & _SNIR_SEND_OK: - if self.socket_status(socket_num) in ( + if self.socket_status(socket_num) in { SNSR_SOCK_CLOSED, SNSR_SOCK_TIME_WAIT, SNSR_SOCK_FIN_WAIT, SNSR_SOCK_CLOSE_WAIT, _SNSR_SOCK_CLOSING, - ): + }: raise RuntimeError("No data was sent, socket was closed.") if timeout and ticks_diff(ticks_ms(), start_time) / 1000 > timeout: raise RuntimeError("Operation timed out. No data sent.") @@ -1190,17 +1159,13 @@ def _read_sndipr(self, sock) -> bytes: """Read socket destination IP address.""" data = [] for offset in range(4): - data.append( - self._read_socket_register(sock, _REG_SNDIPR[self._chip_type] + offset) - ) + data.append(self._read_socket_register(sock, _REG_SNDIPR[self._chip_type] + offset)) return bytes(data) def _write_sndipr(self, sock: int, ip_addr: bytes) -> None: """Write to socket destination IP Address.""" for offset, value in enumerate(ip_addr): - self._write_socket_register( - sock, _REG_SNDIPR[self._chip_type] + offset, value - ) + self._write_socket_register(sock, _REG_SNDIPR[self._chip_type] + offset, value) def _read_sndport(self, sock: int) -> int: """Read socket destination port.""" @@ -1267,9 +1232,9 @@ def rtr(self, retry_time: int) -> None: # *** Chip Specific Methods *** - def _chip_read(self, device: "busio.SPI", address: int, call_back: int) -> None: + def _chip_read(self, device: busio.SPI, address: int, call_back: int) -> None: """Chip specific calls for _read method.""" - if self._chip_type in ("w5500", "w6100"): + if self._chip_type in {"w5500", "w6100"}: device.write((address >> 8).to_bytes(1, "big")) device.write((address & 0xFF).to_bytes(1, "big")) device.write(call_back.to_bytes(1, "big")) @@ -1278,9 +1243,9 @@ def _chip_read(self, device: "busio.SPI", address: int, call_back: int) -> None: device.write((address >> 8).to_bytes(1, "big")) device.write((address & 0xFF).to_bytes(1, "big")) - def _chip_write(self, device: "busio.SPI", address: int, call_back: int) -> None: + def _chip_write(self, device: busio.SPI, address: int, call_back: int) -> None: """Chip specific calls for _write.""" - if self._chip_type in ("w5500", "w6100"): + if self._chip_type in {"w5500", "w6100"}: device.write((address >> 8).to_bytes(1, "big")) device.write((address & 0xFF).to_bytes(1, "big")) device.write(call_back.to_bytes(1, "big")) @@ -1291,7 +1256,7 @@ def _chip_write(self, device: "busio.SPI", address: int, call_back: int) -> None def _chip_socket_read(self, socket_number, pointer, bytes_to_read): """Chip specific calls for socket_read.""" - if self._chip_type in ("w5500", "w6100"): + if self._chip_type in {"w5500", "w6100"}: # Read data from the starting address of snrx_rd ctrl_byte = 0x18 + (socket_number << 5) bytes_read = self._read(pointer, ctrl_byte, bytes_to_read) @@ -1312,7 +1277,7 @@ def _chip_socket_write( self, socket_number: int, offset: int, bytes_to_write: int, buffer: bytes ): """Chip specific calls for socket_write.""" - if self._chip_type in ("w5500", "w6100"): + if self._chip_type in {"w5500", "w6100"}: dst_addr = offset + (socket_number * _SOCK_SIZE + 0x8000) cntl_byte = 0x14 + (socket_number << 5) self._write(dst_addr, cntl_byte, buffer[:bytes_to_write]) @@ -1336,7 +1301,7 @@ def _chip_parse_udp_header(self, socket_num) -> int: :return int: The UDP data length. """ - if self._chip_type in ("w5100s", "w5500"): + if self._chip_type in {"w5100s", "w5500"}: self.udp_from_ip[socket_num] = self._pbuff[:4] self.udp_from_port[socket_num] = int.from_bytes(self._pbuff[4:6], "big") return int.from_bytes(self._pbuff[6:], "big") @@ -1348,7 +1313,7 @@ def _chip_parse_udp_header(self, socket_num) -> int: def _write_socket_register(self, sock: int, address: int, data: int) -> None: """Write to a WIZnet 5k socket register.""" - if self._chip_type in ("w5500", "w6100"): + if self._chip_type in {"w5500", "w6100"}: cntl_byte = (sock << 5) + 0x0C self._write(address, cntl_byte, data) elif self._chip_type == "w5100s": @@ -1357,12 +1322,10 @@ def _write_socket_register(self, sock: int, address: int, data: int) -> None: def _read_socket_register(self, sock: int, address: int) -> int: """Read a WIZnet 5k socket register.""" - if self._chip_type in ("w5500", "w6100"): + if self._chip_type in {"w5500", "w6100"}: cntl_byte = (sock << 5) + 0x08 register = self._read(address, cntl_byte) elif self._chip_type == "w5100s": cntl_byte = 0 - register = self._read( - self._ch_base_msb + sock * _CH_SIZE + address, cntl_byte - ) + register = self._read(self._ch_base_msb + sock * _CH_SIZE + address, cntl_byte) return int.from_bytes(register, "big") diff --git a/adafruit_wiznet5k/adafruit_wiznet5k_debug.py b/adafruit_wiznet5k/adafruit_wiznet5k_debug.py index 9fb02c6..1f20651 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k_debug.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k_debug.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: MIT """Makes a debug message function available to all modules.""" + try: from typing import TYPE_CHECKING, Union @@ -14,9 +15,7 @@ import gc -def debug_msg( - message: Union[Exception, str, bytes, bytearray], debugging: bool -) -> None: +def debug_msg(message: Union[Exception, str, bytes, bytearray], debugging: bool) -> None: """ Helper function to print debugging messages. If the message is a bytes type object, create a hexdump. @@ -43,7 +42,7 @@ def _hexdump(src: bytes): result = [] for i in range(0, len(src), 16): chunk = src[i : i + 16] - hexa = " ".join(("{:02x}".format(x) for x in chunk)) - text = "".join((chr(x) if 0x20 <= x < 0x7F else "." for x in chunk)) - result.append("{:04x} {:<48} {}".format(i, hexa, text)) + hexa = " ".join(f"{x:02x}" for x in chunk) + text = "".join(chr(x) if 0x20 <= x < 0x7F else "." for x in chunk) + result.append(f"{i:04x} {hexa:<48} {text}") return "\n".join(result) diff --git a/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py b/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py index 4af10be..c37c29a 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k_dhcp.py @@ -14,10 +14,11 @@ * Author(s): Jordan Terrell, Brent Rubell, Martin Stephens """ + from __future__ import annotations try: - from typing import TYPE_CHECKING, Optional, Union, Tuple + from typing import TYPE_CHECKING, Optional, Tuple, Union if TYPE_CHECKING: from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K @@ -27,11 +28,11 @@ import gc from random import randint + +from adafruit_ticks import ticks_add, ticks_diff, ticks_less, ticks_ms from micropython import const -from adafruit_ticks import ticks_ms, ticks_diff, ticks_add, ticks_less -from adafruit_wiznet5k.adafruit_wiznet5k_debug import ( # pylint: disable=ungrouped-imports - debug_msg, -) + +from adafruit_wiznet5k.adafruit_wiznet5k_debug import debug_msg # DHCP State Machine _STATE_INIT = const(0x01) @@ -120,7 +121,6 @@ class DHCP: IP addresses. """ - # pylint: disable=too-many-arguments, too-many-instance-attributes, invalid-name def __init__( self, eth: WIZNET5K, @@ -171,7 +171,7 @@ def __init__( self._lease = None # Host name - mac_string = "".join("{:02X}".format(o) for o in mac_address) + mac_string = "".join(f"{o:02X}" for o in mac_address) self._hostname = bytes( (hostname or "WIZnet{}").split(".")[0].format(mac_string)[:42], "utf-8" ) @@ -191,7 +191,7 @@ def maintain_dhcp_lease(self, blocking: bool = False) -> None: Maintain a DHCP lease. :param bool blocking: Run the DHCP FSM in non-blocking mode. """ - debug_msg("Maintaining lease with blocking = {}".format(blocking), self._debug) + debug_msg(f"Maintaining lease with blocking = {blocking}", self._debug) self._dhcp_state_machine(blocking=blocking) def _dsm_reset(self) -> None: @@ -266,7 +266,7 @@ def _receive_dhcp_response(self, socket_num: int, timeout: int) -> int: if self._eth.socket_available(socket_num, _SNMR_UDP) > 236: bytes_count, bytes_read = self._eth.read_udp(socket_num, _BUFF_LENGTH) _BUFF[:bytes_count] = bytes_read - debug_msg("Received {} bytes".format(bytes_count), self._debug) + debug_msg(f"Received {bytes_count} bytes", self._debug) del bytes_read gc.collect() return bytes_count @@ -294,9 +294,7 @@ def _process_messaging_states(self, *, message_type: int): debug_msg("Message is ACK, setting FSM state to BOUND.", self._debug) lease = self._lease or 60 self._lease_timeout = ticks_add(self._start_ticks, lease * 1000) - self._t1_timeout = ticks_add( - self._start_ticks, (self._t1 or (lease // 2)) * 1000 - ) + self._t1_timeout = ticks_add(self._start_ticks, (self._t1 or (lease // 2)) * 1000) self._t2_timeout = ticks_add( self._start_ticks, (self._t2 or (lease - lease // 8)) * 1000 ) @@ -326,16 +324,13 @@ def _handle_dhcp_message(self) -> int: :raises TimeoutError: If the FSM is in blocking mode and no valid response has been received before the timeout expires. """ - # pylint: disable=too-many-branches debug_msg("Processing SELECTING or REQUESTING state.", self._debug) if self._dhcp_state == _STATE_SELECTING: msg_type_out = _DHCP_DISCOVER elif self._dhcp_state == _STATE_REQUESTING: msg_type_out = _DHCP_REQUEST else: - raise ValueError( - "FSM can only send messages while in SELECTING or REQUESTING states." - ) + raise ValueError("FSM can only send messages while in SELECTING or REQUESTING states.") debug_msg("Setting up connection for DHCP.", self._debug) if self._renew: dhcp_server = self.dhcp_server_ip @@ -353,9 +348,7 @@ def _handle_dhcp_message(self) -> int: raise RuntimeError("Unable to initialize UDP socket.") self._eth.src_port = 68 - self._eth.socket_connect( - sock_num, dhcp_server, _DHCP_SERVER_PORT, conn_mode=0x02 - ) + self._eth.socket_connect(sock_num, dhcp_server, _DHCP_SERVER_PORT, conn_mode=0x02) self._eth.src_port = 0 message_length = self._generate_dhcp_message(message_type=msg_type_out) @@ -368,7 +361,7 @@ def _handle_dhcp_message(self) -> int: try: msg_type_in = self._parse_dhcp_response() debug_msg( - "Received message type {}".format(msg_type_in), + f"Received message type {msg_type_in}", self._debug, ) return msg_type_in @@ -380,9 +373,7 @@ def _handle_dhcp_message(self) -> int: self._debug, ) return 0 # Did not receive a response in a single attempt. - raise TimeoutError( - "No response from DHCP server after {} retries.".format(attempt) - ) + raise TimeoutError(f"No response from DHCP server after {attempt} retries.") finally: self._eth.socket_close(sock_num) # Close the socket whatever happens. @@ -391,8 +382,8 @@ def _dhcp_state_machine(self, *, blocking: bool = False) -> None: A finite state machine to allow the DHCP lease to be managed without blocking the main program. The initial lease... """ - debug_msg("DHCP FSM called with blocking = {}".format(blocking), self._debug) - debug_msg("FSM initial state is {}".format(self._dhcp_state), self._debug) + debug_msg(f"DHCP FSM called with blocking = {blocking}", self._debug) + debug_msg(f"FSM initial state is {self._dhcp_state}", self._debug) self._blocking = blocking while True: if self._dhcp_state == _STATE_BOUND: @@ -401,20 +392,14 @@ def _dhcp_state_machine(self, *, blocking: bool = False) -> None: debug_msg("No timers have expired. Exiting FSM.", self._debug) return if ticks_less(self._lease_timeout, now): - debug_msg( - "Lease has expired, switching state to INIT.", self._debug - ) + debug_msg("Lease has expired, switching state to INIT.", self._debug) self._blocking = True self._dhcp_state = _STATE_INIT elif ticks_less(self._t2_timeout, now): - debug_msg( - "T2 has expired, switching state to REBINDING.", self._debug - ) + debug_msg("T2 has expired, switching state to REBINDING.", self._debug) self._dhcp_state = _STATE_REBINDING else: - debug_msg( - "T1 has expired, switching state to RENEWING.", self._debug - ) + debug_msg("T1 has expired, switching state to RENEWING.", self._debug) self._dhcp_state = _STATE_RENEWING if self._dhcp_state == _STATE_RENEWING: @@ -496,19 +481,14 @@ def option_writer( _BUFF[offset:data_end] = bytes(option_data) return data_end - debug_msg("Generating DHCP message type {}".format(message_type), self._debug) - # global _BUFF # pylint: disable=global-variable-not-assigned + debug_msg(f"Generating DHCP message type {message_type}", self._debug) _BUFF[:] = bytearray(_BUFF_LENGTH) # OP.HTYPE.HLEN.HOPS - _BUFF[0:4] = bytes( - [_DHCP_BOOT_REQUEST, _DHCP_HTYPE10MB, _DHCP_HLENETHERNET, _DHCP_HOPS] - ) + _BUFF[0:4] = bytes([_DHCP_BOOT_REQUEST, _DHCP_HTYPE10MB, _DHCP_HLENETHERNET, _DHCP_HOPS]) # Transaction ID (xid) _BUFF[4:8] = self._transaction_id.to_bytes(4, "big") # Seconds elapsed - _BUFF[8:10] = int(ticks_diff(ticks_ms(), self._start_ticks) / 1000).to_bytes( - 2, "big" - ) + _BUFF[8:10] = int(ticks_diff(ticks_ms(), self._start_ticks) / 1000).to_bytes(2, "big") # Flags (only bit 0 is used, all other bits must be 0) if broadcast: _BUFF[10] = 0b10000000 @@ -525,13 +505,9 @@ def option_writer( pointer = 240 # Option - DHCP Message Type - pointer = option_writer( - offset=pointer, option_code=53, option_data=(message_type,) - ) + pointer = option_writer(offset=pointer, option_code=53, option_data=(message_type,)) # Option - Host Name - pointer = option_writer( - offset=pointer, option_code=12, option_data=self._hostname - ) + pointer = option_writer(offset=pointer, option_code=12, option_data=self._hostname) # Option - Client ID pointer = option_writer( @@ -544,15 +520,11 @@ def option_writer( pointer = option_writer(offset=pointer, option_code=55, option_data=(1, 3, 6)) # Request a 90 day lease. - pointer = option_writer( - offset=pointer, option_code=51, option_data=b"\x00\x76\xa7\x00" - ) + pointer = option_writer(offset=pointer, option_code=51, option_data=b"\x00\x76\xa7\x00") if message_type == _DHCP_REQUEST: # Set Requested IP Address to offered IP address. - pointer = option_writer( - offset=pointer, option_code=50, option_data=self.local_ip - ) + pointer = option_writer(offset=pointer, option_code=50, option_data=self.local_ip) # Set Server ID to chosen DHCP server IP address. if self._renew != "rebind": pointer = option_writer( @@ -582,7 +554,6 @@ def _parse_dhcp_response( fail or no message type is found in the options, raises a ValueError. """ - # pylint: disable=too-many-branches def option_reader(pointer: int) -> Tuple[int, int, bytes]: """Helper function to extract DHCP option data from a response. @@ -648,18 +619,8 @@ def option_reader(pointer: int) -> Tuple[int, int, bytes]: break debug_msg( - "Msg Type: {}\nSubnet Mask: {}\nDHCP Server IP: {}\nDNS Server IP: {}\ - \nGateway IP: {}\nLocal IP: {}\nT1: {}\nT2: {}\nLease Time: {}".format( - msg_type, - self.subnet_mask, - self.dhcp_server_ip, - self.dns_server_ip, - self.gateway_ip, - self.local_ip, - self._t1, - self._t2, - self._lease, - ), + f"Msg Type: {msg_type}\nSubnet Mask: {self.subnet_mask}\nDHCP Server IP: {self.dhcp_server_ip}\nDNS Server IP: {self.dns_server_ip}\ + \nGateway IP: {self.gateway_ip}\nLocal IP: {self.local_ip}\nT1: {self._t1}\nT2: {self._t2}\nLease Time: {self._lease}", self._debug, ) if msg_type is None: diff --git a/adafruit_wiznet5k/adafruit_wiznet5k_dns.py b/adafruit_wiznet5k/adafruit_wiznet5k_dns.py index ee5591c..6896e2d 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k_dns.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k_dns.py @@ -13,10 +13,11 @@ * Author(s): MCQN Ltd, Brent Rubell, Martin Stephens """ + from __future__ import annotations try: - from typing import TYPE_CHECKING, Union, Tuple + from typing import TYPE_CHECKING, Tuple, Union if TYPE_CHECKING: from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K @@ -25,8 +26,9 @@ import time from random import getrandbits + +from adafruit_ticks import ticks_diff, ticks_ms from micropython import const -from adafruit_ticks import ticks_ms, ticks_diff _QUERY_FLAG = const(0x00) _OPCODE_STANDARD_QUERY = const(0x00) @@ -96,7 +98,6 @@ def _build_dns_query(domain: bytes) -> Tuple[int, int, bytearray]: def _parse_dns_response( *, response: bytearray, query_id: int, query_length: int, debug: bool ) -> bytearray: - # pylint: disable=too-many-branches """ Parses a DNS query response. @@ -113,15 +114,9 @@ class IN answer. """ # Validate request identifier response_id = int.from_bytes(response[0:2], "big") - _debug_print( - debug=debug, message="Parsing packet with ID {x:#x}".format(x=response_id) - ) + _debug_print(debug=debug, message=f"Parsing packet with ID {response_id:#x}") if response_id != query_id: - raise ValueError( - "Response ID 0x{x:x} does not match query ID 0x{y:x}".format( - x=response_id, y=query_id - ) - ) + raise ValueError(f"Response ID 0x{response_id:x} does not match query ID 0x{query_id:x}") # Validate flags flags = int.from_bytes(response[2:4], "big") # Mask out authenticated, truncated and recursion bits, unimportant to parsing. @@ -129,21 +124,20 @@ class IN answer. # Check that the response bit is set, the query is standard and no error occurred. if flags != 0x8000: # noinspection PyStringFormat - raise ValueError("Invalid flags {x:#04x}, {x:#016b}.".format(x=flags)) + raise ValueError(f"Invalid flags {flags:#04x}, {flags:#016b}.") # Number of questions question_count = int.from_bytes(response[4:6], "big") # Never more than one question per DNS query in this implementation. if question_count != 1: - raise ValueError("Question count should be 1, is {}.".format(question_count)) + raise ValueError(f"Question count should be 1, is {question_count}.") # Number of answers answer_count = int.from_bytes(response[6:8], "big") - _debug_print(debug=debug, message="* DNS Answer Count: {}.".format(answer_count)) + _debug_print(debug=debug, message=f"* DNS Answer Count: {answer_count}.") if answer_count < 1: - raise ValueError("Answer count should be > 0, is {}.".format(answer_count)) + raise ValueError(f"Answer count should be > 0, is {answer_count}.") # Parse answers pointer = query_length # Response header is the same length as the query header. - # pylint: disable=too-many-nested-blocks try: for answer in range(answer_count): # Move the pointer past the name. @@ -167,23 +161,15 @@ class IN answer. # Check for a type A answer. if int.from_bytes(response[pointer : pointer + 2], "big") == _TYPE_A: # Check for an IN class answer. - if ( - int.from_bytes(response[pointer + 2 : pointer + 4], "big") - == _CLASS_IN - ): + if int.from_bytes(response[pointer + 2 : pointer + 4], "big") == _CLASS_IN: _debug_print( debug=debug, - message="Type A, class IN found in answer {x} of {y}.".format( - x=answer + 1, y=answer_count - ), + message=f"Type A, class IN found in answer {answer + 1} of {answer_count}.", ) # Set pointer to start of resource record. pointer += 8 # Confirm that the resource record is 4 bytes (an IPv4 address). - if ( - int.from_bytes(response[pointer : pointer + 2], "big") - == _DATA_LEN - ): + if int.from_bytes(response[pointer : pointer + 2], "big") == _DATA_LEN: ipv4 = response[pointer + 2 : pointer + 6] # Low probability that the response was truncated inside the 4 byte address. if len(ipv4) != _DATA_LEN: @@ -199,18 +185,14 @@ class IN answer. pointer += 10 + int.from_bytes(response[pointer + 8 : pointer + 10], "big") _debug_print( debug=debug, - message="Answer {x} of {y} was not type A, class IN.".format( - x=answer + 1, y=answer_count - ), + message=f"Answer {answer + 1} of {answer_count} was not type A, class IN.", ) # No IPv4 address in any answer. raise ValueError() except (IndexError, ValueError) as error: # IndexError means we ran out of data in an answer, maybe truncated. # ValueError means we ran out of answers. - raise ValueError( - "No type A, class IN answers found in the DNS response." - ) from error + raise ValueError("No type A, class IN answers found in the DNS response.") from error class DNS: @@ -230,9 +212,7 @@ def __init__( self._debug = debug self._iface = iface self._dns_server = ( - self._iface.unpretty_ip(dns_address) - if isinstance(dns_address, str) - else dns_address + self._iface.unpretty_ip(dns_address) if isinstance(dns_address, str) else dns_address ) self._query_id = 0 # Request ID. self._query_length = 0 # Length of last query. @@ -252,9 +232,7 @@ def gethostbyname(self, hostname: bytes) -> Union[int, bytes]: # Send DNS request packet dns_socket = self._iface.get_socket() - self._iface.socket_connect( - dns_socket, bytes(self._dns_server), _DNS_PORT, conn_mode=0x02 - ) + self._iface.socket_connect(dns_socket, bytes(self._dns_server), _DNS_PORT, conn_mode=0x02) _debug_print(debug=self._debug, message="* DNS: Sending request packet...") self._iface.socket_write(dns_socket, buffer) @@ -277,7 +255,7 @@ def gethostbyname(self, hostname: bytes) -> Union[int, bytes]: _, buffer = self._iface.read_udp(dns_socket, 512) _debug_print( debug=self._debug, - message="DNS Packet Received: {}".format(buffer), + message=f"DNS Packet Received: {buffer}", ) try: ipaddress = _parse_dns_response( @@ -291,7 +269,7 @@ def gethostbyname(self, hostname: bytes) -> Union[int, bytes]: _debug_print( debug=self._debug, message="* DNS ERROR: Failed to resolve DNS response, retrying…\n" - " ({}).".format(error.args[0]), + f" ({error.args[0]}).", ) self._iface.socket_close(dns_socket) return ipaddress diff --git a/adafruit_wiznet5k/adafruit_wiznet5k_socketpool.py b/adafruit_wiznet5k/adafruit_wiznet5k_socketpool.py index 6694bef..15cb84a 100644 --- a/adafruit_wiznet5k/adafruit_wiznet5k_socketpool.py +++ b/adafruit_wiznet5k/adafruit_wiznet5k_socketpool.py @@ -11,6 +11,7 @@ * Author(s): ladyada, Brent Rubell, Patrick Van Oosterwijck, Adam Cummick, Martin Stephens """ + from __future__ import annotations try: @@ -25,12 +26,11 @@ import gc from sys import byteorder +from adafruit_ticks import ticks_diff, ticks_ms from micropython import const -from adafruit_ticks import ticks_ms, ticks_diff import adafruit_wiznet5k as wiznet5k - _SOCKET_TYPE_TO_WIZNET = b"\0\x21\2" _SOCKET_INVALID = const(255) @@ -71,7 +71,7 @@ def _is_ipv4_string(ipv4_address: str) -> bool: """ octets = ipv4_address.split(".", 3) if len(octets) == 4 and "".join(octets).isdigit(): - if all((0 <= int(octet) <= 255 for octet in octets)): + if all(0 <= int(octet) <= 255 for octet in octets): return True return False @@ -154,7 +154,7 @@ def inet_ntoa(self, ip_address: Union[bytes, bytearray]) -> str: raise ValueError("The IPv4 address must be 4 bytes.") return self._interface.pretty_ip(ip_address) - def getaddrinfo( # pylint: disable=redefined-builtin,too-many-arguments,unused-argument + def getaddrinfo( self, host: str, port: int, @@ -201,10 +201,10 @@ def gethostbyname(self, hostname: str) -> str: if self._is_ipv4_string(hostname): return hostname address = self._interface.get_host_by_name(hostname) - address = "{}.{}.{}.{}".format(address[0], address[1], address[2], address[3]) + address = f"{address[0]}.{address[1]}.{address[2]}.{address[3]}" return address - def socket( # pylint: disable=redefined-builtin + def socket( self, family: int = AF_INET, type: int = SOCK_STREAM, @@ -221,7 +221,7 @@ class Socket: to a Wiznet5k module. """ - def __init__( # pylint: disable=redefined-builtin,too-many-arguments,unused-argument + def __init__( self, socket_pool: SocketPool, family: int = SocketPool.AF_INET, @@ -263,15 +263,10 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None: self._socknum, 0xFF & (~wiznet5k.adafruit_wiznet5k.SNIR_DISCON) ) # Reset socket interrupt register. self._interface.socket_disconnect(self._socknum) - mask = ( - wiznet5k.adafruit_wiznet5k.SNIR_TIMEOUT - | wiznet5k.adafruit_wiznet5k.SNIR_DISCON - ) + mask = wiznet5k.adafruit_wiznet5k.SNIR_TIMEOUT | wiznet5k.adafruit_wiznet5k.SNIR_DISCON while not self._interface.read_snir(self._socknum) & mask: pass - self._interface.write_snir( - self._socknum, 0xFF - ) # Reset socket interrupt register. + self._interface.write_snir(self._socknum, 0xFF) # Reset socket interrupt register. self._interface.socket_close(self._socknum) while ( self._interface.socket_status(self._socknum) @@ -280,13 +275,13 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None: pass # This works around problems with using a class method as a decorator. - def _check_socket_closed(func): # pylint: disable=no-self-argument + def _check_socket_closed(func): """Decorator to check whether the socket object has been closed.""" def wrapper(self, *args, **kwargs): - if self._socket_closed: # pylint: disable=protected-access + if self._socket_closed: raise RuntimeError("The socket has been closed.") - return func(self, *args, **kwargs) # pylint: disable=not-callable + return func(self, *args, **kwargs) return wrapper @@ -310,18 +305,15 @@ def _connected(self) -> bool: if self._socknum >= self._interface.max_sockets: return False status = self._interface.socket_status(self._socknum) - if ( - status == wiznet5k.adafruit_wiznet5k.SNSR_SOCK_CLOSE_WAIT - and self._available() == 0 - ): + if status == wiznet5k.adafruit_wiznet5k.SNSR_SOCK_CLOSE_WAIT and self._available() == 0: result = False else: - result = status not in ( + result = status not in { wiznet5k.adafruit_wiznet5k.SNSR_SOCK_CLOSED, wiznet5k.adafruit_wiznet5k.SNSR_SOCK_LISTEN, wiznet5k.adafruit_wiznet5k.SNSR_SOCK_TIME_WAIT, wiznet5k.adafruit_wiznet5k.SNSR_SOCK_FIN_WAIT, - ) + } if not result and status != wiznet5k.adafruit_wiznet5k.SNSR_SOCK_LISTEN: self.close() return result @@ -333,9 +325,7 @@ def getpeername(self) -> Tuple[str, int]: :return Tuple[str, int]: IPv4 address and port the socket is connected to. """ - return self._interface.remote_ip(self._socknum), self._interface.remote_port( - self._socknum - ) + return self._interface.remote_ip(self._socknum), self._interface.remote_port(self._socknum) @_check_socket_closed def bind(self, address: Tuple[Optional[str], int]) -> None: @@ -367,10 +357,8 @@ def _bind(self, address: Tuple[Optional[str], int]) -> None: self._interface.ip_address ): raise ValueError( - "The IPv4 address requested must match {}, " - "the one assigned to the WIZNET5K interface.".format( - self._interface.pretty_ip(self._interface.ip_address) - ) + f"The IPv4 address requested must match {self._interface.pretty_ip(self._interface.ip_address)}, " + "the one assigned to the WIZNET5K interface." ) self._listen_port = address[1] # For UDP servers we need to open the socket here because we won't call @@ -384,7 +372,7 @@ def _bind(self, address: Tuple[Optional[str], int]) -> None: self._buffer = b"" @_check_socket_closed - def listen(self, backlog: int = 0) -> None: # pylint: disable=unused-argument + def listen(self, backlog: int = 0) -> None: """ Enable a server to accept connections. @@ -408,15 +396,12 @@ def accept( end of the connection. """ stamp = ticks_ms() - while self._status not in ( + while self._status not in { wiznet5k.adafruit_wiznet5k.SNSR_SOCK_SYNRECV, wiznet5k.adafruit_wiznet5k.SNSR_SOCK_ESTABLISHED, wiznet5k.adafruit_wiznet5k.SNSR_SOCK_LISTEN, - ): - if ( - self._timeout - and 0 < self._timeout < ticks_diff(ticks_ms(), stamp) / 1000 - ): + }: + if self._timeout and 0 < self._timeout < ticks_diff(ticks_ms(), stamp) / 1000: raise TimeoutError("Failed to accept connection.") if self._status == wiznet5k.adafruit_wiznet5k.SNSR_SOCK_CLOSE_WAIT: self._disconnect() @@ -429,8 +414,8 @@ def accept( current_socknum = self._socknum # Create a new socket object and swap socket nums, so we can continue listening client_sock = Socket(self._socket_pool) - self._socknum = client_sock._socknum # pylint: disable=protected-access - client_sock._socknum = current_socknum # pylint: disable=protected-access + self._socknum = client_sock._socknum + client_sock._socknum = current_socknum self._bind((None, self._listen_port)) self.listen() if self._status != wiznet5k.adafruit_wiznet5k.SNSR_SOCK_LISTEN: @@ -491,7 +476,7 @@ def sendto(self, data: bytearray, *flags_and_or_address: any) -> int: """ # May be called with (data, address) or (data, flags, address) other_args = list(flags_and_or_address) - if len(other_args) in (1, 2): + if len(other_args) in {1, 2}: address = other_args[-1] else: raise ValueError("Incorrect number of arguments, should be 2 or 3.") @@ -499,7 +484,7 @@ def sendto(self, data: bytearray, *flags_and_or_address: any) -> int: return self.send(data) @_check_socket_closed - def recv( # pylint: disable=unused-argument + def recv( self, bufsize: int, flags: int = 0, @@ -521,9 +506,7 @@ def recv( # pylint: disable=unused-argument return bytes(buf[:nread]) return bytes(buf) - def _embed_recv( # pylint: disable=unused-argument - self, bufsize: int = 0, flags: int = 0 - ) -> bytes: + def _embed_recv(self, bufsize: int = 0, flags: int = 0) -> bytes: """ Read from the connected remote address. @@ -546,9 +529,7 @@ def _embed_recv( # pylint: disable=unused-argument return ret @_check_socket_closed - def recvfrom( # pylint: disable=unused-argument - self, bufsize: int, flags: int = 0 - ) -> Tuple[bytes, Tuple[str, int]]: + def recvfrom(self, bufsize: int, flags: int = 0) -> Tuple[bytes, Tuple[str, int]]: """ Receive data from the socket. The return value is a pair (bytes, address) where bytes is a bytes object representing the data received and address is the address of the socket @@ -569,9 +550,7 @@ def recvfrom( # pylint: disable=unused-argument ) @_check_socket_closed - def recv_into( # pylint: disable=unused-argument - self, buffer: bytearray, nbytes: int = 0, flags: int = 0 - ) -> int: + def recv_into(self, buffer: bytearray, nbytes: int = 0, flags: int = 0) -> int: """ Receive up to nbytes bytes from the socket, storing the data into a buffer rather than creating a new bytestring. @@ -593,9 +572,7 @@ def recv_into( # pylint: disable=unused-argument # _readline if len(self._buffer) > 0: bytes_to_read = min(num_to_read, len(self._buffer)) - buffer[num_read : num_read + bytes_to_read] = self._buffer[ - :bytes_to_read - ] + buffer[num_read : num_read + bytes_to_read] = self._buffer[:bytes_to_read] num_read += bytes_to_read num_to_read -= bytes_to_read self._buffer = self._buffer[bytes_to_read:] @@ -607,23 +584,19 @@ def recv_into( # pylint: disable=unused-argument last_read_time = ticks_ms() bytes_to_read = min(num_to_read, num_avail) if self._sock_type == SocketPool.SOCK_STREAM: - bytes_read = self._interface.socket_read( - self._socknum, bytes_to_read - )[1] + bytes_read = self._interface.socket_read(self._socknum, bytes_to_read)[1] else: - bytes_read = self._interface.read_udp(self._socknum, bytes_to_read)[ - 1 - ] + bytes_read = self._interface.read_udp(self._socknum, bytes_to_read)[1] buffer[num_read : num_read + len(bytes_read)] = bytes_read num_read += len(bytes_read) num_to_read -= len(bytes_read) elif num_read > 0: # We got a message, but there are no more bytes to read, so we can stop. break - elif self._status in ( + elif self._status in { wiznet5k.adafruit_wiznet5k.SNSR_SOCK_CLOSED, wiznet5k.adafruit_wiznet5k.SNSR_SOCK_CLOSE_WAIT, - ): + }: # No bytes to read and we will not get more, stop. break # No bytes yet, or more bytes requested. @@ -638,7 +611,7 @@ def recv_into( # pylint: disable=unused-argument return num_read @_check_socket_closed - def recvfrom_into( # pylint: disable=unused-argument + def recvfrom_into( self, buffer: bytearray, nbytes: int = 0, flags: int = 0 ) -> Tuple[int, Tuple[str, int]]: """ @@ -718,9 +691,7 @@ def _available(self) -> int: ) @_check_socket_closed - def setsockopt( # pylint: disable=no-self-use,unused-argument - self, level: int, opt: int, value: any - ) -> None: + def setsockopt(self, level: int, opt: int, value: any) -> None: """ Set a socket option. diff --git a/docs/api.rst b/docs/api.rst index 6c431a8..2bbf47a 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -4,6 +4,9 @@ .. If your library file(s) are nested in a directory (e.g. /adafruit_foo/foo.py) .. use this format as the module name: "adafruit_foo.foo" +API Reference +############# + .. automodule:: adafruit_wiznet5k.adafruit_wiznet5k :members: diff --git a/docs/conf.py b/docs/conf.py index df0a406..ac0906c 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,12 +1,10 @@ -# -*- coding: utf-8 -*- - # SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries # # SPDX-License-Identifier: MIT +import datetime import os import sys -import datetime sys.path.insert(0, os.path.abspath("..")) @@ -52,9 +50,7 @@ creation_year = "2020" current_year = str(datetime.datetime.now().year) year_duration = ( - current_year - if current_year == creation_year - else creation_year + " - " + current_year + current_year if current_year == creation_year else creation_year + " - " + current_year ) copyright = year_duration + " Brent Rubell" author = "Brent Rubell" diff --git a/examples/wiznet5k_aio_post.py b/examples/wiznet5k_aio_post.py index 96525bd..0398ac1 100644 --- a/examples/wiznet5k_aio_post.py +++ b/examples/wiznet5k_aio_post.py @@ -1,13 +1,15 @@ # SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries # SPDX-License-Identifier: MIT -from os import getenv import time +from os import getenv + +import adafruit_connection_manager +import adafruit_requests import board import busio from digitalio import DigitalInOut -import adafruit_connection_manager -import adafruit_requests + from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K # Get Adafruit IO keys, ensure these are setup in settings.toml diff --git a/examples/wiznet5k_cheerlights.py b/examples/wiznet5k_cheerlights.py index d4b21fa..cd017cb 100755 --- a/examples/wiznet5k_cheerlights.py +++ b/examples/wiznet5k_cheerlights.py @@ -2,15 +2,15 @@ # SPDX-License-Identifier: MIT import time -import board -import busio -from digitalio import DigitalInOut import adafruit_connection_manager +import adafruit_fancyled.adafruit_fancyled as fancy import adafruit_requests - +import board +import busio import neopixel -import adafruit_fancyled.adafruit_fancyled as fancy +from digitalio import DigitalInOut + from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K cs = DigitalInOut(board.D10) diff --git a/examples/wiznet5k_cpython_client_for_simpleserver.py b/examples/wiznet5k_cpython_client_for_simpleserver.py index ae89d8b..52a6668 100755 --- a/examples/wiznet5k_cpython_client_for_simpleserver.py +++ b/examples/wiznet5k_cpython_client_for_simpleserver.py @@ -7,6 +7,7 @@ This example client runs on CPython and connects to / sends data to the simpleserver example. """ + import socket import time diff --git a/examples/wiznet5k_httpserver.py b/examples/wiznet5k_httpserver.py index fcc40e4..2ae34d9 100644 --- a/examples/wiznet5k_httpserver.py +++ b/examples/wiznet5k_httpserver.py @@ -3,9 +3,10 @@ import board import digitalio -from adafruit_httpserver import Server, Request, Response -from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K +from adafruit_httpserver import Request, Response, Server + import adafruit_wiznet5k.adafruit_wiznet5k_socketpool as socketpool +from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K print("Wiznet5k HTTPServer Test") diff --git a/examples/wiznet5k_simpleserver.py b/examples/wiznet5k_simpleserver.py index 732a76c..dda32e1 100644 --- a/examples/wiznet5k_simpleserver.py +++ b/examples/wiznet5k_simpleserver.py @@ -6,8 +6,9 @@ import board import busio import digitalio -from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K + import adafruit_wiznet5k.adafruit_wiznet5k_socketpool as socketpool +from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K print("Wiznet5k SimpleServer Test") diff --git a/examples/wiznet5k_simpletest.py b/examples/wiznet5k_simpletest.py index 379ec17..db4980b 100644 --- a/examples/wiznet5k_simpletest.py +++ b/examples/wiznet5k_simpletest.py @@ -1,11 +1,12 @@ # SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries # SPDX-License-Identifier: MIT +import adafruit_connection_manager +import adafruit_requests import board import busio import digitalio -import adafruit_connection_manager -import adafruit_requests + from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K print("Wiznet5k WebClient Test") @@ -30,9 +31,7 @@ print("Chip Version:", eth.chip) print("MAC Address:", [hex(i) for i in eth.mac_address]) print("My IP address is:", eth.pretty_ip(eth.ip_address)) -print( - "IP lookup adafruit.com: %s" % eth.pretty_ip(eth.get_host_by_name("adafruit.com")) -) +print("IP lookup adafruit.com: %s" % eth.pretty_ip(eth.get_host_by_name("adafruit.com"))) # eth._debug = True diff --git a/examples/wiznet5k_simpletest_manual_network.py b/examples/wiznet5k_simpletest_manual_network.py index f90057f..fab53d5 100644 --- a/examples/wiznet5k_simpletest_manual_network.py +++ b/examples/wiznet5k_simpletest_manual_network.py @@ -1,11 +1,12 @@ # SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries # SPDX-License-Identifier: MIT +import adafruit_connection_manager +import adafruit_requests import board import busio import digitalio -import adafruit_connection_manager -import adafruit_requests + from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K TEXT_URL = "http://wifitest.adafruit.com/testwifi/index.html" @@ -35,9 +36,7 @@ print("Chip Version:", eth.chip) print("MAC Address:", [hex(i) for i in eth.mac_address]) print("My IP address is:", eth.pretty_ip(eth.ip_address)) -print( - "IP lookup adafruit.com: %s" % eth.pretty_ip(eth.get_host_by_name("adafruit.com")) -) +print("IP lookup adafruit.com: %s" % eth.pretty_ip(eth.get_host_by_name("adafruit.com"))) # eth._debug = True print("Fetching text from", TEXT_URL) diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 0000000..9541300 --- /dev/null +++ b/ruff.toml @@ -0,0 +1,111 @@ +# SPDX-FileCopyrightText: 2024 Tim Cocks for Adafruit Industries +# +# SPDX-License-Identifier: MIT + +target-version = "py38" +line-length = 100 + +[lint] +preview = true +select = ["I", "PL", "UP"] + +extend-select = [ + "D419", # empty-docstring + "E501", # line-too-long + "W291", # trailing-whitespace + "PLC0414", # useless-import-alias + "PLC2401", # non-ascii-name + "PLC2801", # unnecessary-dunder-call + "PLC3002", # unnecessary-direct-lambda-call + "E999", # syntax-error + "PLE0101", # return-in-init + "F706", # return-outside-function + "F704", # yield-outside-function + "PLE0116", # continue-in-finally + "PLE0117", # nonlocal-without-binding + "PLE0241", # duplicate-bases + "PLE0302", # unexpected-special-method-signature + "PLE0604", # invalid-all-object + "PLE0605", # invalid-all-format + "PLE0643", # potential-index-error + "PLE0704", # misplaced-bare-raise + "PLE1141", # dict-iter-missing-items + "PLE1142", # await-outside-async + "PLE1205", # logging-too-many-args + "PLE1206", # logging-too-few-args + "PLE1307", # bad-string-format-type + "PLE1310", # bad-str-strip-call + "PLE1507", # invalid-envvar-value + "PLE2502", # bidirectional-unicode + "PLE2510", # invalid-character-backspace + "PLE2512", # invalid-character-sub + "PLE2513", # invalid-character-esc + "PLE2514", # invalid-character-nul + "PLE2515", # invalid-character-zero-width-space + "PLR0124", # comparison-with-itself + "PLR0202", # no-classmethod-decorator + "PLR0203", # no-staticmethod-decorator + "UP004", # useless-object-inheritance + "PLR0206", # property-with-parameters + "PLR0904", # too-many-public-methods + "PLR0911", # too-many-return-statements + "PLR0912", # too-many-branches + "PLR0913", # too-many-arguments + "PLR0914", # too-many-locals + "PLR0915", # too-many-statements + "PLR0916", # too-many-boolean-expressions + "PLR1702", # too-many-nested-blocks + "PLR1704", # redefined-argument-from-local + "PLR1711", # useless-return + "C416", # unnecessary-comprehension + "PLR1733", # unnecessary-dict-index-lookup + "PLR1736", # unnecessary-list-index-lookup + + # ruff reports this rule is unstable + #"PLR6301", # no-self-use + + "PLW0108", # unnecessary-lambda + "PLW0120", # useless-else-on-loop + "PLW0127", # self-assigning-variable + "PLW0129", # assert-on-string-literal + "B033", # duplicate-value + "PLW0131", # named-expr-without-context + "PLW0245", # super-without-brackets + "PLW0406", # import-self + "PLW0602", # global-variable-not-assigned + "PLW0603", # global-statement + "PLW0604", # global-at-module-level + + # fails on the try: import typing used by libraries + #"F401", # unused-import + + "F841", # unused-variable + "E722", # bare-except + "PLW0711", # binary-op-exception + "PLW1501", # bad-open-mode + "PLW1508", # invalid-envvar-default + "PLW1509", # subprocess-popen-preexec-fn + "PLW2101", # useless-with-lock + "PLW3301", # nested-min-max +] + +ignore = [ + "PLR2004", # magic-value-comparison + "UP030", # format literals + "PLW1514", # unspecified-encoding + "PLR0913", # too-many-arguments + "PLR0915", # too-many-statements + "PLR0917", # too-many-positional-arguments + "PLR0904", # too-many-public-methods + "PLR0912", # too-many-branches + "PLR0916", # too-many-boolean-expressions + "PLR6301", # could-be-static no-self-use + "PLC0415", # import outside toplevel + "UP007", # type or instead of union + "UP006", # type built-in instead of typing class + "PLR1702", # too many nested blocks + "E501", # line too long +] + +[format] +line-ending = "lf"