Skip to content

Commit 100fcfb

Browse files
committed
resolves #159: Add batch argument to fetch data in batches
1 parent c3315c7 commit 100fcfb

File tree

1 file changed

+71
-18
lines changed

1 file changed

+71
-18
lines changed

infrahub_sdk/client.py

Lines changed: 71 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,7 @@ async def all(
540540
fragment: bool = ...,
541541
prefetch_relationships: bool = ...,
542542
property: bool = ...,
543+
batch: bool = ...,
543544
) -> list[SchemaType]: ...
544545

545546
@overload
@@ -557,6 +558,7 @@ async def all(
557558
fragment: bool = ...,
558559
prefetch_relationships: bool = ...,
559560
property: bool = ...,
561+
batch: bool = ...,
560562
) -> list[InfrahubNode]: ...
561563

562564
async def all(
@@ -573,6 +575,7 @@ async def all(
573575
fragment: bool = False,
574576
prefetch_relationships: bool = False,
575577
property: bool = False,
578+
batch: bool = False,
576579
) -> list[InfrahubNode] | list[SchemaType]:
577580
"""Retrieve all nodes of a given kind
578581
@@ -588,6 +591,7 @@ async def all(
588591
exclude (list[str], optional): List of attributes or relationships to exclude from the query.
589592
fragment (bool, optional): Flag to use GraphQL fragments for generic schemas.
590593
prefetch_relationships (bool, optional): Flag to indicate whether to prefetch related node data.
594+
batch (bool, optional): Whether to use batch processing for the query.
591595
592596
Returns:
593597
list[InfrahubNode]: List of Nodes
@@ -605,6 +609,7 @@ async def all(
605609
fragment=fragment,
606610
prefetch_relationships=prefetch_relationships,
607611
property=property,
612+
batch=batch,
608613
)
609614

610615
@overload
@@ -623,6 +628,7 @@ async def filters(
623628
prefetch_relationships: bool = ...,
624629
partial_match: bool = ...,
625630
property: bool = ...,
631+
batch: bool = ...,
626632
**kwargs: Any,
627633
) -> list[SchemaType]: ...
628634

@@ -642,6 +648,7 @@ async def filters(
642648
prefetch_relationships: bool = ...,
643649
partial_match: bool = ...,
644650
property: bool = ...,
651+
batch: bool = ...,
645652
**kwargs: Any,
646653
) -> list[InfrahubNode]: ...
647654

@@ -660,6 +667,7 @@ async def filters(
660667
prefetch_relationships: bool = False,
661668
partial_match: bool = False,
662669
property: bool = False,
670+
batch: bool = False,
663671
**kwargs: Any,
664672
) -> list[InfrahubNode] | list[SchemaType]:
665673
"""Retrieve nodes of a given kind based on provided filters.
@@ -677,6 +685,7 @@ async def filters(
677685
fragment (bool, optional): Flag to use GraphQL fragments for generic schemas.
678686
prefetch_relationships (bool, optional): Flag to indicate whether to prefetch related node data.
679687
partial_match (bool, optional): Allow partial match of filter criteria for the query.
688+
batch (bool, optional): Whether to use batch processing for the query.
680689
**kwargs (Any): Additional filter criteria for the query.
681690
682691
Returns:
@@ -697,9 +706,7 @@ async def filters(
697706
has_remaining_items = True
698707
page_number = 1
699708

700-
while has_remaining_items:
701-
page_offset = (page_number - 1) * self.pagination_size
702-
709+
async def process_page(page_offset: int):
703710
query_data = await InfrahubNode(client=self, schema=schema, branch=branch).generate_query_data(
704711
offset=offset or page_offset,
705712
limit=limit or self.pagination_size,
@@ -727,14 +734,33 @@ async def filters(
727734
prefetch_relationships=prefetch_relationships,
728735
timeout=timeout,
729736
)
730-
nodes.extend(process_result["nodes"])
731-
related_nodes.extend(process_result["related_nodes"])
732737

733-
remaining_items = response[schema.kind].get("count", 0) - (page_offset + self.pagination_size)
734-
if remaining_items < 0 or offset is not None or limit is not None:
735-
has_remaining_items = False
738+
return response, process_result
739+
740+
if batch:
741+
batch_process = await self.create_batch()
742+
resp = await self.execute_graphql(query=f"query {{ {schema.kind} {{ count }} }}")
743+
count = resp[schema.kind].get("count", 0)
744+
total_pages = (count + self.pagination_size - 1) // self.pagination_size
745+
for page_number in range(1, total_pages + 1):
746+
page_offset = (page_number - 1) * self.pagination_size
747+
batch_process.add(task=process_page, node=node, page_offset=page_offset)
748+
749+
async for _, response in batch_process.execute():
750+
nodes.extend(response[1]["nodes"])
751+
related_nodes.extend(response[1]["related_nodes"])
752+
else:
753+
while has_remaining_items:
754+
page_offset = (page_number - 1) * self.pagination_size
755+
response, process_result = await process_page(page_offset)
756+
757+
nodes.extend(process_result["nodes"])
758+
related_nodes.extend(process_result["related_nodes"])
759+
remaining_items = response[schema.kind].get("count", 0) - (page_offset + self.pagination_size)
760+
if remaining_items < 0 or offset is not None or limit is not None:
761+
has_remaining_items = False
736762

737-
page_number += 1
763+
page_number += 1
738764

739765
if populate_store:
740766
for node in nodes:
@@ -1564,6 +1590,7 @@ def all(
15641590
fragment: bool = ...,
15651591
prefetch_relationships: bool = ...,
15661592
property: bool = ...,
1593+
batch: bool = ...,
15671594
) -> list[SchemaTypeSync]: ...
15681595

15691596
@overload
@@ -1581,6 +1608,7 @@ def all(
15811608
fragment: bool = ...,
15821609
prefetch_relationships: bool = ...,
15831610
property: bool = ...,
1611+
batch: bool = ...,
15841612
) -> list[InfrahubNodeSync]: ...
15851613

15861614
def all(
@@ -1597,6 +1625,7 @@ def all(
15971625
fragment: bool = False,
15981626
prefetch_relationships: bool = False,
15991627
property: bool = False,
1628+
batch: bool = False,
16001629
) -> list[InfrahubNodeSync] | list[SchemaTypeSync]:
16011630
"""Retrieve all nodes of a given kind
16021631
@@ -1629,6 +1658,7 @@ def all(
16291658
fragment=fragment,
16301659
prefetch_relationships=prefetch_relationships,
16311660
property=property,
1661+
batch=batch,
16321662
)
16331663

16341664
def _process_nodes_and_relationships(
@@ -1682,6 +1712,7 @@ def filters(
16821712
prefetch_relationships: bool = ...,
16831713
partial_match: bool = ...,
16841714
property: bool = ...,
1715+
batch: bool = ...,
16851716
**kwargs: Any,
16861717
) -> list[SchemaTypeSync]: ...
16871718

@@ -1701,6 +1732,7 @@ def filters(
17011732
prefetch_relationships: bool = ...,
17021733
partial_match: bool = ...,
17031734
property: bool = ...,
1735+
batch: bool = ...,
17041736
**kwargs: Any,
17051737
) -> list[InfrahubNodeSync]: ...
17061738

@@ -1719,6 +1751,7 @@ def filters(
17191751
prefetch_relationships: bool = False,
17201752
partial_match: bool = False,
17211753
property: bool = False,
1754+
batch: bool = False,
17221755
**kwargs: Any,
17231756
) -> list[InfrahubNodeSync] | list[SchemaTypeSync]:
17241757
"""Retrieve nodes of a given kind based on provided filters.
@@ -1736,6 +1769,7 @@ def filters(
17361769
fragment (bool, optional): Flag to use GraphQL fragments for generic schemas.
17371770
prefetch_relationships (bool, optional): Flag to indicate whether to prefetch related node data.
17381771
partial_match (bool, optional): Allow partial match of filter criteria for the query.
1772+
batch (bool, optional): Whether to use batch processing for the query.
17391773
**kwargs (Any): Additional filter criteria for the query.
17401774
17411775
Returns:
@@ -1756,9 +1790,7 @@ def filters(
17561790
has_remaining_items = True
17571791
page_number = 1
17581792

1759-
while has_remaining_items:
1760-
page_offset = (page_number - 1) * self.pagination_size
1761-
1793+
def process_page(page_offset: int):
17621794
query_data = InfrahubNodeSync(client=self, schema=schema, branch=branch).generate_query_data(
17631795
offset=offset or page_offset,
17641796
limit=limit or self.pagination_size,
@@ -1786,14 +1818,35 @@ def filters(
17861818
prefetch_relationships=prefetch_relationships,
17871819
timeout=timeout,
17881820
)
1789-
nodes.extend(process_result["nodes"])
1790-
related_nodes.extend(process_result["related_nodes"])
1821+
return response, process_result
1822+
1823+
if batch:
1824+
batch_process = self.create_batch()
1825+
1826+
resp = self.execute_graphql(query=f"query {{ {schema.kind} {{ count }} }}")
1827+
count = resp[schema.kind].get("count", 0)
1828+
total_pages = (count + self.pagination_size - 1) // self.pagination_size
1829+
for page_number in range(1, total_pages + 1):
1830+
page_offset = (page_number - 1) * self.pagination_size
1831+
batch_process.add(task=process_page, node=node, page_offset=page_offset)
1832+
1833+
for _, response in batch_process.execute():
1834+
nodes.extend(response[1]["nodes"])
1835+
related_nodes.extend(response[1]["related_nodes"])
1836+
1837+
else:
1838+
while has_remaining_items:
1839+
page_offset = (page_number - 1) * self.pagination_size
1840+
response, process_result = process_page(page_offset)
1841+
1842+
nodes.extend(process_result["nodes"])
1843+
related_nodes.extend(process_result["related_nodes"])
17911844

1792-
remaining_items = response[schema.kind].get("count", 0) - (page_offset + self.pagination_size)
1793-
if remaining_items < 0 or offset is not None or limit is not None:
1794-
has_remaining_items = False
1845+
remaining_items = response[schema.kind].get("count", 0) - (page_offset + self.pagination_size)
1846+
if remaining_items < 0 or offset is not None or limit is not None:
1847+
has_remaining_items = False
17951848

1796-
page_number += 1
1849+
page_number += 1
17971850

17981851
if populate_store:
17991852
for node in nodes:

0 commit comments

Comments
 (0)