|
3 | 3 | import pytest |
4 | 4 | import os |
5 | 5 | import shutil |
| 6 | +import tarfile |
| 7 | +import tempfile |
| 8 | +from unittest.mock import patch |
6 | 9 |
|
7 | 10 | from click.testing import CliRunner |
8 | 11 |
|
9 | | -from hepdata_cli.api import Client, mkdir |
| 12 | +from hepdata_cli.api import Client, download_url, mkdir |
10 | 13 | from hepdata_cli.cli import cli |
11 | 14 |
|
12 | 15 |
|
@@ -76,3 +79,71 @@ def test_cli_download(id_list, file_format, ids, table): |
76 | 79 | assert result.exit_code == 0 |
77 | 80 | assert len(os.listdir(test_download_dir)) > 0 |
78 | 81 | cleanup(test_download_dir) |
| 82 | + |
| 83 | + |
| 84 | +# utility function testing |
| 85 | + |
| 86 | +@pytest.mark.parametrize("files_raises", [{"file": "test.txt", "raises": False}, |
| 87 | + {"file": "../test.txt", "raises": True}, |
| 88 | + {"file": None, "raises": True}]) |
| 89 | +def test_tar_unpack(files_raises): |
| 90 | + """ |
| 91 | + Test the unpacking of a tarfile |
| 92 | + """ |
| 93 | + filename = files_raises["file"] |
| 94 | + raises = files_raises["raises"] |
| 95 | + if filename is None: # To hit FileNotFoundError branch |
| 96 | + filename = 'test.txt' |
| 97 | + real_exists = os.path.exists |
| 98 | + def mock_exists(path): |
| 99 | + if path.endswith(filename): |
| 100 | + return False |
| 101 | + return real_exists(path) |
| 102 | + exists_patcher = patch('os.path.exists', mock_exists) |
| 103 | + exists_patcher.start() |
| 104 | + |
| 105 | + # Create a some tarfile with known content |
| 106 | + with tempfile.NamedTemporaryFile(delete=False, suffix='.tar.gz') as tmp: |
| 107 | + tar_path = tmp.name |
| 108 | + with tarfile.open(tar_path, "w:gz") as tar: |
| 109 | + info = tarfile.TarInfo(name=filename) |
| 110 | + content = b"Hello, World!" |
| 111 | + info.size = len(content) |
| 112 | + temp_content_file = tempfile.NamedTemporaryFile(delete=False) |
| 113 | + try: |
| 114 | + temp_content_file.write(content) |
| 115 | + temp_content_file.close() |
| 116 | + tar.add(temp_content_file.name, arcname=filename) |
| 117 | + finally: |
| 118 | + os.remove(temp_content_file.name) |
| 119 | + |
| 120 | + test_download_dir = './.pytest_downloads/' |
| 121 | + mkdir(test_download_dir) |
| 122 | + assert len(os.listdir(test_download_dir)) == 0 |
| 123 | + |
| 124 | + # Mock the requests part to return our tarfile |
| 125 | + with patch('hepdata_cli.api.is_downloadable', return_value=True), \ |
| 126 | + patch('hepdata_cli.api.resilient_requests') as mock_requests, \ |
| 127 | + patch('hepdata_cli.api.getFilename_fromCd', return_value='test.tar.gz'): |
| 128 | + |
| 129 | + mock_response = mock_requests.return_value |
| 130 | + mock_response.content = open(tar_path, 'rb').read() |
| 131 | + mock_response.headers = {'content-disposition': 'filename=test.tar.gz'} |
| 132 | + |
| 133 | + # Test the download_url function |
| 134 | + try: |
| 135 | + if raises: |
| 136 | + with pytest.raises(Exception): |
| 137 | + files = download_url('http://example.com/test.tar.gz', test_download_dir) |
| 138 | + else: |
| 139 | + files = download_url('http://example.com/test.tar.gz', test_download_dir) |
| 140 | + assert len(files) == 1 |
| 141 | + for f in files: |
| 142 | + assert os.path.exists(f) |
| 143 | + with open(f, 'rb') as fr: |
| 144 | + assert fr.read() == b"Hello, World!" |
| 145 | + finally: |
| 146 | + exists_patcher.stop() if filename is None else None |
| 147 | + if os.path.exists(tar_path): |
| 148 | + os.remove(tar_path) |
| 149 | + cleanup(test_download_dir) |
0 commit comments