Skip to content

Commit ea8fccd

Browse files
authored
Adoption of returnDocumentResponses in insert_many (no changes to user) (#335)
* collection insert_many uses returnDocumentResponses * collection, thorough testing of the switch to retDocResps * switched to returnDocumentResponses in table insert many + tests
1 parent f31c565 commit ea8fccd

11 files changed

+377
-54
lines changed

CHANGES

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Support for the `indexType` field to describe table indexes (as not mandatory fo
55
DataAPITime: support for "hh:mm" no-seconds format
66
DataAPIDuration: improved parse performance by caching regexpes
77
DataAPIDuration: support for "P4W"-type strings and for zeroes such as "P", "-PR"
8+
Collection and Table `insert_many` methods employ returnDocumentResponses under the hood
89
maintenance: switch to DSE6.9 for local non-Astra testing
910

1011
v 2.0.0rc1

astrapy/data/collection.py

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -787,7 +787,7 @@ def insert_many(
787787
timeout_label=_gmt_label,
788788
)
789789
if ordered:
790-
options = {"ordered": True}
790+
options = {"ordered": True, "returnDocumentResponses": True}
791791
inserted_ids: list[Any] = []
792792
for i in range(0, len(_documents), _chunk_size):
793793
im_payload = {
@@ -807,9 +807,13 @@ def insert_many(
807807
)
808808
logger.info(f"finished insertMany(chunk) on '{self.name}'")
809809
# accumulate the results in this call
810-
chunk_inserted_ids = (chunk_response.get("status") or {}).get(
811-
"insertedIds", []
812-
)
810+
chunk_inserted_ids = [
811+
doc_resp["_id"]
812+
for doc_resp in (chunk_response.get("status") or {}).get(
813+
"documentResponses", []
814+
)
815+
if doc_resp["status"] == "OK"
816+
]
813817
inserted_ids += chunk_inserted_ids
814818
raw_results += [chunk_response]
815819
# if errors, quit early
@@ -836,7 +840,7 @@ def insert_many(
836840

837841
else:
838842
# unordered: concurrent or not, do all of them and parse the results
839-
options = {"ordered": False}
843+
options = {"ordered": False, "returnDocumentResponses": True}
840844
if _concurrency > 1:
841845
with ThreadPoolExecutor(max_workers=_concurrency) as executor:
842846

@@ -891,11 +895,12 @@ def _chunk_insertor(
891895
raw_results.append(im_response)
892896
# recast raw_results
893897
inserted_ids = [
894-
inserted_id
898+
doc_resp["_id"]
895899
for chunk_response in raw_results
896-
for inserted_id in (chunk_response.get("status") or {}).get(
897-
"insertedIds", []
900+
for doc_resp in (chunk_response.get("status") or {}).get(
901+
"documentResponses", []
898902
)
903+
if doc_resp["status"] == "OK"
899904
]
900905

901906
# check-raise
@@ -3335,7 +3340,7 @@ async def insert_many(
33353340
timeout_label=_gmt_label,
33363341
)
33373342
if ordered:
3338-
options = {"ordered": True}
3343+
options = {"ordered": True, "returnDocumentResponses": True}
33393344
inserted_ids: list[Any] = []
33403345
for i in range(0, len(_documents), _chunk_size):
33413346
im_payload = {
@@ -3355,9 +3360,13 @@ async def insert_many(
33553360
)
33563361
logger.info(f"finished insertMany(chunk) on '{self.name}'")
33573362
# accumulate the results in this call
3358-
chunk_inserted_ids = (chunk_response.get("status") or {}).get(
3359-
"insertedIds", []
3360-
)
3363+
chunk_inserted_ids = [
3364+
doc_resp["_id"]
3365+
for doc_resp in (chunk_response.get("status") or {}).get(
3366+
"documentResponses", []
3367+
)
3368+
if doc_resp["status"] == "OK"
3369+
]
33613370
inserted_ids += chunk_inserted_ids
33623371
raw_results += [chunk_response]
33633372
# if errors, quit early
@@ -3384,7 +3393,7 @@ async def insert_many(
33843393

33853394
else:
33863395
# unordered: concurrent or not, do all of them and parse the results
3387-
options = {"ordered": False}
3396+
options = {"ordered": False, "returnDocumentResponses": True}
33883397

33893398
sem = asyncio.Semaphore(_concurrency)
33903399

@@ -3426,11 +3435,12 @@ async def concurrent_insert_chunk(
34263435

34273436
# recast raw_results
34283437
inserted_ids = [
3429-
inserted_id
3438+
doc_resp["_id"]
34303439
for chunk_response in raw_results
3431-
for inserted_id in (chunk_response.get("status") or {}).get(
3432-
"insertedIds", []
3440+
for doc_resp in (chunk_response.get("status") or {}).get(
3441+
"documentResponses", []
34333442
)
3443+
if doc_resp["status"] == "OK"
34343444
]
34353445

34363446
# check-raise

astrapy/data/table.py

Lines changed: 52 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1183,29 +1183,37 @@ def _prepare_keys_from_status(
11831183
ids = []
11841184
id_tuples = []
11851185
else:
1186-
if "primaryKeySchema" not in status:
1186+
if "documentResponses" not in status:
11871187
raise UnexpectedDataAPIResponseException(
11881188
text=(
1189-
"received a 'status' without 'primaryKeySchema' "
1189+
"received a 'status' without 'documentResponses' "
11901190
f"in API response (received: {status})"
11911191
),
11921192
raw_response=None,
11931193
)
1194-
if "insertedIds" not in status:
1195-
raise UnexpectedDataAPIResponseException(
1196-
text=(
1197-
"received a 'status' without 'insertedIds' "
1198-
f"in API response (received: {status})"
1199-
),
1200-
raw_response=None,
1194+
raw_inserted_ids = [
1195+
row_resp["_id"]
1196+
for row_resp in status["documentResponses"]
1197+
if row_resp["status"] == "OK"
1198+
]
1199+
if raw_inserted_ids:
1200+
if "primaryKeySchema" not in status:
1201+
raise UnexpectedDataAPIResponseException(
1202+
text=(
1203+
"received a 'status' without 'primaryKeySchema' "
1204+
f"in API response (received: {status})"
1205+
),
1206+
raw_response=None,
1207+
)
1208+
id_tuples_and_ids = self._converter_agent.postprocess_keys(
1209+
raw_inserted_ids,
1210+
primary_key_schema_dict=status["primaryKeySchema"],
12011211
)
1202-
primary_key_schema = status["primaryKeySchema"]
1203-
id_tuples_and_ids = self._converter_agent.postprocess_keys(
1204-
status["insertedIds"],
1205-
primary_key_schema_dict=primary_key_schema,
1206-
)
1207-
id_tuples = [tpl for tpl, _ in id_tuples_and_ids]
1208-
ids = [id for _, id in id_tuples_and_ids]
1212+
id_tuples = [tpl for tpl, _ in id_tuples_and_ids]
1213+
ids = [id for _, id in id_tuples_and_ids]
1214+
else:
1215+
ids = []
1216+
id_tuples = []
12091217
return ids, id_tuples
12101218

12111219
def insert_many(
@@ -1384,7 +1392,7 @@ def insert_many(
13841392
timeout_label=_gmt_label,
13851393
)
13861394
if ordered:
1387-
options = {"ordered": True}
1395+
options = {"ordered": True, "returnDocumentResponses": True}
13881396
inserted_ids: list[Any] = []
13891397
inserted_id_tuples: list[Any] = []
13901398
for i in range(0, len(_rows), _chunk_size):
@@ -1437,7 +1445,7 @@ def insert_many(
14371445

14381446
else:
14391447
# unordered: concurrent or not, do all of them and parse the results
1440-
options = {"ordered": False}
1448+
options = {"ordered": False, "returnDocumentResponses": True}
14411449
if _concurrency > 1:
14421450
with ThreadPoolExecutor(max_workers=_concurrency) as executor:
14431451

@@ -3900,29 +3908,37 @@ def _prepare_keys_from_status(
39003908
ids = []
39013909
id_tuples = []
39023910
else:
3903-
if "primaryKeySchema" not in status:
3911+
if "documentResponses" not in status:
39043912
raise UnexpectedDataAPIResponseException(
39053913
text=(
3906-
"received a 'status' without 'primaryKeySchema' "
3914+
"received a 'status' without 'documentResponses' "
39073915
f"in API response (received: {status})"
39083916
),
39093917
raw_response=None,
39103918
)
3911-
if "insertedIds" not in status:
3912-
raise UnexpectedDataAPIResponseException(
3913-
text=(
3914-
"received a 'status' without 'insertedIds' "
3915-
f"in API response (received: {status})"
3916-
),
3917-
raw_response=None,
3919+
raw_inserted_ids = [
3920+
row_resp["_id"]
3921+
for row_resp in status["documentResponses"]
3922+
if row_resp["status"] == "OK"
3923+
]
3924+
if raw_inserted_ids:
3925+
if "primaryKeySchema" not in status:
3926+
raise UnexpectedDataAPIResponseException(
3927+
text=(
3928+
"received a 'status' without 'primaryKeySchema' "
3929+
f"in API response (received: {status})"
3930+
),
3931+
raw_response=None,
3932+
)
3933+
id_tuples_and_ids = self._converter_agent.postprocess_keys(
3934+
raw_inserted_ids,
3935+
primary_key_schema_dict=status["primaryKeySchema"],
39183936
)
3919-
primary_key_schema = status["primaryKeySchema"]
3920-
id_tuples_and_ids = self._converter_agent.postprocess_keys(
3921-
status["insertedIds"],
3922-
primary_key_schema_dict=primary_key_schema,
3923-
)
3924-
id_tuples = [tpl for tpl, _ in id_tuples_and_ids]
3925-
ids = [id for _, id in id_tuples_and_ids]
3937+
id_tuples = [tpl for tpl, _ in id_tuples_and_ids]
3938+
ids = [id for _, id in id_tuples_and_ids]
3939+
else:
3940+
ids = []
3941+
id_tuples = []
39263942
return ids, id_tuples
39273943

39283944
async def insert_many(
@@ -4103,7 +4119,7 @@ async def insert_many(
41034119
timeout_label=_gmt_label,
41044120
)
41054121
if ordered:
4106-
options = {"ordered": True}
4122+
options = {"ordered": True, "returnDocumentResponses": True}
41074123
inserted_ids: list[Any] = []
41084124
inserted_id_tuples: list[Any] = []
41094125
for i in range(0, len(_rows), _chunk_size):
@@ -4156,7 +4172,7 @@ async def insert_many(
41564172

41574173
else:
41584174
# unordered: concurrent or not, do all of them and parse the results
4159-
options = {"ordered": False}
4175+
options = {"ordered": False, "returnDocumentResponses": True}
41604176

41614177
sem = asyncio.Semaphore(_concurrency)
41624178

tests/base/integration/collections/test_collection_dml_async.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,6 +1061,48 @@ async def test_collection_insert_many_vectors_async(
10611061
vectors = [doc["$vector"] async for doc in acol.find({}, projection={"*": 1})]
10621062
assert all(len(vec) == 2 for vec in vectors)
10631063

1064+
@pytest.mark.describe("test of collection insert_many, failures, async")
1065+
async def test_collection_insert_many_failures_async(
1066+
self,
1067+
async_empty_collection: DefaultAsyncCollection,
1068+
) -> None:
1069+
# The main goal here is to keep the switch to returnDocumentResponses in check.
1070+
N = 110
1071+
ins_res0 = await async_empty_collection.insert_many(
1072+
[{"_id": i} for i in range(N)],
1073+
concurrency=1,
1074+
)
1075+
assert set(ins_res0.inserted_ids) == set(range(N))
1076+
1077+
ins_res1 = await async_empty_collection.insert_many(
1078+
[{"_id": N + i} for i in range(N)],
1079+
concurrency=20,
1080+
)
1081+
assert set(ins_res1.inserted_ids) == {N + i for i in range(N)}
1082+
1083+
# unordered insertion [good, bad]
1084+
err2: CollectionInsertManyException | None = None
1085+
try:
1086+
await async_empty_collection.insert_many([{"_id": 2 * N}, {"_id": 0}])
1087+
except CollectionInsertManyException as e:
1088+
err2 = e
1089+
assert err2 is not None
1090+
assert len(err2.error_descriptors) == 1
1091+
assert err2.partial_result.inserted_ids == [2 * N]
1092+
1093+
# ordered insertion [good, bad, good_skipped]
1094+
err3: CollectionInsertManyException | None = None
1095+
try:
1096+
await async_empty_collection.insert_many(
1097+
[{"_id": 2 * N + 1}, {"_id": 0}, {"_id": 2 * N + 2}],
1098+
ordered=True,
1099+
)
1100+
except CollectionInsertManyException as e:
1101+
err3 = e
1102+
assert err3 is not None
1103+
assert len(err3.error_descriptors) == 1
1104+
assert err3.partial_result.inserted_ids == [2 * N + 1]
1105+
10641106
@pytest.mark.describe("test of collection find_one, async")
10651107
async def test_collection_find_one_async(
10661108
self,

tests/base/integration/collections/test_collection_dml_sync.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -997,6 +997,48 @@ def test_collection_insert_many_vectors_sync(
997997
len(doc["$vector"]) == 2 for doc in col.find({}, projection={"*": 1})
998998
)
999999

1000+
@pytest.mark.describe("test of collection insert_many, failures, sync")
1001+
def test_collection_insert_many_failures_sync(
1002+
self,
1003+
sync_empty_collection: DefaultCollection,
1004+
) -> None:
1005+
# The main goal here is to keep the switch to returnDocumentResponses in check.
1006+
N = 110
1007+
ins_res0 = sync_empty_collection.insert_many(
1008+
[{"_id": i} for i in range(N)],
1009+
concurrency=1,
1010+
)
1011+
assert set(ins_res0.inserted_ids) == set(range(N))
1012+
1013+
ins_res1 = sync_empty_collection.insert_many(
1014+
[{"_id": N + i} for i in range(N)],
1015+
concurrency=20,
1016+
)
1017+
assert set(ins_res1.inserted_ids) == {N + i for i in range(N)}
1018+
1019+
# unordered insertion [good, bad]
1020+
err2: CollectionInsertManyException | None = None
1021+
try:
1022+
sync_empty_collection.insert_many([{"_id": 2 * N}, {"_id": 0}])
1023+
except CollectionInsertManyException as e:
1024+
err2 = e
1025+
assert err2 is not None
1026+
assert len(err2.error_descriptors) == 1
1027+
assert err2.partial_result.inserted_ids == [2 * N]
1028+
1029+
# ordered insertion [good, bad, good_skipped]
1030+
err3: CollectionInsertManyException | None = None
1031+
try:
1032+
sync_empty_collection.insert_many(
1033+
[{"_id": 2 * N + 1}, {"_id": 0}, {"_id": 2 * N + 2}],
1034+
ordered=True,
1035+
)
1036+
except CollectionInsertManyException as e:
1037+
err3 = e
1038+
assert err3 is not None
1039+
assert len(err3.error_descriptors) == 1
1040+
assert err3.partial_result.inserted_ids == [2 * N + 1]
1041+
10001042
@pytest.mark.describe("test of collection find_one, sync")
10011043
def test_collection_find_one_sync(
10021044
self,

tests/base/integration/collections/test_collection_vectorize_methods_async.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from astrapy.constants import DefaultDocumentType
2323
from astrapy.cursors import AsyncCollectionFindCursor
2424
from astrapy.data_types import DataAPIVector
25-
from astrapy.exceptions import DataAPIResponseException
25+
from astrapy.exceptions import CollectionInsertManyException, DataAPIResponseException
2626
from astrapy.info import CollectionDefinition
2727

2828
from ..conftest import HEADER_EMBEDDING_API_KEY_OPENAI, DefaultAsyncCollection
@@ -239,6 +239,28 @@ async def _alist(
239239
else:
240240
assert (await this_ite_2.get_sort_vector()) is None
241241

242+
@pytest.mark.describe("test of vectorize-based insert many grand failures, async")
243+
async def test_collection_methods_grandfailure_vectorize_async(
244+
self,
245+
async_empty_service_collection: DefaultAsyncCollection,
246+
) -> None:
247+
await async_empty_service_collection.insert_many(
248+
[{"_id": "Z", "$vectorize": "Text."}]
249+
)
250+
251+
bad_collection = async_empty_service_collection.with_options(
252+
embedding_api_key="BadKey",
253+
)
254+
err: CollectionInsertManyException | None = None
255+
try:
256+
await bad_collection.insert_many(
257+
[{"_id": "A"}, {"_id": "B", "$vectorize": "Text."}]
258+
)
259+
except CollectionInsertManyException as e:
260+
err = e
261+
assert err is not None
262+
assert err.partial_result.inserted_ids == []
263+
242264
@pytest.mark.describe(
243265
"test of database create_collection dimension-mismatch failure, async"
244266
)

0 commit comments

Comments
 (0)