Skip to content

Commit b2e500a

Browse files
authored
Add option to limit concurrency when async processing batches (#1092)
* Add option to limit concurrency * Add tests * Try to add coverage * Try to add coverage
1 parent bfc5e7a commit b2e500a

File tree

2 files changed

+237
-2
lines changed

2 files changed

+237
-2
lines changed

meilisearch_python_sdk/index.py

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1596,6 +1596,7 @@ async def add_documents_in_batches(
15961596
batch_size: int = 1000,
15971597
primary_key: str | None = None,
15981598
compress: bool = False,
1599+
concurrency_limit: int | None = None,
15991600
) -> list[TaskInfo]:
16001601
"""Adds documents in batches to reduce RAM usage with indexing.
16011602
@@ -1607,6 +1608,9 @@ async def add_documents_in_batches(
16071608
primary_key: The primary key of the documents. This will be ignored if already set.
16081609
Defaults to None.
16091610
compress: If set to True the data will be sent in gzip format. Defaults to False.
1611+
concurrency_limit: If set this will limit the number of batches that will be sent
1612+
concurrently. This can be helpful if you find you are overloading the Meilisearch
1613+
server with requests. Defaults to None.
16101614
16111615
Returns:
16121616
@@ -1628,6 +1632,23 @@ async def add_documents_in_batches(
16281632
>>> index = client.index("movies")
16291633
>>> await index.add_documents_in_batches(documents)
16301634
"""
1635+
if concurrency_limit:
1636+
async with asyncio.Semaphore(concurrency_limit):
1637+
if not use_task_groups():
1638+
batches = [
1639+
self.add_documents(x, primary_key, compress=compress)
1640+
for x in _batch(documents, batch_size)
1641+
]
1642+
return await asyncio.gather(*batches)
1643+
1644+
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
1645+
tasks = [
1646+
tg.create_task(self.add_documents(x, primary_key, compress=compress))
1647+
for x in _batch(documents, batch_size)
1648+
]
1649+
1650+
return [x.result() for x in tasks]
1651+
16311652
if not use_task_groups():
16321653
batches = [
16331654
self.add_documents(x, primary_key, compress=compress)
@@ -1652,6 +1673,7 @@ async def add_documents_from_directory(
16521673
csv_delimiter: str | None = None,
16531674
combine_documents: bool = True,
16541675
compress: bool = False,
1676+
concurrency_limit: int | None = None,
16551677
) -> list[TaskInfo]:
16561678
"""Load all json files from a directory and add the documents to the index.
16571679
@@ -1668,6 +1690,9 @@ async def add_documents_from_directory(
16681690
combine_documents: If set to True this will combine the documents from all the files
16691691
before indexing them. Defaults to True.
16701692
compress: If set to True the data will be sent in gzip format. Defaults to False.
1693+
concurrency_limit: If set this will limit the number of batches that will be sent
1694+
concurrently. This can be helpful if you find you are overloading the Meilisearch
1695+
server with requests. Defaults to None.
16711696
16721697
Returns:
16731698
@@ -1709,6 +1734,54 @@ async def add_documents_from_directory(
17091734

17101735
return [response]
17111736

1737+
if concurrency_limit:
1738+
async with asyncio.Semaphore(concurrency_limit):
1739+
if not use_task_groups():
1740+
add_documents = []
1741+
for path in directory.iterdir():
1742+
if path.suffix == f".{document_type}":
1743+
documents = await _async_load_documents_from_file(
1744+
path, csv_delimiter, json_handler=self._json_handler
1745+
)
1746+
add_documents.append(
1747+
self.add_documents(documents, primary_key, compress=compress)
1748+
)
1749+
1750+
_raise_on_no_documents(add_documents, document_type, directory_path)
1751+
1752+
if len(add_documents) > 1:
1753+
# Send the first document on its own before starting the gather. Otherwise Meilisearch
1754+
# returns an error because it thinks all entries are trying to create the same index.
1755+
first_response = [await add_documents.pop()]
1756+
1757+
responses = await asyncio.gather(*add_documents)
1758+
responses = [*first_response, *responses]
1759+
else:
1760+
responses = [await add_documents[0]]
1761+
1762+
return responses
1763+
1764+
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
1765+
tasks = []
1766+
all_results = []
1767+
for i, path in enumerate(directory.iterdir()):
1768+
if path.suffix == f".{document_type}":
1769+
documents = await _async_load_documents_from_file(
1770+
path, csv_delimiter, json_handler=self._json_handler
1771+
)
1772+
if i == 0:
1773+
all_results = [
1774+
await self.add_documents(documents, compress=compress)
1775+
]
1776+
else:
1777+
tasks.append(
1778+
tg.create_task(
1779+
self.add_documents(
1780+
documents, primary_key, compress=compress
1781+
)
1782+
)
1783+
)
1784+
17121785
if not use_task_groups():
17131786
add_documents = []
17141787
for path in directory.iterdir():
@@ -1766,6 +1839,7 @@ async def add_documents_from_directory_in_batches(
17661839
csv_delimiter: str | None = None,
17671840
combine_documents: bool = True,
17681841
compress: bool = False,
1842+
concurrency_limit: int | None = None,
17691843
) -> list[TaskInfo]:
17701844
"""Load all json files from a directory and add the documents to the index in batches.
17711845
@@ -1784,6 +1858,9 @@ async def add_documents_from_directory_in_batches(
17841858
combine_documents: If set to True this will combine the documents from all the files
17851859
before indexing them. Defaults to True.
17861860
compress: If set to True the data will be sent in gzip format. Defaults to False.
1861+
concurrency_limit: If set this will limit the number of batches that will be sent
1862+
concurrently. This can be helpful if you find you are overloading the Meilisearch
1863+
server with requests. Defaults to None.
17871864
17881865
Returns:
17891866
@@ -1826,6 +1903,7 @@ async def add_documents_from_directory_in_batches(
18261903
batch_size=batch_size,
18271904
primary_key=primary_key,
18281905
compress=compress,
1906+
concurrency_limit=concurrency_limit,
18291907
)
18301908

18311909
responses: list[TaskInfo] = []
@@ -1842,6 +1920,7 @@ async def add_documents_from_directory_in_batches(
18421920
batch_size=batch_size,
18431921
primary_key=primary_key,
18441922
compress=compress,
1923+
concurrency_limit=concurrency_limit,
18451924
)
18461925
)
18471926

@@ -1908,6 +1987,7 @@ async def add_documents_from_file_in_batches(
19081987
primary_key: str | None = None,
19091988
csv_delimiter: str | None = None,
19101989
compress: bool = False,
1990+
concurrency_limit: int | None = None,
19111991
) -> list[TaskInfo]:
19121992
"""Adds documents form a json file in batches to reduce RAM usage with indexing.
19131993
@@ -1921,6 +2001,9 @@ async def add_documents_from_file_in_batches(
19212001
csv_delimiter: A single ASCII character to specify the delimiter for csv files. This
19222002
can only be used if the file is a csv file. Defaults to comma.
19232003
compress: If set to True the data will be sent in gzip format. Defaults to False.
2004+
concurrency_limit: If set this will limit the number of batches that will be sent
2005+
concurrently. This can be helpful if you find you are overloading the Meilisearch
2006+
server with requests. Defaults to None.
19242007
19252008
Returns:
19262009
@@ -1951,6 +2034,7 @@ async def add_documents_from_file_in_batches(
19512034
batch_size=batch_size,
19522035
primary_key=primary_key,
19532036
compress=compress,
2037+
concurrency_limit=concurrency_limit,
19542038
)
19552039

19562040
async def add_documents_from_raw_file(
@@ -2232,6 +2316,7 @@ async def update_documents_in_batches(
22322316
batch_size: int = 1000,
22332317
primary_key: str | None = None,
22342318
compress: bool = False,
2319+
concurrency_limit: int | None = None,
22352320
) -> list[TaskInfo]:
22362321
"""Update documents in batches to reduce RAM usage with indexing.
22372322
@@ -2245,6 +2330,9 @@ async def update_documents_in_batches(
22452330
primary_key: The primary key of the documents. This will be ignored if already set.
22462331
Defaults to None.
22472332
compress: If set to True the data will be sent in gzip format. Defaults to False.
2333+
concurrency_limit: If set this will limit the number of batches that will be sent
2334+
concurrently. This can be helpful if you find you are overloading the Meilisearch
2335+
server with requests. Defaults to None.
22482336
22492337
Returns:
22502338
@@ -2266,6 +2354,22 @@ async def update_documents_in_batches(
22662354
>>> index = client.index("movies")
22672355
>>> await index.update_documents_in_batches(documents)
22682356
"""
2357+
if concurrency_limit:
2358+
async with asyncio.Semaphore(concurrency_limit):
2359+
if not use_task_groups():
2360+
batches = [
2361+
self.update_documents(x, primary_key, compress=compress)
2362+
for x in _batch(documents, batch_size)
2363+
]
2364+
return await asyncio.gather(*batches)
2365+
2366+
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
2367+
tasks = [
2368+
tg.create_task(self.update_documents(x, primary_key, compress=compress))
2369+
for x in _batch(documents, batch_size)
2370+
]
2371+
return [x.result() for x in tasks]
2372+
22692373
if not use_task_groups():
22702374
batches = [
22712375
self.update_documents(x, primary_key, compress=compress)
@@ -2402,6 +2506,7 @@ async def update_documents_from_directory_in_batches(
24022506
csv_delimiter: str | None = None,
24032507
combine_documents: bool = True,
24042508
compress: bool = False,
2509+
concurrency_limit: int | None = None,
24052510
) -> list[TaskInfo]:
24062511
"""Load all json files from a directory and update the documents.
24072512
@@ -2420,6 +2525,9 @@ async def update_documents_from_directory_in_batches(
24202525
combine_documents: If set to True this will combine the documents from all the files
24212526
before indexing them. Defaults to True.
24222527
compress: If set to True the data will be sent in gzip format. Defaults to False.
2528+
concurrency_limit: If set this will limit the number of batches that will be sent
2529+
concurrently. This can be helpful if you find you are overloading the Meilisearch
2530+
server with requests. Defaults to None.
24232531
24242532
Returns:
24252533
@@ -2462,6 +2570,7 @@ async def update_documents_from_directory_in_batches(
24622570
batch_size=batch_size,
24632571
primary_key=primary_key,
24642572
compress=compress,
2573+
concurrency_limit=concurrency_limit,
24652574
)
24662575

24672576
if not use_task_groups():
@@ -2479,6 +2588,7 @@ async def update_documents_from_directory_in_batches(
24792588
batch_size=batch_size,
24802589
primary_key=primary_key,
24812590
compress=compress,
2591+
concurrency_limit=concurrency_limit,
24822592
)
24832593
)
24842594

@@ -2509,6 +2619,7 @@ async def update_documents_from_directory_in_batches(
25092619
batch_size=batch_size,
25102620
primary_key=primary_key,
25112621
compress=compress,
2622+
concurrency_limit=concurrency_limit,
25122623
)
25132624
else:
25142625
tasks.append(
@@ -2518,6 +2629,7 @@ async def update_documents_from_directory_in_batches(
25182629
batch_size=batch_size,
25192630
primary_key=primary_key,
25202631
compress=compress,
2632+
concurrency_limit=concurrency_limit,
25212633
)
25222634
)
25232635
)
@@ -2576,6 +2688,7 @@ async def update_documents_from_file_in_batches(
25762688
batch_size: int = 1000,
25772689
primary_key: str | None = None,
25782690
compress: bool = False,
2691+
concurrency_limit: int | None = None,
25792692
) -> list[TaskInfo]:
25802693
"""Updates documents form a json file in batches to reduce RAM usage with indexing.
25812694
@@ -2587,6 +2700,9 @@ async def update_documents_from_file_in_batches(
25872700
primary_key: The primary key of the documents. This will be ignored if already set.
25882701
Defaults to None.
25892702
compress: If set to True the data will be sent in gzip format. Defaults to False.
2703+
concurrency_limit: If set this will limit the number of batches that will be sent
2704+
concurrently. This can be helpful if you find you are overloading the Meilisearch
2705+
server with requests. Defaults to None.
25902706
25912707
Returns:
25922708
@@ -2615,6 +2731,7 @@ async def update_documents_from_file_in_batches(
26152731
batch_size=batch_size,
26162732
primary_key=primary_key,
26172733
compress=compress,
2734+
concurrency_limit=concurrency_limit,
26182735
)
26192736

26202737
async def update_documents_from_raw_file(
@@ -2935,13 +3052,16 @@ async def delete_documents_by_filter(self, filter: Filter) -> TaskInfo:
29353052
return result
29363053

29373054
async def delete_documents_in_batches_by_filter(
2938-
self, filters: list[str | list[str | list[str]]]
3055+
self, filters: list[str | list[str | list[str]]], concurrency_limit: int | None = None
29393056
) -> list[TaskInfo]:
29403057
"""Delete batches of documents from the index by filter.
29413058
29423059
Args:
29433060
29443061
filters: A list of filter value information.
3062+
concurrency_limit: If set this will limit the number of batches that will be sent
3063+
concurrently. This can be helpful if you find you are overloading the Meilisearch
3064+
server with requests. Defaults to None.
29453065
29463066
Returns:
29473067
@@ -2964,6 +3084,20 @@ async def delete_documents_in_batches_by_filter(
29643084
>>> ]
29653085
>>> )
29663086
"""
3087+
if concurrency_limit:
3088+
async with asyncio.Semaphore(concurrency_limit):
3089+
if not use_task_groups():
3090+
tasks = [self.delete_documents_by_filter(filter) for filter in filters]
3091+
return await asyncio.gather(*tasks)
3092+
3093+
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
3094+
tg_tasks = [
3095+
tg.create_task(self.delete_documents_by_filter(filter))
3096+
for filter in filters
3097+
]
3098+
3099+
return [x.result() for x in tg_tasks]
3100+
29673101
if not use_task_groups():
29683102
tasks = [self.delete_documents_by_filter(filter) for filter in filters]
29693103
return await asyncio.gather(*tasks)

0 commit comments

Comments
 (0)