Skip to content

Commit f1d3238

Browse files
authored
Merge pull request #644 from sanders41/task-group
Use task groups in Python 3.11+
2 parents 0c5f41e + 8ec9aab commit f1d3238

File tree

2 files changed

+153
-64
lines changed

2 files changed

+153
-64
lines changed

meilisearch_python_async/_utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import sys
34
from datetime import datetime
45
from functools import lru_cache
56

@@ -42,3 +43,8 @@ def iso_to_date_time(iso_date: datetime | str | None) -> datetime | None:
4243
reduce = len(split[1]) - 6
4344
reduced = f"{split[0]}.{split[1][:-reduce]}Z"
4445
return datetime.strptime(reduced, "%Y-%m-%dT%H:%M:%S.%fZ")
46+
47+
48+
@lru_cache(maxsize=1)
49+
def use_task_groups() -> bool:
50+
return True if sys.version_info >= (3, 11) else False

meilisearch_python_async/index.py

Lines changed: 147 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import json
4-
from asyncio import gather, get_running_loop
55
from csv import DictReader
66
from datetime import datetime
77
from functools import partial
@@ -13,7 +13,7 @@
1313
from httpx import AsyncClient
1414

1515
from meilisearch_python_async._http_requests import HttpRequests
16-
from meilisearch_python_async._utils import is_pydantic_2, iso_to_date_time
16+
from meilisearch_python_async._utils import is_pydantic_2, iso_to_date_time, use_task_groups
1717
from meilisearch_python_async.errors import InvalidDocumentError, MeilisearchError
1818
from meilisearch_python_async.models.documents import DocumentsInfo
1919
from meilisearch_python_async.models.index import IndexStats
@@ -174,7 +174,7 @@ async def fetch_info(self) -> Index:
174174
response = await self._http_requests.get(url)
175175
index_dict = response.json()
176176
self.primary_key = index_dict["primaryKey"]
177-
loop = get_running_loop()
177+
loop = asyncio.get_running_loop()
178178
self.created_at = await loop.run_in_executor(
179179
None, partial(iso_to_date_time, index_dict["createdAt"])
180180
)
@@ -682,8 +682,17 @@ async def add_documents_in_batches(
682682
>>> index = client.index("movies")
683683
>>> await index.add_documents_in_batches(documents)
684684
"""
685-
batches = [self.add_documents(x, primary_key) for x in _batch(documents, batch_size)]
686-
return await gather(*batches)
685+
if not use_task_groups():
686+
batches = [self.add_documents(x, primary_key) for x in _batch(documents, batch_size)]
687+
return await asyncio.gather(*batches)
688+
689+
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
690+
tasks = [
691+
tg.create_task(self.add_documents(x, primary_key))
692+
for x in _batch(documents, batch_size)
693+
]
694+
695+
return [x.result() for x in tasks]
687696

688697
async def add_documents_from_directory(
689698
self,
@@ -740,31 +749,49 @@ async def add_documents_from_directory(
740749

741750
_raise_on_no_documents(all_documents, document_type, directory_path)
742751

743-
loop = get_running_loop()
752+
loop = asyncio.get_running_loop()
744753
combined = await loop.run_in_executor(None, partial(_combine_documents, all_documents))
745754

746755
response = await self.add_documents(combined, primary_key)
747756

748757
return [response]
749758

750-
add_documents = []
751-
for path in directory.iterdir():
752-
if path.suffix == f".{document_type}":
753-
documents = await _load_documents_from_file(path, csv_delimiter)
754-
add_documents.append(self.add_documents(documents, primary_key))
759+
if not use_task_groups():
760+
add_documents = []
761+
for path in directory.iterdir():
762+
if path.suffix == f".{document_type}":
763+
documents = await _load_documents_from_file(path, csv_delimiter)
764+
add_documents.append(self.add_documents(documents, primary_key))
755765

756-
_raise_on_no_documents(add_documents, document_type, directory_path)
766+
_raise_on_no_documents(add_documents, document_type, directory_path)
757767

758-
if len(add_documents) > 1:
759-
# Send the first document on its own before starting the gather. Otherwise Meilisearch
760-
# returns an error because it thinks all entries are trying to create the same index.
761-
first_response = [await add_documents.pop()]
762-
responses = await gather(*add_documents)
763-
responses = [*first_response, *responses]
764-
else:
765-
responses = [await add_documents[0]]
768+
if len(add_documents) > 1:
769+
# Send the first document on its own before starting the gather. Otherwise Meilisearch
770+
# returns an error because it thinks all entries are trying to create the same index.
771+
first_response = [await add_documents.pop()]
766772

767-
return responses
773+
responses = await asyncio.gather(*add_documents)
774+
responses = [*first_response, *responses]
775+
else:
776+
responses = [await add_documents[0]]
777+
778+
return responses
779+
780+
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
781+
tasks = []
782+
all_results = []
783+
for i, path in enumerate(directory.iterdir()):
784+
if path.suffix == f".{document_type}":
785+
documents = await _load_documents_from_file(path, csv_delimiter)
786+
if i == 0:
787+
all_results = [await self.add_documents(documents)]
788+
else:
789+
tasks.append(tg.create_task(self.add_documents(documents, primary_key)))
790+
791+
results = [x.result() for x in tasks]
792+
all_results = [*all_results, *results]
793+
_raise_on_no_documents(all_results, document_type, directory_path)
794+
return all_results
768795

769796
async def add_documents_from_directory_in_batches(
770797
self,
@@ -824,7 +851,7 @@ async def add_documents_from_directory_in_batches(
824851

825852
_raise_on_no_documents(all_documents, document_type, directory_path)
826853

827-
loop = get_running_loop()
854+
loop = asyncio.get_running_loop()
828855
combined = await loop.run_in_executor(None, partial(_combine_documents, all_documents))
829856

830857
return await self.add_documents_in_batches(
@@ -849,7 +876,7 @@ async def add_documents_from_directory_in_batches(
849876
# Send the first document on its own before starting the gather. Otherwise Meilisearch
850877
# returns an error because it thinks all entries are trying to create the same index.
851878
first_response = await add_documents.pop()
852-
responses_gather = await gather(*add_documents)
879+
responses_gather = await asyncio.gather(*add_documents)
853880
responses = [*first_response, *[x for y in responses_gather for x in y]]
854881
else:
855882
responses = await add_documents[0]
@@ -1095,8 +1122,16 @@ async def update_documents_in_batches(
10951122
>>> index = client.index("movies")
10961123
>>> await index.update_documents_in_batches(documents)
10971124
"""
1098-
batches = [self.update_documents(x, primary_key) for x in _batch(documents, batch_size)]
1099-
return await gather(*batches)
1125+
if not use_task_groups():
1126+
batches = [self.update_documents(x, primary_key) for x in _batch(documents, batch_size)]
1127+
return await asyncio.gather(*batches)
1128+
1129+
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
1130+
tasks = [
1131+
tg.create_task(self.update_documents(x, primary_key))
1132+
for x in _batch(documents, batch_size)
1133+
]
1134+
return [x.result() for x in tasks]
11001135

11011136
async def update_documents_from_directory(
11021137
self,
@@ -1153,30 +1188,46 @@ async def update_documents_from_directory(
11531188

11541189
_raise_on_no_documents(all_documents, document_type, directory_path)
11551190

1156-
loop = get_running_loop()
1191+
loop = asyncio.get_running_loop()
11571192
combined = await loop.run_in_executor(None, partial(_combine_documents, all_documents))
11581193

11591194
response = await self.update_documents(combined, primary_key)
11601195
return [response]
11611196

1162-
update_documents = []
1163-
for path in directory.iterdir():
1164-
if path.suffix == f".{document_type}":
1165-
documents = await _load_documents_from_file(path, csv_delimiter)
1166-
update_documents.append(self.update_documents(documents, primary_key))
1197+
if not use_task_groups():
1198+
update_documents = []
1199+
for path in directory.iterdir():
1200+
if path.suffix == f".{document_type}":
1201+
documents = await _load_documents_from_file(path, csv_delimiter)
1202+
update_documents.append(self.update_documents(documents, primary_key))
11671203

1168-
_raise_on_no_documents(update_documents, document_type, directory_path)
1204+
_raise_on_no_documents(update_documents, document_type, directory_path)
11691205

1170-
if len(update_documents) > 1:
1171-
# Send the first document on its own before starting the gather. Otherwise Meilisearch
1172-
# returns an error because it thinks all entries are trying to create the same index.
1173-
first_response = [await update_documents.pop()]
1174-
responses = await gather(*update_documents)
1175-
responses = [*first_response, *responses]
1176-
else:
1177-
responses = [await update_documents[0]]
1206+
if len(update_documents) > 1:
1207+
# Send the first document on its own before starting the gather. Otherwise Meilisearch
1208+
# returns an error because it thinks all entries are trying to create the same index.
1209+
first_response = [await update_documents.pop()]
1210+
responses = await asyncio.gather(*update_documents)
1211+
responses = [*first_response, *responses]
1212+
else:
1213+
responses = [await update_documents[0]]
11781214

1179-
return responses
1215+
return responses
1216+
1217+
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
1218+
tasks = []
1219+
results = []
1220+
for i, path in enumerate(directory.iterdir()):
1221+
if path.suffix == f".{document_type}":
1222+
documents = await _load_documents_from_file(path, csv_delimiter)
1223+
if i == 0:
1224+
results = [await self.update_documents(documents, primary_key)]
1225+
else:
1226+
tasks.append(tg.create_task(self.update_documents(documents, primary_key)))
1227+
1228+
results = [*results, *[x.result() for x in tasks]]
1229+
_raise_on_no_documents(results, document_type, directory_path)
1230+
return results
11801231

11811232
async def update_documents_from_directory_in_batches(
11821233
self,
@@ -1236,37 +1287,61 @@ async def update_documents_from_directory_in_batches(
12361287

12371288
_raise_on_no_documents(all_documents, document_type, directory_path)
12381289

1239-
loop = get_running_loop()
1290+
loop = asyncio.get_running_loop()
12401291
combined = await loop.run_in_executor(None, partial(_combine_documents, all_documents))
12411292

12421293
return await self.update_documents_in_batches(
12431294
combined, batch_size=batch_size, primary_key=primary_key
12441295
)
12451296

1246-
responses: list[TaskInfo] = []
1297+
if not use_task_groups():
1298+
responses: list[TaskInfo] = []
12471299

1248-
update_documents = []
1249-
for path in directory.iterdir():
1250-
if path.suffix == f".{document_type}":
1251-
documents = await _load_documents_from_file(path, csv_delimiter)
1252-
update_documents.append(
1253-
self.update_documents_in_batches(
1254-
documents, batch_size=batch_size, primary_key=primary_key
1300+
update_documents = []
1301+
for path in directory.iterdir():
1302+
if path.suffix == f".{document_type}":
1303+
documents = await _load_documents_from_file(path, csv_delimiter)
1304+
update_documents.append(
1305+
self.update_documents_in_batches(
1306+
documents, batch_size=batch_size, primary_key=primary_key
1307+
)
12551308
)
1256-
)
12571309

1258-
_raise_on_no_documents(update_documents, document_type, directory_path)
1310+
_raise_on_no_documents(update_documents, document_type, directory_path)
12591311

1260-
if len(update_documents) > 1:
1261-
# Send the first document on its own before starting the gather. Otherwise Meilisearch
1262-
# returns an error because it thinks all entries are trying to create the same index.
1263-
first_response = await update_documents.pop()
1264-
responses_gather = await gather(*update_documents)
1265-
responses = [*first_response, *[x for y in responses_gather for x in y]]
1266-
else:
1267-
responses = await update_documents[0]
1312+
if len(update_documents) > 1:
1313+
# Send the first document on its own before starting the gather. Otherwise Meilisearch
1314+
# returns an error because it thinks all entries are trying to create the same index.
1315+
first_response = await update_documents.pop()
1316+
responses_gather = await asyncio.gather(*update_documents)
1317+
responses = [*first_response, *[x for y in responses_gather for x in y]]
1318+
else:
1319+
responses = await update_documents[0]
12681320

1269-
return responses
1321+
return responses
1322+
1323+
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
1324+
results = []
1325+
tasks = []
1326+
for i, path in enumerate(directory.iterdir()):
1327+
if path.suffix == f".{document_type}":
1328+
documents = await _load_documents_from_file(path, csv_delimiter)
1329+
if i == 0:
1330+
results = await self.update_documents_in_batches(
1331+
documents, batch_size=batch_size, primary_key=primary_key
1332+
)
1333+
else:
1334+
tasks.append(
1335+
tg.create_task(
1336+
self.update_documents_in_batches(
1337+
documents, batch_size=batch_size, primary_key=primary_key
1338+
)
1339+
)
1340+
)
1341+
1342+
results = [*results, *[x for y in tasks for x in y.result()]]
1343+
_raise_on_no_documents(results, document_type, directory_path)
1344+
return results
12701345

12711346
async def update_documents_from_file(
12721347
self,
@@ -1535,8 +1610,16 @@ async def delete_documents_in_batches_by_filter(
15351610
>>> ]
15361611
>>> )
15371612
"""
1538-
tasks = [self.delete_documents_by_filter(filter) for filter in filters]
1539-
return await gather(*tasks)
1613+
if not use_task_groups():
1614+
tasks = [self.delete_documents_by_filter(filter) for filter in filters]
1615+
return await asyncio.gather(*tasks)
1616+
1617+
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
1618+
tg_tasks = [
1619+
tg.create_task(self.delete_documents_by_filter(filter)) for filter in filters
1620+
]
1621+
1622+
return [x.result() for x in tg_tasks]
15401623

15411624
async def delete_all_documents(self) -> TaskInfo:
15421625
"""Delete all documents from the index.
@@ -2549,7 +2632,7 @@ async def _load_documents_from_file(
25492632
if isinstance(file_path, str):
25502633
file_path = Path(file_path)
25512634

2552-
loop = get_running_loop()
2635+
loop = asyncio.get_running_loop()
25532636
await loop.run_in_executor(None, partial(_validate_file_type, file_path))
25542637

25552638
if file_path.suffix == ".csv":

0 commit comments

Comments
 (0)