Skip to content

Commit e669b3b

Browse files
Merge pull request #34 from praekeltfoundation/manage_large_data_volumes
Manage large data volumes
2 parents 7de2810 + de7e82f commit e669b3b

File tree

8 files changed

+456
-422
lines changed

8 files changed

+456
-422
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<img src="https://github.com/praekeltfoundation/rdw-ingestion-tools/workflows/lint/badge.svg" width="120" />
88
<img src="https://github.com/praekeltfoundation/rdw-ingestion-tools/workflows/release/badge.svg" width="145" />
99
<img src="https://github.com/praekeltfoundation/rdw-ingestion-tools/workflows/test/badge.svg" width="120" />
10-
<img src="https://img.shields.io/badge/version-2.0.5.dev0-orange" width="145" />
10+
<img src="https://img.shields.io/badge/version-2.0.5-orange" width="145" />
1111
<img src="https://img.shields.io/badge/license-MIT-blue" width="100" />
1212
</p>
1313
</div>

examples/rapidpro/contacts.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from api.rapidpro import pyRapid
22

33
contacts = pyRapid().contacts.get_contacts(
4-
end_datetime="2023-01-01 01:00:50", start_datetime="2023-01-01 00:00:00"
4+
end_datetime="2025-10-16 00:00:00", start_datetime="2025-10-15 00:00:00"
55
)
66

77
print(contacts.collect())

examples/rapidpro/flowstarts.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from api.rapidpro import pyRapid
22

33
flowstarts = pyRapid().flow_starts.get_flowstarts(
4-
end_datetime="2023-01-02 00:00:00", start_datetime="2023-01-01 00:00:00"
4+
end_datetime="2025-10-16 00:00:00", start_datetime="2025-10-15 00:00:00"
55
)
66

77
print(flowstarts.collect())

examples/rapidpro/runs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from api.rapidpro import pyRapid
22

33
runs = pyRapid().runs.get_runs(
4-
end_datetime="2024-06-22 00:00:10", start_datetime="2024-06-22 00:00:00"
4+
end_datetime="2025-10-16 00:00:00", start_datetime="2025-10-15 00:00:00"
55
)
66

77
print(runs.collect())

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "rdw-ingestion-tools"
3-
version = "2.0.5.dev0"
3+
version = "2.0.5"
44
description = "A Python package for integrating third-party data to Reach Digital Health's AWS Data Lake."
55
authors = [
66
{name = "Schalk <schalk@reachdigitalhealth.org>"},
@@ -14,6 +14,7 @@ dependencies = [
1414
"boto3>=1.34.103",
1515
"httpx>=0.27.0",
1616
"httpx-retries>=0.4.2",
17+
"more-itertools>=10.8.0",
1718
"pandas>=2.2.2",
1819
"types-tqdm>=4.66.0.20240417",
1920
]

rdw_ingestion_tools/api/__init__.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
from collections.abc import Iterator
33

4+
from more_itertools import chunked
45
from pandas import DataFrame
56
from pandas import json_normalize as pd_json_normalize
67
from polars import (
@@ -71,9 +72,10 @@ def get_polars_schema(
7172
object_columns: list[str], data: list[dict[str, Object]]
7273
) -> dict[str, Object]:
7374
"""
74-
Creates a normalised LazyFrame and uses the schema to generate a schema
75-
dictionary using the column names.
76-
Columns that are `list` types need to be type `Object` before they can be cast
75+
Creates a normalised LazyFrame. Returns a schema dictionary using the
76+
LazyFrame's column names.
77+
78+
Note: Columns that are `list` types need to be type `Object` before they can be cast
7779
to string.
7880
All other column types can be cast directly to string using the schema generated.
7981
"""
@@ -94,17 +96,22 @@ def get_polars_schema(
9496
def concatenate_to_string_lazyframe(
9597
objs: list[dict] | dict[Never, Never] | list[Never] | Iterator,
9698
object_columns: list[str],
99+
batch_size: int = 2000,
97100
) -> LazyFrame:
98101
"""
99-
Flattens JSON data. Returns a LazyFrame with String columns.
102+
Flattens JSON data. Returns a LazyFrame with columns of type `String`.
100103
"""
101-
data = list(objs)
102-
103-
schema = get_polars_schema(data=data, object_columns=object_columns)
104-
lf = (
105-
json_normalize(data, separator="_", schema=schema)
106-
.lazy()
107-
.with_columns(col(Object).map_elements(lambda x: str(x), return_dtype=String))
108-
)
104+
lf = LazyFrame()
105+
106+
for data in chunked(objs, batch_size):
107+
schema = get_polars_schema(data=data, object_columns=object_columns)
108+
response_lf = (
109+
json_normalize(data, separator="_", schema=schema)
110+
.lazy()
111+
.with_columns(
112+
col(Object).map_elements(lambda x: str(x), return_dtype=String)
113+
)
114+
)
115+
lf = concat([lf, response_lf], how="diagonal")
109116

110117
return lf

tests/test_concatenation/test_concatenation.py

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,29 @@
1+
import pytest
12
from polars import LazyFrame, Object, String
23
from polars.testing import assert_frame_equal
34

45
from rdw_ingestion_tools.api import concatenate_to_string_lazyframe, get_polars_schema
56

67

78
def test_get_polars_schema_empty_data():
8-
"""
9-
Tests that schemas generated for empty responses are empty dictionaries.
10-
11-
"""
9+
"""Tests that schemas generated for empty responses are empty dictionaries."""
1210
schema = get_polars_schema(object_columns=[], data=[])
1311

1412
assert schema == {}
1513

1614

1715
def test_concatenate_to_string_lazyframe_empty_response():
18-
"""
19-
Tests that concatenate_to_string_lazyframe returns an empty LazyFrame for
16+
"""Tests that concatenate_to_string_lazyframe returns an empty LazyFrame for
2017
empty response data.
21-
2218
"""
2319
lf = concatenate_to_string_lazyframe(objs=[], object_columns=[])
2420

2521
assert_frame_equal(lf, LazyFrame(schema={}))
2622

2723

2824
def test_get_polars_schema_primitive_types():
29-
"""
30-
Schemas generated from response data use type `String`
25+
"""Tests that schemas generated from response data use type `String`
3126
for all primitive types.
32-
3327
"""
3428
data = [
3529
{
@@ -57,10 +51,8 @@ def test_get_polars_schema_primitive_types():
5751

5852

5953
def test_get_polars_schema_list_types():
60-
"""
61-
Tests that generated schemas from response data use type `Object`
54+
"""Tests that generated schemas from response data use type `Object`
6255
for list columns.
63-
6456
"""
6557
data = [{"col1": [1, 2, 3], "col2": [{"key": "value"}], "col3": False}]
6658

@@ -72,10 +64,8 @@ def test_get_polars_schema_list_types():
7264

7365

7466
def test_get_polars_schema_json_types():
75-
"""
76-
Tests that generated schemas from response data with JSON columns
67+
"""Tests that generated schemas from response data with JSON columns
7768
normalise the column names in the schema.
78-
7969
"""
8070
data = [{"col1": {"key": {"inner_key": "value"}}, "col2": {"key": "value"}}]
8171

@@ -86,18 +76,19 @@ def test_get_polars_schema_json_types():
8676
assert schema == expected_schema
8777

8878

89-
def test_concatenate_to_string_lazyframe():
90-
"""
91-
Tests that response data is concatenated and normalised into LazyFrames
79+
@pytest.mark.parametrize("batch_size", [1, 2])
80+
def test_concatenate_to_string_lazyframe(batch_size):
81+
"""Tests that response data is concatenated and normalised into LazyFrames
9282
with column type `String`.
93-
9483
"""
9584
data = [
9685
{"col1": 1, "col2": [1, 2, 3], "col3": {"key": "value1"}},
9786
{"col1": 2, "col2": [1, 2, 3], "col3": {"key": "value2"}},
9887
]
9988

100-
lf = concatenate_to_string_lazyframe(objs=data, object_columns=["col2"])
89+
lf = concatenate_to_string_lazyframe(
90+
objs=data, object_columns=["col2"], batch_size=batch_size
91+
)
10192

10293
expected_lf = LazyFrame(
10394
{
@@ -108,3 +99,27 @@ def test_concatenate_to_string_lazyframe():
10899
)
109100

110101
assert_frame_equal(lf, expected_lf)
102+
103+
104+
@pytest.mark.parametrize("batch_size", [1, 2, 3])
105+
def test_concatenate_to_string_lazyframe_uses_all_rows(batch_size):
106+
"""Tests that the key names in every JSON column are used."""
107+
data = [
108+
{"column1": {"key1": "1"}},
109+
{"column2": {"key1": "1"}},
110+
{"column2": {"key1": "1", "key2": "2"}},
111+
]
112+
113+
expected_lf = LazyFrame(
114+
{
115+
"column1_key1": ["1", None, None],
116+
"column2_key1": [None, "1", "1"],
117+
"column2_key2": [None, None, "2"],
118+
}
119+
)
120+
121+
lf = concatenate_to_string_lazyframe(
122+
objs=data, object_columns=[], batch_size=batch_size
123+
)
124+
125+
assert_frame_equal(lf, expected_lf)

0 commit comments

Comments
 (0)