Skip to content

Commit 2a839e5

Browse files
tvaron3Copilot
andauthored
Session Token False Progress (#42393)
* add session token false progress * update changelog * Apply suggestions from code review Co-authored-by: Copilot <[email protected]> * add more tests * react to comments * fix analyze --------- Co-authored-by: Copilot <[email protected]>
1 parent 2dece3e commit 2a839e5

File tree

4 files changed

+97
-4
lines changed

4 files changed

+97
-4
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010

1111
#### Other Changes
12+
* Added session token false progress merge logic. See [42393](https://github.com/Azure/azure-sdk-for-python/pull/42393)
1213

1314
### 4.14.0b2 (2025-08-12)
1415

sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ class _Constants:
5151
HS_MAX_ITEMS_CONFIG_DEFAULT: int = 1000
5252
MAX_ITEM_BUFFER_VS_CONFIG: str = "AZURE_COSMOS_MAX_ITEM_BUFFER_VECTOR_SEARCH"
5353
MAX_ITEM_BUFFER_VS_CONFIG_DEFAULT: int = 50000
54+
SESSION_TOKEN_FALSE_PROGRESS_MERGE_CONFIG: str = "AZURE_COSMOS_SESSION_TOKEN_FALSE_PROGRESS_MERGE"
55+
SESSION_TOKEN_FALSE_PROGRESS_MERGE_CONFIG_DEFAULT: str = "True"
5456
CIRCUIT_BREAKER_ENABLED_CONFIG: str = "AZURE_COSMOS_ENABLE_CIRCUIT_BREAKER"
5557
CIRCUIT_BREAKER_ENABLED_CONFIG_DEFAULT: str = "False"
5658
AAD_SCOPE_OVERRIDE: str = "AZURE_COSMOS_AAD_SCOPE_OVERRIDE"

sdk/cosmos/azure-cosmos/azure/cosmos/_vector_session_token.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121

2222
"""Session Consistency Tracking in the Azure Cosmos database service.
2323
"""
24+
import os
2425

2526
from . import exceptions
27+
from ._constants import _Constants
2628
from .http_constants import StatusCodes as _StatusCodes
2729

2830

@@ -113,9 +115,12 @@ def equals(self, other):
113115
and self.are_region_progress_equal(other.local_lsn_by_region)
114116
)
115117

116-
def merge(self, other):
118+
def merge(self, other: "VectorSessionToken"):
117119
if other is None:
118120
raise ValueError("Invalid Session Token (should not be None)")
121+
false_progress_merge_enabled = (os.environ.get(_Constants.SESSION_TOKEN_FALSE_PROGRESS_MERGE_CONFIG,
122+
_Constants.SESSION_TOKEN_FALSE_PROGRESS_MERGE_CONFIG_DEFAULT)
123+
.lower() == "true")
119124

120125
if self.version == other.version and len(self.local_lsn_by_region) != len(other.local_lsn_by_region):
121126
raise exceptions.CosmosHttpResponseError(
@@ -152,9 +157,12 @@ def merge(self, other):
152157
)
153158
else:
154159
highest_local_lsn_by_region[region_id] = local_lsn1
160+
global_lsn = max(self.global_lsn, other.global_lsn)
161+
if false_progress_merge_enabled and self.version != other.version:
162+
global_lsn = session_token_with_higher_version.global_lsn
155163

156164
return VectorSessionToken(
157-
max(self.version, other.version), max(self.global_lsn, other.global_lsn), highest_local_lsn_by_region
165+
max(self.version, other.version), global_lsn, highest_local_lsn_by_region
158166
)
159167

160168
def convert_to_string(self):

sdk/cosmos/azure-cosmos/tests/test_session_token_unit.py

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) Microsoft Corporation. All rights reserved.
33

44
import unittest
5+
import os
56

67
import pytest
78

@@ -38,7 +39,7 @@ def test_validate_session_token_parsing_from_empty_string(self):
3839
session_token = ""
3940
self.assertIsNone(VectorSessionToken.create(session_token))
4041

41-
def test_validate_session_token_comparison(self):
42+
def _different_merge_scenarios(self):
4243
# valid session token
4344
session_token1 = VectorSessionToken.create("1#100#1=20#2=5#3=30")
4445
session_token2 = VectorSessionToken.create("2#105#4=10#2=5#3=30")
@@ -73,11 +74,29 @@ def test_validate_session_token_comparison(self):
7374
self.assertIsNotNone(session_token_merged)
7475
self.assertTrue(session_token_merged.equals(session_token1.merge(session_token2)))
7576

77+
# same vector clock version with global lsn increase (no failover)
78+
session_token1 = VectorSessionToken.create("1#100#1=20#2=5")
79+
session_token2 = VectorSessionToken.create("1#197#1=20#2=5")
80+
self.assertIsNotNone(session_token1)
81+
self.assertIsNotNone(session_token2)
82+
83+
self.assertTrue(session_token1.merge(session_token2).equals(
84+
VectorSessionToken.create("1#197#1=20#2=5")))
85+
86+
# same vector clock version with global lsn increase and local lsn increase
87+
session_token1 = VectorSessionToken.create("1#100#1=20#2=5")
88+
session_token2 = VectorSessionToken.create("1#197#1=23#2=15")
89+
self.assertIsNotNone(session_token1)
90+
self.assertIsNotNone(session_token2)
91+
92+
self.assertTrue(session_token1.merge(session_token2).equals(
93+
VectorSessionToken.create("1#197#1=23#2=15")))
94+
95+
# different number of regions with same region should throw error
7696
session_token1 = VectorSessionToken.create("1#101#1=20#2=5#3=30")
7797
session_token2 = VectorSessionToken.create("1#100#1=20#2=5#3=30#4=40")
7898
self.assertIsNotNone(session_token1)
7999
self.assertIsNotNone(session_token2)
80-
81100
try:
82101
session_token1.merge(session_token2)
83102
self.fail("Region progress can not be different when version is same")
@@ -86,6 +105,69 @@ def test_validate_session_token_comparison(self):
86105
"Status code: 500\nCompared session tokens '1#101#1=20#2=5#3=30' "
87106
"and '1#100#1=20#2=5#3=30#4=40' have unexpected regions.")
88107

108+
# same version with different region progress should throw error
109+
session_token1 = VectorSessionToken.create("1#101#1=20#2=5#3=30")
110+
session_token2 = VectorSessionToken.create("1#100#4=20#2=5#3=30")
111+
self.assertIsNotNone(session_token1)
112+
self.assertIsNotNone(session_token2)
113+
114+
try:
115+
session_token1.merge(session_token2)
116+
self.fail("Region progress can not be different when version is same")
117+
except CosmosHttpResponseError as e:
118+
self.assertEqual(str(e),
119+
"Status code: 500\nCompared session tokens '1#101#1=20#2=5#3=30' "
120+
"and '1#100#4=20#2=5#3=30' have unexpected regions.")
121+
122+
def test_validate_session_token_comparison(self):
123+
self._different_merge_scenarios()
124+
os.environ["AZURE_COSMOS_SESSION_TOKEN_FALSE_PROGRESS_MERGE"] = "false"
125+
self._different_merge_scenarios()
126+
del os.environ["AZURE_COSMOS_SESSION_TOKEN_FALSE_PROGRESS_MERGE"]
127+
128+
def test_session_token_false_progress_merge(self):
129+
for false_progress_enabled in [True, False]:
130+
self.validate_different_session_token_false_progress_merge_scenarios(false_progress_enabled)
131+
132+
def validate_different_session_token_false_progress_merge_scenarios(self, false_progress_enabled: bool):
133+
# Test that false progress merge is enabled by default and that global lsn is used from higher version token
134+
# when enabled
135+
os.environ["AZURE_COSMOS_SESSION_TOKEN_FALSE_PROGRESS_MERGE"] = str(false_progress_enabled)
136+
session_token1 = VectorSessionToken.create("1#200#1=20#2=5#3=30")
137+
session_token2 = VectorSessionToken.create("2#100#1=10#2=8#3=30")
138+
self.assertIsNotNone(session_token1)
139+
self.assertIsNotNone(session_token2)
140+
if false_progress_enabled:
141+
expected_session_token = "2#100#1=20#2=8#3=30"
142+
else:
143+
expected_session_token = "2#200#1=20#2=8#3=30"
144+
self.assertTrue(session_token1.merge(session_token2).equals(
145+
VectorSessionToken.create(expected_session_token)))
146+
147+
# vector clock version increase with removed region progress should merge
148+
session_token1 = VectorSessionToken.create("1#200#1=20#2=5#3=30")
149+
session_token2 = VectorSessionToken.create("2#100#1=10#2=5")
150+
self.assertIsNotNone(session_token1)
151+
self.assertIsNotNone(session_token2)
152+
153+
if false_progress_enabled:
154+
expected_session_token = "2#100#1=20#2=5"
155+
else:
156+
expected_session_token = "2#200#1=20#2=5"
157+
self.assertTrue(session_token1.merge(session_token2).equals(
158+
VectorSessionToken.create(expected_session_token)))
159+
160+
# vector clock version increase with new region progress should merge
161+
session_token1 = VectorSessionToken.create("1#200#1=20#2=5")
162+
session_token2 = VectorSessionToken.create("2#100#1=10#2=5#3=30")
163+
self.assertIsNotNone(session_token1)
164+
self.assertIsNotNone(session_token2)
165+
if false_progress_enabled:
166+
expected_session_token = "2#100#1=20#2=5#3=30"
167+
else:
168+
expected_session_token = "2#200#1=20#2=5#3=30"
169+
self.assertTrue(session_token1.merge(session_token2).equals(
170+
VectorSessionToken.create(expected_session_token)))
89171

90172
if __name__ == '__main__':
91173
unittest.main()

0 commit comments

Comments
 (0)