diff --git a/.github/workflows/autils_migration_announcement.yml b/.github/workflows/autils_migration_announcement.yml index 18799c7b51..39f7962b33 100644 --- a/.github/workflows/autils_migration_announcement.yml +++ b/.github/workflows/autils_migration_announcement.yml @@ -6,6 +6,7 @@ on: - opened paths: - '**/ar.py' + - '**/crypto.py' - '**/path.py' - '**/data_structures.py' - '**/network/ports.py' diff --git a/.pylintrc_utils b/.pylintrc_utils index 5d7b94b171..572fb52001 100644 --- a/.pylintrc_utils +++ b/.pylintrc_utils @@ -7,7 +7,7 @@ extension-pkg-whitelist=netifaces # Add files or directories to the blacklist. They should be base names, not # paths. -ignore=CVS,archive.py,asset.py,astring.py,aurl.py,build.py,cloudinit.py,cpu.py,crypto.py,data_factory.py,datadrainer.py,debug.py,diff_validator.py,disk.py,distro.py,dmesg.py,download.py,exit_codes.py,file_utils.py,filelock.py,genio.py,git.py,iso9660.py,kernel.py,linux.py,linux_modules.py,lv_utils.py,memory.py,multipath.py,nvme.py,partition.py,pci.py,pmem.py,podman.py,service.py,softwareraid.py,ssh.py,stacktrace.py,sysinfo.py,vmimage.py,wait.py,gdbmi_parser.py,spark.py,distro_packages.py,inspector.py,main.py,manager.py,apt.py,base.py,dnf.py,dpkg.py,rpm.py,yum.py,zypper.py,deprecation.py +ignore=CVS,archive.py,asset.py,astring.py,aurl.py,build.py,cloudinit.py,cpu.py,data_factory.py,datadrainer.py,debug.py,diff_validator.py,disk.py,distro.py,dmesg.py,download.py,exit_codes.py,file_utils.py,filelock.py,genio.py,git.py,iso9660.py,kernel.py,linux.py,linux_modules.py,lv_utils.py,memory.py,multipath.py,nvme.py,partition.py,pci.py,pmem.py,podman.py,service.py,softwareraid.py,ssh.py,stacktrace.py,sysinfo.py,vmimage.py,wait.py,gdbmi_parser.py,spark.py,distro_packages.py,inspector.py,main.py,manager.py,apt.py,base.py,dnf.py,dpkg.py,rpm.py,yum.py,zypper.py,deprecation.py # regex matches against base names, not paths. ignore-patterns=.git diff --git a/avocado/utils/crypto.py b/avocado/utils/crypto.py index 1d739b8e85..d014b76bec 100644 --- a/avocado/utils/crypto.py +++ b/avocado/utils/crypto.py @@ -12,6 +12,8 @@ # Copyright: Red Hat Inc. 2013-2014 # Author: Lucas Meneghel Rodrigues +"""Cryptographic hash utilities for file verification.""" + import hashlib import io import logging @@ -21,19 +23,35 @@ def hash_file(filename, size=None, algorithm="md5"): - """ - Calculate the hash value of filename. + """Calculate the hash value of a file. - If size is not None, limit to first size bytes. - Throw exception if something is wrong with filename. - Can be also implemented with bash one-liner (assuming ``size%1024==0``):: - - dd if=filename bs=1024 count=size/1024 | sha1sum - + Computes a cryptographic hash of the specified file using the given + algorithm. Optionally limits hashing to the first N bytes of the file, + which is useful for verifying partial downloads or large files. :param filename: Path of the file that will have its hash calculated. - :param algorithm: Method used to calculate the hash (default is md5). + :type filename: str :param size: If provided, hash only the first size bytes of the file. - :return: Hash of the file, if something goes wrong, return None. + If None or 0, the entire file is hashed. If size exceeds the file + size, the entire file is hashed. + :type size: int or None + :param algorithm: Hash algorithm to use. Supported algorithms include + md5, sha1, sha256, sha512, blake2b, and others available in hashlib. + :type algorithm: str + :return: Hexadecimal digest string of the computed hash. Returns None + if an invalid algorithm is specified. + :rtype: str or None + :raises FileNotFoundError: When the specified file does not exist. + :raises PermissionError: When the file cannot be read due to permissions. + + Example:: + + >>> hash_file('/etc/passwd') + 'd41d8cd98f00b204e9800998ecf8427e' + >>> hash_file('/path/to/file', algorithm='sha256') + 'e3b0c44298fc1c149afbf4c8996fb924...' + >>> hash_file('/path/to/large_file', size=1024) + 'abc123...' """ chunksize = io.DEFAULT_BUFFER_SIZE fsize = os.path.getsize(filename) @@ -60,3 +78,9 @@ def hash_file(filename, size=None, algorithm="md5"): size -= len(data) return hash_obj.hexdigest() + + +# pylint: disable=wrong-import-position +from avocado.utils.deprecation import log_deprecation + +log_deprecation.warning("crypto") diff --git a/selftests/check.py b/selftests/check.py index d0bf2e2a55..d57431b41e 100755 --- a/selftests/check.py +++ b/selftests/check.py @@ -27,9 +27,9 @@ "job-api-check-tmp-directory-exists": 1, "nrunner-interface": 90, "nrunner-requirement": 28, - "unit": 934, + "unit": 945, "jobs": 11, - "functional-parallel": 353, + "functional-parallel": 357, "functional-serial": 7, "optional-plugins": 0, "optional-plugins-golang": 2, diff --git a/selftests/functional/utils/crypto.py b/selftests/functional/utils/crypto.py new file mode 100644 index 0000000000..0fd540309e --- /dev/null +++ b/selftests/functional/utils/crypto.py @@ -0,0 +1,114 @@ +import hashlib +import os +import shutil +import tempfile +import unittest + +from avocado.utils import crypto + + +class HashFileFunctionalTest(unittest.TestCase): + """Functional tests for crypto.hash_file with real-world scenarios.""" + + def setUp(self): + """Create a temporary directory for test files.""" + self.tmpdir = tempfile.mkdtemp() + + def tearDown(self): + """Clean up temporary files.""" + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def test_download_verification_with_known_checksums(self): + """ + Test verifying a downloaded file against published checksums. + + Real-world scenario: Package managers and download sites publish + checksums that users verify after downloading. This test uses + a well-known test vector with pre-computed checksums. + """ + # "The quick brown fox..." is a standard test vector + content = b"The quick brown fox jumps over the lazy dog" + filepath = os.path.join(self.tmpdir, "downloaded_file.bin") + with open(filepath, "wb") as f: + f.write(content) + + # Verify against known checksums (as would be published on download site) + self.assertEqual( + crypto.hash_file(filepath, algorithm="md5"), + "9e107d9d372bb6826bd81d3542a419d6", + ) + self.assertEqual( + crypto.hash_file(filepath, algorithm="sha256"), + "d7a8fbb307d7809469ca9abcb0082e4f8d5651e46d3cdb762d02d0bf37c9e592", + ) + + def test_file_tampering_detection(self): + """ + Test detecting file modification through hash comparison. + + Real-world scenario: Security systems use hashes to detect if + files have been tampered with. This tests the complete workflow. + """ + filepath = os.path.join(self.tmpdir, "secure_config.conf") + + # Create original file and record its hash + with open(filepath, "wb") as f: + f.write(b"secure_setting=true\npassword_hash=abc123") + original_hash = crypto.hash_file(filepath, algorithm="sha256") + + # Simulate tampering - even a single byte change should be detected + with open(filepath, "wb") as f: + f.write(b"secure_setting=false\npassword_hash=abc123") + tampered_hash = crypto.hash_file(filepath, algorithm="sha256") + + self.assertNotEqual(original_hash, tampered_hash) + + def test_create_file_manifest(self): + """ + Test creating a manifest of file checksums for a directory. + + Real-world scenario: Build systems and package managers create + manifests listing checksums of all files for verification. + """ + # Create a project-like structure with various files + files = { + "src/main.py": b"print('hello')", + "src/utils.py": b"def helper(): pass", + "data/config.json": b'{"key": "value"}', + } + + manifest = {} + for relpath, content in files.items(): + filepath = os.path.join(self.tmpdir, relpath) + os.makedirs(os.path.dirname(filepath), exist_ok=True) + with open(filepath, "wb") as f: + f.write(content) + manifest[relpath] = crypto.hash_file(filepath, algorithm="sha256") + + # Verify manifest entries match expected hashes + for relpath, content in files.items(): + expected = hashlib.sha256(content).hexdigest() + self.assertEqual(manifest[relpath], expected) + + # All files should have unique hashes + self.assertEqual(len(set(manifest.values())), len(files)) + + def test_symlink_follows_to_target(self): + """ + Test that hashing through symlink produces same result as original. + + Real-world scenario: Linux systems use symlinks extensively; + hash verification must work regardless of access path. + """ + original = os.path.join(self.tmpdir, "original.bin") + symlink = os.path.join(self.tmpdir, "link.bin") + + with open(original, "wb") as f: + f.write(b"Linked content") + os.symlink(original, symlink) + + self.assertEqual(crypto.hash_file(original), crypto.hash_file(symlink)) + + +if __name__ == "__main__": + unittest.main() diff --git a/selftests/unit/utils/crypto.py b/selftests/unit/utils/crypto.py new file mode 100644 index 0000000000..7824bd5b46 --- /dev/null +++ b/selftests/unit/utils/crypto.py @@ -0,0 +1,123 @@ +import hashlib +import os +import shutil +import tempfile +import unittest + +from avocado.utils import crypto + + +class HashFileTest(unittest.TestCase): + """Test cases for crypto.hash_file function.""" + + def setUp(self): + """Create a temporary directory for test files.""" + self.tmpdir = tempfile.mkdtemp() + + def tearDown(self): + """Clean up temporary files.""" + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def _create_test_file(self, content, filename="testfile"): + """Helper to create a test file with given content.""" + filepath = os.path.join(self.tmpdir, filename) + with open(filepath, "wb") as f: + f.write(content) + return filepath + + # Core algorithm tests - testing the algorithm parameter code path + def test_hash_file_md5_default(self): + """Test MD5 hash calculation with default algorithm.""" + content = b"Hello, World!" + filepath = self._create_test_file(content) + expected = hashlib.md5(content).hexdigest() + result = crypto.hash_file(filepath) + self.assertEqual(result, expected) + + def test_hash_file_sha256(self): + """Test SHA256 hash calculation.""" + content = b"Test content for SHA256" + filepath = self._create_test_file(content) + expected = hashlib.sha256(content).hexdigest() + result = crypto.hash_file(filepath, algorithm="sha256") + self.assertEqual(result, expected) + + # Size parameter tests - each tests a distinct code path + def test_hash_file_with_size_limit(self): + """Test hashing only the first N bytes of a file.""" + content = b"ABCDEFGHIJ" # 10 bytes + filepath = self._create_test_file(content) + # Hash only first 5 bytes - tests size < file_size path + expected = hashlib.md5(b"ABCDE").hexdigest() + result = crypto.hash_file(filepath, size=5) + self.assertEqual(result, expected) + + def test_hash_file_size_larger_than_file(self): + """Test that size larger than file hashes the whole file.""" + content = b"Small file" + filepath = self._create_test_file(content) + expected = hashlib.md5(content).hexdigest() + # Request more bytes than file contains - tests size > file_size branch + result = crypto.hash_file(filepath, size=1000000) + self.assertEqual(result, expected) + + def test_hash_file_size_falsy_hashes_whole_file(self): + """Test that falsy size values (None, 0) hash the entire file.""" + content = b"Complete file content" + filepath = self._create_test_file(content) + expected = hashlib.md5(content).hexdigest() + # Both None and 0 are falsy - tests 'not size' branch + self.assertEqual(crypto.hash_file(filepath, size=None), expected) + self.assertEqual(crypto.hash_file(filepath, size=0), expected) + + # Edge case tests - each tests unique behavior + def test_hash_file_empty_file(self): + """Test hashing an empty file.""" + filepath = self._create_test_file(b"") + expected = hashlib.md5(b"").hexdigest() + result = crypto.hash_file(filepath) + self.assertEqual(result, expected) + + def test_hash_file_binary_content(self): + """Test hashing a file with all possible byte values.""" + content = bytes(range(256)) # All byte values 0-255 + filepath = self._create_test_file(content) + expected = hashlib.md5(content).hexdigest() + result = crypto.hash_file(filepath) + self.assertEqual(result, expected) + + def test_hash_file_larger_than_chunk_size(self): + """Test hashing a file that requires multiple read iterations.""" + # Create content larger than io.DEFAULT_BUFFER_SIZE (typically 8192) + content = b"x" * 100000 + filepath = self._create_test_file(content) + expected = hashlib.md5(content).hexdigest() + result = crypto.hash_file(filepath) + self.assertEqual(result, expected) + + # Error handling tests + def test_hash_file_invalid_algorithm_returns_none(self): + """Test that invalid algorithm returns None without raising.""" + content = b"Test content" + filepath = self._create_test_file(content) + result = crypto.hash_file(filepath, algorithm="invalid_algo") + self.assertIsNone(result) + + def test_hash_file_nonexistent_file_raises(self): + """Test that non-existent file raises FileNotFoundError.""" + nonexistent = os.path.join(self.tmpdir, "nonexistent_file.txt") + with self.assertRaises(FileNotFoundError): + crypto.hash_file(nonexistent) + + # Hash uniqueness test - verifies hash function works correctly + def test_hash_file_different_content_produces_different_hash(self): + """Test that different content produces different hash values.""" + filepath1 = self._create_test_file(b"Content A", filename="file1.txt") + filepath2 = self._create_test_file(b"Content B", filename="file2.txt") + hash1 = crypto.hash_file(filepath1) + hash2 = crypto.hash_file(filepath2) + self.assertNotEqual(hash1, hash2) + + +if __name__ == "__main__": + unittest.main() diff --git a/spell.ignore b/spell.ignore index da5e489f4b..49a82b27c3 100644 --- a/spell.ignore +++ b/spell.ignore @@ -820,3 +820,7 @@ rwxrwxr substring truthy wb +checksums +cryptographic +hashlib +symlinked