Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .github/workflows/python_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: Python Unit Tests

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
workflow_dispatch:

jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.10', '3.11', '3.12', '3.13']
fail-fast: false

steps:
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install uv
run: |
curl -LsSf https://astral.sh/uv/install.sh | sh
echo "$HOME/.cargo/bin" >> $GITHUB_PATH

- name: Install dependencies
run: |
uv sync

- name: Run tests
run: |
uv run -m pytest src -v
11 changes: 11 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[project]
name = "cc-index-table"
version = "0.1.0"
description = "Tools for working with Common Crawl index tables."
requires-python = ">=3.12"
dependencies = [
"boto3>=1.40.61",
"pyarrow>=22.0.0",
"pytest>=8.4.2",
"tqdm>=4.67.1",
]
Empty file added src/util/__init__.py
Empty file.
100 changes: 100 additions & 0 deletions src/util/is_table_sorted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from collections import defaultdict

import pyarrow.parquet as pq
import argparse

from urllib.parse import urlparse
from urllib.request import urlopen
import boto3
import gzip
from tqdm.auto import tqdm


def are_parquet_file_row_groups_sorted(pf: pq.ParquetFile, column_name: str) -> tuple[bool, str|None, str|None]:
sort_column_index = next(i for i, name in enumerate(pf.schema.names)
if name == column_name)

# keep track of min/max in this ParquetFile
whole_min = None
whole_max = None
prev_max = None
for row_group_index in range(pf.num_row_groups):
row_group = pf.metadata.row_group(row_group_index)
column = row_group.column(sort_column_index)
if prev_max is not None and prev_max > column.statistics.min:
# internally unsorted
print(f"row group {row_group_index} is not sorted on {column_name}: '{column.statistics.min}' <= '{prev_max}' ; stopping")
return False, None, None
whole_min = column.statistics.min if whole_min is None else min(column.statistics.min, whole_min)
whole_max = column.statistics.max if whole_max is None else max(column.statistics.max, whole_max)
prev_max = column.statistics.max
return True, whole_min, whole_max


def is_full_table_sorted(file_or_s3_url_list_ordered: list[str], sort_column_name: str) -> bool:
is_sorted = True
prev_max = None
prev_file_or_url = None
status = defaultdict(int)
with tqdm(file_or_s3_url_list_ordered) as pbar:
for file_or_url in pbar:
pf = pq.ParquetFile(file_or_url)
this_is_sorted, pf_min, pf_max = are_parquet_file_row_groups_sorted(pf, column_name=sort_column_name)
if not this_is_sorted:
print(
f"Row groups are *internally* not sorted in file {file_or_url}"
)
is_sorted = False
status['internally_unsorted'] += 1

if prev_max is not None and prev_max > pf_min:
print(f"{prev_file_or_url} is not sorted with respect to {file_or_url}: '{prev_max}' > '{pf_min}'")
status['filewise_unsorted'] += 1
#is_sorted = False # uncomment to fail on filewise unsortedness
pbar.set_postfix(status)
prev_max = pf_max
prev_file_or_url = file_or_url
return is_sorted


def is_gzip(content: bytes) -> bool:
return content[:2] == b'\x1f\x8b'


def read_file_list(path_or_url: str, prefix: str) -> list[str]:
parsed = urlparse(path_or_url)
if parsed.scheme == "s3":
s3 = boto3.client("s3")
bucket = parsed.netloc
key = parsed.path.lstrip("/")
obj = s3.get_object(Bucket=bucket, Key=key)
content = obj["Body"].read()
elif parsed.scheme in ("http", "https"):
with urlopen(path_or_url) as f:
content = f.read()
else:
with open(path_or_url, "r") as f:
content = f.read()

if is_gzip(content):
content = gzip.decompress(content)
lines = content.decode("utf-8").split("\n")
return [prefix + line.strip() for line in lines if len(line.strip()) > 0]


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Check if a collection of Parquet files, considered as a whole, is sorted. Exit code is 0 if sorted, 1 if not sorted.")
parser.add_argument("files_or_s3_urls_file", type=str, help="URI or path to a text file containing a list of paths, URLs, or S3 URLs, one per line, in the expected sorted order.")
parser.add_argument("--prefix", type=str, default="s3://commoncrawl/", help="Prefix to prepend to entries read from the file (default: 's3://commoncrawl/')")
parser.add_argument("--column", type=str, default="url_surtkey", help="Column name to check sorting against (default: 'url_surtkey')")

args = parser.parse_args()

files = read_file_list(args.files_or_s3_urls_file, prefix=args.prefix)
is_sorted = is_full_table_sorted(files, sort_column_name=args.column)
if is_sorted:
print("✅ Files are sorted")
exit(0)
else:
print("❌ Files are NOT sorted")
exit(1)
Empty file added src/util/test/__init__.py
Empty file.
98 changes: 98 additions & 0 deletions src/util/test/test_is_table_sorted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import random
from unittest.mock import MagicMock, patch

from util.is_table_sorted import are_parquet_file_row_groups_sorted, is_full_table_sorted


def _create_mock_parquet_file(column_name: str, row_groups_stats: list[tuple[str, str]]):
mock_pf = MagicMock()
mock_pf.schema.names = [column_name]
mock_pf.num_row_groups = len(row_groups_stats)

mock_row_groups = []
for min_val, max_val in row_groups_stats:
mock_row_group = MagicMock()
mock_column = MagicMock()
mock_column.statistics.min = min_val
mock_column.statistics.max = max_val
mock_row_group.column.return_value = mock_column
mock_row_groups.append(mock_row_group)

mock_pf.metadata.row_group.side_effect = lambda i: mock_row_groups[i]
return mock_pf


def test_single_row_group_sorted():
mock_pf = _create_mock_parquet_file('url_surtkey', [('a', 'b')])
is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, column_name='url_surtkey')
assert is_sorted
assert min_val == 'a'
assert max_val == 'b'


def test_row_groups_sorted():
all_row_groups_stats = [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h')]
for n in range(1, len(all_row_groups_stats)):
row_groups_stats = all_row_groups_stats[:n]
mock_pf = _create_mock_parquet_file('url_surtkey', row_groups_stats)
is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, column_name='url_surtkey')
assert is_sorted
assert is_sorted
assert min_val == row_groups_stats[0][0]
assert max_val == row_groups_stats[-1][1]


def test_row_groups_unsorted():
all_row_groups_stats = [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h')]
count = 0
while count < 100:
for n in range(2, len(all_row_groups_stats)):
row_groups_stats = all_row_groups_stats[:n].copy()
random.shuffle(row_groups_stats)
if row_groups_stats == all_row_groups_stats[:n]:
# shuffle resulted in same order, try again
continue

mock_pf = _create_mock_parquet_file('url_surtkey', row_groups_stats)
is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, column_name='url_surtkey')
assert not is_sorted

count += 1


def test_row_groups_overlapping():
row_groups = [('a', 'c'), ('b', 'd')]
mock_pf = _create_mock_parquet_file('url_surtkey', row_groups)
is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, column_name='url_surtkey')
assert not is_sorted


def test_ordered_files_sorted():
files_config = {
'/data/a': [('aaa', 'bbb'), ('bbc', 'ccc')],
'/data/b': [('ccd', 'ddd'), ('dde', 'eee')],
'/data/c': [('eef', 'fff'), ('ffg', 'ggg')],
}

def mock_parquet_file(path):
return _create_mock_parquet_file('url_surtkey', files_config[path])

with patch('pyarrow.parquet.ParquetFile', side_effect=mock_parquet_file):
result = is_full_table_sorted(['/data/a', '/data/b', '/data/c'], 'url_surtkey')
assert result


def test_ordered_files_unsorted():
files_config = {
'/data/a': [('aaa', 'bbb'), ('bbc', 'ccc')],
'/data/b': [('ccd', 'ddd'), ('dde', 'eee')],
'/data/c': [('eef', 'fff'), ('ffg', 'ggg')],
}

def mock_parquet_file(path):
return _create_mock_parquet_file('url_surtkey', files_config[path])

with patch('pyarrow.parquet.ParquetFile', side_effect=mock_parquet_file):
result = is_full_table_sorted(['/data/a', '/data/c', '/data/b'], 'url_surtkey')
assert result # we don't care about the order of files