diff --git a/poetry.lock b/poetry.lock index af9e4c6ed..ca50df015 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2857,4 +2857,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.10" -content-hash = "f2c411b5b151c44368a3417a9bc76d41225dfa08dd108ea2922f73f8bc72c667" +content-hash = "674172d5eedaef5c5d6a6e55e84480a517bbaf581342aa297b0e613263b0a82b" diff --git a/pyproject.toml b/pyproject.toml index 40fc2d7bf..349ee3a3e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,20 +32,20 @@ opentelemetry-exporter-otlp-proto-http = "1.21.0" optional = true [tool.poetry.group.format.dependencies] -ruff = "^0.12.4" +ruff = "^0.12.7" [tool.poetry.group.lint] optional = true [tool.poetry.group.lint.dependencies] -ruff = "^0.12.4" +ruff = "^0.12.7" codespell = "^2.4.1" [tool.poetry.group.unit.dependencies] pytest = "^8.4.1" pytest-xdist = "^3.8.0" pytest-cov = "^6.2.1" -ops-scenario = "^6.0.3, <6.0.4" # 6.0.4 requires ops >= 2.12 +ops-scenario = "^6.0.3, <6.0.4" # 6.0.4 requires ops >= 2.12 [tool.poetry.group.integration.dependencies] pytest = "^8.4.1" @@ -84,23 +84,47 @@ line-length = 99 [tool.ruff.lint] explicit-preview-rules = true -select = ["A", "E", "W", "F", "C", "N", "D", "I", "CPY001"] +select = [ + "A", + "E", + "W", + "F", + "C", + "N", + "D", + "I", + "B", + "CPY001", + "RUF", + "S", + "SIM", + "UP", + "TC", +] ignore = [ - # Missing docstring in public method (pydocstyle doesn't look for docstrings in super class - # https://github.com/PyCQA/pydocstyle/issues/309) TODO: add pylint check? https://github.com/PyCQA/pydocstyle/issues/309#issuecomment-1284142716 - "D102", - "D105", # Missing docstring in magic method - "D107", # Missing docstring in __init__ - "D403", # First word of the first line should be capitalized (false positive on "MySQL") - "D415", # Docstring first line punctuation (doesn't make sense for properties) - "E501", # Line too long (because using black creates errors with this) - "N818", # Exception name should be named with an Error suffix - "W505", # Doc line too long (so that strings in comments aren't split across lines) + # Missing docstring in public method (pydocstyle doesn't look for docstrings in super class + # https://github.com/PyCQA/pydocstyle/issues/309) TODO: add pylint check? https://github.com/PyCQA/pydocstyle/issues/309#issuecomment-1284142716 + "D102", + "D105", # Missing docstring in magic method + "D107", # Missing docstring in __init__ + "D403", # First word of the first line should be capitalized (false positive on "MySQL") + "D415", # Docstring first line punctuation (doesn't make sense for properties) + "E501", # Line too long (because using black creates errors with this) + "N818", # Exception name should be named with an Error suffix + "W505", # Doc line too long (so that strings in comments aren't split across lines) + "S101", ] [tool.ruff.lint.per-file-ignores] # D100, D101, D102, D103: Ignore missing docstrings in tests -"tests/*" = ["D1"] +"tests/*" = [ + "D1", + "D417", + # Asserts + "B011", + # Disable security checks for tests + "S", +] [tool.ruff.lint.flake8-copyright] # Check for properly formatted copyright header in each file diff --git a/src/abstract_charm.py b/src/abstract_charm.py index 16469a4ea..906c4c260 100644 --- a/src/abstract_charm.py +++ b/src/abstract_charm.py @@ -256,9 +256,8 @@ def _determine_unit_status(self, *, event) -> ops.StatusBase: if status := workload_.status: statuses.append(status) # only in machine charms - if self._ha_cluster: - if status := self._ha_cluster.get_unit_juju_status(): - statuses.append(status) + if self._ha_cluster and (status := self._ha_cluster.get_unit_juju_status()): + statuses.append(status) refresh_lower_priority = self.refresh.unit_status_lower_priority( workload_is_running=isinstance(workload_, workload.RunningWorkload) ) diff --git a/src/container.py b/src/container.py index e72864e66..794fd71f0 100644 --- a/src/container.py +++ b/src/container.py @@ -132,7 +132,9 @@ def mysql_router_exporter_service_enabled(self) -> bool: """MySQL Router exporter service status""" @abc.abstractmethod - def update_mysql_router_service(self, *, enabled: bool, tls: bool = None) -> None: + def update_mysql_router_service( + self, *, enabled: bool, tls: typing.Optional[bool] = None + ) -> None: """Update and restart MySQL Router service. Args: @@ -148,10 +150,10 @@ def update_mysql_router_exporter_service( *, enabled: bool, config: "relations.cos.ExporterConfig" = None, - tls: bool = None, - key_filename: str = None, - certificate_filename: str = None, - certificate_authority_filename: str = None, + tls: typing.Optional[bool] = None, + key_filename: typing.Optional[str] = None, + certificate_filename: typing.Optional[str] = None, + certificate_authority_filename: typing.Optional[str] = None, ) -> None: """Update and restart the MySQL Router exporter service. @@ -207,7 +209,7 @@ def _run_command( command: typing.List[str], *, timeout: typing.Optional[int], - input: str = None, # noqa: A002 Match subprocess.run() + input: typing.Optional[str] = None, # noqa: A002 Match subprocess.run() ) -> str: """Run command in container. @@ -216,7 +218,9 @@ def _run_command( """ # TODO python3.10 min version: Use `list` instead of `typing.List` - def run_mysql_router(self, args: typing.List[str], *, timeout: int = None) -> str: + def run_mysql_router( + self, args: typing.List[str], *, timeout: typing.Optional[int] = None + ) -> str: """Run MySQL Router command. Raises: @@ -230,8 +234,8 @@ def run_mysql_shell( self, args: typing.List[str], *, - timeout: int = None, - input: str = None, # noqa: A002 Match subprocess.run() + timeout: typing.Optional[int] = None, + input: typing.Optional[str] = None, # noqa: A002 Match subprocess.run() ) -> str: """Run MySQL Shell command. diff --git a/src/machine_workload.py b/src/machine_workload.py index 59ea234c5..67a432e66 100644 --- a/src/machine_workload.py +++ b/src/machine_workload.py @@ -28,7 +28,7 @@ def _get_bootstrap_command( if self._charm.is_externally_accessible(event=event): command.extend([ "--conf-bind-address", - "0.0.0.0", + "0.0.0.0", # noqa: S104 ]) else: command.extend([ diff --git a/src/mysql_shell/__init__.py b/src/mysql_shell/__init__.py index e5df61cb0..883a37628 100644 --- a/src/mysql_shell/__init__.py +++ b/src/mysql_shell/__init__.py @@ -62,11 +62,11 @@ def username(self): def _run_code(self, code: str) -> None: """Connect to MySQL cluster and run Python code.""" template = _jinja_env.get_template("try_except_wrapper.py.jinja") - error_file = self._container.path("/tmp/mysqlsh_error.json") + error_file = self._container.path("/tmp/mysqlsh_error.json") # noqa: S108 script = template.render(code=code, error_filepath=error_file.relative_to_container) - temporary_script_file = self._container.path("/tmp/mysqlsh_script.py") + temporary_script_file = self._container.path("/tmp/mysqlsh_script.py") # noqa: S108 temporary_script_file.write_text(script) try: @@ -100,7 +100,7 @@ def _run_code(self, code: str) -> None: except ShellDBError as e: if e.code == 2003: logger.exception(server_exceptions.ConnectionError_.MESSAGE) - raise server_exceptions.ConnectionError_ + raise server_exceptions.ConnectionError from None else: logger.exception( f"Failed to run MySQL Shell script:\n{script}\n\nMySQL client error {e.code}\nMySQL Shell traceback:\n{e.traceback_message}\n" @@ -114,7 +114,7 @@ def _run_sql(self, sql_statements: typing.List[str]) -> None: _jinja_env.get_template("run_sql.py.jinja").render(statements=sql_statements) ) - def _get_attributes(self, additional_attributes: dict = None) -> str: + def _get_attributes(self, additional_attributes: typing.Optional[dict] = None) -> str: """Attributes for (MySQL) users created by this charm If the relation with the MySQL charm is broken, the MySQL charm will use this attribute @@ -163,7 +163,7 @@ def get_mysql_router_user_for_unit( again. """ logger.debug(f"Getting MySQL Router user for {unit_name=}") - output_file = self._container.path("/tmp/mysqlsh_output.json") + output_file = self._container.path("/tmp/mysqlsh_output.json") # noqa: S108 self._run_code( _jinja_env.get_template("get_mysql_router_user_for_unit.py.jinja").render( username=self.username, @@ -210,7 +210,7 @@ def delete_user(self, username: str, *, must_exist=True) -> None: def is_router_in_cluster_set(self, router_id: str) -> bool: """Check if MySQL Router is part of InnoDB ClusterSet.""" logger.debug(f"Checking if {router_id=} in cluster set") - output_file = self._container.path("/tmp/mysqlsh_output.json") + output_file = self._container.path("/tmp/mysqlsh_output.json") # noqa: S108 self._run_code( _jinja_env.get_template("get_routers_in_cluster_set.py.jinja").render( output_filepath=output_file.relative_to_container @@ -226,7 +226,7 @@ def is_router_in_cluster_set(self, router_id: str) -> bool: _jinja_env = jinja2.Environment( - autoescape=False, + autoescape=False, # noqa: S701 trim_blocks=True, loader=jinja2.FileSystemLoader(pathlib.Path(__file__).parent / "templates"), undefined=jinja2.StrictUndefined, diff --git a/src/relations/cos.py b/src/relations/cos.py index 14f83c688..cc5e5d899 100644 --- a/src/relations/cos.py +++ b/src/relations/cos.py @@ -40,7 +40,7 @@ class COSRelation: _PEER_RELATION_NAME = "cos" MONITORING_USERNAME = "monitoring" - _MONITORING_PASSWORD_KEY = "monitoring-password" + _MONITORING_PASSWORD_KEY = "monitoring-password" # noqa: S105 _TRACING_PROTOCOL = "otlp_http" diff --git a/src/relations/database_provides.py b/src/relations/database_provides.py index b00507aa6..8ee0c3588 100644 --- a/src/relations/database_provides.py +++ b/src/relations/database_provides.py @@ -3,6 +3,7 @@ """Relation(s) to one or more application charms""" +import contextlib import logging import typing @@ -221,30 +222,24 @@ def __init__(self, charm_: "abstract_charm.MySQLRouterCharm") -> None: def _shared_users(self) -> typing.List[_RelationWithSharedUser]: shared_users = [] for relation in self._interface.relations: - try: + with contextlib.suppress(_UserNotShared): shared_users.append( _RelationWithSharedUser(relation=relation, interface=self._interface) ) - except _UserNotShared: - pass return shared_users def external_connectivity(self, event) -> bool: """Whether any of the relations are marked as external.""" requested_users = [] for relation in self._interface.relations: - try: + with contextlib.suppress( + _RelationBreaking, remote_databag.IncompleteDatabag, _UnsupportedExtraUserRole + ): requested_users.append( _RelationThatRequestedUser( relation=relation, interface=self._interface, event=event ) ) - except ( - _RelationBreaking, - remote_databag.IncompleteDatabag, - _UnsupportedExtraUserRole, - ): - pass return any(relation.external_connectivity for relation in requested_users) def update_endpoints( @@ -285,18 +280,14 @@ def reconcile_users( ) requested_users = [] for relation in self._interface.relations: - try: + with contextlib.suppress( + _RelationBreaking, remote_databag.IncompleteDatabag, _UnsupportedExtraUserRole + ): requested_users.append( _RelationThatRequestedUser( relation=relation, interface=self._interface, event=event ) ) - except ( - _RelationBreaking, - remote_databag.IncompleteDatabag, - _UnsupportedExtraUserRole, - ): - pass logger.debug(f"State of reconcile users {requested_users=}, {self._shared_users=}") for relation in requested_users: if relation not in self._shared_users: diff --git a/src/relations/database_requires.py b/src/relations/database_requires.py index 1e1b33791..49aeeb004 100644 --- a/src/relations/database_requires.py +++ b/src/relations/database_requires.py @@ -57,7 +57,7 @@ def __init__(self, *, host: str, port: str, username: str): self.host = host self.port = port self.username = username - self.password = "***" + self.password = "***" # noqa: S105 class CompleteConnectionInformation(ConnectionInformation): diff --git a/src/relations/deprecated_shared_db_database_provides.py b/src/relations/deprecated_shared_db_database_provides.py index bf4f86eaf..b14f3d1f0 100644 --- a/src/relations/deprecated_shared_db_database_provides.py +++ b/src/relations/deprecated_shared_db_database_provides.py @@ -6,6 +6,7 @@ Uses DEPRECATED "mysql-shared" relation interface """ +import contextlib import logging import typing @@ -13,10 +14,10 @@ import mysql_shell import relations.remote_databag as remote_databag -import status_exception if typing.TYPE_CHECKING: import abstract_charm + import status_exception class LogPrefix(logging.LoggerAdapter): @@ -231,7 +232,7 @@ def _update_unit_databag(self, _) -> None: logger.debug("Synchronizing unit databags") requested_users = [] for relation in self._relations: - try: + with contextlib.suppress(remote_databag.IncompleteDatabag): requested_users.append( _UnitThatNeedsUser( relation=relation, @@ -239,8 +240,6 @@ def _update_unit_databag(self, _) -> None: peer_relation_app_databag=self._peer_app_databag, ) ) - except remote_databag.IncompleteDatabag: - pass for relation in requested_users: if password := self._peer_app_databag.get(relation.peer_databag_password_key): relation.set_databag(password=password) @@ -253,15 +252,13 @@ def _update_unit_databag(self, _) -> None: def _shared_users(self) -> typing.List[_RelationWithSharedUser]: shared_users = [] for relation in self._relations: - try: + with contextlib.suppress(_UserNotShared): shared_users.append( _RelationWithSharedUser( relation=relation, peer_relation_app_databag=self._peer_app_databag, ) ) - except _UserNotShared: - pass return shared_users def reconcile_users( @@ -279,7 +276,7 @@ def reconcile_users( logger.debug(f"Reconciling users {event=}") requested_users = [] for relation in self._relations: - try: + with contextlib.suppress(_RelationBreaking, remote_databag.IncompleteDatabag): requested_users.append( _RelationThatRequestedUser( relation=relation, @@ -288,11 +285,6 @@ def reconcile_users( event=event, ) ) - except ( - _RelationBreaking, - remote_databag.IncompleteDatabag, - ): - pass logger.debug(f"State of reconcile users {requested_users=}, {self._shared_users=}") for relation in requested_users: if relation not in self._shared_users: diff --git a/src/relations/remote_databag.py b/src/relations/remote_databag.py index 6d6cb491e..ce67c8c00 100644 --- a/src/relations/remote_databag.py +++ b/src/relations/remote_databag.py @@ -45,4 +45,6 @@ def __getitem__(self, key): logger.debug( f"Required {key=} missing from databag for {self._app_name=} on {self._endpoint_name=}" ) - raise IncompleteDatabag(app_name=self._app_name, endpoint_name=self._endpoint_name) + raise IncompleteDatabag( + app_name=self._app_name, endpoint_name=self._endpoint_name + ) from None diff --git a/src/relations/secrets.py b/src/relations/secrets.py index e6d7b20d3..9af1e3996 100644 --- a/src/relations/secrets.py +++ b/src/relations/secrets.py @@ -21,16 +21,20 @@ class RelationSecrets: """MySQLRouter secrets on a specific peer relation""" - _SECRET_INTERNAL_LABEL = "internal-secret" - _SECRET_DELETED_LABEL = "None" + _SECRET_INTERNAL_LABEL = "internal-secret" # noqa: S105 + _SECRET_DELETED_LABEL = "None" # noqa: S105 def __init__( self, charm: "abstract_charm.MySQLRouterCharm", relation_name: str, - app_secret_fields: typing.List[str] = [], - unit_secret_fields: typing.List[str] = [], + app_secret_fields: typing.Optional[typing.List[str]] = None, + unit_secret_fields: typing.Optional[typing.List[str]] = None, ) -> None: + if unit_secret_fields is None: + unit_secret_fields = [] + if app_secret_fields is None: + app_secret_fields = [] self._charm = charm self._relation_name = relation_name diff --git a/src/snap.py b/src/snap.py index b4f50b537..7ba13fdc9 100644 --- a/src/snap.py +++ b/src/snap.py @@ -78,8 +78,8 @@ def __new__(cls, *args, **kwargs): "/var/log/mysqlrouter" ): parent = f"/var/snap/{snap_name}/common" - elif str(path).startswith("/tmp"): - parent = f"/tmp/snap-private-tmp/snap.{snap_name}" + elif str(path).startswith("/tmp"): # noqa: S108 + parent = f"/tmp/snap-private-tmp/snap.{snap_name}" # noqa: S108 else: parent = None if parent: @@ -163,7 +163,9 @@ def mysql_router_service_enabled(self) -> bool: def mysql_router_exporter_service_enabled(self) -> bool: return self._snap.services[self._EXPORTER_SERVICE_NAME]["active"] - def update_mysql_router_service(self, *, enabled: bool, tls: bool = None) -> None: + def update_mysql_router_service( + self, *, enabled: bool, tls: typing.Optional[bool] = None + ) -> None: super().update_mysql_router_service(enabled=enabled, tls=tls) if tls: @@ -186,10 +188,10 @@ def update_mysql_router_exporter_service( *, enabled: bool, config: "relations.cos.ExporterConfig" = None, - tls: bool = None, - key_filename: str = None, - certificate_filename: str = None, - certificate_authority_filename: str = None, + tls: typing.Optional[bool] = None, + key_filename: typing.Optional[str] = None, + certificate_filename: typing.Optional[str] = None, + certificate_authority_filename: typing.Optional[str] = None, ) -> None: super().update_mysql_router_exporter_service( enabled=enabled, @@ -250,7 +252,7 @@ def install( _raise_if_snap_installed_not_by_this_charm(unit=unit, model_uuid=model_uuid) return # Install snap - logger.info(f"Installing snap revision {repr(snap_revision)}") + logger.info(f"Installing snap revision {snap_revision!r}") unit.status = ops.MaintenanceStatus("Installing snap") def _set_retry_status(_) -> None: @@ -271,7 +273,7 @@ def _set_retry_status(_) -> None: self._snap.hold() self._installed_by_unit.write_text(unique_unit_name) logger.debug(f"Wrote {unique_unit_name=} to {self._installed_by_unit.name=}") - logger.info(f"Installed snap revision {repr(snap_revision)}") + logger.info(f"Installed snap revision {snap_revision!r}") def refresh( self, @@ -298,20 +300,20 @@ def refresh( if revision_before_refresh == snap_revision: raise ValueError(f"Cannot refresh snap; {snap_revision=} is already installed") - logger.info(f"Refreshing snap to revision {repr(snap_revision)}") + logger.info(f"Refreshing snap to revision {snap_revision!r}") unit.status = ops.MaintenanceStatus("Refreshing snap") try: self._snap.ensure(state=snap_lib.SnapState.Present, revision=snap_revision) - except (snap_lib.SnapError, snap_lib.SnapAPIError): + except (snap_lib.SnapError, snap_lib.SnapAPIError) as e: logger.exception("Snap refresh failed") if self._snap.revision == revision_before_refresh: - raise container.RefreshFailed + raise container.RefreshFailed from e else: refresh.update_snap_revision() raise else: refresh.update_snap_revision() - logger.info(f"Refreshed snap to revision {repr(snap_revision)}") + logger.info(f"Refreshed snap to revision {snap_revision!r}") # TODO python3.10 min version: Use `list` instead of `typing.List` def _run_command( @@ -319,10 +321,10 @@ def _run_command( command: typing.List[str], *, timeout: typing.Optional[int] = 30, - input: str = None, # noqa: A002 Match subprocess.run() + input: typing.Optional[str] = None, # noqa: A002 Match subprocess.run() ) -> str: try: - output = subprocess.run( + output = subprocess.run( # noqa: S603 command, input=input, capture_output=True, @@ -333,7 +335,7 @@ def _run_command( except subprocess.CalledProcessError as e: raise container.CalledProcessError( returncode=e.returncode, cmd=e.cmd, output=e.output, stderr=e.stderr - ) + ) from None return output def path(self, *args, **kwargs) -> _Path: diff --git a/src/workload.py b/src/workload.py index 6d55e2b39..cc47e5efb 100644 --- a/src/workload.py +++ b/src/workload.py @@ -182,9 +182,9 @@ def reconcile( tls: bool, unit_name: str, exporter_config: "relations.cos.ExporterConfig", - key: str = None, - certificate: str = None, - certificate_authority: str = None, + key: typing.Optional[str] = None, + certificate: typing.Optional[str] = None, + certificate_authority: typing.Optional[str] = None, ) -> None: """Reconcile all workloads (router, exporter, tls).""" if tls and not (key and certificate and certificate_authority): @@ -384,9 +384,9 @@ def reconcile( tls: bool, unit_name: str, exporter_config: "relations.cos.ExporterConfig", - key: str = None, - certificate: str = None, - certificate_authority: str = None, + key: typing.Optional[str] = None, + certificate: typing.Optional[str] = None, + certificate_authority: typing.Optional[str] = None, ) -> None: """Reconcile all workloads (router, exporter, tls).""" if tls and not (key and certificate and certificate_authority): @@ -493,10 +493,10 @@ def _wait_until_http_server_authenticates(self) -> None: wait=tenacity.wait_fixed(5), ): with attempt: - response = requests.get( + response = requests.get( # noqa: S113 f"https://127.0.0.1:{self._cos.HTTP_SERVER_PORT}/api/20190715/routes", auth=(self._cos.MONITORING_USERNAME, self._cos.get_monitoring_password()), - verify=False, # do not verify tls certs as default certs do not have 127.0.0.1 in its list of IP SANs + verify=False, # noqa: S501 do not verify tls certs as default certs do not have 127.0.0.1 in its list of IP SANs ) response.raise_for_status() if "bootstrap_rw" not in response.text: diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index d129db1b8..a8b8a0e70 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -290,7 +290,7 @@ async def get_primary_unit( for k, v in results["status"]["defaultreplicaset"]["topology"].items(): if v["memberrole"] == "primary": unit_name = f"{app_name}/{k.split('-')[-1]}" - primary_unit = [unit for unit in units if unit.name == unit_name][0] + primary_unit = next(unit for unit in units if unit.name == unit_name) break if not primary_unit: diff --git a/tests/integration/test_hacluster.py b/tests/integration/test_hacluster.py index 04f82872b..fbad235dd 100644 --- a/tests/integration/test_hacluster.py +++ b/tests/integration/test_hacluster.py @@ -68,7 +68,9 @@ async def ensure_database_accessible_from_vip( async def generate_next_available_ip( - ops_test: OpsTest, starting_ip: str, exclude_ips: list[str] = [] + ops_test: OpsTest, + starting_ip: str, + exclude_ips: list[str] = [], # noqa: B006 ) -> str: """Compute and return the next available IP in the model's subnet.""" all_ip_addresses = [ diff --git a/tests/integration/test_upgrade.py b/tests/integration/test_upgrade.py index 53a57ec0b..db01139ae 100644 --- a/tests/integration/test_upgrade.py +++ b/tests/integration/test_upgrade.py @@ -208,9 +208,10 @@ def create_valid_upgrade_charm(charm_file: typing.Union[str, pathlib.Path]) -> N Upgrades require a new snap revision to avoid no-oping. """ - with zipfile.ZipFile(charm_file, mode="r") as charm_zip: - with zipfile.Path(charm_zip, "refresh_versions.toml").open("rb") as file: - versions = tomli.load(file) + with zipfile.ZipFile(charm_file, mode="r") as charm_zip, zipfile.Path( + charm_zip, "refresh_versions.toml" + ).open("rb") as file: + versions = tomli.load(file) # charm needs to refresh snap to be able to avoid no-op when upgrading. # set an old revision of the snap @@ -224,9 +225,10 @@ def create_valid_upgrade_charm(charm_file: typing.Union[str, pathlib.Path]) -> N def create_invalid_upgrade_charm(charm_file: typing.Union[str, pathlib.Path]) -> None: """Create an invalid mysql router charm for upgrade.""" - with zipfile.ZipFile(charm_file, mode="r") as charm_zip: - with zipfile.Path(charm_zip, "refresh_versions.toml").open("rb") as file: - versions = tomli.load(file) + with zipfile.ZipFile(charm_file, mode="r") as charm_zip, zipfile.Path( + charm_zip, "refresh_versions.toml" + ).open("rb") as file: + versions = tomli.load(file) versions["charm"] = "8.0/0.0.0" diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 4b48db1d9..4f08ba4c5 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -3,6 +3,7 @@ import pathlib import platform +from typing import Optional import ops import pytest @@ -101,7 +102,7 @@ def unset(self, *_, **__): def hold(self, *_, **__): return - def start(self, services: list[str] = None, *_, **__): + def start(self, services: Optional[list[str]], *_, **__): for service in services: assert service in ("mysqlrouter-service", "mysqlrouter-exporter") @@ -110,7 +111,7 @@ def start(self, services: list[str] = None, *_, **__): if "mysqlrouter-exporter" in services: self.services["mysqlrouter-exporter"]["active"] = True - def stop(self, services: list[str] = None, *_, **__): + def stop(self, services: Optional[list[str]], *_, **__): for service in services: assert service in ("mysqlrouter-service", "mysqlrouter-exporter") @@ -119,7 +120,7 @@ def stop(self, services: list[str] = None, *_, **__): if "mysqlrouter-exporter" in services: self.services["mysqlrouter-exporter"]["active"] = False - def restart(self, services: list[str] = []): + def restart(self, services: list[str] = []): # noqa: B006 if "mysqlrouter-service" in services: self.services["mysqlrouter-service"]["active"] = True if "mysqlrouter-exporter" in services: diff --git a/tests/unit/scenario_/database_relations/test_database_relations.py b/tests/unit/scenario_/database_relations/test_database_relations.py index 49c3d4d1e..401d5aefd 100644 --- a/tests/unit/scenario_/database_relations/test_database_relations.py +++ b/tests/unit/scenario_/database_relations/test_database_relations.py @@ -114,7 +114,7 @@ def test_incomplete_requires(incomplete_requires, complete_provides_s): assert state.app_status == ops.WaitingStatus( f"Waiting for {incomplete_requires.remote_app_name} app on backend-database endpoint" ) - for index, provides in enumerate(complete_provides_s, 1): + for index, _ in enumerate(complete_provides_s, 1): assert state.relations[index].local_app_data == {} @@ -144,7 +144,7 @@ def test_complete_requires_and_provides_unsupported_extra_user_role( assert_complete_local_app_databag( local_app_data, state.secrets, complete_requires, provides, juju_has_secrets ) - for index, provides in enumerate( + for index, _ in enumerate( unsupported_extra_user_role_provides_s, 1 + len(complete_provides_s) ): assert state.relations[index].local_app_data == {} @@ -156,7 +156,7 @@ def test_incomplete_provides(complete_requires, incomplete_provides_s): assert state.app_status == ops.WaitingStatus( f"Waiting for {incomplete_provides_s[0].remote_app_name} app on database endpoint" ) - for index, provides in enumerate(incomplete_provides_s, 1): + for index, _ in enumerate(incomplete_provides_s, 1): assert state.relations[index].local_app_data == {} @@ -187,5 +187,5 @@ def test_complete_provides_and_incomplete_provides( assert_complete_local_app_databag( local_app_data, state.secrets, complete_requires, provides, juju_has_secrets ) - for index, provides in enumerate(incomplete_provides_s, 1 + len(complete_provides_s)): + for index, _ in enumerate(incomplete_provides_s, 1 + len(complete_provides_s)): assert state.relations[index].local_app_data == {} diff --git a/tests/unit/scenario_/database_relations/test_database_relations_breaking.py b/tests/unit/scenario_/database_relations/test_database_relations_breaking.py index 799cd53d0..4d6249531 100644 --- a/tests/unit/scenario_/database_relations/test_database_relations_breaking.py +++ b/tests/unit/scenario_/database_relations/test_database_relations_breaking.py @@ -66,7 +66,7 @@ def test_breaking_requires_and_complete_provides( event=complete_requires.broken_event, ) assert state.app_status == ops.BlockedStatus("Missing relation: backend-database") - for index, provides in enumerate(complete_provides_s_, 1): + for index, _ in enumerate(complete_provides_s_, 1): local_app_data = state.relations[index].local_app_data # TODO secrets cleanup: remove # (waiting on https://github.com/canonical/data-platform-libs/issues/118) @@ -128,7 +128,7 @@ def test_complete_requires_and_breaking_provides( # TODO secrets cleanup: test if secret deleted # (waiting on https://github.com/canonical/data-platform-libs/issues/118) complete_provides_s_.pop() - for index, provides in enumerate(complete_provides_s_, 1): + for index, _ in enumerate(complete_provides_s_, 1): relation = state.relations[index] if juju_has_secrets and "requested-secrets" in relation.remote_app_data: local_app_data = relation.local_app_data