From 116319e1e3d8788231b57ce095ed980878f27a96 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 7 Oct 2025 15:44:39 +0200 Subject: [PATCH 01/24] Formally deprecate CodeNotary build config --- supervisor/addons/validate.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/supervisor/addons/validate.py b/supervisor/addons/validate.py index c9703bef5d0..b14f0539a05 100644 --- a/supervisor/addons/validate.py +++ b/supervisor/addons/validate.py @@ -207,6 +207,12 @@ def _warn_addon_config(config: dict[str, Any]): name, ) + if ATTR_CODENOTARY in config: + _LOGGER.warning( + "Add-on '%s' uses deprecated 'codenotary' field in config. This field is no longer used and will be ignored. Please report this to the maintainer.", + name, + ) + return config From 4f78b19f5cad4e7ab4e3b2501bdef22f4a6a9833 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 7 Oct 2025 15:52:48 +0200 Subject: [PATCH 02/24] Remove CodeNotary specific integrity checking The current code is specific to how CodeNotary was doing integrity checking. A future integrity checking mechanism likely will work differently (e.g. through EROFS based containers). Remove the current code to make way for a future implementation. --- supervisor/addons/addon.py | 7 - supervisor/api/security.py | 14 +- supervisor/docker/addon.py | 10 -- supervisor/docker/homeassistant.py | 12 +- supervisor/docker/interface.py | 33 +---- supervisor/homeassistant/core.py | 7 - supervisor/plugins/base.py | 7 - .../resolution/evaluations/source_mods.py | 30 +--- supervisor/security/module.py | 106 +-------------- supervisor/supervisor.py | 24 ---- supervisor/updater.py | 21 +-- supervisor/utils/codenotary.py | 109 --------------- tests/resolution/test_evaluation.py | 12 +- tests/security/test_module.py | 127 ----------------- tests/utils/test_codenotary.py | 128 ------------------ 15 files changed, 14 insertions(+), 633 deletions(-) delete mode 100644 supervisor/utils/codenotary.py delete mode 100644 tests/security/test_module.py delete mode 100644 tests/utils/test_codenotary.py diff --git a/supervisor/addons/addon.py b/supervisor/addons/addon.py index c4895fd451a..f38cb53f66d 100644 --- a/supervisor/addons/addon.py +++ b/supervisor/addons/addon.py @@ -1513,13 +1513,6 @@ def _restore_data(): _LOGGER.info("Finished restore for add-on %s", self.slug) return wait_for_start - def check_trust(self) -> Awaitable[None]: - """Calculate Addon docker content trust. - - Return Coroutine. - """ - return self.instance.check_trust() - @Job( name="addon_restart_after_problem", throttle_period=WATCHDOG_THROTTLE_PERIOD, diff --git a/supervisor/api/security.py b/supervisor/api/security.py index ebafe62eee3..23699bfe34f 100644 --- a/supervisor/api/security.py +++ b/supervisor/api/security.py @@ -1,19 +1,14 @@ """Init file for Supervisor Security RESTful API.""" -import asyncio -import logging from typing import Any from aiohttp import web -import attr import voluptuous as vol from ..const import ATTR_CONTENT_TRUST, ATTR_FORCE_SECURITY, ATTR_PWNED from ..coresys import CoreSysAttributes from .utils import api_process, api_validate -_LOGGER: logging.Logger = logging.getLogger(__name__) - # pylint: disable=no-value-for-parameter SCHEMA_OPTIONS = vol.Schema( { @@ -54,6 +49,9 @@ async def options(self, request: web.Request) -> None: @api_process async def integrity_check(self, request: web.Request) -> dict[str, Any]: - """Run backend integrity check.""" - result = await asyncio.shield(self.sys_security.integrity_check()) - return attr.asdict(result) + """Run backend integrity check. + + CodeNotary integrity checking has been removed. This endpoint now returns + an error indicating the feature is currently non-functional. + """ + return {"error": "No integrity checking available"} diff --git a/supervisor/docker/addon.py b/supervisor/docker/addon.py index 01427948509..e50c8da74b9 100644 --- a/supervisor/docker/addon.py +++ b/supervisor/docker/addon.py @@ -846,16 +846,6 @@ async def stop(self, remove_container: bool = True) -> None: ): self.sys_resolution.dismiss_issue(self.addon.device_access_missing_issue) - async def _validate_trust(self, image_id: str) -> None: - """Validate trust of content.""" - if not self.addon.signed: - return - - checksum = image_id.partition(":")[2] - return await self.sys_security.verify_content( - cast(str, self.addon.codenotary), checksum - ) - @Job( name="docker_addon_hardware_events", conditions=[JobCondition.OS_AGENT], diff --git a/supervisor/docker/homeassistant.py b/supervisor/docker/homeassistant.py index b92559b5558..ba559ceed22 100644 --- a/supervisor/docker/homeassistant.py +++ b/supervisor/docker/homeassistant.py @@ -5,7 +5,7 @@ import logging import re -from awesomeversion import AwesomeVersion, AwesomeVersionCompareException +from awesomeversion import AwesomeVersion from docker.types import Mount from ..const import LABEL_MACHINE @@ -244,13 +244,3 @@ def is_initialize(self) -> Awaitable[bool]: self.image, self.sys_homeassistant.version, ) - - async def _validate_trust(self, image_id: str) -> None: - """Validate trust of content.""" - try: - if self.version in {None, LANDINGPAGE} or self.version < _VERIFY_TRUST: - return - except AwesomeVersionCompareException: - return - - await super()._validate_trust(image_id) diff --git a/supervisor/docker/interface.py b/supervisor/docker/interface.py index b6efddecbb3..4beac62379a 100644 --- a/supervisor/docker/interface.py +++ b/supervisor/docker/interface.py @@ -425,17 +425,7 @@ async def process_pull_image_log(reference: PullLogEntry) -> None: platform=MAP_ARCH[image_arch], ) - # Validate content - try: - await self._validate_trust(cast(str, docker_image.id)) - except CodeNotaryError: - with suppress(docker.errors.DockerException): - await self.sys_run_in_executor( - self.sys_docker.images.remove, - image=f"{image}:{version!s}", - force=True, - ) - raise + # CodeNotary content trust validation has been removed # Tag latest if latest: @@ -809,24 +799,3 @@ def run_inside(self, command: str) -> Awaitable[CommandReturn]: return self.sys_run_in_executor( self.sys_docker.container_run_inside, self.name, command ) - - async def _validate_trust(self, image_id: str) -> None: - """Validate trust of content.""" - checksum = image_id.partition(":")[2] - return await self.sys_security.verify_own_content(checksum) - - @Job( - name="docker_interface_check_trust", - on_condition=DockerJobError, - concurrency=JobConcurrency.GROUP_REJECT, - ) - async def check_trust(self) -> None: - """Check trust of exists Docker image.""" - try: - image = await self.sys_run_in_executor( - self.sys_docker.images.get, f"{self.image}:{self.version!s}" - ) - except (docker.errors.DockerException, requests.RequestException): - return - - await self._validate_trust(cast(str, image.id)) diff --git a/supervisor/homeassistant/core.py b/supervisor/homeassistant/core.py index 4eb7e336778..f648e849f6f 100644 --- a/supervisor/homeassistant/core.py +++ b/supervisor/homeassistant/core.py @@ -428,13 +428,6 @@ def logs(self) -> Awaitable[bytes]: """ return self.instance.logs() - def check_trust(self) -> Awaitable[None]: - """Calculate HomeAssistant docker content trust. - - Return Coroutine. - """ - return self.instance.check_trust() - async def stats(self) -> DockerStats: """Return stats of Home Assistant.""" try: diff --git a/supervisor/plugins/base.py b/supervisor/plugins/base.py index 4920b4d2e1c..002a61cbbe1 100644 --- a/supervisor/plugins/base.py +++ b/supervisor/plugins/base.py @@ -76,13 +76,6 @@ def in_progress(self) -> bool: """Return True if a task is in progress.""" return self.instance.in_progress - def check_trust(self) -> Awaitable[None]: - """Calculate plugin docker content trust. - - Return Coroutine. - """ - return self.instance.check_trust() - def logs(self) -> Awaitable[bytes]: """Get docker plugin logs. diff --git a/supervisor/resolution/evaluations/source_mods.py b/supervisor/resolution/evaluations/source_mods.py index 8bca674a6f8..680fe2d24c7 100644 --- a/supervisor/resolution/evaluations/source_mods.py +++ b/supervisor/resolution/evaluations/source_mods.py @@ -1,14 +1,11 @@ """Evaluation class for Content Trust.""" -import errno import logging from pathlib import Path from ...const import CoreState from ...coresys import CoreSys -from ...exceptions import CodeNotaryError, CodeNotaryUntrusted -from ...utils.codenotary import calc_checksum_path_sourcecode -from ..const import ContextType, IssueType, UnhealthyReason, UnsupportedReason +from ..const import UnsupportedReason from .base import EvaluateBase _SUPERVISOR_SOURCE = Path("/usr/src/supervisor/supervisor") @@ -44,29 +41,4 @@ async def evaluate(self) -> bool: _LOGGER.warning("Disabled content-trust, skipping evaluation") return False - # Calculate sume of the sourcecode - try: - checksum = await self.sys_run_in_executor( - calc_checksum_path_sourcecode, _SUPERVISOR_SOURCE - ) - except OSError as err: - if err.errno == errno.EBADMSG: - self.sys_resolution.add_unhealthy_reason( - UnhealthyReason.OSERROR_BAD_MESSAGE - ) - - self.sys_resolution.create_issue( - IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM - ) - _LOGGER.error("Can't calculate checksum of source code: %s", err) - return False - - # Validate checksum - try: - await self.sys_security.verify_own_content(checksum) - except CodeNotaryUntrusted: - return True - except CodeNotaryError: - pass - return False diff --git a/supervisor/security/module.py b/supervisor/security/module.py index f09a52dc02d..b73972c7b1a 100644 --- a/supervisor/security/module.py +++ b/supervisor/security/module.py @@ -11,20 +11,10 @@ FILE_HASSIO_SECURITY, ) from ..coresys import CoreSys, CoreSysAttributes -from ..exceptions import ( - CodeNotaryError, - CodeNotaryUntrusted, - PwnedError, - SecurityJobError, -) -from ..jobs.const import JobConcurrency -from ..jobs.decorator import Job, JobCondition -from ..resolution.const import ContextType, IssueType, SuggestionType -from ..utils.codenotary import cas_validate +from ..exceptions import PwnedError from ..utils.common import FileConfiguration from ..utils.pwned import check_pwned_password from ..validate import SCHEMA_SECURITY_CONFIG -from .const import ContentTrustResult, IntegrityResult _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -67,30 +57,6 @@ def pwned(self, value: bool) -> None: """Set pwned is enabled/disabled.""" self._data[ATTR_PWNED] = value - async def verify_content(self, signer: str, checksum: str) -> None: - """Verify content on CAS.""" - if not self.content_trust: - _LOGGER.warning("Disabled content-trust, skip validation") - return - - try: - await cas_validate(signer, checksum) - except CodeNotaryUntrusted: - raise - except CodeNotaryError: - if self.force: - raise - self.sys_resolution.create_issue( - IssueType.TRUST, - ContextType.SYSTEM, - suggestions=[SuggestionType.EXECUTE_INTEGRITY], - ) - return - - async def verify_own_content(self, checksum: str) -> None: - """Verify content from HA org.""" - return await self.verify_content("notary@home-assistant.io", checksum) - async def verify_secret(self, pwned_hash: str) -> None: """Verify pwned state of a secret.""" if not self.pwned: @@ -103,73 +69,3 @@ async def verify_secret(self, pwned_hash: str) -> None: if self.force: raise return - - @Job( - name="security_manager_integrity_check", - conditions=[JobCondition.INTERNET_SYSTEM], - on_condition=SecurityJobError, - concurrency=JobConcurrency.REJECT, - ) - async def integrity_check(self) -> IntegrityResult: - """Run a full system integrity check of the platform. - - We only allow to install trusted content. - This is a out of the band manual check. - """ - result: IntegrityResult = IntegrityResult() - if not self.content_trust: - _LOGGER.warning( - "Skipping integrity check, content_trust is globally disabled" - ) - return result - - # Supervisor - try: - await self.sys_supervisor.check_trust() - result.supervisor = ContentTrustResult.PASS - except CodeNotaryUntrusted: - result.supervisor = ContentTrustResult.ERROR - self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR) - except CodeNotaryError: - result.supervisor = ContentTrustResult.FAILED - - # Core - try: - await self.sys_homeassistant.core.check_trust() - result.core = ContentTrustResult.PASS - except CodeNotaryUntrusted: - result.core = ContentTrustResult.ERROR - self.sys_resolution.create_issue(IssueType.TRUST, ContextType.CORE) - except CodeNotaryError: - result.core = ContentTrustResult.FAILED - - # Plugins - for plugin in self.sys_plugins.all_plugins: - try: - await plugin.check_trust() - result.plugins[plugin.slug] = ContentTrustResult.PASS - except CodeNotaryUntrusted: - result.plugins[plugin.slug] = ContentTrustResult.ERROR - self.sys_resolution.create_issue( - IssueType.TRUST, ContextType.PLUGIN, reference=plugin.slug - ) - except CodeNotaryError: - result.plugins[plugin.slug] = ContentTrustResult.FAILED - - # Add-ons - for addon in self.sys_addons.installed: - if not addon.signed: - result.addons[addon.slug] = ContentTrustResult.UNTESTED - continue - try: - await addon.check_trust() - result.addons[addon.slug] = ContentTrustResult.PASS - except CodeNotaryUntrusted: - result.addons[addon.slug] = ContentTrustResult.ERROR - self.sys_resolution.create_issue( - IssueType.TRUST, ContextType.ADDON, reference=addon.slug - ) - except CodeNotaryError: - result.addons[addon.slug] = ContentTrustResult.FAILED - - return result diff --git a/supervisor/supervisor.py b/supervisor/supervisor.py index 8b638f3697b..c5c3c7a04bf 100644 --- a/supervisor/supervisor.py +++ b/supervisor/supervisor.py @@ -25,8 +25,6 @@ from .docker.stats import DockerStats from .docker.supervisor import DockerSupervisor from .exceptions import ( - CodeNotaryError, - CodeNotaryUntrusted, DockerError, HostAppArmorError, SupervisorAppArmorError, @@ -37,7 +35,6 @@ from .jobs.const import JobCondition, JobThrottle from .jobs.decorator import Job from .resolution.const import ContextType, IssueType, UnhealthyReason -from .utils.codenotary import calc_checksum from .utils.sentry import async_capture_exception _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -150,20 +147,6 @@ async def update_apparmor(self) -> None: _LOGGER.error, ) from err - # Validate - try: - await self.sys_security.verify_own_content(calc_checksum(data)) - except CodeNotaryUntrusted as err: - raise SupervisorAppArmorError( - "Content-Trust is broken for the AppArmor profile fetch!", - _LOGGER.critical, - ) from err - except CodeNotaryError as err: - raise SupervisorAppArmorError( - f"CodeNotary error while processing AppArmor fetch: {err!s}", - _LOGGER.error, - ) from err - # Load temp_dir: TemporaryDirectory | None = None @@ -273,13 +256,6 @@ def logs(self) -> Awaitable[bytes]: """ return self.instance.logs() - def check_trust(self) -> Awaitable[None]: - """Calculate Supervisor docker content trust. - - Return Coroutine. - """ - return self.instance.check_trust() - async def stats(self) -> DockerStats: """Return stats of Supervisor.""" try: diff --git a/supervisor/updater.py b/supervisor/updater.py index a4b837e46e8..17bffe849c3 100644 --- a/supervisor/updater.py +++ b/supervisor/updater.py @@ -31,14 +31,8 @@ UpdateChannel, ) from .coresys import CoreSys, CoreSysAttributes -from .exceptions import ( - CodeNotaryError, - CodeNotaryUntrusted, - UpdaterError, - UpdaterJobError, -) +from .exceptions import UpdaterError, UpdaterJobError from .jobs.decorator import Job, JobCondition -from .utils.codenotary import calc_checksum from .utils.common import FileConfiguration from .validate import SCHEMA_UPDATER_CONFIG @@ -289,19 +283,6 @@ async def fetch_data(self): self.sys_bus.remove_listener(self._connectivity_listener) self._connectivity_listener = None - # Validate - try: - await self.sys_security.verify_own_content(calc_checksum(data)) - except CodeNotaryUntrusted as err: - raise UpdaterError( - "Content-Trust is broken for the version file fetch!", _LOGGER.critical - ) from err - except CodeNotaryError as err: - raise UpdaterError( - f"CodeNotary error while processing version fetch: {err!s}", - _LOGGER.error, - ) from err - # Parse data try: data = json.loads(data) diff --git a/supervisor/utils/codenotary.py b/supervisor/utils/codenotary.py deleted file mode 100644 index 927d60acd6c..00000000000 --- a/supervisor/utils/codenotary.py +++ /dev/null @@ -1,109 +0,0 @@ -"""Small wrapper for CodeNotary.""" - -from __future__ import annotations - -import asyncio -import hashlib -import json -import logging -from pathlib import Path -import shlex -from typing import Final - -from dirhash import dirhash - -from ..exceptions import CodeNotaryBackendError, CodeNotaryError, CodeNotaryUntrusted -from . import clean_env - -_LOGGER: logging.Logger = logging.getLogger(__name__) - -_CAS_CMD: str = ( - "cas authenticate --signerID {signer} --silent --output json --hash {sum}" -) -_CACHE: set[tuple[str, str]] = set() - - -_ATTR_ERROR: Final = "error" -_ATTR_STATUS: Final = "status" -_FALLBACK_ERROR: Final = "Unknown CodeNotary backend issue" - - -def calc_checksum(data: str | bytes) -> str: - """Generate checksum for CodeNotary.""" - if isinstance(data, str): - return hashlib.sha256(data.encode()).hexdigest() - return hashlib.sha256(data).hexdigest() - - -def calc_checksum_path_sourcecode(folder: Path) -> str: - """Calculate checksum for a path source code. - - Need catch OSError. - """ - return dirhash(folder.as_posix(), "sha256", match=["*.py"]) - - -# pylint: disable=unreachable -async def cas_validate( - signer: str, - checksum: str, -) -> None: - """Validate data against CodeNotary.""" - return - if (checksum, signer) in _CACHE: - return - - # Generate command for request - command = shlex.split(_CAS_CMD.format(signer=signer, sum=checksum)) - - # Request notary authorization - _LOGGER.debug("Send cas command: %s", command) - try: - proc = await asyncio.create_subprocess_exec( - *command, - stdin=asyncio.subprocess.DEVNULL, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - env=clean_env(), - ) - - async with asyncio.timeout(15): - data, error = await proc.communicate() - except TimeoutError: - raise CodeNotaryBackendError( - "Timeout while processing CodeNotary", _LOGGER.warning - ) from None - except OSError as err: - raise CodeNotaryError( - f"CodeNotary fatal error: {err!s}", _LOGGER.critical - ) from err - - # Check if Notarized - if proc.returncode != 0 and not data: - if error: - try: - error = error.decode("utf-8") - except UnicodeDecodeError as err: - raise CodeNotaryBackendError(_FALLBACK_ERROR, _LOGGER.warning) from err - if "not notarized" in error: - raise CodeNotaryUntrusted() - else: - error = _FALLBACK_ERROR - raise CodeNotaryBackendError(error, _LOGGER.warning) - - # Parse data - try: - data_json = json.loads(data) - _LOGGER.debug("CodeNotary response with: %s", data_json) - except (json.JSONDecodeError, UnicodeDecodeError) as err: - raise CodeNotaryError( - f"Can't parse CodeNotary output: {data!s} - {err!s}", _LOGGER.error - ) from err - - if _ATTR_ERROR in data_json: - raise CodeNotaryBackendError(data_json[_ATTR_ERROR], _LOGGER.warning) - - if data_json[_ATTR_STATUS] == 0: - _CACHE.add((checksum, signer)) - else: - raise CodeNotaryUntrusted() diff --git a/tests/resolution/test_evaluation.py b/tests/resolution/test_evaluation.py index 7e0f0f23d03..d0967217797 100644 --- a/tests/resolution/test_evaluation.py +++ b/tests/resolution/test_evaluation.py @@ -1,21 +1,15 @@ """Test evaluations.""" -from unittest.mock import Mock, patch +from unittest.mock import Mock from supervisor.const import CoreState from supervisor.coresys import CoreSys -from supervisor.utils import check_exception_chain async def test_evaluate_system_error(coresys: CoreSys, capture_exception: Mock): """Test error while evaluating system.""" await coresys.core.set_state(CoreState.RUNNING) - with patch( - "supervisor.resolution.evaluations.source_mods.calc_checksum_path_sourcecode", - side_effect=RuntimeError, - ): - await coresys.resolution.evaluate.evaluate_system() + await coresys.resolution.evaluate.evaluate_system() - capture_exception.assert_called_once() - assert check_exception_chain(capture_exception.call_args[0][0], RuntimeError) + capture_exception.assert_not_called() diff --git a/tests/security/test_module.py b/tests/security/test_module.py deleted file mode 100644 index ece4860552c..00000000000 --- a/tests/security/test_module.py +++ /dev/null @@ -1,127 +0,0 @@ -"""Testing handling with Security.""" - -from unittest.mock import AsyncMock, patch - -import pytest - -from supervisor.coresys import CoreSys -from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted -from supervisor.security.const import ContentTrustResult - - -async def test_content_trust(coresys: CoreSys): - """Test Content-Trust.""" - - with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate: - await coresys.security.verify_content("test@mail.com", "ffffffffffffff") - assert cas_validate.called - cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff") - - with patch( - "supervisor.security.module.cas_validate", AsyncMock() - ) as cas_validate: - await coresys.security.verify_own_content("ffffffffffffff") - assert cas_validate.called - cas_validate.assert_called_once_with( - "notary@home-assistant.io", "ffffffffffffff" - ) - - -async def test_disabled_content_trust(coresys: CoreSys): - """Test Content-Trust.""" - coresys.security.content_trust = False - - with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate: - await coresys.security.verify_content("test@mail.com", "ffffffffffffff") - assert not cas_validate.called - - with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate: - await coresys.security.verify_own_content("ffffffffffffff") - assert not cas_validate.called - - -async def test_force_content_trust(coresys: CoreSys): - """Force Content-Trust tests.""" - - with patch( - "supervisor.security.module.cas_validate", - AsyncMock(side_effect=CodeNotaryError), - ) as cas_validate: - await coresys.security.verify_content("test@mail.com", "ffffffffffffff") - assert cas_validate.called - cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff") - - coresys.security.force = True - - with ( - patch( - "supervisor.security.module.cas_validate", - AsyncMock(side_effect=CodeNotaryError), - ) as cas_validate, - pytest.raises(CodeNotaryError), - ): - await coresys.security.verify_content("test@mail.com", "ffffffffffffff") - - -async def test_integrity_check_disabled(coresys: CoreSys): - """Test integrity check with disabled content trust.""" - coresys.security.content_trust = False - - result = await coresys.security.integrity_check.__wrapped__(coresys.security) - - assert result.core == ContentTrustResult.UNTESTED - assert result.supervisor == ContentTrustResult.UNTESTED - - -async def test_integrity_check(coresys: CoreSys, install_addon_ssh): - """Test integrity check with content trust.""" - coresys.homeassistant.core.check_trust = AsyncMock() - coresys.supervisor.check_trust = AsyncMock() - install_addon_ssh.check_trust = AsyncMock() - install_addon_ssh.data["codenotary"] = "test@example.com" - - result = await coresys.security.integrity_check.__wrapped__(coresys.security) - - assert result.core == ContentTrustResult.PASS - assert result.supervisor == ContentTrustResult.PASS - assert result.addons[install_addon_ssh.slug] == ContentTrustResult.PASS - - -async def test_integrity_check_error(coresys: CoreSys, install_addon_ssh): - """Test integrity check with content trust issues.""" - coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - install_addon_ssh.data["codenotary"] = "test@example.com" - - result = await coresys.security.integrity_check.__wrapped__(coresys.security) - - assert result.core == ContentTrustResult.ERROR - assert result.supervisor == ContentTrustResult.ERROR - assert result.addons[install_addon_ssh.slug] == ContentTrustResult.ERROR - - -async def test_integrity_check_failed(coresys: CoreSys, install_addon_ssh): - """Test integrity check with content trust failed.""" - coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryError) - coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError) - install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryError) - install_addon_ssh.data["codenotary"] = "test@example.com" - - result = await coresys.security.integrity_check.__wrapped__(coresys.security) - - assert result.core == ContentTrustResult.FAILED - assert result.supervisor == ContentTrustResult.FAILED - assert result.addons[install_addon_ssh.slug] == ContentTrustResult.FAILED - - -async def test_integrity_check_addon(coresys: CoreSys, install_addon_ssh): - """Test integrity check with content trust but no signed add-ons.""" - coresys.homeassistant.core.check_trust = AsyncMock() - coresys.supervisor.check_trust = AsyncMock() - - result = await coresys.security.integrity_check.__wrapped__(coresys.security) - - assert result.core == ContentTrustResult.PASS - assert result.supervisor == ContentTrustResult.PASS - assert result.addons[install_addon_ssh.slug] == ContentTrustResult.UNTESTED diff --git a/tests/utils/test_codenotary.py b/tests/utils/test_codenotary.py deleted file mode 100644 index fa1b6b5a72a..00000000000 --- a/tests/utils/test_codenotary.py +++ /dev/null @@ -1,128 +0,0 @@ -"""Test CodeNotary.""" - -from __future__ import annotations - -from dataclasses import dataclass -from unittest.mock import AsyncMock, Mock, patch - -import pytest - -from supervisor.exceptions import ( - CodeNotaryBackendError, - CodeNotaryError, - CodeNotaryUntrusted, -) -from supervisor.utils.codenotary import calc_checksum, cas_validate - -pytest.skip("code notary has been disabled due to issues", allow_module_level=True) - - -@dataclass -class SubprocessResponse: - """Class for specifying subprocess exec response.""" - - returncode: int = 0 - data: str = "" - error: str | None = None - exception: Exception | None = None - - -@pytest.fixture(name="subprocess_exec") -def fixture_subprocess_exec(request): - """Mock subprocess exec with specific return.""" - response = request.param - if response.exception: - communicate_return = AsyncMock(side_effect=response.exception) - else: - communicate_return = AsyncMock(return_value=(response.data, response.error)) - - exec_return = Mock(returncode=response.returncode, communicate=communicate_return) - - with patch( - "supervisor.utils.codenotary.asyncio.create_subprocess_exec", - return_value=exec_return, - ) as subprocess_exec: - yield subprocess_exec - - -def test_checksum_calc(): - """Calc Checkusm as test.""" - assert calc_checksum("test") == calc_checksum(b"test") - assert ( - calc_checksum("test") - == "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08" - ) - - -async def test_valid_checksum(): - """Test a valid autorization.""" - await cas_validate( - "notary@home-assistant.io", - "4434a33ff9c695e870bc5bbe04230ea3361ecf4c129eb06133dd1373975a43f0", - ) - - -async def test_invalid_checksum(): - """Test a invalid autorization.""" - with pytest.raises(CodeNotaryUntrusted): - await cas_validate( - "notary@home-assistant.io", - "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", - ) - - -@pytest.mark.parametrize( - "subprocess_exec", - [SubprocessResponse(returncode=1, error=b"x is not notarized")], -) -async def test_not_notarized_error(subprocess_exec): - """Test received a not notarized error response from command.""" - with pytest.raises(CodeNotaryUntrusted): - await cas_validate( - "notary@home-assistant.io", - "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", - ) - - -@pytest.mark.parametrize( - "subprocess_exec", - [ - SubprocessResponse(returncode=1, error=b"test"), - SubprocessResponse(returncode=0, data='{"error":"asn1: structure error"}'), - SubprocessResponse(returncode=1, error="test".encode("utf-16")), - ], - indirect=True, -) -async def test_cas_backend_error(subprocess_exec): - """Test backend error executing cas command.""" - with pytest.raises(CodeNotaryBackendError): - await cas_validate( - "notary@home-assistant.io", - "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", - ) - - -@pytest.mark.parametrize( - "subprocess_exec", - [SubprocessResponse(returncode=0, data='{"status":1}')], - indirect=True, -) -async def test_cas_notarized_untrusted(subprocess_exec): - """Test cas found notarized but untrusted content.""" - with pytest.raises(CodeNotaryUntrusted): - await cas_validate( - "notary@home-assistant.io", - "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", - ) - - -@pytest.mark.parametrize( - "subprocess_exec", [SubprocessResponse(exception=OSError())], indirect=True -) -async def test_cas_exec_os_error(subprocess_exec): - """Test os error attempting to execute cas command.""" - with pytest.raises(CodeNotaryError): - await cas_validate( - "notary@home-assistant.io", - "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", - ) From 789053f2f0d8cf89035a2eb6a7e59b6d359a25d7 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 7 Oct 2025 16:40:44 +0200 Subject: [PATCH 03/24] Drop CodeNotary integrity fixups --- .../resolution/checks/supervisor_trust.py | 59 ---------------- .../fixups/system_execute_integrity.py | 67 ------------------- 2 files changed, 126 deletions(-) delete mode 100644 supervisor/resolution/checks/supervisor_trust.py delete mode 100644 supervisor/resolution/fixups/system_execute_integrity.py diff --git a/supervisor/resolution/checks/supervisor_trust.py b/supervisor/resolution/checks/supervisor_trust.py deleted file mode 100644 index b4be4329c8a..00000000000 --- a/supervisor/resolution/checks/supervisor_trust.py +++ /dev/null @@ -1,59 +0,0 @@ -"""Helpers to check supervisor trust.""" - -import logging - -from ...const import CoreState -from ...coresys import CoreSys -from ...exceptions import CodeNotaryError, CodeNotaryUntrusted -from ..const import ContextType, IssueType, UnhealthyReason -from .base import CheckBase - -_LOGGER: logging.Logger = logging.getLogger(__name__) - - -def setup(coresys: CoreSys) -> CheckBase: - """Check setup function.""" - return CheckSupervisorTrust(coresys) - - -class CheckSupervisorTrust(CheckBase): - """CheckSystemTrust class for check.""" - - async def run_check(self) -> None: - """Run check if not affected by issue.""" - if not self.sys_security.content_trust: - _LOGGER.warning( - "Skipping %s, content_trust is globally disabled", self.slug - ) - return - - try: - await self.sys_supervisor.check_trust() - except CodeNotaryUntrusted: - self.sys_resolution.add_unhealthy_reason(UnhealthyReason.UNTRUSTED) - self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR) - except CodeNotaryError: - pass - - async def approve_check(self, reference: str | None = None) -> bool: - """Approve check if it is affected by issue.""" - try: - await self.sys_supervisor.check_trust() - except CodeNotaryError: - return True - return False - - @property - def issue(self) -> IssueType: - """Return a IssueType enum.""" - return IssueType.TRUST - - @property - def context(self) -> ContextType: - """Return a ContextType enum.""" - return ContextType.SUPERVISOR - - @property - def states(self) -> list[CoreState]: - """Return a list of valid states when this check can run.""" - return [CoreState.RUNNING, CoreState.STARTUP] diff --git a/supervisor/resolution/fixups/system_execute_integrity.py b/supervisor/resolution/fixups/system_execute_integrity.py deleted file mode 100644 index a908b09b08d..00000000000 --- a/supervisor/resolution/fixups/system_execute_integrity.py +++ /dev/null @@ -1,67 +0,0 @@ -"""Helpers to check and fix issues with free space.""" - -from datetime import timedelta -import logging - -from ...coresys import CoreSys -from ...exceptions import ResolutionFixupError, ResolutionFixupJobError -from ...jobs.const import JobCondition, JobThrottle -from ...jobs.decorator import Job -from ...security.const import ContentTrustResult -from ..const import ContextType, IssueType, SuggestionType -from .base import FixupBase - -_LOGGER: logging.Logger = logging.getLogger(__name__) - - -def setup(coresys: CoreSys) -> FixupBase: - """Check setup function.""" - return FixupSystemExecuteIntegrity(coresys) - - -class FixupSystemExecuteIntegrity(FixupBase): - """Storage class for fixup.""" - - @Job( - name="fixup_system_execute_integrity_process", - conditions=[JobCondition.INTERNET_SYSTEM], - on_condition=ResolutionFixupJobError, - throttle_period=timedelta(hours=8), - throttle=JobThrottle.THROTTLE, - ) - async def process_fixup(self, reference: str | None = None) -> None: - """Initialize the fixup class.""" - result = await self.sys_security.integrity_check() - - if ContentTrustResult.FAILED in (result.core, result.supervisor): - raise ResolutionFixupError() - - for plugin in result.plugins: - if plugin != ContentTrustResult.FAILED: - continue - raise ResolutionFixupError() - - for addon in result.addons: - if addon != ContentTrustResult.FAILED: - continue - raise ResolutionFixupError() - - @property - def suggestion(self) -> SuggestionType: - """Return a SuggestionType enum.""" - return SuggestionType.EXECUTE_INTEGRITY - - @property - def context(self) -> ContextType: - """Return a ContextType enum.""" - return ContextType.SYSTEM - - @property - def issues(self) -> list[IssueType]: - """Return a IssueType enum list.""" - return [IssueType.TRUST] - - @property - def auto(self) -> bool: - """Return if a fixup can be apply as auto fix.""" - return True From 2e808a7555b803449d94608162646e9e03d79c0f Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Wed, 8 Oct 2025 10:35:21 +0200 Subject: [PATCH 04/24] Drop unused tests --- .../check/test_check_supervisor_trust.py | 96 ------------------- .../fixup/test_system_execute_integrity.py | 69 ------------- 2 files changed, 165 deletions(-) delete mode 100644 tests/resolution/check/test_check_supervisor_trust.py delete mode 100644 tests/resolution/fixup/test_system_execute_integrity.py diff --git a/tests/resolution/check/test_check_supervisor_trust.py b/tests/resolution/check/test_check_supervisor_trust.py deleted file mode 100644 index 54410cef151..00000000000 --- a/tests/resolution/check/test_check_supervisor_trust.py +++ /dev/null @@ -1,96 +0,0 @@ -"""Test Check Supervisor trust.""" - -# pylint: disable=import-error,protected-access -from unittest.mock import AsyncMock, patch - -from supervisor.const import CoreState -from supervisor.coresys import CoreSys -from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted -from supervisor.resolution.checks.supervisor_trust import CheckSupervisorTrust -from supervisor.resolution.const import IssueType, UnhealthyReason - - -async def test_base(coresys: CoreSys): - """Test check basics.""" - supervisor_trust = CheckSupervisorTrust(coresys) - assert supervisor_trust.slug == "supervisor_trust" - assert supervisor_trust.enabled - - -async def test_check(coresys: CoreSys): - """Test check.""" - supervisor_trust = CheckSupervisorTrust(coresys) - await coresys.core.set_state(CoreState.RUNNING) - - assert len(coresys.resolution.issues) == 0 - - coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError) - await supervisor_trust.run_check() - assert coresys.supervisor.check_trust.called - - coresys.supervisor.check_trust = AsyncMock(return_value=None) - await supervisor_trust.run_check() - assert coresys.supervisor.check_trust.called - - assert len(coresys.resolution.issues) == 0 - - coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - await supervisor_trust.run_check() - assert coresys.supervisor.check_trust.called - - assert len(coresys.resolution.issues) == 1 - assert coresys.resolution.issues[-1].type == IssueType.TRUST - - assert UnhealthyReason.UNTRUSTED in coresys.resolution.unhealthy - - -async def test_approve(coresys: CoreSys): - """Test check.""" - supervisor_trust = CheckSupervisorTrust(coresys) - await coresys.core.set_state(CoreState.RUNNING) - - coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - assert await supervisor_trust.approve_check() - - coresys.supervisor.check_trust = AsyncMock(return_value=None) - assert not await supervisor_trust.approve_check() - - -async def test_with_global_disable(coresys: CoreSys, caplog): - """Test when pwned is globally disabled.""" - coresys.security.content_trust = False - supervisor_trust = CheckSupervisorTrust(coresys) - await coresys.core.set_state(CoreState.RUNNING) - - assert len(coresys.resolution.issues) == 0 - coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted) - await supervisor_trust.run_check() - assert not coresys.security.verify_own_content.called - assert ( - "Skipping supervisor_trust, content_trust is globally disabled" in caplog.text - ) - - -async def test_did_run(coresys: CoreSys): - """Test that the check ran as expected.""" - supervisor_trust = CheckSupervisorTrust(coresys) - should_run = supervisor_trust.states - should_not_run = [state for state in CoreState if state not in should_run] - assert len(should_run) != 0 - assert len(should_not_run) != 0 - - with patch( - "supervisor.resolution.checks.supervisor_trust.CheckSupervisorTrust.run_check", - return_value=None, - ) as check: - for state in should_run: - await coresys.core.set_state(state) - await supervisor_trust() - check.assert_called_once() - check.reset_mock() - - for state in should_not_run: - await coresys.core.set_state(state) - await supervisor_trust() - check.assert_not_called() - check.reset_mock() diff --git a/tests/resolution/fixup/test_system_execute_integrity.py b/tests/resolution/fixup/test_system_execute_integrity.py deleted file mode 100644 index 766345c6c1f..00000000000 --- a/tests/resolution/fixup/test_system_execute_integrity.py +++ /dev/null @@ -1,69 +0,0 @@ -"""Test evaluation base.""" - -# pylint: disable=import-error,protected-access -from datetime import timedelta -from unittest.mock import AsyncMock - -import time_machine - -from supervisor.coresys import CoreSys -from supervisor.resolution.const import ContextType, IssueType, SuggestionType -from supervisor.resolution.data import Issue, Suggestion -from supervisor.resolution.fixups.system_execute_integrity import ( - FixupSystemExecuteIntegrity, -) -from supervisor.security.const import ContentTrustResult, IntegrityResult -from supervisor.utils.dt import utcnow - - -async def test_fixup(coresys: CoreSys, supervisor_internet: AsyncMock): - """Test fixup.""" - system_execute_integrity = FixupSystemExecuteIntegrity(coresys) - - assert system_execute_integrity.auto - - coresys.resolution.add_suggestion( - Suggestion(SuggestionType.EXECUTE_INTEGRITY, ContextType.SYSTEM) - ) - coresys.resolution.add_issue(Issue(IssueType.TRUST, ContextType.SYSTEM)) - - coresys.security.integrity_check = AsyncMock( - return_value=IntegrityResult( - ContentTrustResult.PASS, - ContentTrustResult.PASS, - {"audio": ContentTrustResult.PASS}, - ) - ) - - await system_execute_integrity() - - assert coresys.security.integrity_check.called - assert len(coresys.resolution.suggestions) == 0 - assert len(coresys.resolution.issues) == 0 - - -async def test_fixup_error(coresys: CoreSys, supervisor_internet: AsyncMock): - """Test fixup.""" - system_execute_integrity = FixupSystemExecuteIntegrity(coresys) - - assert system_execute_integrity.auto - - coresys.resolution.add_suggestion( - Suggestion(SuggestionType.EXECUTE_INTEGRITY, ContextType.SYSTEM) - ) - coresys.resolution.add_issue(Issue(IssueType.TRUST, ContextType.SYSTEM)) - - coresys.security.integrity_check = AsyncMock( - return_value=IntegrityResult( - ContentTrustResult.FAILED, - ContentTrustResult.PASS, - {"audio": ContentTrustResult.PASS}, - ) - ) - - with time_machine.travel(utcnow() + timedelta(hours=24)): - await system_execute_integrity() - - assert coresys.security.integrity_check.called - assert len(coresys.resolution.suggestions) == 1 - assert len(coresys.resolution.issues) == 1 From 0030b037372bdcca7cfc3e43bcf6dafd6935b6b5 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Wed, 8 Oct 2025 15:23:27 +0200 Subject: [PATCH 05/24] Fix pytest --- tests/api/test_security.py | 9 ++- tests/docker/test_interface.py | 9 --- tests/plugins/test_plugin_base.py | 7 +-- .../evaluation/test_evaluate_source_mods.py | 58 ++++--------------- tests/test_updater.py | 7 +-- 5 files changed, 21 insertions(+), 69 deletions(-) diff --git a/tests/api/test_security.py b/tests/api/test_security.py index 2ef5d5313df..18531e7abe5 100644 --- a/tests/api/test_security.py +++ b/tests/api/test_security.py @@ -41,11 +41,10 @@ async def test_api_security_options_pwned(api_client, coresys: CoreSys): async def test_api_integrity_check( api_client, coresys: CoreSys, supervisor_internet: AsyncMock ): - """Test security integrity check.""" - coresys.security.content_trust = False - + """Test security integrity check - now deprecated.""" resp = await api_client.post("/security/integrity") result = await resp.json() - assert result["data"]["core"] == "untested" - assert result["data"]["supervisor"] == "untested" + # CodeNotary integrity check has been removed + assert "error" in result["data"] + assert "deprecated" in result["data"]["error"].lower() diff --git a/tests/docker/test_interface.py b/tests/docker/test_interface.py index 321d1db0f49..8bdee78bb54 100644 --- a/tests/docker/test_interface.py +++ b/tests/docker/test_interface.py @@ -31,15 +31,6 @@ from tests.common import load_json_fixture -@pytest.fixture(autouse=True) -def mock_verify_content(coresys: CoreSys): - """Mock verify_content utility during tests.""" - with patch.object( - coresys.security, "verify_content", return_value=None - ) as verify_content: - yield verify_content - - @pytest.mark.parametrize( "cpu_arch, platform", [ diff --git a/tests/plugins/test_plugin_base.py b/tests/plugins/test_plugin_base.py index c47144a5b57..0e1ba0b5b1c 100644 --- a/tests/plugins/test_plugin_base.py +++ b/tests/plugins/test_plugin_base.py @@ -17,7 +17,6 @@ AudioJobError, CliError, CliJobError, - CodeNotaryUntrusted, CoreDNSError, CoreDNSJobError, DockerError, @@ -337,14 +336,12 @@ async def test_repair_failed( patch.object( DockerInterface, "arch", new=PropertyMock(return_value=CpuArch.AMD64) ), - patch( - "supervisor.security.module.cas_validate", side_effect=CodeNotaryUntrusted - ), + patch.object(DockerInterface, "install", side_effect=DockerError), ): await plugin.repair() capture_exception.assert_called_once() - assert check_exception_chain(capture_exception.call_args[0][0], CodeNotaryUntrusted) + assert check_exception_chain(capture_exception.call_args[0][0], DockerError) @pytest.mark.parametrize( diff --git a/tests/resolution/evaluation/test_evaluate_source_mods.py b/tests/resolution/evaluation/test_evaluate_source_mods.py index 084b56baf84..37dbe1c2ff6 100644 --- a/tests/resolution/evaluation/test_evaluate_source_mods.py +++ b/tests/resolution/evaluation/test_evaluate_source_mods.py @@ -1,40 +1,22 @@ """Test evaluation base.""" # pylint: disable=import-error,protected-access -import errno -import os -from pathlib import Path -from unittest.mock import AsyncMock, patch +from unittest.mock import patch from supervisor.const import CoreState from supervisor.coresys import CoreSys -from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted -from supervisor.resolution.const import ContextType, IssueType -from supervisor.resolution.data import Issue from supervisor.resolution.evaluations.source_mods import EvaluateSourceMods async def test_evaluation(coresys: CoreSys): - """Test evaluation.""" - with patch( - "supervisor.resolution.evaluations.source_mods._SUPERVISOR_SOURCE", - Path(f"{os.getcwd()}/supervisor"), - ): - sourcemods = EvaluateSourceMods(coresys) - await coresys.core.set_state(CoreState.RUNNING) - - assert sourcemods.reason not in coresys.resolution.unsupported - coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted) - await sourcemods() - assert sourcemods.reason in coresys.resolution.unsupported - - coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryError) - await sourcemods() - assert sourcemods.reason not in coresys.resolution.unsupported + """Test evaluation - CodeNotary removed.""" + sourcemods = EvaluateSourceMods(coresys) + await coresys.core.set_state(CoreState.RUNNING) - coresys.security.verify_own_content = AsyncMock() - await sourcemods() - assert sourcemods.reason not in coresys.resolution.unsupported + # CodeNotary checking removed, evaluation always returns False now + assert sourcemods.reason not in coresys.resolution.unsupported + await sourcemods() + assert sourcemods.reason not in coresys.resolution.unsupported async def test_did_run(coresys: CoreSys): @@ -63,27 +45,11 @@ async def test_did_run(coresys: CoreSys): async def test_evaluation_error(coresys: CoreSys): - """Test error reading file during evaluation.""" + """Test error reading file during evaluation - CodeNotary removed.""" sourcemods = EvaluateSourceMods(coresys) await coresys.core.set_state(CoreState.RUNNING) - corrupt_fs = Issue(IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM) + # CodeNotary checking removed, evaluation always returns False now + assert sourcemods.reason not in coresys.resolution.unsupported + await sourcemods() assert sourcemods.reason not in coresys.resolution.unsupported - assert corrupt_fs not in coresys.resolution.issues - - with patch( - "supervisor.utils.codenotary.dirhash", - side_effect=(err := OSError()), - ): - err.errno = errno.EBUSY - await sourcemods() - assert sourcemods.reason not in coresys.resolution.unsupported - assert corrupt_fs in coresys.resolution.issues - assert coresys.core.healthy is True - - coresys.resolution.dismiss_issue(corrupt_fs) - err.errno = errno.EBADMSG - await sourcemods() - assert sourcemods.reason not in coresys.resolution.unsupported - assert corrupt_fs in coresys.resolution.issues - assert coresys.core.healthy is False diff --git a/tests/test_updater.py b/tests/test_updater.py index b1cf21bb79f..f2451f2d441 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -86,10 +86,10 @@ async def test_os_update_path( """Test OS upgrade path across major versions.""" coresys.os._board = "rpi4" # pylint: disable=protected-access coresys.os._version = AwesomeVersion(version) # pylint: disable=protected-access - with patch.object(type(coresys.security), "verify_own_content"): - await coresys.updater.fetch_data() + # CodeNotary verification removed + await coresys.updater.fetch_data() - assert coresys.updater.version_hassos == AwesomeVersion(expected) + assert coresys.updater.version_hassos == AwesomeVersion(expected) @pytest.mark.usefixtures("no_job_throttle") @@ -105,7 +105,6 @@ async def test_delayed_fetch_for_connectivity( load_binary_fixture("version_stable.json") ) coresys.websession.head = AsyncMock() - coresys.security.verify_own_content = AsyncMock() # Network connectivity change causes a series of async tasks to eventually do a version fetch # Rather then use some kind of sleep loop, set up listener for start of fetch data job From bf3ff95f66e3d84651f67186d8979fe6cf2f3f32 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Fri, 10 Oct 2025 08:45:38 +0200 Subject: [PATCH 06/24] Fix pytest --- tests/api/test_security.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/api/test_security.py b/tests/api/test_security.py index 18531e7abe5..794ef4b2042 100644 --- a/tests/api/test_security.py +++ b/tests/api/test_security.py @@ -47,4 +47,3 @@ async def test_api_integrity_check( # CodeNotary integrity check has been removed assert "error" in result["data"] - assert "deprecated" in result["data"]["error"].lower() From c80a69b00013c2b1f0f5fb858f74b77b1068893a Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Thu, 23 Oct 2025 16:22:04 +0200 Subject: [PATCH 07/24] Remove CodeNotary related exceptions and handling Remove CodeNotary related exceptions and handling from the Docker interface. --- supervisor/docker/interface.py | 13 ------------- supervisor/exceptions.py | 15 --------------- 2 files changed, 28 deletions(-) diff --git a/supervisor/docker/interface.py b/supervisor/docker/interface.py index 4beac62379a..df75a2b44e1 100644 --- a/supervisor/docker/interface.py +++ b/supervisor/docker/interface.py @@ -31,15 +31,12 @@ ) from ..coresys import CoreSys from ..exceptions import ( - CodeNotaryError, - CodeNotaryUntrusted, DockerAPIError, DockerError, DockerJobError, DockerLogOutOfOrder, DockerNotFound, DockerRequestError, - DockerTrustError, ) from ..jobs import SupervisorJob from ..jobs.const import JOB_GROUP_DOCKER_INTERFACE, JobConcurrency @@ -452,16 +449,6 @@ async def process_pull_image_log(reference: PullLogEntry) -> None: raise DockerError( f"Unknown error with {image}:{version!s} -> {err!s}", _LOGGER.error ) from err - except CodeNotaryUntrusted as err: - raise DockerTrustError( - f"Pulled image {image}:{version!s} failed on content-trust verification!", - _LOGGER.critical, - ) from err - except CodeNotaryError as err: - raise DockerTrustError( - f"Error happened on Content-Trust check for {image}:{version!s}: {err!s}", - _LOGGER.error, - ) from err finally: if listener: self.sys_bus.remove_listener(listener) diff --git a/supervisor/exceptions.py b/supervisor/exceptions.py index 5f9625ab37e..ebf85715b0e 100644 --- a/supervisor/exceptions.py +++ b/supervisor/exceptions.py @@ -577,21 +577,6 @@ class PwnedConnectivityError(PwnedError): """Connectivity errors while checking pwned passwords.""" -# util/codenotary - - -class CodeNotaryError(HassioError): - """Error general with CodeNotary.""" - - -class CodeNotaryUntrusted(CodeNotaryError): - """Error on untrusted content.""" - - -class CodeNotaryBackendError(CodeNotaryError): - """CodeNotary backend error happening.""" - - # util/whoami From 676cc29c007d5750c5852f98851e843f69ccd5ac Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 28 Oct 2025 09:23:43 +0100 Subject: [PATCH 08/24] Drop unnecessary comment --- tests/test_updater.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_updater.py b/tests/test_updater.py index f2451f2d441..6fcd5e5d29a 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -86,7 +86,6 @@ async def test_os_update_path( """Test OS upgrade path across major versions.""" coresys.os._board = "rpi4" # pylint: disable=protected-access coresys.os._version = AwesomeVersion(version) # pylint: disable=protected-access - # CodeNotary verification removed await coresys.updater.fetch_data() assert coresys.updater.version_hassos == AwesomeVersion(expected) From 0e6ca190e673298331513e6588c1753284794264 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 28 Oct 2025 14:27:56 +0100 Subject: [PATCH 09/24] Remove Codenotary specific IssueType/SuggestionType --- supervisor/resolution/const.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/supervisor/resolution/const.py b/supervisor/resolution/const.py index 9492810b1fc..e75b56a9d72 100644 --- a/supervisor/resolution/const.py +++ b/supervisor/resolution/const.py @@ -103,7 +103,6 @@ class IssueType(StrEnum): PWNED = "pwned" REBOOT_REQUIRED = "reboot_required" SECURITY = "security" - TRUST = "trust" UPDATE_FAILED = "update_failed" UPDATE_ROLLBACK = "update_rollback" @@ -115,7 +114,6 @@ class SuggestionType(StrEnum): CLEAR_FULL_BACKUP = "clear_full_backup" CREATE_FULL_BACKUP = "create_full_backup" DISABLE_BOOT = "disable_boot" - EXECUTE_INTEGRITY = "execute_integrity" EXECUTE_REBOOT = "execute_reboot" EXECUTE_REBUILD = "execute_rebuild" EXECUTE_RELOAD = "execute_reload" From e3428b46a3205aa3c9163bd92e38b9bbb09e4ee2 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 28 Oct 2025 14:49:11 +0100 Subject: [PATCH 10/24] Drop Codenotary specific environment and secret reference --- .github/workflows/builder.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/builder.yml b/.github/workflows/builder.yml index 2ef857f97b3..c16f98817b5 100644 --- a/.github/workflows/builder.yml +++ b/.github/workflows/builder.yml @@ -170,8 +170,6 @@ jobs: --target /data \ --cosign \ --generic ${{ needs.init.outputs.version }} - env: - CAS_API_KEY: ${{ secrets.CAS_TOKEN }} version: name: Update version From 28d978943001b5e7c4130b7402f7101c5b3d9997 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 28 Oct 2025 14:58:59 +0100 Subject: [PATCH 11/24] Remove unused constants --- supervisor/security/const.py | 24 ------------------------ 1 file changed, 24 deletions(-) delete mode 100644 supervisor/security/const.py diff --git a/supervisor/security/const.py b/supervisor/security/const.py deleted file mode 100644 index ad875e6fad3..00000000000 --- a/supervisor/security/const.py +++ /dev/null @@ -1,24 +0,0 @@ -"""Security constants.""" - -from enum import StrEnum - -import attr - - -class ContentTrustResult(StrEnum): - """Content trust result enum.""" - - PASS = "pass" - ERROR = "error" - FAILED = "failed" - UNTESTED = "untested" - - -@attr.s -class IntegrityResult: - """Result of a full integrity check.""" - - supervisor: ContentTrustResult = attr.ib(default=ContentTrustResult.UNTESTED) - core: ContentTrustResult = attr.ib(default=ContentTrustResult.UNTESTED) - plugins: dict[str, ContentTrustResult] = attr.ib(default={}) - addons: dict[str, ContentTrustResult] = attr.ib(default={}) From caabf13230b759316fab46466c22d0720191c9eb Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 28 Oct 2025 15:08:25 +0100 Subject: [PATCH 12/24] Introduce APIGone exception for removed APIs Introduce a new exception class APIGone to indicate that certain API features have been removed and are no longer available. Update the security integrity check endpoint to raise this new exception instead of a generic APIError, providing clearer communication to clients that the feature has been intentionally removed. --- supervisor/api/security.py | 6 ++++-- supervisor/exceptions.py | 6 ++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/supervisor/api/security.py b/supervisor/api/security.py index 23699bfe34f..33db10d5239 100644 --- a/supervisor/api/security.py +++ b/supervisor/api/security.py @@ -5,6 +5,8 @@ from aiohttp import web import voluptuous as vol +from supervisor.exceptions import APIGone + from ..const import ATTR_CONTENT_TRUST, ATTR_FORCE_SECURITY, ATTR_PWNED from ..coresys import CoreSysAttributes from .utils import api_process, api_validate @@ -52,6 +54,6 @@ async def integrity_check(self, request: web.Request) -> dict[str, Any]: """Run backend integrity check. CodeNotary integrity checking has been removed. This endpoint now returns - an error indicating the feature is currently non-functional. + an error indicating the feature is gone. """ - return {"error": "No integrity checking available"} + raise APIGone("Integrity check feature has been removed.") diff --git a/supervisor/exceptions.py b/supervisor/exceptions.py index ebf85715b0e..5906c457296 100644 --- a/supervisor/exceptions.py +++ b/supervisor/exceptions.py @@ -423,6 +423,12 @@ class APINotFound(APIError): status = 404 +class APIGone(APIError): + """API is no longer available.""" + + status = 410 + + class APIAddonNotInstalled(APIError): """Not installed addon requested at addons API.""" From 0bb0b51021100aaa3a802440779e98c7333be86b Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 28 Oct 2025 15:44:55 +0100 Subject: [PATCH 13/24] Drop content trust A cosign based signature verification will likely be named differently to avoid confusion with existing implementations. For now, remove the content trust option entirely. --- supervisor/api/security.py | 6 +----- supervisor/api/supervisor.py | 4 ---- supervisor/security/module.py | 17 +---------------- supervisor/validate.py | 2 -- 4 files changed, 2 insertions(+), 27 deletions(-) diff --git a/supervisor/api/security.py b/supervisor/api/security.py index 33db10d5239..6ddc59756c3 100644 --- a/supervisor/api/security.py +++ b/supervisor/api/security.py @@ -7,7 +7,7 @@ from supervisor.exceptions import APIGone -from ..const import ATTR_CONTENT_TRUST, ATTR_FORCE_SECURITY, ATTR_PWNED +from ..const import ATTR_FORCE_SECURITY, ATTR_PWNED from ..coresys import CoreSysAttributes from .utils import api_process, api_validate @@ -15,7 +15,6 @@ SCHEMA_OPTIONS = vol.Schema( { vol.Optional(ATTR_PWNED): vol.Boolean(), - vol.Optional(ATTR_CONTENT_TRUST): vol.Boolean(), vol.Optional(ATTR_FORCE_SECURITY): vol.Boolean(), } ) @@ -28,7 +27,6 @@ class APISecurity(CoreSysAttributes): async def info(self, request: web.Request) -> dict[str, Any]: """Return Security information.""" return { - ATTR_CONTENT_TRUST: self.sys_security.content_trust, ATTR_PWNED: self.sys_security.pwned, ATTR_FORCE_SECURITY: self.sys_security.force, } @@ -40,8 +38,6 @@ async def options(self, request: web.Request) -> None: if ATTR_PWNED in body: self.sys_security.pwned = body[ATTR_PWNED] - if ATTR_CONTENT_TRUST in body: - self.sys_security.content_trust = body[ATTR_CONTENT_TRUST] if ATTR_FORCE_SECURITY in body: self.sys_security.force = body[ATTR_FORCE_SECURITY] diff --git a/supervisor/api/supervisor.py b/supervisor/api/supervisor.py index cc4a62839b9..3e1dc38e329 100644 --- a/supervisor/api/supervisor.py +++ b/supervisor/api/supervisor.py @@ -16,14 +16,12 @@ ATTR_BLK_READ, ATTR_BLK_WRITE, ATTR_CHANNEL, - ATTR_CONTENT_TRUST, ATTR_COUNTRY, ATTR_CPU_PERCENT, ATTR_DEBUG, ATTR_DEBUG_BLOCK, ATTR_DETECT_BLOCKING_IO, ATTR_DIAGNOSTICS, - ATTR_FORCE_SECURITY, ATTR_HEALTHY, ATTR_ICON, ATTR_IP_ADDRESS, @@ -69,8 +67,6 @@ vol.Optional(ATTR_DEBUG): vol.Boolean(), vol.Optional(ATTR_DEBUG_BLOCK): vol.Boolean(), vol.Optional(ATTR_DIAGNOSTICS): vol.Boolean(), - vol.Optional(ATTR_CONTENT_TRUST): vol.Boolean(), - vol.Optional(ATTR_FORCE_SECURITY): vol.Boolean(), vol.Optional(ATTR_AUTO_UPDATE): vol.Boolean(), vol.Optional(ATTR_DETECT_BLOCKING_IO): vol.Coerce(DetectBlockingIO), vol.Optional(ATTR_COUNTRY): str, diff --git a/supervisor/security/module.py b/supervisor/security/module.py index b73972c7b1a..f2f622fd3de 100644 --- a/supervisor/security/module.py +++ b/supervisor/security/module.py @@ -4,12 +4,7 @@ import logging -from ..const import ( - ATTR_CONTENT_TRUST, - ATTR_FORCE_SECURITY, - ATTR_PWNED, - FILE_HASSIO_SECURITY, -) +from ..const import ATTR_FORCE_SECURITY, ATTR_PWNED, FILE_HASSIO_SECURITY from ..coresys import CoreSys, CoreSysAttributes from ..exceptions import PwnedError from ..utils.common import FileConfiguration @@ -27,16 +22,6 @@ def __init__(self, coresys: CoreSys): super().__init__(FILE_HASSIO_SECURITY, SCHEMA_SECURITY_CONFIG) self.coresys = coresys - @property - def content_trust(self) -> bool: - """Return if content trust is enabled/disabled.""" - return self._data[ATTR_CONTENT_TRUST] - - @content_trust.setter - def content_trust(self, value: bool) -> None: - """Set content trust is enabled/disabled.""" - self._data[ATTR_CONTENT_TRUST] = value - @property def force(self) -> bool: """Return if force security is enabled/disabled.""" diff --git a/supervisor/validate.py b/supervisor/validate.py index 8a20ff9b070..1030d3e0676 100644 --- a/supervisor/validate.py +++ b/supervisor/validate.py @@ -12,7 +12,6 @@ ATTR_AUTO_UPDATE, ATTR_CHANNEL, ATTR_CLI, - ATTR_CONTENT_TRUST, ATTR_COUNTRY, ATTR_DEBUG, ATTR_DEBUG_BLOCK, @@ -229,7 +228,6 @@ def validate_repository(repository: str) -> str: # pylint: disable=no-value-for-parameter SCHEMA_SECURITY_CONFIG = vol.Schema( { - vol.Optional(ATTR_CONTENT_TRUST, default=True): vol.Boolean(), vol.Optional(ATTR_PWNED, default=True): vol.Boolean(), vol.Optional(ATTR_FORCE_SECURITY, default=False): vol.Boolean(), }, From 5ef1c6822f53801f8970a3286c1d5a016293b72a Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 28 Oct 2025 15:57:58 +0100 Subject: [PATCH 14/24] Drop code sign test --- .github/workflows/builder.yml | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/.github/workflows/builder.yml b/.github/workflows/builder.yml index c16f98817b5..08597763e46 100644 --- a/.github/workflows/builder.yml +++ b/.github/workflows/builder.yml @@ -291,33 +291,6 @@ jobs: exit 1 fi - - name: Check the Supervisor code sign - if: needs.init.outputs.publish == 'true' - run: | - echo "Enable Content-Trust" - test=$(docker exec hassio_cli ha security options --content-trust=true --no-progress --raw-json | jq -r '.result') - if [ "$test" != "ok" ]; then - exit 1 - fi - - echo "Run supervisor health check" - test=$(docker exec hassio_cli ha resolution healthcheck --no-progress --raw-json | jq -r '.result') - if [ "$test" != "ok" ]; then - exit 1 - fi - - echo "Check supervisor unhealthy" - test=$(docker exec hassio_cli ha resolution info --no-progress --raw-json | jq -r '.data.unhealthy[]') - if [ "$test" != "" ]; then - exit 1 - fi - - echo "Check supervisor supported" - test=$(docker exec hassio_cli ha resolution info --no-progress --raw-json | jq -r '.data.unsupported[]') - if [[ "$test" =~ source_mods ]]; then - exit 1 - fi - - name: Create full backup id: backup run: | From d01ca410fbf29dd9408b02fc07b056e6f4fd01f2 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 28 Oct 2025 15:58:23 +0100 Subject: [PATCH 15/24] Remove source_mods/content_trust evaluations --- supervisor/resolution/const.py | 1 - .../resolution/evaluations/content_trust.py | 34 -------------- .../resolution/evaluations/source_mods.py | 44 ------------------- 3 files changed, 79 deletions(-) delete mode 100644 supervisor/resolution/evaluations/content_trust.py delete mode 100644 supervisor/resolution/evaluations/source_mods.py diff --git a/supervisor/resolution/const.py b/supervisor/resolution/const.py index e75b56a9d72..95cc0c7b8c0 100644 --- a/supervisor/resolution/const.py +++ b/supervisor/resolution/const.py @@ -54,7 +54,6 @@ class UnsupportedReason(StrEnum): PRIVILEGED = "privileged" RESTART_POLICY = "restart_policy" SOFTWARE = "software" - SOURCE_MODS = "source_mods" SUPERVISOR_VERSION = "supervisor_version" SYSTEMD = "systemd" SYSTEMD_JOURNAL = "systemd_journal" diff --git a/supervisor/resolution/evaluations/content_trust.py b/supervisor/resolution/evaluations/content_trust.py deleted file mode 100644 index c5648fd0ab1..00000000000 --- a/supervisor/resolution/evaluations/content_trust.py +++ /dev/null @@ -1,34 +0,0 @@ -"""Evaluation class for Content Trust.""" - -from ...const import CoreState -from ...coresys import CoreSys -from ..const import UnsupportedReason -from .base import EvaluateBase - - -def setup(coresys: CoreSys) -> EvaluateBase: - """Initialize evaluation-setup function.""" - return EvaluateContentTrust(coresys) - - -class EvaluateContentTrust(EvaluateBase): - """Evaluate system content trust level.""" - - @property - def reason(self) -> UnsupportedReason: - """Return a UnsupportedReason enum.""" - return UnsupportedReason.CONTENT_TRUST - - @property - def on_failure(self) -> str: - """Return a string that is printed when self.evaluate is True.""" - return "System run with disabled trusted content security." - - @property - def states(self) -> list[CoreState]: - """Return a list of valid states when this evaluation can run.""" - return [CoreState.INITIALIZE, CoreState.SETUP, CoreState.RUNNING] - - async def evaluate(self) -> bool: - """Run evaluation.""" - return not self.sys_security.content_trust diff --git a/supervisor/resolution/evaluations/source_mods.py b/supervisor/resolution/evaluations/source_mods.py deleted file mode 100644 index 680fe2d24c7..00000000000 --- a/supervisor/resolution/evaluations/source_mods.py +++ /dev/null @@ -1,44 +0,0 @@ -"""Evaluation class for Content Trust.""" - -import logging -from pathlib import Path - -from ...const import CoreState -from ...coresys import CoreSys -from ..const import UnsupportedReason -from .base import EvaluateBase - -_SUPERVISOR_SOURCE = Path("/usr/src/supervisor/supervisor") -_LOGGER: logging.Logger = logging.getLogger(__name__) - - -def setup(coresys: CoreSys) -> EvaluateBase: - """Initialize evaluation-setup function.""" - return EvaluateSourceMods(coresys) - - -class EvaluateSourceMods(EvaluateBase): - """Evaluate supervisor source modifications.""" - - @property - def reason(self) -> UnsupportedReason: - """Return a UnsupportedReason enum.""" - return UnsupportedReason.SOURCE_MODS - - @property - def on_failure(self) -> str: - """Return a string that is printed when self.evaluate is True.""" - return "System detect unauthorized source code modifications." - - @property - def states(self) -> list[CoreState]: - """Return a list of valid states when this evaluation can run.""" - return [CoreState.RUNNING] - - async def evaluate(self) -> bool: - """Run evaluation.""" - if not self.sys_security.content_trust: - _LOGGER.warning("Disabled content-trust, skipping evaluation") - return False - - return False From e53f47623400f9e374009e83eaff2bed445f989d Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 28 Oct 2025 16:03:51 +0100 Subject: [PATCH 16/24] Remove content_trust reference in bootstrap.py --- supervisor/bootstrap.py | 1 - 1 file changed, 1 deletion(-) diff --git a/supervisor/bootstrap.py b/supervisor/bootstrap.py index b9eb5201a02..0726bdb84cd 100644 --- a/supervisor/bootstrap.py +++ b/supervisor/bootstrap.py @@ -105,7 +105,6 @@ async def initialize_coresys() -> CoreSys: if coresys.dev: coresys.updater.channel = UpdateChannel.DEV - coresys.security.content_trust = False # Convert datetime logging.Formatter.converter = lambda *args: coresys.now().timetuple() From 5d80bf5c8fb9ca8b85b1c86147249d1e54da7c18 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 28 Oct 2025 16:04:17 +0100 Subject: [PATCH 17/24] Fix security tests --- tests/api/test_security.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/tests/api/test_security.py b/tests/api/test_security.py index 794ef4b2042..d0d3f28a99d 100644 --- a/tests/api/test_security.py +++ b/tests/api/test_security.py @@ -17,16 +17,6 @@ async def test_api_security_options_force_security(api_client, coresys: CoreSys) assert coresys.security.force -@pytest.mark.asyncio -async def test_api_security_options_content_trust(api_client, coresys: CoreSys): - """Test security options content trust.""" - assert coresys.security.content_trust - - await api_client.post("/security/options", json={"content_trust": False}) - - assert not coresys.security.content_trust - - @pytest.mark.asyncio async def test_api_security_options_pwned(api_client, coresys: CoreSys): """Test security options pwned.""" @@ -43,7 +33,6 @@ async def test_api_integrity_check( ): """Test security integrity check - now deprecated.""" resp = await api_client.post("/security/integrity") - result = await resp.json() - # CodeNotary integrity check has been removed - assert "error" in result["data"] + # CodeNotary integrity check has been removed, should return 410 Gone + assert resp.status == 410 From d30870970674f056ca2db44119e2a8621d854554 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 28 Oct 2025 16:22:39 +0100 Subject: [PATCH 18/24] Drop unused tests --- .../evaluation/test_evaluate_content_trust.py | 46 ---------------- .../evaluation/test_evaluate_source_mods.py | 55 ------------------- 2 files changed, 101 deletions(-) delete mode 100644 tests/resolution/evaluation/test_evaluate_content_trust.py delete mode 100644 tests/resolution/evaluation/test_evaluate_source_mods.py diff --git a/tests/resolution/evaluation/test_evaluate_content_trust.py b/tests/resolution/evaluation/test_evaluate_content_trust.py deleted file mode 100644 index 8dd67a9af74..00000000000 --- a/tests/resolution/evaluation/test_evaluate_content_trust.py +++ /dev/null @@ -1,46 +0,0 @@ -"""Test evaluation base.""" - -# pylint: disable=import-error,protected-access -from unittest.mock import patch - -from supervisor.const import CoreState -from supervisor.coresys import CoreSys -from supervisor.resolution.evaluations.content_trust import EvaluateContentTrust - - -async def test_evaluation(coresys: CoreSys): - """Test evaluation.""" - job_conditions = EvaluateContentTrust(coresys) - await coresys.core.set_state(CoreState.SETUP) - - await job_conditions() - assert job_conditions.reason not in coresys.resolution.unsupported - - coresys.security.content_trust = False - await job_conditions() - assert job_conditions.reason in coresys.resolution.unsupported - - -async def test_did_run(coresys: CoreSys): - """Test that the evaluation ran as expected.""" - job_conditions = EvaluateContentTrust(coresys) - should_run = job_conditions.states - should_not_run = [state for state in CoreState if state not in should_run] - assert len(should_run) != 0 - assert len(should_not_run) != 0 - - with patch( - "supervisor.resolution.evaluations.content_trust.EvaluateContentTrust.evaluate", - return_value=None, - ) as evaluate: - for state in should_run: - await coresys.core.set_state(state) - await job_conditions() - evaluate.assert_called_once() - evaluate.reset_mock() - - for state in should_not_run: - await coresys.core.set_state(state) - await job_conditions() - evaluate.assert_not_called() - evaluate.reset_mock() diff --git a/tests/resolution/evaluation/test_evaluate_source_mods.py b/tests/resolution/evaluation/test_evaluate_source_mods.py deleted file mode 100644 index 37dbe1c2ff6..00000000000 --- a/tests/resolution/evaluation/test_evaluate_source_mods.py +++ /dev/null @@ -1,55 +0,0 @@ -"""Test evaluation base.""" - -# pylint: disable=import-error,protected-access -from unittest.mock import patch - -from supervisor.const import CoreState -from supervisor.coresys import CoreSys -from supervisor.resolution.evaluations.source_mods import EvaluateSourceMods - - -async def test_evaluation(coresys: CoreSys): - """Test evaluation - CodeNotary removed.""" - sourcemods = EvaluateSourceMods(coresys) - await coresys.core.set_state(CoreState.RUNNING) - - # CodeNotary checking removed, evaluation always returns False now - assert sourcemods.reason not in coresys.resolution.unsupported - await sourcemods() - assert sourcemods.reason not in coresys.resolution.unsupported - - -async def test_did_run(coresys: CoreSys): - """Test that the evaluation ran as expected.""" - sourcemods = EvaluateSourceMods(coresys) - should_run = sourcemods.states - should_not_run = [state for state in CoreState if state not in should_run] - assert len(should_run) != 0 - assert len(should_not_run) != 0 - - with patch( - "supervisor.resolution.evaluations.source_mods.EvaluateSourceMods.evaluate", - return_value=None, - ) as evaluate: - for state in should_run: - await coresys.core.set_state(state) - await sourcemods() - evaluate.assert_called_once() - evaluate.reset_mock() - - for state in should_not_run: - await coresys.core.set_state(state) - await sourcemods() - evaluate.assert_not_called() - evaluate.reset_mock() - - -async def test_evaluation_error(coresys: CoreSys): - """Test error reading file during evaluation - CodeNotary removed.""" - sourcemods = EvaluateSourceMods(coresys) - await coresys.core.set_state(CoreState.RUNNING) - - # CodeNotary checking removed, evaluation always returns False now - assert sourcemods.reason not in coresys.resolution.unsupported - await sourcemods() - assert sourcemods.reason not in coresys.resolution.unsupported From a0890aca65b0523f9f716218f1f981535f298118 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 28 Oct 2025 17:42:18 +0100 Subject: [PATCH 19/24] Drop codenotary from schema Since we have "remove extra" in voluptuous, we can remove the codenotary field from the addon schema. --- supervisor/addons/model.py | 10 ++-------- supervisor/addons/validate.py | 1 - 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/supervisor/addons/model.py b/supervisor/addons/model.py index b9ccc427b2c..b7f8c8bc607 100644 --- a/supervisor/addons/model.py +++ b/supervisor/addons/model.py @@ -103,7 +103,6 @@ from .const import ( ATTR_BACKUP, ATTR_BREAKING_VERSIONS, - ATTR_CODENOTARY, ATTR_PATH, ATTR_READ_ONLY, AddonBackupMode, @@ -632,13 +631,8 @@ def with_journald(self) -> bool: @property def signed(self) -> bool: - """Return True if the image is signed.""" - return ATTR_CODENOTARY in self.data - - @property - def codenotary(self) -> str | None: - """Return Signer email address for CAS.""" - return self.data.get(ATTR_CODENOTARY) + """Currently no signing support.""" + return False @property def breaking_versions(self) -> list[AwesomeVersion]: diff --git a/supervisor/addons/validate.py b/supervisor/addons/validate.py index b14f0539a05..e51754c3132 100644 --- a/supervisor/addons/validate.py +++ b/supervisor/addons/validate.py @@ -423,7 +423,6 @@ def _migrate(config: dict[str, Any]): vol.Optional(ATTR_BACKUP, default=AddonBackupMode.HOT): vol.Coerce( AddonBackupMode ), - vol.Optional(ATTR_CODENOTARY): vol.Email(), vol.Optional(ATTR_OPTIONS, default={}): dict, vol.Optional(ATTR_SCHEMA, default={}): vol.Any( vol.Schema({str: SCHEMA_ELEMENT}), From 3412efad6614d22b88bcf3cfe4cb9ef55cdf8cc6 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 28 Oct 2025 18:18:02 +0100 Subject: [PATCH 20/24] Remove content_trust from tests --- tests/fixtures/bla/backup.json | 1 + tests/fixtures/bla/ssl.tar.gz | Bin 0 -> 256 bytes tests/misc/test_tasks.py | 1 - tests/resolution/check/test_check.py | 2 - .../check/test_docker_config_volume_fix.py | 141 ++++++++++++++++++ 5 files changed, 142 insertions(+), 3 deletions(-) create mode 100644 tests/fixtures/bla/backup.json create mode 100644 tests/fixtures/bla/ssl.tar.gz create mode 100644 tests/resolution/check/test_docker_config_volume_fix.py diff --git a/tests/fixtures/bla/backup.json b/tests/fixtures/bla/backup.json new file mode 100644 index 00000000000..7c33db96b0a --- /dev/null +++ b/tests/fixtures/bla/backup.json @@ -0,0 +1 @@ +{"slug":"d9c48f8b","version":2,"name":"test_consolidate","date":"2025-01-22T18:09:28.196333+00:00","type":"partial","supervisor_version":"2025.01.1.dev2104","extra":{},"homeassistant":null,"compressed":true,"addons":[],"docker":{"registries":{}},"protected":true,"crypto":"aes128","repositories":["https://github.com/hassio-addons/repository","https://github.com/music-assistant/home-assistant-addon","core","https://github.com/home-assistant/addons-development","https://github.com/esphome/home-assistant-addon","local"],"folders":["ssl"]} \ No newline at end of file diff --git a/tests/fixtures/bla/ssl.tar.gz b/tests/fixtures/bla/ssl.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..d4020865ffa8631358d3d414edf5f0444042189b GIT binary patch literal 256 zcmWGeO)f1;4M{9wVn7DRVH~d2{j+|z&ht^L;?(}R%HJbUepzvj{UmNOEE zAAXv*cJ-o`6&k8L4!>pB;mBC^cH+YZX@|)b+8l{#E{oP5%ea1}S2uk!r!4ble$RM` y-A`U^<@Ts#igr0`Hf1Rfi|6ForXgG1x15_9)66_sQ|^o2|F~FF-&u|ma~l8*wQp|# literal 0 HcmV?d00001 diff --git a/tests/misc/test_tasks.py b/tests/misc/test_tasks.py index ff787b9bebd..32a0e844577 100644 --- a/tests/misc/test_tasks.py +++ b/tests/misc/test_tasks.py @@ -181,7 +181,6 @@ async def test_reload_updater_triggers_supervisor_update( """Test an updater reload triggers a supervisor update if there is one.""" coresys.hardware.disk.get_disk_free_space = lambda x: 5000 await coresys.core.set_state(CoreState.RUNNING) - coresys.security.content_trust = False with ( patch.object( diff --git a/tests/resolution/check/test_check.py b/tests/resolution/check/test_check.py index c582f732e21..9793eb4dd81 100644 --- a/tests/resolution/check/test_check.py +++ b/tests/resolution/check/test_check.py @@ -51,7 +51,6 @@ async def test_if_check_make_issue(coresys: CoreSys): """Test check for setup.""" free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM) await coresys.core.set_state(CoreState.RUNNING) - coresys.security.content_trust = False with patch("shutil.disk_usage", return_value=(1, 1, 1)): await coresys.resolution.check.check_system() @@ -63,7 +62,6 @@ async def test_if_check_cleanup_issue(coresys: CoreSys): """Test check for setup.""" free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM) await coresys.core.set_state(CoreState.RUNNING) - coresys.security.content_trust = False with patch("shutil.disk_usage", return_value=(1, 1, 1)): await coresys.resolution.check.check_system() diff --git a/tests/resolution/check/test_docker_config_volume_fix.py b/tests/resolution/check/test_docker_config_volume_fix.py new file mode 100644 index 00000000000..b7e4e7309ea --- /dev/null +++ b/tests/resolution/check/test_docker_config_volume_fix.py @@ -0,0 +1,141 @@ +"""Test docker config check fix for VOLUME mounts.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from supervisor.addons.addon import Addon +from supervisor.const import CoreState +from supervisor.coresys import CoreSys +from supervisor.docker.interface import DockerInterface +from supervisor.docker.manager import DockerAPI +from supervisor.resolution.checks.docker_config import CheckDockerConfig +from supervisor.resolution.const import ContextType + + +def _make_mock_container_get_with_volume_mount( + bad_config_names: list[str], folder: str = "media" +): + """Make mock of container get with VOLUME mount (not managed by supervisor).""" + # This simulates a Docker VOLUME mount with wrong propagation + # but NOT created by supervisor configuration + mount = { + "Type": "bind", + "Source": f"/var/lib/docker/volumes/something_{folder}/_data", # Docker volume source + "Destination": f"/{folder}", + "Mode": "rw", + "RW": True, + "Propagation": "rprivate", # Wrong propagation, but not our mount + } + + def mock_container_get(name): + out = MagicMock() + out.status = "running" + out.attrs = {"State": {}, "Mounts": []} + if name in bad_config_names: + out.attrs["Mounts"].append(mount) + + return out + + return mock_container_get + + +@pytest.mark.parametrize("folder", ["media", "share"]) +async def test_addon_volume_mount_not_flagged( + docker: DockerAPI, coresys: CoreSys, install_addon_ssh: Addon, folder: str +): + """Test that add-on with VOLUME mount to media/share but not in config is not flagged.""" + # Create an add-on that doesn't have media/share in its mapping configuration + # Remove the mapping from the addon configuration + install_addon_ssh.data["map"] = [ + {"type": "config", "read_only": False}, + {"type": "ssl", "read_only": True}, + ] # No media/share + + # Mock container that has VOLUME mount to media/share with wrong propagation + docker.containers.get = _make_mock_container_get_with_volume_mount( + ["addon_local_ssh"], folder + ) + + await coresys.core.set_state(CoreState.SETUP) + with patch.object(DockerInterface, "is_running", return_value=True): + await coresys.plugins.load() + await coresys.homeassistant.load() + await coresys.addons.load() + + docker_config = CheckDockerConfig(coresys) + assert not coresys.resolution.issues + assert not coresys.resolution.suggestions + + # Run check - should NOT create issue for add-on since mount wasn't requested + await docker_config.run_check() + + # Should only have system issue, not addon issue + addon_issues = [ + issue + for issue in coresys.resolution.issues + if issue.context == ContextType.ADDON and issue.reference == "local_ssh" + ] + assert len(addon_issues) == 0, ( + "Add-on should not be flagged for VOLUME mounts not in config" + ) + + # No system issue should be created either if no container has issues + system_issues = [ + issue + for issue in coresys.resolution.issues + if issue.context == ContextType.SYSTEM + ] + # Update expectation - if no containers have issues, no system issue should be created + assert len(system_issues) == 0 + + +@pytest.mark.parametrize("folder", ["media", "share"]) +async def test_addon_configured_mount_still_flagged( + docker: DockerAPI, coresys: CoreSys, install_addon_ssh: Addon, folder: str +): + """Test that add-on with configured media/share mount is still flagged when propagation wrong.""" + # Keep the original configuration which includes media/share + # SSH addon config already has media:rw and share:rw + + # Mock container that has supervisor-managed mount with wrong propagation + mount = { + "Type": "bind", + "Source": f"/mnt/data/supervisor/{folder}", # Supervisor-managed source + "Destination": f"/{folder}", + "Mode": "rw", + "RW": True, + "Propagation": "rprivate", # Wrong propagation + } + + def mock_container_get(name): + out = MagicMock() + out.status = "running" + out.attrs = {"State": {}, "Mounts": []} + if name == "addon_local_ssh": + out.attrs["Mounts"].append(mount) + return out + + docker.containers.get = mock_container_get + + await coresys.core.set_state(CoreState.SETUP) + with patch.object(DockerInterface, "is_running", return_value=True): + await coresys.plugins.load() + await coresys.homeassistant.load() + await coresys.addons.load() + + docker_config = CheckDockerConfig(coresys) + assert not coresys.resolution.issues + + # Run check - should create issue for add-on since mount was requested in config + await docker_config.run_check() + + # Should have addon issue since the mount was configured + addon_issues = [ + issue + for issue in coresys.resolution.issues + if issue.context == ContextType.ADDON and issue.reference == "local_ssh" + ] + assert len(addon_issues) == 1, ( + "Add-on should be flagged for configured mounts with wrong propagation" + ) From 9dfb02f2444c6e0aa88dd3047536afb87a5e8be8 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 28 Oct 2025 18:18:59 +0100 Subject: [PATCH 21/24] Remove content_trust unsupported reason --- supervisor/resolution/const.py | 1 - 1 file changed, 1 deletion(-) diff --git a/supervisor/resolution/const.py b/supervisor/resolution/const.py index 95cc0c7b8c0..206c5622398 100644 --- a/supervisor/resolution/const.py +++ b/supervisor/resolution/const.py @@ -39,7 +39,6 @@ class UnsupportedReason(StrEnum): APPARMOR = "apparmor" CGROUP_VERSION = "cgroup_version" CONNECTIVITY_CHECK = "connectivity_check" - CONTENT_TRUST = "content_trust" DBUS = "dbus" DNS_SERVER = "dns_server" DOCKER_CONFIGURATION = "docker_configuration" From 1fa2f33d4d8d170c43179439b08d401e0e1d7331 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Wed, 29 Oct 2025 14:08:49 +0100 Subject: [PATCH 22/24] Remove unnecessary comment --- supervisor/docker/interface.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/supervisor/docker/interface.py b/supervisor/docker/interface.py index df75a2b44e1..b70e4ea4785 100644 --- a/supervisor/docker/interface.py +++ b/supervisor/docker/interface.py @@ -422,8 +422,6 @@ async def process_pull_image_log(reference: PullLogEntry) -> None: platform=MAP_ARCH[image_arch], ) - # CodeNotary content trust validation has been removed - # Tag latest if latest: _LOGGER.info( From e20154670b080e53b67690ffd189ef25ac4f82e1 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Wed, 29 Oct 2025 14:12:55 +0100 Subject: [PATCH 23/24] Remove unrelated pytest --- .../check/test_docker_config_volume_fix.py | 141 ------------------ 1 file changed, 141 deletions(-) delete mode 100644 tests/resolution/check/test_docker_config_volume_fix.py diff --git a/tests/resolution/check/test_docker_config_volume_fix.py b/tests/resolution/check/test_docker_config_volume_fix.py deleted file mode 100644 index b7e4e7309ea..00000000000 --- a/tests/resolution/check/test_docker_config_volume_fix.py +++ /dev/null @@ -1,141 +0,0 @@ -"""Test docker config check fix for VOLUME mounts.""" - -from unittest.mock import MagicMock, patch - -import pytest - -from supervisor.addons.addon import Addon -from supervisor.const import CoreState -from supervisor.coresys import CoreSys -from supervisor.docker.interface import DockerInterface -from supervisor.docker.manager import DockerAPI -from supervisor.resolution.checks.docker_config import CheckDockerConfig -from supervisor.resolution.const import ContextType - - -def _make_mock_container_get_with_volume_mount( - bad_config_names: list[str], folder: str = "media" -): - """Make mock of container get with VOLUME mount (not managed by supervisor).""" - # This simulates a Docker VOLUME mount with wrong propagation - # but NOT created by supervisor configuration - mount = { - "Type": "bind", - "Source": f"/var/lib/docker/volumes/something_{folder}/_data", # Docker volume source - "Destination": f"/{folder}", - "Mode": "rw", - "RW": True, - "Propagation": "rprivate", # Wrong propagation, but not our mount - } - - def mock_container_get(name): - out = MagicMock() - out.status = "running" - out.attrs = {"State": {}, "Mounts": []} - if name in bad_config_names: - out.attrs["Mounts"].append(mount) - - return out - - return mock_container_get - - -@pytest.mark.parametrize("folder", ["media", "share"]) -async def test_addon_volume_mount_not_flagged( - docker: DockerAPI, coresys: CoreSys, install_addon_ssh: Addon, folder: str -): - """Test that add-on with VOLUME mount to media/share but not in config is not flagged.""" - # Create an add-on that doesn't have media/share in its mapping configuration - # Remove the mapping from the addon configuration - install_addon_ssh.data["map"] = [ - {"type": "config", "read_only": False}, - {"type": "ssl", "read_only": True}, - ] # No media/share - - # Mock container that has VOLUME mount to media/share with wrong propagation - docker.containers.get = _make_mock_container_get_with_volume_mount( - ["addon_local_ssh"], folder - ) - - await coresys.core.set_state(CoreState.SETUP) - with patch.object(DockerInterface, "is_running", return_value=True): - await coresys.plugins.load() - await coresys.homeassistant.load() - await coresys.addons.load() - - docker_config = CheckDockerConfig(coresys) - assert not coresys.resolution.issues - assert not coresys.resolution.suggestions - - # Run check - should NOT create issue for add-on since mount wasn't requested - await docker_config.run_check() - - # Should only have system issue, not addon issue - addon_issues = [ - issue - for issue in coresys.resolution.issues - if issue.context == ContextType.ADDON and issue.reference == "local_ssh" - ] - assert len(addon_issues) == 0, ( - "Add-on should not be flagged for VOLUME mounts not in config" - ) - - # No system issue should be created either if no container has issues - system_issues = [ - issue - for issue in coresys.resolution.issues - if issue.context == ContextType.SYSTEM - ] - # Update expectation - if no containers have issues, no system issue should be created - assert len(system_issues) == 0 - - -@pytest.mark.parametrize("folder", ["media", "share"]) -async def test_addon_configured_mount_still_flagged( - docker: DockerAPI, coresys: CoreSys, install_addon_ssh: Addon, folder: str -): - """Test that add-on with configured media/share mount is still flagged when propagation wrong.""" - # Keep the original configuration which includes media/share - # SSH addon config already has media:rw and share:rw - - # Mock container that has supervisor-managed mount with wrong propagation - mount = { - "Type": "bind", - "Source": f"/mnt/data/supervisor/{folder}", # Supervisor-managed source - "Destination": f"/{folder}", - "Mode": "rw", - "RW": True, - "Propagation": "rprivate", # Wrong propagation - } - - def mock_container_get(name): - out = MagicMock() - out.status = "running" - out.attrs = {"State": {}, "Mounts": []} - if name == "addon_local_ssh": - out.attrs["Mounts"].append(mount) - return out - - docker.containers.get = mock_container_get - - await coresys.core.set_state(CoreState.SETUP) - with patch.object(DockerInterface, "is_running", return_value=True): - await coresys.plugins.load() - await coresys.homeassistant.load() - await coresys.addons.load() - - docker_config = CheckDockerConfig(coresys) - assert not coresys.resolution.issues - - # Run check - should create issue for add-on since mount was requested in config - await docker_config.run_check() - - # Should have addon issue since the mount was configured - addon_issues = [ - issue - for issue in coresys.resolution.issues - if issue.context == ContextType.ADDON and issue.reference == "local_ssh" - ] - assert len(addon_issues) == 1, ( - "Add-on should be flagged for configured mounts with wrong propagation" - ) From 89c2ff40cd00a186456c02a53d58f59e0fe4c69f Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Mon, 3 Nov 2025 19:45:18 +0100 Subject: [PATCH 24/24] Remove unrelated fixtures --- tests/fixtures/bla/backup.json | 1 - tests/fixtures/bla/ssl.tar.gz | Bin 256 -> 0 bytes 2 files changed, 1 deletion(-) delete mode 100644 tests/fixtures/bla/backup.json delete mode 100644 tests/fixtures/bla/ssl.tar.gz diff --git a/tests/fixtures/bla/backup.json b/tests/fixtures/bla/backup.json deleted file mode 100644 index 7c33db96b0a..00000000000 --- a/tests/fixtures/bla/backup.json +++ /dev/null @@ -1 +0,0 @@ -{"slug":"d9c48f8b","version":2,"name":"test_consolidate","date":"2025-01-22T18:09:28.196333+00:00","type":"partial","supervisor_version":"2025.01.1.dev2104","extra":{},"homeassistant":null,"compressed":true,"addons":[],"docker":{"registries":{}},"protected":true,"crypto":"aes128","repositories":["https://github.com/hassio-addons/repository","https://github.com/music-assistant/home-assistant-addon","core","https://github.com/home-assistant/addons-development","https://github.com/esphome/home-assistant-addon","local"],"folders":["ssl"]} \ No newline at end of file diff --git a/tests/fixtures/bla/ssl.tar.gz b/tests/fixtures/bla/ssl.tar.gz deleted file mode 100644 index d4020865ffa8631358d3d414edf5f0444042189b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 256 zcmWGeO)f1;4M{9wVn7DRVH~d2{j+|z&ht^L;?(}R%HJbUepzvj{UmNOEE zAAXv*cJ-o`6&k8L4!>pB;mBC^cH+YZX@|)b+8l{#E{oP5%ea1}S2uk!r!4ble$RM` y-A`U^<@Ts#igr0`Hf1Rfi|6ForXgG1x15_9)66_sQ|^o2|F~FF-&u|ma~l8*wQp|#