Skip to content

Commit 27316b3

Browse files
simorenohannatisch
andauthored
[Cosmos] make response_hook thread safe for the sync client (#33790)
* only missing queries * pylint * upsert method typehints * Update _cosmos_client_connection.py * had to include List in return type for batch * Revert "had to include List in return type for batch" This reverts commit cacf629. * read all items is also query - still don't have this logic down * casting batch * casting again? * oops * return readItems to previous state * remove casting in client_connection Batch * update __queryFeed to be thread safe * forgot to save headers * Update _cosmos_client_connection.py * typehint for non-used method was missing * Updated User operations * make response_hook internal in cosmos_client * make response_hook internal in database * make response_hook internal container * small fixes * Update database.py * Update _cosmos_client_connection.py * fixes from comments * wrong naming on these samples pls let me squeeze it in <3 * revert changes on cosmos_client methods * Update cosmos_client.py --------- Co-authored-by: antisch <[email protected]>
1 parent 2828444 commit 27316b3

File tree

9 files changed

+216
-327
lines changed

9 files changed

+216
-327
lines changed

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 171 additions & 188 deletions
Large diffs are not rendered by default.

sdk/cosmos/azure-cosmos/azure/cosmos/container.py

Lines changed: 11 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@ def read( # pylint:disable=docstring-missing-param
153153
:rtype: dict[str, Any]
154154
"""
155155
request_options = build_options(kwargs)
156-
response_hook = kwargs.pop('response_hook', None)
157156
if populate_query_metrics:
158157
warnings.warn(
159158
"the populate_query_metrics flag does not apply to this method and will be removed in the future",
@@ -167,8 +166,6 @@ def read( # pylint:disable=docstring-missing-param
167166
self._properties = self.client_connection.ReadContainer(
168167
collection_link, options=request_options, **kwargs
169168
)
170-
if response_hook:
171-
response_hook(self.client_connection.last_response_headers, self._properties)
172169
return self._properties
173170

174171
@distributed_trace
@@ -211,7 +208,6 @@ def read_item( # pylint:disable=docstring-missing-param
211208
"""
212209
doc_link = self._get_document_link(item)
213210
request_options = build_options(kwargs)
214-
response_hook = kwargs.pop('response_hook', None)
215211

216212
if partition_key is not None:
217213
request_options["partitionKey"] = self._set_partition_key(partition_key)
@@ -229,10 +225,7 @@ def read_item( # pylint:disable=docstring-missing-param
229225
validate_cache_staleness_value(max_integrated_cache_staleness_in_ms)
230226
request_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms
231227

232-
result = self.client_connection.ReadItem(document_link=doc_link, options=request_options, **kwargs)
233-
if response_hook:
234-
response_hook(self.client_connection.last_response_headers, result)
235-
return result
228+
return self.client_connection.ReadItem(document_link=doc_link, options=request_options, **kwargs)
236229

237230
@distributed_trace
238231
def read_all_items( # pylint:disable=docstring-missing-param
@@ -275,8 +268,7 @@ def read_all_items( # pylint:disable=docstring-missing-param
275268
response_hook.clear()
276269

277270
items = self.client_connection.ReadItems(
278-
collection_link=self.container_link, feed_options=feed_options, response_hook=response_hook, **kwargs
279-
)
271+
collection_link=self.container_link, feed_options=feed_options, response_hook=response_hook, **kwargs)
280272
if response_hook:
281273
response_hook(self.client_connection.last_response_headers, items)
282274
return items
@@ -494,7 +486,6 @@ def replace_item( # pylint:disable=docstring-missing-param
494486
"""
495487
item_link = self._get_document_link(item)
496488
request_options = build_options(kwargs)
497-
response_hook = kwargs.pop('response_hook', None)
498489
request_options["disableAutomaticIdGeneration"] = True
499490
if populate_query_metrics is not None:
500491
warnings.warn(
@@ -507,12 +498,9 @@ def replace_item( # pylint:disable=docstring-missing-param
507498
if post_trigger_include is not None:
508499
request_options["postTriggerInclude"] = post_trigger_include
509500

510-
result = self.client_connection.ReplaceItem(
501+
return self.client_connection.ReplaceItem(
511502
document_link=item_link, new_document=body, options=request_options, **kwargs
512503
)
513-
if response_hook:
514-
response_hook(self.client_connection.last_response_headers, result)
515-
return result
516504

517505
@distributed_trace
518506
def upsert_item( # pylint:disable=docstring-missing-param
@@ -546,7 +534,6 @@ def upsert_item( # pylint:disable=docstring-missing-param
546534
:rtype: Dict[str, Any]
547535
"""
548536
request_options = build_options(kwargs)
549-
response_hook = kwargs.pop('response_hook', None)
550537
request_options["disableAutomaticIdGeneration"] = True
551538
if populate_query_metrics is not None:
552539
warnings.warn(
@@ -559,15 +546,12 @@ def upsert_item( # pylint:disable=docstring-missing-param
559546
if post_trigger_include is not None:
560547
request_options["postTriggerInclude"] = post_trigger_include
561548

562-
result = self.client_connection.UpsertItem(
549+
return self.client_connection.UpsertItem(
563550
database_or_container_link=self.container_link,
564551
document=body,
565552
options=request_options,
566553
**kwargs
567554
)
568-
if response_hook:
569-
response_hook(self.client_connection.last_response_headers, result)
570-
return result
571555

572556
@distributed_trace
573557
def create_item( # pylint:disable=docstring-missing-param
@@ -606,7 +590,6 @@ def create_item( # pylint:disable=docstring-missing-param
606590
:rtype: Dict[str, Any]
607591
"""
608592
request_options = build_options(kwargs)
609-
response_hook = kwargs.pop('response_hook', None)
610593

611594
request_options["disableAutomaticIdGeneration"] = not kwargs.pop('enable_automatic_id_generation', False)
612595
if populate_query_metrics:
@@ -622,12 +605,8 @@ def create_item( # pylint:disable=docstring-missing-param
622605
if indexing_directive is not None:
623606
request_options["indexingDirective"] = indexing_directive
624607

625-
result = self.client_connection.CreateItem(
626-
database_or_container_link=self.container_link, document=body, options=request_options, **kwargs
627-
)
628-
if response_hook:
629-
response_hook(self.client_connection.last_response_headers, result)
630-
return result
608+
return self.client_connection.CreateItem(
609+
database_or_container_link=self.container_link, document=body, options=request_options, **kwargs)
631610

632611
@distributed_trace
633612
def patch_item(
@@ -662,19 +641,15 @@ def patch_item(
662641
:rtype: dict[str, Any]
663642
"""
664643
request_options = build_options(kwargs)
665-
response_hook = kwargs.pop('response_hook', None)
666644
request_options["disableAutomaticIdGeneration"] = True
667645
request_options["partitionKey"] = self._set_partition_key(partition_key)
668646
filter_predicate = kwargs.pop("filter_predicate", None)
669647
if filter_predicate is not None:
670648
request_options["filterPredicate"] = filter_predicate
671649

672650
item_link = self._get_document_link(item)
673-
result = self.client_connection.PatchItem(
651+
return self.client_connection.PatchItem(
674652
document_link=item_link, operations=patch_operations, options=request_options, **kwargs)
675-
if response_hook:
676-
response_hook(self.client_connection.last_response_headers, result)
677-
return result
678653

679654
@distributed_trace
680655
def execute_item_batch(
@@ -703,14 +678,10 @@ def execute_item_batch(
703678
"""
704679
request_options = build_options(kwargs)
705680
request_options["partitionKey"] = self._set_partition_key(partition_key)
706-
response_hook = kwargs.pop('response_hook', None)
707681
request_options["disableAutomaticIdGeneration"] = True
708682

709-
result = self.client_connection.Batch(
683+
return self.client_connection.Batch(
710684
collection_link=self.container_link, batch_operations=batch_operations, options=request_options, **kwargs)
711-
if response_hook:
712-
response_hook(self.client_connection.last_response_headers, result)
713-
return result
714685

715686
@distributed_trace
716687
def delete_item( # pylint:disable=docstring-missing-param
@@ -743,7 +714,6 @@ def delete_item( # pylint:disable=docstring-missing-param
743714
:rtype: None
744715
"""
745716
request_options = build_options(kwargs)
746-
response_hook = kwargs.pop('response_hook', None)
747717
if partition_key is not None:
748718
request_options["partitionKey"] = self._set_partition_key(partition_key)
749719
if populate_query_metrics is not None:
@@ -759,8 +729,6 @@ def delete_item( # pylint:disable=docstring-missing-param
759729

760730
document_link = self._get_document_link(item)
761731
self.client_connection.DeleteItem(document_link=document_link, options=request_options, **kwargs)
762-
if response_hook:
763-
response_hook(self.client_connection.last_response_headers, None)
764732

765733
@distributed_trace
766734
def read_offer(self, **kwargs: Any) -> Offer:
@@ -827,7 +795,6 @@ def replace_throughput(
827795
or the throughput properties could not be updated.
828796
:rtype: ~azure.cosmos.ThroughputProperties
829797
"""
830-
response_hook = kwargs.pop('response_hook', None)
831798
properties = self._get_properties()
832799
link = properties["_self"]
833800
query_spec = {
@@ -844,8 +811,6 @@ def replace_throughput(
844811
data = self.client_connection.ReplaceOffer(
845812
offer_link=throughput_properties[0]["_self"], offer=throughput_properties[0], **kwargs)
846813

847-
if response_hook:
848-
response_hook(self.client_connection.last_response_headers, data)
849814
return ThroughputProperties(offer_throughput=data["content"]["offerThroughput"], properties=data)
850815

851816
@distributed_trace
@@ -936,16 +901,12 @@ def get_conflict(
936901
:rtype: Dict[str, Any]
937902
"""
938903
request_options = build_options(kwargs)
939-
response_hook = kwargs.pop('response_hook', None)
940904
if partition_key is not None:
941905
request_options["partitionKey"] = self._set_partition_key(partition_key)
942906

943-
result = self.client_connection.ReadConflict(
907+
return self.client_connection.ReadConflict(
944908
conflict_link=self._get_conflict_link(conflict), options=request_options, **kwargs
945909
)
946-
if response_hook:
947-
response_hook(self.client_connection.last_response_headers, result)
948-
return result
949910

950911
@distributed_trace
951912
def delete_conflict(
@@ -968,15 +929,12 @@ def delete_conflict(
968929
:rtype: None
969930
"""
970931
request_options = build_options(kwargs)
971-
response_hook = kwargs.pop('response_hook', None)
972932
if partition_key is not None:
973933
request_options["partitionKey"] = self._set_partition_key(partition_key)
974934

975935
self.client_connection.DeleteConflict(
976936
conflict_link=self._get_conflict_link(conflict), options=request_options, **kwargs
977937
)
978-
if response_hook:
979-
response_hook(self.client_connection.last_response_headers, None)
980938

981939
@distributed_trace
982940
def delete_all_items_by_partition_key(
@@ -1002,11 +960,8 @@ def delete_all_items_by_partition_key(
1002960
:rtype: None
1003961
"""
1004962
request_options = build_options(kwargs)
1005-
response_hook = kwargs.pop('response_hook', None)
1006963
# regardless if partition key is valid we set it as invalid partition keys are set to a default empty value
1007964
request_options["partitionKey"] = self._set_partition_key(partition_key)
1008965

1009-
self.client_connection.DeleteAllItemsByPartitionKey(collection_link=self.container_link,
1010-
options=request_options, **kwargs)
1011-
if response_hook:
1012-
response_hook(self.client_connection.last_response_headers, None)
966+
self.client_connection.DeleteAllItemsByPartitionKey(
967+
collection_link=self.container_link, options=request_options, **kwargs)

0 commit comments

Comments
 (0)