|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +from pathlib import Path |
| 4 | + |
| 5 | +import ibis |
| 6 | +from ibis import _ |
| 7 | +import ibis.expr.types as ir |
| 8 | +import pytest |
| 9 | + |
| 10 | +from mismo.lib.geo import postal_parse_address |
| 11 | +from mismo.lib.geo._address import ADDRESS_SCHEMA |
| 12 | + |
| 13 | +try: |
| 14 | + from postal.parser import parse_address as _parse_address |
| 15 | +except ImportError: |
| 16 | + # Need to make it so that pytest can at least collect the tests on CI on windows |
| 17 | + # (or wherever postal is not available). |
| 18 | + # Of course, actually running the tests will explode things. |
| 19 | + pass |
| 20 | + |
| 21 | +_NOOP_ADDRESS = { |
| 22 | + "street1": None, |
| 23 | + "street2": None, |
| 24 | + "postal_code": None, |
| 25 | + "city": None, |
| 26 | + "state": None, |
| 27 | + "country": None, |
| 28 | +} |
| 29 | + |
| 30 | +udf = ibis.udf.scalar.python(signature=((str,), ADDRESS_SCHEMA)) |
| 31 | + |
| 32 | + |
| 33 | +@udf |
| 34 | +def noop(address_string: str | None) -> dict: |
| 35 | + return _NOOP_ADDRESS |
| 36 | + |
| 37 | + |
| 38 | +@udf |
| 39 | +def python_only(address_string: str | None) -> dict: |
| 40 | + result: dict[str, str | None] = { |
| 41 | + "house_number": None, |
| 42 | + "road": None, |
| 43 | + "unit": None, |
| 44 | + "city": None, |
| 45 | + "state": None, |
| 46 | + "postcode": None, |
| 47 | + "country": None, |
| 48 | + } |
| 49 | + |
| 50 | + # Fake 'parse_address' function that emits just one field ("street") |
| 51 | + # containing the whole address. |
| 52 | + parsed_fields = (("street", address_string),) |
| 53 | + for value, label in parsed_fields: |
| 54 | + current = result.get(label, False) |
| 55 | + if current is not False: |
| 56 | + result[label] = value if current is None else f"{current} {value}" |
| 57 | + |
| 58 | + house_number = result.pop("house_number") |
| 59 | + if house_number is not None: |
| 60 | + road = result["road"] |
| 61 | + if road is None: |
| 62 | + result["road"] = house_number |
| 63 | + else: |
| 64 | + result["road"] = f"{house_number} {road}" |
| 65 | + |
| 66 | + result["street1"] = result.pop("road") |
| 67 | + result["street2"] = result.pop("unit") |
| 68 | + result["postal_code"] = result.pop("postcode") |
| 69 | + |
| 70 | + return result |
| 71 | + |
| 72 | + |
| 73 | +@udf |
| 74 | +def postal_only(address_string: str | None) -> dict: |
| 75 | + _parse_address(address_string or "") |
| 76 | + return _NOOP_ADDRESS |
| 77 | + |
| 78 | + |
| 79 | +@udf |
| 80 | +def complete(address_string: str | None) -> dict | None: |
| 81 | + if address_string is None: |
| 82 | + return None |
| 83 | + # Initially, the keys match the names of pypostal fields we need. |
| 84 | + # Later, this dict is modified to match the shape of an `ADDRESS_SCHEMA`. |
| 85 | + result: dict[str, str | None] = { |
| 86 | + "house_number": None, |
| 87 | + "road": None, |
| 88 | + "unit": None, |
| 89 | + "city": None, |
| 90 | + "state": None, |
| 91 | + "postcode": None, |
| 92 | + "country": None, |
| 93 | + } |
| 94 | + |
| 95 | + parsed_fields = _parse_address(address_string) |
| 96 | + for value, label in parsed_fields: |
| 97 | + # Pypostal returns more fields than the ones we actually need. |
| 98 | + # Here `False` is used as a placeholder under the assumption that |
| 99 | + # such value is never returned by pypostal a field value. |
| 100 | + current = result.get(label, False) |
| 101 | + |
| 102 | + # Keep only the fields declared when `result` is initialized. |
| 103 | + # Pypostal fields can be repeated, in such case we concat their values. |
| 104 | + if current is not False: |
| 105 | + result[label] = value if current is None else f"{current} {value}" |
| 106 | + |
| 107 | + # Hack to prepend "house_number" to "road" |
| 108 | + house_number = result.pop("house_number") |
| 109 | + if house_number is not None: |
| 110 | + road = result["road"] |
| 111 | + if road is None: |
| 112 | + result["road"] = house_number |
| 113 | + else: |
| 114 | + result["road"] = f"{house_number} {road}" |
| 115 | + |
| 116 | + # Modify `result` in-place to match the shape of an `ADDRESS_SCHEMA`. |
| 117 | + result["street1"] = result.pop("road") |
| 118 | + result["street2"] = result.pop("unit") |
| 119 | + result["postal_code"] = result.pop("postcode") |
| 120 | + |
| 121 | + return result |
| 122 | + |
| 123 | + |
| 124 | +def download_test_data() -> ir.Table: |
| 125 | + # download test data from https://github.com/NickCrews/apoc-data/releases/tag/20240717-111158 |
| 126 | + URL_TEMPLATE = "https://github.com/NickCrews/apoc-data/releases/download/20240717-111158/income_{year}.csv" |
| 127 | + conn = ibis.duckdb.connect() |
| 128 | + sub_tables = [ |
| 129 | + conn.read_csv( |
| 130 | + (URL_TEMPLATE.format(year=year) for year in range(2011, 2024)), |
| 131 | + all_varchar=True, |
| 132 | + ) |
| 133 | + ] |
| 134 | + t = ibis.union(*sub_tables) |
| 135 | + t = t.select( |
| 136 | + full_address=_.Address |
| 137 | + + ", " |
| 138 | + + _.City |
| 139 | + + ", " |
| 140 | + + _.State |
| 141 | + + ", " |
| 142 | + + _.Zip |
| 143 | + + ", " |
| 144 | + + _.Country |
| 145 | + ) |
| 146 | + return t |
| 147 | + |
| 148 | + |
| 149 | +@pytest.fixture |
| 150 | +def data(backend: ibis.BaseBackend) -> ir.Table: |
| 151 | + pq = Path(__file__).parent / "apoc_addresses_1M.parquet" |
| 152 | + if not pq.exists(): |
| 153 | + download_test_data().to_parquet(pq) |
| 154 | + t = backend.read_parquet(pq) |
| 155 | + t = t.cache() # ensure in memory, we don't want to benchmark disk IO |
| 156 | + return t |
| 157 | + |
| 158 | + |
| 159 | +@pytest.mark.parametrize( |
| 160 | + "fn", |
| 161 | + [ |
| 162 | + noop, |
| 163 | + python_only, |
| 164 | + postal_only, |
| 165 | + complete, |
| 166 | + postal_parse_address, |
| 167 | + ], |
| 168 | +) |
| 169 | +@pytest.mark.parametrize( |
| 170 | + "nrows", |
| 171 | + [ |
| 172 | + pytest.param(1_000, id="1k"), |
| 173 | + pytest.param(10_000, id="10k"), |
| 174 | + pytest.param(100_000, id="100k"), |
| 175 | + pytest.param(1_000_000, id="1m"), |
| 176 | + ], |
| 177 | +) |
| 178 | +# run with eg |
| 179 | +# just bench -k test_benchmark_parse[100k-postal_parse_address] |
| 180 | +# just bench -k 10k mismo/lib/geo/tests/test_postal_benchmark.py |
| 181 | +# just bench -k 100k-postal_only mismo/lib/geo/tests/test_postal_benchmark.py |
| 182 | +def test_benchmark_parse(backend, data, nrows, fn, benchmark): |
| 183 | + inp = data.head(nrows).full_address |
| 184 | + |
| 185 | + def run(): |
| 186 | + t = fn(inp).lift() |
| 187 | + # Not sure if this is needed, but being defensive: |
| 188 | + # If we use .cache(), then when benchmark() runs this in a loop, |
| 189 | + # computation will only happen the first time, and the rest of the |
| 190 | + # times it will just return the cached result. |
| 191 | + return backend.create_table("temp", t, overwrite=True) |
| 192 | + |
| 193 | + result = benchmark(run) |
| 194 | + assert len(result.execute()) == nrows |
0 commit comments