Skip to content

Commit 155db05

Browse files
committed
more tests
1 parent d8a627f commit 155db05

File tree

4 files changed

+225
-88
lines changed

4 files changed

+225
-88
lines changed

cdx_toolkit/filter_cdx/__init__.py

Lines changed: 60 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
import sys
55
from concurrent.futures import ProcessPoolExecutor, as_completed
66
from functools import partial
7+
from typing import List, Tuple
78

89
import fsspec
910
from surt import surt
1011

11-
from cdx_toolkit.filter_cdx.matcher import TupleMatcher, TrieMatcher
12+
from cdx_toolkit.filter_cdx.matcher import Matcher, TupleMatcher, TrieMatcher
1213

1314

1415
logger = logging.getLogger(__name__)
@@ -62,71 +63,77 @@ def run_filter_cdx(args, cmdline: str):
6263
'trie': TrieMatcher,
6364
'tuple': TupleMatcher,
6465
}
66+
limit = 0 if args.limit is None else args.limit
67+
logger.info(f'Loaded {len(include_surt_prefixes):,} filter entries using {args.matching_approach} approach')
6568

66-
matcher = matcher_classes[args.matching_approach](include_surt_prefixes)
69+
# Process files in parallel
70+
total_lines_n, total_included_n, total_errors_n = filter_cdx(
71+
matcher=matcher_classes[args.matching_approach](include_surt_prefixes),
72+
input_paths=input_paths,
73+
output_paths=output_paths,
74+
limit=limit,
75+
n_parallel=max(1, args.parallel),
76+
)
6777

68-
logger.info(f'Loaded {len(include_surt_prefixes):,} filter entries using {args.matching_approach} approach')
78+
logger.info(
79+
f'Filter statistics: {total_included_n} / {total_lines_n} lines ({total_included_n / total_lines_n:.4f})'
80+
)
81+
logger.info(
82+
f'Errors: {total_errors_n}'
83+
)
6984

70-
# Process files in parallel or sequentially
71-
n_parallel = args.parallel
72-
limit = 0 if args.limit is None else args.limit
73-
total_lines_n = 0
74-
total_included_n = 0
75-
total_errors_n = 0
76-
77-
if n_parallel > 1:
78-
# Parallel processing
79-
logger.info('Parallel processes: %i', n_parallel)
80-
with ProcessPoolExecutor(max_workers=n_parallel) as executor:
81-
# Create partial function with common arguments
82-
process_file_partial = partial(_process_single_file, matcher=matcher, limit=limit)
83-
84-
# Submit all jobs
85-
future_to_paths = {
86-
executor.submit(process_file_partial, input_path, output_path): (input_path, output_path)
87-
for input_path, output_path in zip(input_paths, output_paths)
88-
}
89-
90-
# Collect results
91-
for future in as_completed(future_to_paths):
92-
input_path, output_path = future_to_paths[future]
93-
try:
94-
lines_n, included_n = future.result()
95-
logger.info(
96-
f'File statistics for {input_path}: included_n={included_n}; lines_n={lines_n}; ratio={included_n / lines_n:.4f}'
97-
)
98-
total_lines_n += lines_n
99-
total_included_n += included_n
100-
101-
except Exception as exc:
102-
logger.error(f'File {input_path} generated an exception: {exc}')
103-
total_errors_n += 1
104-
else:
105-
# Sequential processing
106-
logger.info('Sequential processing')
107-
for input_path, output_path in zip(input_paths, output_paths):
85+
if limit > 0 and total_included_n >= 0:
86+
logger.info(f"Limit reached at {limit}")
87+
88+
# End timing and log execution time
89+
end_time = time.time()
90+
execution_time = end_time - start_time
91+
92+
logger.info(f'Script execution time: {execution_time:.3f} seconds')
93+
94+
95+
def filter_cdx(
96+
matcher: Matcher,
97+
input_paths: List[str],
98+
output_paths: List[str],
99+
n_parallel: int = 1,
100+
limit: int = 0,
101+
total_lines_n: int = 0,
102+
total_included_n: int = 0,
103+
total_errors_n: int = 0,
104+
) -> Tuple[int, int, int]:
105+
"""Filter CDX files from input paths using a matcher to output paths."""
106+
107+
# Parallel processing
108+
logger.info('Filtering with %i processes in parallel (limit: %i)', n_parallel, limit)
109+
110+
with ProcessPoolExecutor(max_workers=n_parallel) as executor:
111+
# Create partial function with common arguments
112+
process_file_partial = partial(_process_single_file, matcher=matcher, limit=limit)
113+
114+
# Submit all jobs
115+
future_to_paths = {
116+
executor.submit(process_file_partial, input_path, output_path): (input_path, output_path)
117+
for input_path, output_path in zip(input_paths, output_paths)
118+
}
119+
120+
# Collect results
121+
for future in as_completed(future_to_paths):
122+
input_path, output_path = future_to_paths[future]
108123
try:
109-
lines_n, included_n = _process_single_file(input_path, output_path, matcher, limit)
124+
lines_n, included_n = future.result()
110125
logger.info(
111-
f'File statistics for {input_path}: included_n={included_n}; lines_n={lines_n}; ratio={included_n / lines_n:.4f}'
126+
f'File statistics: included {total_included_n} / {total_lines_n} lines: {input_path}'
112127
)
128+
113129
total_lines_n += lines_n
114130
total_included_n += included_n
115131

116132
except Exception as exc:
117133
logger.error(f'File {input_path} generated an exception: {exc}')
118134
total_errors_n += 1
119-
logger.info(
120-
f'Total statistics: included_n={total_included_n}; lines_n={total_lines_n}; ratio={total_included_n / total_lines_n:.4f}'
121-
)
122-
if total_errors_n > 0:
123-
logger.error('Processing errors: %i', total_errors_n)
124-
125-
# End timing and log execution time
126-
end_time = time.time()
127-
execution_time = end_time - start_time
128135

129-
logger.info(f'Script execution time: {execution_time:.3f} seconds')
136+
return total_lines_n, total_included_n, total_errors_n
130137

131138

132139
def resolve_paths(input_base_path: str, input_glob: str, output_base_path: str):

cdx_toolkit/warcer_by_cdx/aioboto3_warcer.py

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -177,33 +177,29 @@ async def get_range_jobs_from_index_paths(
177177
logger.info('Range index limit: %i', limit)
178178
count = 0
179179

180-
if not index_paths:
181-
logger.error('No index paths provided!')
182-
183-
else:
184-
# Iterate over index files
185-
for index_path in index_paths:
186-
# Fetch range queries from index
187-
try:
188-
for warc_url, offset, length in iter_cdx_index_from_path(
189-
index_path, warc_download_prefix=warc_download_prefix
190-
):
191-
# Convert the CDX record back to a RangeJob
192-
bucket, key = parse_s3_uri(warc_url)
193-
job = RangeJob(bucket=bucket, key=key, offset=offset, length=length)
194-
await key_queue.put(job)
195-
count += 1
196-
197-
if limit > 0 and count >= limit:
198-
logger.warning('Index limit reached at %i', count)
199-
break
200-
201-
except Exception as e:
202-
logger.error('Failed to read CDX index from %s: %s', index_path, e)
203-
204-
if limit > 0 and count >= limit:
205-
logger.warning('Limit reached at %i', count)
206-
break
180+
# Iterate over index files
181+
for index_path in index_paths:
182+
# Fetch range queries from index
183+
try:
184+
for warc_url, offset, length in iter_cdx_index_from_path(
185+
index_path, warc_download_prefix=warc_download_prefix
186+
):
187+
# Convert the CDX record back to a RangeJob
188+
bucket, key = parse_s3_uri(warc_url)
189+
job = RangeJob(bucket=bucket, key=key, offset=offset, length=length)
190+
await key_queue.put(job)
191+
count += 1
192+
193+
if limit > 0 and count >= limit:
194+
logger.warning('Index limit reached at %i', count)
195+
break
196+
197+
except Exception as e:
198+
logger.error('Failed to read CDX index from %s: %s', index_path, e)
199+
200+
if limit > 0 and count >= limit:
201+
logger.warning('Limit reached at %i', count)
202+
break
207203

208204
# signal fetchers to stop
209205
for _ in range(num_fetchers):
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import asyncio
2+
from unittest.mock import patch, AsyncMock
3+
4+
from cdx_toolkit.warcer_by_cdx.aioboto3_warcer import filter_warc_by_cdx_via_aioboto3, get_range_jobs_from_index_paths
5+
6+
7+
def test_filter_warc_by_cdx_via_aioboto3_keyboard_interrupt(caplog):
8+
"""Test filter_warc_by_cdx_via_aioboto3 KeyboardInterrupt exception handling."""
9+
10+
# Mock the async function to raise KeyboardInterrupt
11+
async def mock_async_function(*args, **kwargs):
12+
raise KeyboardInterrupt('User interrupted')
13+
14+
with patch(
15+
'cdx_toolkit.warcer_by_cdx.aioboto3_warcer.filter_warc_by_cdx_via_aioboto3_async',
16+
side_effect=mock_async_function,
17+
):
18+
# Call the function with minimal required parameters
19+
result = filter_warc_by_cdx_via_aioboto3(
20+
index_paths=['test_index.cdx'], prefix_path='s3://test-bucket/test-prefix', writer_info={'software': 'test'}
21+
)
22+
23+
# Verify that KeyboardInterrupt was handled correctly
24+
assert result == -1, 'Should return -1 when KeyboardInterrupt is caught'
25+
26+
# Check that the warning message was logged
27+
assert 'Interrupted by user.' in caplog.text
28+
29+
# Verify the log level is warning
30+
warning_records = [record for record in caplog.records if record.levelname == 'WARNING']
31+
assert len(warning_records) == 1
32+
assert warning_records[0].message == 'Interrupted by user.'
33+
34+
35+
36+
def test_get_range_jobs_from_index_paths_exception_handling_with_logging(caplog):
37+
"""Test get_range_jobs_from_index_paths logs errors when iter_cdx_index_from_path raises."""
38+
39+
async def run_test():
40+
# Create a mock queue
41+
key_queue = AsyncMock(spec=asyncio.Queue)
42+
43+
# Test parameters
44+
index_paths = ['failing_index.cdx']
45+
warc_download_prefix = 'http://test-prefix'
46+
num_fetchers = 1
47+
48+
# Mock iter_cdx_index_from_path to always raise exception
49+
def mock_iter_cdx_index_from_path(index_path, warc_download_prefix):
50+
raise ValueError('Simulated CDX parsing error')
51+
52+
with patch(
53+
'cdx_toolkit.warcer_by_cdx.aioboto3_warcer.iter_cdx_index_from_path',
54+
side_effect=mock_iter_cdx_index_from_path,
55+
):
56+
# Run the function
57+
await get_range_jobs_from_index_paths(
58+
key_queue=key_queue,
59+
index_paths=index_paths,
60+
warc_download_prefix=warc_download_prefix,
61+
num_fetchers=num_fetchers,
62+
limit=0,
63+
)
64+
65+
# Verify error was logged
66+
assert 'Failed to read CDX index from failing_index.cdx' in caplog.text
67+
assert 'Simulated CDX parsing error' in caplog.text
68+
69+
# Verify that only STOP signal was sent (no jobs due to exception)
70+
assert key_queue.put.call_count == 1 # Only 1 STOP signal
71+
72+
# Run the test
73+
asyncio.run(run_test())

tests/warc_by_cdx/test_filter_cdx.py

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import pytest
22

3+
from unittest.mock import patch
4+
35
from cdx_toolkit.cli import main
4-
from cdx_toolkit.filter_cdx import resolve_paths, validate_resolved_paths
6+
from cdx_toolkit.filter_cdx import _process_single_file, resolve_paths, validate_resolved_paths, filter_cdx
7+
from cdx_toolkit.filter_cdx.matcher import TupleMatcher
58
from tests.conftest import requires_aws_s3, TEST_DATA_PATH
69

710
fixture_path = TEST_DATA_PATH / 'filter_cdx'
@@ -23,7 +26,7 @@ def test_cli_filter_cdx_with_surts(tmpdir, caplog):
2326
f'{str(whitelist_path)}',
2427
f'{tmpdir}',
2528
'--filter-type=surt',
26-
f'--input-glob={index_glob}'
29+
f'--input-glob={index_glob}',
2730
]
2831
)
2932

@@ -46,7 +49,7 @@ def test_cli_filter_cdx_with_urls(tmpdir, caplog):
4649
f'{str(whitelist_path)}',
4750
f'{tmpdir}',
4851
'--filter-type=url',
49-
f'--input-glob={index_glob}'
52+
f'--input-glob={index_glob}',
5053
]
5154
)
5255

@@ -99,7 +102,7 @@ def test_filter_cdx_nonexistent_surt_file_exits(tmpdir, caplog):
99102
f'{index_path}',
100103
f'{nonexistent_surt_file}',
101104
f'{tmpdir}',
102-
f'--input-glob={index_glob}'
105+
f'--input-glob={index_glob}',
103106
]
104107
)
105108

@@ -150,15 +153,73 @@ def test_cli_filter_cdx_with_parallel_processing(tmpdir, caplog):
150153
f'{tmpdir}',
151154
'--filter-type=surt',
152155
f'--input-glob={index_glob}',
153-
'--parallel=2'
156+
'--parallel=2',
154157
]
155158
)
156159

157160
# Check that multiple files were processed in parallel
158161
assert 'Found' in caplog.text and 'files matching pattern' in caplog.text
159-
assert 'File statistics for' in caplog.text
160-
assert 'Total statistics:' in caplog.text
162+
assert 'File statistics' in caplog.text
163+
assert 'Filter statistics' in caplog.text
161164

162165
# Should have processed multiple files (pattern matches 2 files: cdx-00187.gz and cdx-00188.gz)
163-
file_stats_count = caplog.text.count('File statistics for')
166+
file_stats_count = caplog.text.count('File statistics')
164167
assert file_stats_count == 2, 'Should process exactly 2 files with the glob pattern'
168+
169+
170+
def test_process_single_file(tmpdir):
171+
input_path = TEST_DATA_PATH / 'warc_by_cdx/filtered_CC-MAIN-2024-30_cdx-00187.gz'
172+
matcher = TupleMatcher(prefixes=['fr,'])
173+
174+
lines_n, included_n = _process_single_file(
175+
input_path=input_path,
176+
output_path=tmpdir + '/filter_cdx',
177+
matcher=matcher,
178+
log_every_n=10,
179+
limit=100,
180+
)
181+
182+
assert included_n == 100
183+
assert lines_n == 100
184+
185+
186+
def test_process_single_file_empty(tmpdir):
187+
input_path = tmpdir + '/input'
188+
with open(input_path, 'w') as f:
189+
f.write('')
190+
191+
lines_n, included_n = _process_single_file(
192+
input_path=input_path,
193+
output_path=tmpdir + '/output',
194+
matcher=None,
195+
)
196+
assert lines_n == 0
197+
assert included_n == 0
198+
199+
200+
def test_filter_cdx_error_handling(tmpdir, caplog):
201+
"""Test filter_cdx function error handling when exceptions occur during processing."""
202+
203+
def mock_process_single_file(*args, **kwargs):
204+
raise ValueError()
205+
206+
# Create test input and output paths
207+
input_paths = [str(tmpdir / 'input1.cdx'), str(tmpdir / 'input2.cdx')]
208+
output_paths = [str(tmpdir / 'output1.cdx'), str(tmpdir / 'output2.cdx')]
209+
210+
# Replace the _process_single_file function with our mock
211+
with patch('cdx_toolkit.filter_cdx._process_single_file', side_effect=mock_process_single_file):
212+
# Test the error handling
213+
total_lines, total_included, total_errors = filter_cdx(
214+
matcher=None,
215+
input_paths=input_paths,
216+
output_paths=output_paths,
217+
)
218+
219+
# Verify error handling results
220+
assert total_errors == 2, f'Should have 1 error from the failed file, got {total_errors}'
221+
assert total_lines == 0, 'Should have lines from the successful file'
222+
assert total_included == 0, 'Should have included lines from the successful file'
223+
224+
# Check that error was logged correctly
225+
assert 'generated an exception' in caplog.text

0 commit comments

Comments
 (0)