Skip to content

Commit 276f550

Browse files
committed
resolves #159: Add batch argument to fetch data in batches
1 parent 100fcfb commit 276f550

14 files changed

+804
-45
lines changed

changelog/159.added.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add the ability to batch API queries for `all` and `filter` functions.

infrahub_sdk/client.py

Lines changed: 64 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -691,25 +691,20 @@ async def filters(
691691
Returns:
692692
list[InfrahubNodeSync]: List of Nodes that match the given filters.
693693
"""
694-
schema = await self.schema.get(kind=kind, branch=branch)
695-
696694
branch = branch or self.default_branch
695+
schema = await self.schema.get(kind=kind, branch=branch)
697696
if at:
698697
at = Timestamp(at)
699698

700699
node = InfrahubNode(client=self, schema=schema, branch=branch)
701700
filters = kwargs
701+
pagination_size = self.pagination_size
702702

703-
nodes: list[InfrahubNode] = []
704-
related_nodes: list[InfrahubNode] = []
705-
706-
has_remaining_items = True
707-
page_number = 1
708-
709-
async def process_page(page_offset: int):
703+
async def process_page(page_offset: int, page_number: int) -> tuple[dict, ProcessRelationsNode]:
704+
"""Process a single page of results."""
710705
query_data = await InfrahubNode(client=self, schema=schema, branch=branch).generate_query_data(
711706
offset=offset or page_offset,
712-
limit=limit or self.pagination_size,
707+
limit=limit or pagination_size,
713708
filters=filters,
714709
include=include,
715710
exclude=exclude,
@@ -734,34 +729,50 @@ async def process_page(page_offset: int):
734729
prefetch_relationships=prefetch_relationships,
735730
timeout=timeout,
736731
)
737-
738732
return response, process_result
739733

740-
if batch:
734+
async def process_batch(schema_kind: str) -> tuple[list[InfrahubNode], list[InfrahubNode]]:
735+
"""Process queries in batch mode."""
736+
nodes = []
737+
related_nodes = []
741738
batch_process = await self.create_batch()
742-
resp = await self.execute_graphql(query=f"query {{ {schema.kind} {{ count }} }}")
743-
count = resp[schema.kind].get("count", 0)
744-
total_pages = (count + self.pagination_size - 1) // self.pagination_size
739+
resp = await self.execute_graphql(query=f"query {{ {schema_kind} {{ count }} }}")
740+
count = resp[schema_kind].get("count", 0)
741+
total_pages = (count + pagination_size - 1) // pagination_size
742+
745743
for page_number in range(1, total_pages + 1):
746-
page_offset = (page_number - 1) * self.pagination_size
747-
batch_process.add(task=process_page, node=node, page_offset=page_offset)
744+
page_offset = (page_number - 1) * pagination_size
745+
batch_process.add(task=process_page, node=node, page_offset=page_offset, page_number=page_number)
748746

749747
async for _, response in batch_process.execute():
750748
nodes.extend(response[1]["nodes"])
751749
related_nodes.extend(response[1]["related_nodes"])
752-
else:
750+
751+
return nodes, related_nodes
752+
753+
async def process_non_batch() -> tuple[list[InfrahubNode], list[InfrahubNode]]:
754+
"""Process queries without batch mode."""
755+
nodes = []
756+
related_nodes = []
757+
has_remaining_items = True
758+
page_number = 1
759+
753760
while has_remaining_items:
754-
page_offset = (page_number - 1) * self.pagination_size
755-
response, process_result = await process_page(page_offset)
761+
page_offset = (page_number - 1) * pagination_size
762+
response, process_result = await process_page(page_offset, page_number)
756763

757764
nodes.extend(process_result["nodes"])
758765
related_nodes.extend(process_result["related_nodes"])
759-
remaining_items = response[schema.kind].get("count", 0) - (page_offset + self.pagination_size)
766+
remaining_items = response[schema.kind].get("count", 0) - (page_offset + pagination_size)
760767
if remaining_items < 0 or offset is not None or limit is not None:
761768
has_remaining_items = False
762-
763769
page_number += 1
764770

771+
return nodes, related_nodes
772+
773+
# Select batch or non-batch processing
774+
nodes, related_nodes = await (process_batch(schema.kind) if batch else process_non_batch())
775+
765776
if populate_store:
766777
for node in nodes:
767778
if node.id:
@@ -770,7 +781,6 @@ async def process_page(page_offset: int):
770781
for node in related_nodes:
771782
if node.id:
772783
self.store.set(key=node.id, node=node)
773-
774784
return nodes
775785

776786
def clone(self) -> InfrahubClient:
@@ -1775,25 +1785,19 @@ def filters(
17751785
Returns:
17761786
list[InfrahubNodeSync]: List of Nodes that match the given filters.
17771787
"""
1778-
schema = self.schema.get(kind=kind, branch=branch)
1779-
17801788
branch = branch or self.default_branch
1789+
schema = self.schema.get(kind=kind, branch=branch)
1790+
node = InfrahubNodeSync(client=self, schema=schema, branch=branch)
17811791
if at:
17821792
at = Timestamp(at)
1783-
1784-
node = InfrahubNodeSync(client=self, schema=schema, branch=branch)
17851793
filters = kwargs
1794+
pagination_size = self.pagination_size
17861795

1787-
nodes: list[InfrahubNodeSync] = []
1788-
related_nodes: list[InfrahubNodeSync] = []
1789-
1790-
has_remaining_items = True
1791-
page_number = 1
1792-
1793-
def process_page(page_offset: int):
1796+
def process_page(page_offset: int, page_number: int) -> tuple[dict, ProcessRelationsNodeSync]:
1797+
"""Process a single page of results."""
17941798
query_data = InfrahubNodeSync(client=self, schema=schema, branch=branch).generate_query_data(
17951799
offset=offset or page_offset,
1796-
limit=limit or self.pagination_size,
1800+
limit=limit or pagination_size,
17971801
filters=filters,
17981802
include=include,
17991803
exclude=exclude,
@@ -1820,34 +1824,50 @@ def process_page(page_offset: int):
18201824
)
18211825
return response, process_result
18221826

1823-
if batch:
1827+
def process_batch() -> tuple[list[InfrahubNodeSync], list[InfrahubNodeSync]]:
1828+
"""Process queries in batch mode."""
1829+
nodes = []
1830+
related_nodes = []
18241831
batch_process = self.create_batch()
18251832

18261833
resp = self.execute_graphql(query=f"query {{ {schema.kind} {{ count }} }}")
18271834
count = resp[schema.kind].get("count", 0)
1828-
total_pages = (count + self.pagination_size - 1) // self.pagination_size
1835+
total_pages = (count + pagination_size - 1) // pagination_size
1836+
18291837
for page_number in range(1, total_pages + 1):
1830-
page_offset = (page_number - 1) * self.pagination_size
1831-
batch_process.add(task=process_page, node=node, page_offset=page_offset)
1838+
page_offset = (page_number - 1) * pagination_size
1839+
batch_process.add(task=process_page, node=node, page_offset=page_offset, page_number=page_number)
18321840

18331841
for _, response in batch_process.execute():
18341842
nodes.extend(response[1]["nodes"])
18351843
related_nodes.extend(response[1]["related_nodes"])
18361844

1837-
else:
1845+
return nodes, related_nodes
1846+
1847+
def process_non_batch() -> tuple[list[InfrahubNodeSync], list[InfrahubNodeSync]]:
1848+
"""Process queries without batch mode."""
1849+
nodes = []
1850+
related_nodes = []
1851+
has_remaining_items = True
1852+
page_number = 1
1853+
18381854
while has_remaining_items:
1839-
page_offset = (page_number - 1) * self.pagination_size
1840-
response, process_result = process_page(page_offset)
1855+
page_offset = (page_number - 1) * pagination_size
1856+
response, process_result = process_page(page_offset, page_number)
18411857

18421858
nodes.extend(process_result["nodes"])
18431859
related_nodes.extend(process_result["related_nodes"])
18441860

1845-
remaining_items = response[schema.kind].get("count", 0) - (page_offset + self.pagination_size)
1861+
remaining_items = response[schema.kind].get("count", 0) - (page_offset + pagination_size)
18461862
if remaining_items < 0 or offset is not None or limit is not None:
18471863
has_remaining_items = False
1848-
18491864
page_number += 1
18501865

1866+
return nodes, related_nodes
1867+
1868+
# Select batch or non-batch processing
1869+
nodes, related_nodes = process_batch() if batch else process_non_batch()
1870+
18511871
if populate_store:
18521872
for node in nodes:
18531873
if node.id:
@@ -1856,7 +1876,6 @@ def process_page(page_offset: int):
18561876
for node in related_nodes:
18571877
if node.id:
18581878
self.store.set(key=node.id, node=node)
1859-
18601879
return nodes
18611880

18621881
@overload
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
{
2+
"data": {
3+
"BuiltinLocation": {
4+
"count": 30,
5+
"edges": [
6+
{
7+
"node": {
8+
"id": "18187157-032c-2b7b-35b6-c5132b0ec757",
9+
"hfid": [
10+
"Location 4"
11+
],
12+
"display_label": "Location 4",
13+
"__typename": "BuiltinLocation",
14+
"name": {
15+
"value": "Location 4"
16+
},
17+
"description": {
18+
"value": null
19+
},
20+
"tags": {
21+
"count": 0,
22+
"edges": []
23+
}
24+
}
25+
},
26+
{
27+
"node": {
28+
"id": "18187157-057b-d717-35b5-c51d0ee87714",
29+
"hfid": [
30+
"Location 5"
31+
],
32+
"display_label": "Location 5",
33+
"__typename": "BuiltinLocation",
34+
"name": {
35+
"value": "Location 5"
36+
},
37+
"description": {
38+
"value": null
39+
},
40+
"tags": {
41+
"count": 0,
42+
"edges": []
43+
}
44+
}
45+
},
46+
{
47+
"node": {
48+
"id": "18187157-078f-33e6-35bd-c51064d1da09",
49+
"hfid": [
50+
"Location 6"
51+
],
52+
"display_label": "Location 6",
53+
"__typename": "BuiltinLocation",
54+
"name": {
55+
"value": "Location 6"
56+
},
57+
"description": {
58+
"value": null
59+
},
60+
"tags": {
61+
"count": 0,
62+
"edges": []
63+
}
64+
}
65+
}
66+
]
67+
}
68+
}
69+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
{
2+
"data": {
3+
"BuiltinLocation": {
4+
"count": 30,
5+
"edges": [
6+
{
7+
"node": {
8+
"id": "18187157-4467-a731-35b0-c51343a6de25",
9+
"hfid": [
10+
"Location 28"
11+
],
12+
"display_label": "Location 28",
13+
"__typename": "BuiltinLocation",
14+
"name": {
15+
"value": "Location 28"
16+
},
17+
"description": {
18+
"value": null
19+
},
20+
"tags": {
21+
"count": 0,
22+
"edges": []
23+
}
24+
}
25+
},
26+
{
27+
"node": {
28+
"id": "18187157-460e-0a60-35b9-c51ae0bf3282",
29+
"hfid": [
30+
"Location 29"
31+
],
32+
"display_label": "Location 29",
33+
"__typename": "BuiltinLocation",
34+
"name": {
35+
"value": "Location 29"
36+
},
37+
"description": {
38+
"value": null
39+
},
40+
"tags": {
41+
"count": 0,
42+
"edges": []
43+
}
44+
}
45+
},
46+
{
47+
"node": {
48+
"id": "18187157-47e7-e6f8-35b4-c5126eb24b25",
49+
"hfid": [
50+
"Location 30"
51+
],
52+
"display_label": "Location 30",
53+
"__typename": "BuiltinLocation",
54+
"name": {
55+
"value": "Location 30"
56+
},
57+
"description": {
58+
"value": null
59+
},
60+
"tags": {
61+
"count": 0,
62+
"edges": []
63+
}
64+
}
65+
}
66+
]
67+
}
68+
}
69+
}

0 commit comments

Comments
 (0)