Skip to content

Commit 38f274d

Browse files
authored
feat(iqb/cache): implement reading parquet files (#45)
This diff adds support for reading parquet files efficiently using filters and column projections. The resulting API is a low-level API for accessing the data, which seems handy because it allows for data exploration. We will iterate over this API and build more user friendly APIs for interacting with the available data.
1 parent 780b457 commit 38f274d

File tree

2 files changed

+248
-2
lines changed

2 files changed

+248
-2
lines changed

library/src/iqb/cache.py

Lines changed: 185 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,118 @@
2222
"""
2323

2424
import json
25+
from dataclasses import dataclass
2526
from datetime import datetime
2627
from pathlib import Path
2728

28-
from . import pipeline
29+
import pandas as pd
30+
import pyarrow.parquet as pq
31+
32+
from .pipeline import PipelineCacheManager, data_dir_or_default
33+
34+
35+
@dataclass(frozen=True)
36+
class CacheEntry:
37+
"""
38+
Entry inside the data cache.
39+
40+
Attributes:
41+
download_data: full path to data.parquet for download
42+
upload_data: full path to data.parquet for upload
43+
download_stats: full path to stats.json for download
44+
upload_stats: full path to stats.json for upload
45+
"""
46+
47+
download_data: Path
48+
upload_data: Path
49+
download_stats: Path
50+
upload_stats: Path
51+
52+
@staticmethod
53+
def _read_data_frame(
54+
filepath: Path,
55+
*,
56+
country_code: str | None,
57+
asn: int | None,
58+
city: str | None,
59+
columns: list[str] | None,
60+
) -> pd.DataFrame:
61+
# 1. setup the reading filters to efficiently skip groups of rows
62+
# PyArrow filters: list of tuples (column, operator, value)
63+
filters = []
64+
if asn is not None:
65+
filters.append(("asn", "=", asn))
66+
if city is not None:
67+
filters.append(("city", "=", city))
68+
if country_code is not None:
69+
filters.append(("country_code", "=", country_code))
70+
71+
# 2. load in memory using the filters and potentially cutting the columns
72+
# Note: PyArrow requires filters=None (not []) when there are no filters
73+
table = pq.read_table(filepath, filters=filters if filters else None, columns=columns)
74+
75+
# 3. finally convert to data frame
76+
return table.to_pandas()
77+
78+
def read_download_data_frame(
79+
self,
80+
*,
81+
country_code: str | None = None,
82+
asn: int | None = None,
83+
city: str | None = None,
84+
columns: list[str] | None = None,
85+
) -> pd.DataFrame:
86+
"""
87+
Load the download dataset as a dataframe.
88+
89+
The arguments allow to select a subset of the entire dataset.
90+
91+
Arguments:
92+
country_code: either None or the desired country code (e.g., "IT")
93+
asn: either None or the desired ASN (e.g., 137)
94+
city: either None or the desired city (e.g., "Boston")
95+
columns: either None (all columns) or list of column names to read
96+
97+
Return:
98+
A pandas DataFrame.
99+
"""
100+
return self._read_data_frame(
101+
self.download_data,
102+
country_code=country_code,
103+
asn=asn,
104+
city=city,
105+
columns=columns,
106+
)
107+
108+
def read_upload_data_frame(
109+
self,
110+
*,
111+
country_code: str | None = None,
112+
asn: int | None = None,
113+
city: str | None = None,
114+
columns: list[str] | None = None,
115+
) -> pd.DataFrame:
116+
"""
117+
Load the upload dataset as a dataframe.
118+
119+
The arguments allow to select a subset of the entire dataset.
120+
121+
Arguments:
122+
country_code: either None or the desired country code (e.g., "IT")
123+
asn: either None or the desired ASN (e.g., 137)
124+
city: either None or the desired city (e.g., "Boston")
125+
columns: either None (all columns) or list of column names to read
126+
127+
Return:
128+
A pandas DataFrame.
129+
"""
130+
return self._read_data_frame(
131+
self.upload_data,
132+
country_code=country_code,
133+
asn=asn,
134+
city=city,
135+
columns=columns,
136+
)
29137

30138

31139
class IQBCache:
@@ -39,7 +147,79 @@ def __init__(self, data_dir: str | Path | None = None):
39147
data_dir: Path to directory containing cached data files.
40148
If None, defaults to .iqb/ in current working directory.
41149
"""
42-
self.data_dir = pipeline.data_dir_or_default(data_dir)
150+
self.data_dir = data_dir_or_default(data_dir)
151+
152+
def get_cache_entry(
153+
self,
154+
*,
155+
start_date: str,
156+
end_date: str,
157+
granularity: str,
158+
) -> CacheEntry:
159+
"""
160+
Get cache entry associated with given dates and granularity.
161+
162+
The available granularities are:
163+
164+
1. "country"
165+
2. "country_asn"
166+
3. "country_city"
167+
4. "country_city_asn"
168+
169+
Note that this function is a low level building block allowing you to
170+
access and filter data very efficiently. Consider using higher-level
171+
user-friendlty APIs when they are actually available.
172+
173+
The returned CacheEntry allows you to read raw data as DataFrame.
174+
175+
Arguments:
176+
start_date: start measurement date expressed as YYYY-MM-DD (included)
177+
end_date: end measurement date expressed as YYYY-MM-DD (excluded)
178+
granularity: the granularity to use
179+
180+
Return:
181+
A CacheEntry instance.
182+
183+
Example:
184+
>>> # Returns data for October 2025
185+
>>> result = cache.get_cache_entry("2025-10-01", "2025-11-01", "country")
186+
"""
187+
# 1. create a temporary cache manager instance
188+
manager = PipelineCacheManager(self.data_dir)
189+
190+
# 2. check whether the download entry exists
191+
download_entry = manager.get_cache_entry(
192+
f"downloads_by_{granularity}",
193+
start_date,
194+
end_date,
195+
)
196+
download_data = download_entry.data_path()
197+
download_stats = download_entry.stats_path()
198+
if download_data is None or download_stats is None:
199+
raise FileNotFoundError(
200+
f"Cache entry not found for downloads_by_{granularity} ({start_date} to {end_date})"
201+
)
202+
203+
# 3. check whether the upload entry exists
204+
upload_entry = manager.get_cache_entry(
205+
f"uploads_by_{granularity}",
206+
start_date,
207+
end_date,
208+
)
209+
upload_data = upload_entry.data_path()
210+
upload_stats = upload_entry.stats_path()
211+
if upload_data is None or upload_stats is None:
212+
raise FileNotFoundError(
213+
f"Cache entry not found for uploads_by_{granularity} ({start_date} to {end_date})"
214+
)
215+
216+
# 4. return the actual cache entry
217+
return CacheEntry(
218+
download_data=download_data,
219+
upload_data=upload_data,
220+
download_stats=download_stats,
221+
upload_stats=upload_stats,
222+
)
43223

44224
def get_data(
45225
self,
@@ -94,6 +274,9 @@ def get_data(
94274
NOTE: This creates semantics where p95 represents "typical best
95275
performance" - empirical validation will determine if appropriate.
96276
"""
277+
# TODO(bassosimone): we should convert this method to become
278+
# a wrapper of `get_cache_entry` in the future.
279+
97280
# Design Note
98281
# -----------
99282
#

library/tests/iqb/integration_test.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,66 @@ def test_with_us_data_october_2024(self, data_dir):
2626
# NOTE: With current percentile interpretation (p95 for all metrics),
2727
# scores may reach 1.0 as we're checking "top ~5% performance"
2828
assert score >= 0 and score <= 1
29+
30+
def test_cache_entry_read_dataframes(self, data_dir):
31+
"""Test reading parquet files from v1 cache using get_cache_entry()."""
32+
# Instantiate the cache with the data directory
33+
cache = IQBCache(data_dir=data_dir)
34+
35+
# Get cache entry for October 2024 country-level data
36+
entry = cache.get_cache_entry(
37+
start_date="2024-10-01",
38+
end_date="2024-11-01",
39+
granularity="country",
40+
)
41+
42+
# Read download DataFrame for all countries
43+
download_df = entry.read_download_data_frame()
44+
assert not download_df.empty
45+
assert "country_code" in download_df.columns
46+
assert "sample_count" in download_df.columns
47+
assert "download_p95" in download_df.columns
48+
assert "latency_p95" in download_df.columns
49+
assert "loss_p95" in download_df.columns
50+
assert len(download_df) > 200 # We have 236 countries
51+
52+
# Read upload DataFrame for all countries
53+
upload_df = entry.read_upload_data_frame()
54+
assert not upload_df.empty
55+
assert "country_code" in upload_df.columns
56+
assert "sample_count" in upload_df.columns
57+
assert "upload_p95" in upload_df.columns
58+
assert len(upload_df) > 200 # We have 237 countries
59+
60+
# Filter by country_code (US)
61+
us_download_df = entry.read_download_data_frame(country_code="US")
62+
assert len(us_download_df) == 1
63+
assert us_download_df.iloc[0]["country_code"] == "US"
64+
assert us_download_df.iloc[0]["sample_count"] > 0
65+
assert us_download_df.iloc[0]["download_p95"] > 0 # US has good throughput
66+
67+
us_upload_df = entry.read_upload_data_frame(country_code="US")
68+
assert len(us_upload_df) == 1
69+
assert us_upload_df.iloc[0]["country_code"] == "US"
70+
assert us_upload_df.iloc[0]["sample_count"] > 0
71+
assert us_upload_df.iloc[0]["upload_p95"] > 0
72+
73+
# Test column projection (only load specific columns)
74+
limited_download_df = entry.read_download_data_frame(
75+
country_code="US",
76+
columns=["country_code", "sample_count", "download_p95", "latency_p95"],
77+
)
78+
assert len(limited_download_df.columns) == 4
79+
assert "sample_count" in limited_download_df.columns
80+
assert "download_p95" in limited_download_df.columns
81+
assert "latency_p95" in limited_download_df.columns
82+
assert "loss_p95" not in limited_download_df.columns # Not requested
83+
84+
# Verify we can read data for multiple countries
85+
countries = ["US", "DE", "BR"]
86+
for country_code in countries:
87+
df = entry.read_download_data_frame(country_code=country_code)
88+
assert len(df) == 1
89+
assert df.iloc[0]["country_code"] == country_code
90+
assert df.iloc[0]["sample_count"] > 0
91+
assert df.iloc[0]["download_p95"] > 0

0 commit comments

Comments
 (0)