Skip to content

Commit b9e0c0b

Browse files
tvaron3annie-mac
andauthored
Session Token Management APIs (Azure#36971)
* merge from main and resolve conflicts * remove async keyword from changeFeed query in aio package * refactor * refactor * fix pylint * added public surface methods * pylint fix * fix * added functionality for merging session tokens from logical pk * fix mypy * added tests for basic merge and split * resolve comments * resolve comments * resolve comments * resolve comments * fix pylint * fix mypy * fix tests * add tests * fix pylint * fix and resolve comments * fix and resolve comments * Added isSubsetFeedRange logic * Added request context to crud operations, session token helpers * revert unnecessary change * Added more tests * Added more tests * Changed tests to use new public feed range and more test coverage for request context * Added more tests * Fix tests and add changelog * fix spell checks * Added tests and pushed request context to client level * Added async methods and removed feed range from request context * fix tests * fix tests and pylint * Reacting to comments * Reacting to comments * pylint and added hpk tests * reacting to comments * fix tests and mypy * fix mypy * fix mypy * reacting to comments * reacting to comments * reacting to comments * fix cspell * rename method to get_latest_session_token * reacting to reverted feed range * change based on the api review * Reacting to API review and adding samples. * Fixed pylint * Reacting to comments * Reacting to comments * Reacting to comments * Reacting to comments * Fix pydoc * Fix pydoc * reacting to comments * reacting to comments --------- Co-authored-by: annie-mac <[email protected]>
1 parent 35d19de commit b9e0c0b

15 files changed

+1494
-10
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ This version and all future versions will support Python 3.13.
1010
* Added option to disable write payload on writes. See [PR 37365](https://github.com/Azure/azure-sdk-for-python/pull/37365)
1111
* Added get feed ranges API. See [PR 37687](https://github.com/Azure/azure-sdk-for-python/pull/37687)
1212
* Added feed range support in `query_items_change_feed`. See [PR 37687](https://github.com/Azure/azure-sdk-for-python/pull/37687)
13+
* Added **provisional** helper APIs for managing session tokens. See [PR 36971](https://github.com/Azure/azure-sdk-for-python/pull/36971)
14+
* Added ability to get feed range for a partition key. See [PR 36971](https://github.com/Azure/azure-sdk-for-python/pull/36971)
1315

1416
#### Breaking Changes
1517
* Item-level point operations will now return `CosmosDict` and `CosmosList` response types.

sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_range.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,6 @@ def _compare_helper(a, b):
186186

187187
@staticmethod
188188
def overlaps(range1, range2):
189-
190189
if range1 is None or range2 is None:
191190
return False
192191
if range1.isEmpty() or range2.isEmpty():
@@ -195,10 +194,35 @@ def overlaps(range1, range2):
195194
cmp1 = Range._compare_helper(range1.min, range2.max)
196195
cmp2 = Range._compare_helper(range2.min, range1.max)
197196

198-
if cmp1 <= 0 or cmp2 <= 0:
197+
if cmp1 <= 0 and cmp2 <= 0:
199198
if (cmp1 == 0 and not (range1.isMinInclusive and range2.isMaxInclusive)) or (
200199
cmp2 == 0 and not (range2.isMinInclusive and range1.isMaxInclusive)
201200
):
202201
return False
203202
return True
204203
return False
204+
205+
def can_merge(self, other: 'Range') -> bool:
206+
if self.isSingleValue() and other.isSingleValue():
207+
return self.min == other.min
208+
# if share the same boundary, they can merge
209+
overlap_boundary1 = self.max == other.min and self.isMaxInclusive or other.isMinInclusive
210+
overlap_boundary2 = other.max == self.min and other.isMaxInclusive or self.isMinInclusive
211+
if overlap_boundary1 or overlap_boundary2:
212+
return True
213+
return self.overlaps(self, other)
214+
215+
def merge(self, other: 'Range') -> 'Range':
216+
if not self.can_merge(other):
217+
raise ValueError("Ranges do not overlap")
218+
min_val = self.min if self.min < other.min else other.min
219+
max_val = self.max if self.max > other.max else other.max
220+
is_min_inclusive = self.isMinInclusive if self.min < other.min else other.isMinInclusive
221+
is_max_inclusive = self.isMaxInclusive if self.max > other.max else other.isMaxInclusive
222+
return Range(min_val, max_val, is_min_inclusive, is_max_inclusive)
223+
224+
def is_subset(self, parent_range: 'Range') -> bool:
225+
normalized_parent_range = parent_range.to_normalized_range()
226+
normalized_child_range = self.to_normalized_range()
227+
return (normalized_parent_range.min <= normalized_child_range.min and
228+
normalized_parent_range.max >= normalized_child_range.max)
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
# The MIT License (MIT)
2+
# Copyright (c) 2014 Microsoft Corporation
3+
4+
# Permission is hereby granted, free of charge, to any person obtaining a copy
5+
# of this software and associated documentation files (the "Software"), to deal
6+
# in the Software without restriction, including without limitation the rights
7+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
# copies of the Software, and to permit persons to whom the Software is
9+
# furnished to do so, subject to the following conditions:
10+
11+
# The above copyright notice and this permission notice shall be included in all
12+
# copies or substantial portions of the Software.
13+
14+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
# SOFTWARE.
21+
22+
"""Internal Helper functions for manipulating session tokens.
23+
"""
24+
from typing import Tuple, List, Dict, Any
25+
26+
from azure.cosmos._routing.routing_range import Range
27+
from azure.cosmos._vector_session_token import VectorSessionToken
28+
from ._change_feed.feed_range_internal import FeedRangeInternalEpk
29+
30+
# pylint: disable=protected-access
31+
32+
33+
# ex inputs and outputs:
34+
# 1. "1:1#51", "1:1#55" -> "1:1#55"
35+
# 2. "0:1#57", "1:1#52" -> "0:1#57"
36+
# 3. "1:1#57#3=54", "2:1#52#3=51" -> "1:1#57#3=54"
37+
# 4. "1:1#57#3=54", "1:1#58#3=53" -> "1:1#58#3=54"
38+
def merge_session_tokens_with_same_range(session_token1: str, session_token2: str) -> str:
39+
pk_range_id1, vector_session_token1 = parse_session_token(session_token1)
40+
pk_range_id2, vector_session_token2 = parse_session_token(session_token2)
41+
pk_range_id = pk_range_id1
42+
# The partition key range id could be different in this scenario
43+
#
44+
# Ex. get_updated_session_token([(("AA", "BB"), "1:1#51")], ("AA", "DD")) -> "1:1#51"
45+
# Then we input this back into get_updated_session_token after a merge happened
46+
# get_updated_session_token([(("AA", "DD"), "1:1#51"), (("AA", "DD"), "0:1#55")], ("AA", "DD")) -> "0:1#55"
47+
if pk_range_id1 != pk_range_id2:
48+
pk_range_id = pk_range_id1 \
49+
if vector_session_token1.global_lsn > vector_session_token2.global_lsn else pk_range_id2
50+
vector_session_token = vector_session_token1.merge(vector_session_token2)
51+
return pk_range_id + ":" + vector_session_token.session_token
52+
53+
def is_compound_session_token(session_token: str) -> bool:
54+
return "," in session_token
55+
56+
def parse_session_token(session_token: str) -> Tuple[str, VectorSessionToken]:
57+
tokens = session_token.split(":")
58+
return tokens[0], VectorSessionToken.create(tokens[1])
59+
60+
def split_compound_session_tokens(compound_session_tokens: List[Tuple[Range, str]]) -> List[str]:
61+
session_tokens = []
62+
for _, session_token in compound_session_tokens:
63+
if is_compound_session_token(session_token):
64+
tokens = session_token.split(",")
65+
for token in tokens:
66+
session_tokens.append(token)
67+
else:
68+
session_tokens.append(session_token)
69+
return session_tokens
70+
71+
# ex inputs:
72+
# ["1:1#51", "1:1#55", "1:1#57", "2:1#42", "2:1#45", "2:1#47"] -> ["1:1#57", "2:1#47"]
73+
def merge_session_tokens_for_same_partition(session_tokens: List[str]) -> List[str]:
74+
pk_session_tokens: Dict[str, List[str]] = {}
75+
for session_token in session_tokens:
76+
pk_range_id, _ = parse_session_token(session_token)
77+
if pk_range_id in pk_session_tokens:
78+
pk_session_tokens[pk_range_id].append(session_token)
79+
else:
80+
pk_session_tokens[pk_range_id] = [session_token]
81+
82+
processed_session_tokens = []
83+
for session_tokens_same_pk in pk_session_tokens.values():
84+
pk_range_id, vector_session_token = parse_session_token(session_tokens_same_pk[0])
85+
for session_token in session_tokens_same_pk[1:]:
86+
_, vector_session_token_1 = parse_session_token(session_token)
87+
vector_session_token = vector_session_token.merge(vector_session_token_1)
88+
processed_session_tokens.append(pk_range_id + ":" + vector_session_token.session_token)
89+
90+
return processed_session_tokens
91+
92+
# ex inputs:
93+
# merge scenario
94+
# 1. [(("AA", "BB"), "1:1#51"), (("BB", "DD"), "2:1#51"), (("AA", "DD"), "3:1#55")] ->
95+
# [("AA", "DD"), "3:1#55"]
96+
# split scenario
97+
# 2. [(("AA", "BB"), "1:1#57"), (("BB", "DD"), "2:1#58"), (("AA", "DD"), "0:1#55")] ->
98+
# [("AA", "DD"), "1:1#57,2:1#58"]
99+
# 3. [(("AA", "BB"), "4:1#57"), (("BB", "DD"), "1:1#52"), (("AA", "DD"), "3:1#55")] ->
100+
# [("AA", "DD"), "4:1#57,1:1#52,3:1#55"]
101+
# goal here is to detect any obvious merges or splits that happened
102+
# compound session tokens are not considered will just pass them along
103+
def merge_ranges_with_subsets(overlapping_ranges: List[Tuple[Range, str]]) -> List[Tuple[Range, str]]:
104+
processed_ranges = []
105+
while len(overlapping_ranges) != 0: # pylint: disable=too-many-nested-blocks
106+
feed_range_cmp, session_token_cmp = overlapping_ranges[0]
107+
# compound session tokens are not considered for merging
108+
if is_compound_session_token(session_token_cmp):
109+
processed_ranges.append(overlapping_ranges[0])
110+
overlapping_ranges.remove(overlapping_ranges[0])
111+
continue
112+
_, vector_session_token_cmp = parse_session_token(session_token_cmp)
113+
subsets = []
114+
# finding the subset feed ranges of the current feed range
115+
for j in range(1, len(overlapping_ranges)):
116+
feed_range = overlapping_ranges[j][0]
117+
if not is_compound_session_token(overlapping_ranges[j][1]) and \
118+
feed_range.is_subset(feed_range_cmp):
119+
subsets.append(overlapping_ranges[j] + (j,))
120+
121+
# go through subsets to see if can create current feed range from the subsets
122+
not_found = True
123+
j = 0
124+
while not_found and j < len(subsets):
125+
merged_range = subsets[j][0]
126+
session_tokens = [subsets[j][1]]
127+
merged_indices = [subsets[j][2]]
128+
if len(subsets) == 1:
129+
_, vector_session_token = parse_session_token(session_tokens[0])
130+
if vector_session_token_cmp.global_lsn > vector_session_token.global_lsn:
131+
overlapping_ranges.remove(overlapping_ranges[merged_indices[0]])
132+
else:
133+
for k, subset in enumerate(subsets):
134+
if j == k:
135+
continue
136+
if merged_range.can_merge(subset[0]):
137+
merged_range = merged_range.merge(subset[0])
138+
session_tokens.append(subset[1])
139+
merged_indices.append(subset[2])
140+
if feed_range_cmp == merged_range:
141+
# if feed range can be created from the subsets
142+
# take the subsets if their global lsn is larger
143+
# else take the current feed range
144+
children_more_updated = True
145+
parent_more_updated = True
146+
for session_token in session_tokens:
147+
_, vector_session_token = parse_session_token(session_token)
148+
if vector_session_token_cmp.global_lsn > vector_session_token.global_lsn:
149+
children_more_updated = False
150+
else:
151+
parent_more_updated = False
152+
feed_ranges_to_remove = [overlapping_ranges[i] for i in merged_indices]
153+
for feed_range_to_remove in feed_ranges_to_remove:
154+
overlapping_ranges.remove(feed_range_to_remove)
155+
if children_more_updated:
156+
overlapping_ranges.append((merged_range, ','.join(map(str, session_tokens))))
157+
overlapping_ranges.remove(overlapping_ranges[0])
158+
elif not parent_more_updated and not children_more_updated:
159+
session_tokens.append(session_token_cmp)
160+
overlapping_ranges.append((merged_range, ','.join(map(str, session_tokens))))
161+
not_found = False
162+
break
163+
164+
j += 1
165+
166+
processed_ranges.append(overlapping_ranges[0])
167+
overlapping_ranges.remove(overlapping_ranges[0])
168+
return processed_ranges
169+
170+
def get_latest_session_token(feed_ranges_to_session_tokens: List[Tuple[Dict[str, Any], str]],
171+
target_feed_range: Dict[str, Any]):
172+
173+
target_feed_range_epk = FeedRangeInternalEpk.from_json(target_feed_range)
174+
target_feed_range_normalized = target_feed_range_epk.get_normalized_range()
175+
# filter out tuples that overlap with target_feed_range and normalizes all the ranges
176+
overlapping_ranges = []
177+
for feed_range_to_session_token in feed_ranges_to_session_tokens:
178+
feed_range_epk = FeedRangeInternalEpk.from_json(feed_range_to_session_token[0])
179+
if Range.overlaps(target_feed_range_normalized,
180+
feed_range_epk.get_normalized_range()):
181+
overlapping_ranges.append((feed_range_epk.get_normalized_range(),
182+
feed_range_to_session_token[1]))
183+
184+
if len(overlapping_ranges) == 0:
185+
raise ValueError('There were no overlapping feed ranges with the target.')
186+
187+
# merge any session tokens that are the same exact feed range
188+
i = 0
189+
j = 1
190+
while i < len(overlapping_ranges) and j < len(overlapping_ranges):
191+
cur_feed_range = overlapping_ranges[i][0]
192+
session_token = overlapping_ranges[i][1]
193+
session_token_1 = overlapping_ranges[j][1]
194+
if (not is_compound_session_token(session_token) and
195+
not is_compound_session_token(session_token_1) and
196+
cur_feed_range == overlapping_ranges[j][0]):
197+
session_token = merge_session_tokens_with_same_range(session_token, session_token_1)
198+
feed_ranges_to_remove = [overlapping_ranges[i], overlapping_ranges[j]]
199+
for feed_range_to_remove in feed_ranges_to_remove:
200+
overlapping_ranges.remove(feed_range_to_remove)
201+
overlapping_ranges.append((cur_feed_range, session_token))
202+
i, j = 0, 1
203+
else:
204+
j += 1
205+
if j == len(overlapping_ranges):
206+
i += 1
207+
j = i + 1
208+
209+
# checking for merging of feed ranges that can be created from other feed ranges
210+
processed_ranges = merge_ranges_with_subsets(overlapping_ranges)
211+
212+
# break up session tokens that are compound
213+
remaining_session_tokens = split_compound_session_tokens(processed_ranges)
214+
215+
if len(remaining_session_tokens) == 1:
216+
return remaining_session_tokens[0]
217+
# merging any session tokens with same physical partition key range id
218+
remaining_session_tokens = merge_session_tokens_for_same_partition(remaining_session_tokens)
219+
220+
updated_session_token = ""
221+
# compound the remaining session tokens
222+
for i, remaining_session_token in enumerate(remaining_session_tokens):
223+
if i == len(remaining_session_tokens) - 1:
224+
updated_session_token += remaining_session_token
225+
else:
226+
updated_session_token += remaining_session_token + ","
227+
228+
return updated_session_token

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
_set_properties_cache
4545
)
4646
from .._routing.routing_range import Range
47+
from .._session_token_helpers import get_latest_session_token
4748
from ..offer import ThroughputProperties
4849
from ..partition_key import (
4950
NonePartitionKeyValue,
@@ -1304,9 +1305,9 @@ async def read_feed_ranges(
13041305
:returns: A list representing the feed ranges in base64 encoded string
13051306
:rtype: Iterable[Dict[str, Any]]
13061307
1307-
.. note::
1308-
For each feed range, even through a Dict has been returned, but in the future, the structure may change.
1309-
Please just treat it as opaque and do not take any dependent on it.
1308+
.. warning::
1309+
The structure of the dict representation of a feed range may vary, including which keys
1310+
are present. It therefore should only be treated as an opaque value.
13101311
13111312
"""
13121313
if force_refresh is True:
@@ -1321,5 +1322,59 @@ async def read_feed_ranges(
13211322

13221323
feed_ranges = [FeedRangeInternalEpk(Range.PartitionKeyRangeToRange(partitionKeyRange)).to_dict()
13231324
for partitionKeyRange in partition_key_ranges]
1324-
13251325
return (feed_range for feed_range in feed_ranges)
1326+
1327+
async def get_latest_session_token(
1328+
self,
1329+
feed_ranges_to_session_tokens: List[Tuple[Dict[str, Any], str]],
1330+
target_feed_range: Dict[str, Any]
1331+
) -> str:
1332+
""" **provisional** This method is still in preview and may be subject to breaking changes.
1333+
1334+
Gets the the most up to date session token from the list of session token and feed
1335+
range tuples for a specific target feed range. The feed range can be obtained from a partition key
1336+
or by reading the container feed ranges. This should only be used if maintaining own session token or else
1337+
the CosmosClient instance will keep track of session token. Session tokens and feed ranges are
1338+
scoped to a container. Only input session tokens and feed ranges obtained from the same container.
1339+
:param feed_ranges_to_session_tokens: List of feed range and session token tuples.
1340+
:type feed_ranges_to_session_tokens: List[Tuple[Dict[str, Any], str]]
1341+
:param target_feed_range: feed range to get most up to date session token.
1342+
:type target_feed_range: Dict[str, Any]
1343+
:returns: a session token
1344+
:rtype: str
1345+
"""
1346+
return get_latest_session_token(feed_ranges_to_session_tokens, target_feed_range)
1347+
1348+
async def feed_range_from_partition_key(self, partition_key: PartitionKeyType) -> Dict[str, Any]:
1349+
""" Gets the feed range for a given partition key.
1350+
:param partition_key: partition key to get feed range.
1351+
:type partition_key: PartitionKeyType
1352+
:returns: a feed range
1353+
:rtype: Dict[str, Any]
1354+
1355+
.. warning::
1356+
The structure of the dict representation of a feed range may vary, including which keys
1357+
are present. It therefore should only be treated as an opaque value.
1358+
1359+
"""
1360+
return FeedRangeInternalEpk(await self._get_epk_range_for_partition_key(partition_key)).to_dict()
1361+
1362+
async def is_feed_range_subset(self, parent_feed_range: Dict[str, Any],
1363+
child_feed_range: Dict[str, Any]) -> bool:
1364+
"""Checks if child feed range is a subset of parent feed range.
1365+
:param parent_feed_range: left feed range
1366+
:type parent_feed_range: Dict[str, Any]
1367+
:param child_feed_range: right feed range
1368+
:type child_feed_range: Dict[str, Any]
1369+
:returns: a boolean indicating if child feed range is a subset of parent feed range
1370+
:rtype: bool
1371+
1372+
.. warning::
1373+
The structure of the dict representation of a feed range may vary, including which keys
1374+
are present. It therefore should only be treated as an opaque value.
1375+
1376+
"""
1377+
parent_feed_range_epk = FeedRangeInternalEpk.from_json(parent_feed_range)
1378+
child_feed_range_epk = FeedRangeInternalEpk.from_json(child_feed_range)
1379+
return child_feed_range_epk.get_normalized_range().is_subset(
1380+
parent_feed_range_epk.get_normalized_range())

0 commit comments

Comments
 (0)