Skip to content

Commit 7a4ee24

Browse files
Users/bambriz/feedrangehotfix0 (#41270)
* Fixing gaps for Containers with HashV1 * Hotfix for Hash V1 PartitionKeys This fixes an issue where querychangefeed doesn't return items that have PartitionKey Values that use Hash V1. This also fixes issue where one couldn't query changefeed for specific HPK PK values that weren't prefix pks. * Update sdk/cosmos/azure-cosmos/CHANGELOG.md * Update CHANGELOG.md * pylint and mypy fixes * remove some comments * additional tests * update tests --------- Co-authored-by: Fabian Meiswinkel <[email protected]> Co-authored-by: Fabian Meiswinkel <[email protected]>
1 parent cc62a57 commit 7a4ee24

10 files changed

+768
-28
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@
44

55
#### Features Added
66

7-
#### Breaking Changes
8-
97
#### Bugs Fixed
8+
* Fixed issue where Query Change Feed did not return items if the container uses legacy Hash V1 Partition Keys. This also fixes issues with not being able to change feed query for Specific Partition Key Values for HPK. See [PR 41270](https://github.com/Azure/azure-sdk-for-python/pull/41270/)
109

1110
#### Other Changes
1211

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3118,12 +3118,15 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]:
31183118

31193119
# check if query has prefix partition key
31203120
isPrefixPartitionQuery = kwargs.pop("isPrefixPartitionQuery", None)
3121-
if isPrefixPartitionQuery:
3121+
if isPrefixPartitionQuery and "partitionKeyDefinition" in kwargs:
31223122
last_response_headers = CaseInsensitiveDict()
31233123
# here get the over lapping ranges
3124-
partition_key_definition = kwargs.pop("partitionKeyDefinition", None)
3125-
pk_properties = partition_key_definition
3126-
partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"])
3124+
# Default to empty Dictionary, but unlikely to be empty as we first check if we have it in kwargs
3125+
pk_properties: Union[PartitionKey, Dict] = kwargs.pop("partitionKeyDefinition", {})
3126+
partition_key_definition = PartitionKey(
3127+
path=pk_properties["paths"],
3128+
kind=pk_properties["kind"],
3129+
version=pk_properties["version"])
31273130
partition_key_value = pk_properties["partition_key"]
31283131
feedrangeEPK = partition_key_definition._get_epk_range_for_prefix_partition_key(
31293132
partition_key_value

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_integers.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,97 @@
2222
from typing import NoReturn, Tuple, Union
2323

2424

25+
class _UInt32:
26+
def __init__(self, value: int) -> None:
27+
self._value: int = value & 0xFFFFFFFF
28+
29+
@property
30+
def value(self) -> int:
31+
return self._value
32+
33+
@value.setter
34+
def value(self, new_value: int) -> None:
35+
self._value = new_value & 0xFFFFFFFF
36+
37+
def __add__(self, other: Union[int, '_UInt32']) -> '_UInt32':
38+
result = self.value + (other.value if isinstance(other, _UInt32) else other)
39+
return _UInt32(result & 0xFFFFFFFF)
40+
41+
def __sub__(self, other: Union[int, '_UInt32']) -> '_UInt32':
42+
result = self.value - (other.value if isinstance(other, _UInt32) else other)
43+
return _UInt32(result & 0xFFFFFFFF)
44+
45+
def __mul__(self, other: Union[int, '_UInt32']) -> '_UInt32':
46+
result = self.value * (other.value if isinstance(other, _UInt32) else other)
47+
return _UInt32(result & 0xFFFFFFFF)
48+
49+
def __xor__(self, other: Union[int, '_UInt32']) -> '_UInt32':
50+
result = self.value ^ (other.value if isinstance(other, _UInt32) else other)
51+
return _UInt32(result & 0xFFFFFFFF)
52+
53+
def __lshift__(self, other: Union[int, '_UInt32']) -> '_UInt32':
54+
result = self.value << (other.value if isinstance(other, _UInt32) else other)
55+
return _UInt32(result & 0xFFFFFFFF)
56+
57+
def __ilshift__(self, other: Union[int, '_UInt32']) -> '_UInt32':
58+
self._value = (self.value << (other.value if isinstance(other, _UInt32) else other)) & 0xFFFFFFFF
59+
return self
60+
61+
def __rshift__(self, other: Union[int, '_UInt32']) -> '_UInt32':
62+
result = self.value >> (other.value if isinstance(other, _UInt32) else other)
63+
return _UInt32(result & 0xFFFFFFFF)
64+
65+
def __irshift__(self, other: Union[int, '_UInt32']) -> '_UInt32':
66+
self._value = (self.value >> (other.value if isinstance(other, _UInt32) else other)) & 0xFFFFFFFF
67+
return self
68+
69+
def __and__(self, other: Union[int, '_UInt32']) -> '_UInt32':
70+
result = self.value & (other.value if isinstance(other, _UInt32) else other)
71+
return _UInt32(result & 0xFFFFFFFF)
72+
73+
def __or__(self, other: Union[int, '_UInt32']) -> '_UInt32':
74+
if isinstance(other, _UInt32):
75+
return _UInt32(self.value | other.value)
76+
if isinstance(other, int):
77+
return _UInt32(self.value | other)
78+
raise TypeError("Unsupported type for OR operation")
79+
80+
def __invert__(self) -> '_UInt32':
81+
return _UInt32(~self.value & 0xFFFFFFFF)
82+
83+
def __eq__(self, other: Union[int, '_UInt32', object]) -> bool:
84+
return self.value == (other.value if isinstance(other, _UInt32) else other)
85+
86+
def __ne__(self, other: Union[int, '_UInt32', object]) -> bool:
87+
return not self.__eq__(other)
88+
89+
def __lt__(self, other: Union[int, '_UInt32']) -> bool:
90+
return self.value < (other.value if isinstance(other, _UInt32) else other)
91+
92+
def __gt__(self, other: Union[int, '_UInt32']) -> bool:
93+
return self.value > (other.value if isinstance(other, _UInt32) else other)
94+
95+
def __le__(self, other: Union[int, '_UInt32']) -> bool:
96+
return self.value <= (other.value if isinstance(other, _UInt32) else other)
97+
98+
def __ge__(self, other: Union[int, '_UInt32']) -> bool:
99+
return self.value >= (other.value if isinstance(other, _UInt32) else other)
100+
101+
@staticmethod
102+
def encode_double_as_uint32(value: float) -> int:
103+
value_in_uint32 = struct.unpack('<I', struct.pack('<f', value))[0]
104+
mask = 0x80000000
105+
return (value_in_uint32 ^ mask) if value_in_uint32 < mask else (~value_in_uint32) + 1
106+
107+
@staticmethod
108+
def decode_double_from_uint32(value: int) -> int:
109+
mask = 0x80000000
110+
value = ~(value - 1) if value < mask else value ^ mask
111+
return struct.unpack('<f', struct.pack('<I', value))[0]
112+
113+
def __int__(self) -> int:
114+
return self.value
115+
25116
class _UInt64:
26117
def __init__(self, value: int) -> None:
27118
self._value: int = value & 0xFFFFFFFFFFFFFFFF
@@ -72,6 +163,32 @@ def __or__(self, other: Union[int, '_UInt64']) -> '_UInt64':
72163
def __invert__(self) -> '_UInt64':
73164
return _UInt64(~self.value & 0xFFFFFFFFFFFFFFFF)
74165

166+
def __eq__(self, other: Union[int, '_UInt64', object]) -> bool:
167+
return self.value == (other.value if isinstance(other, _UInt64) else other)
168+
169+
def __ne__(self, other: Union[int, '_UInt64', object]) -> bool:
170+
return not self.__eq__(other)
171+
172+
def __irshift__(self, other: Union[int, '_UInt64']) -> '_UInt64':
173+
self._value = (self.value >> (other.value if isinstance(other, _UInt64) else other)) & 0xFFFFFFFFFFFFFFFF
174+
return self
175+
176+
def __ilshift__(self, other: Union[int, '_UInt64']) -> '_UInt64':
177+
self._value = (self.value << (other.value if isinstance(other, _UInt64) else other)) & 0xFFFFFFFFFFFFFFFF
178+
return self
179+
180+
def __lt__(self, other: Union[int, '_UInt64']) -> bool:
181+
return self.value < (other.value if isinstance(other, _UInt64) else other)
182+
183+
def __gt__(self, other: Union[int, '_UInt64']) -> bool:
184+
return self.value > (other.value if isinstance(other, _UInt64) else other)
185+
186+
def __le__(self, other: Union[int, '_UInt64']) -> bool:
187+
return self.value <= (other.value if isinstance(other, _UInt64) else other)
188+
189+
def __ge__(self, other: Union[int, '_UInt64']) -> bool:
190+
return self.value >= (other.value if isinstance(other, _UInt64) else other)
191+
75192
@staticmethod
76193
def encode_double_as_uint64(value: float) -> int:
77194
value_in_uint64 = struct.unpack('<Q', struct.pack('<d', value))[0]

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_murmurhash3.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
# This is public domain code with no copyrights. From home page of
3131
# <a href="https://github.com/aappleby/smhasher">SMHasher</a>:
3232
# "All MurmurHash versions are public domain software, and the author disclaims all copyright to their code."
33-
from ._cosmos_integers import _UInt128, _UInt64
33+
from ._cosmos_integers import _UInt128, _UInt64, _UInt32
3434

3535

3636
def rotate_left_64(val: int, shift: int) -> int:
@@ -147,3 +147,47 @@ def murmurhash3_128(span: bytearray, seed: _UInt128) -> _UInt128: # pylint: dis
147147
h2 += h1
148148

149149
return _UInt128(int(h1.value), int(h2.value))
150+
151+
152+
def murmurhash3_32(data: bytearray, seed: int) -> _UInt32:
153+
c1: _UInt32 = _UInt32(0xcc9e2d51)
154+
c2: _UInt32 = _UInt32(0x1b873593)
155+
length: _UInt32 = _UInt32(len(data))
156+
h1: _UInt32 = _UInt32(seed)
157+
rounded_end: _UInt32 = _UInt32(length.value & 0xfffffffc) # round down to 4 byte block
158+
159+
for i in range(0, rounded_end.value, 4):
160+
# little endian load order
161+
k1: _UInt32 = _UInt32(
162+
(data[i] & 0xff) | ((data[i + 1] & 0xff) << 8) | ((data[i + 2] & 0xff) << 16) | (data[i + 3] << 24)
163+
)
164+
k1 *= c1
165+
k1.value = (k1.value << 15) | (k1.value >> 17) # ROTL32(k1,15)
166+
k1 *= c2
167+
168+
h1 ^= k1
169+
h1.value = (h1.value << 13) | (h1.value >> 19) # ROTL32(h1,13)
170+
h1 = h1 * _UInt32(5) + _UInt32(0xe6546b64)
171+
172+
# tail
173+
k1 = _UInt32(0)
174+
if length.value & 0x03 == 3:
175+
k1 ^= _UInt32((data[rounded_end.value + 2] & 0xff) << 16)
176+
if length.value & 0x03 >= 2:
177+
k1 ^= _UInt32((data[rounded_end.value + 1] & 0xff) << 8)
178+
if length.value & 0x03 >= 1:
179+
k1 ^= _UInt32(data[rounded_end.value] & 0xff)
180+
k1 *= c1
181+
k1.value = (k1.value << 15) | (k1.value >> 17)
182+
k1 *= c2
183+
h1 ^= k1
184+
185+
# finalization
186+
h1 ^= length
187+
h1.value ^= h1.value >> 16
188+
h1 *= _UInt32(0x85ebca6b)
189+
h1.value ^= h1.value >> 13
190+
h1 *= _UInt32(0xc2b2ae35)
191+
h1.value ^= h1.value >> 16
192+
193+
return h1

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,10 @@ async def _get_epk_range_for_partition_key(self, partition_key_value: PartitionK
144144

145145
container_properties = await self._get_properties()
146146
partition_key_definition = container_properties["partitionKey"]
147-
partition_key = PartitionKey(path=partition_key_definition["paths"], kind=partition_key_definition["kind"])
147+
partition_key = PartitionKey(
148+
path=partition_key_definition["paths"],
149+
kind=partition_key_definition["kind"],
150+
version=partition_key_definition["version"])
148151

149152
return partition_key._get_epk_range_for_partition_key(partition_key_value)
150153

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2914,7 +2914,10 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]:
29142914
if cont_prop:
29152915
cont_prop = await cont_prop()
29162916
pk_properties = cont_prop["partitionKey"]
2917-
partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"])
2917+
partition_key_definition = PartitionKey(
2918+
path=pk_properties["paths"],
2919+
kind=pk_properties["kind"],
2920+
version=pk_properties["version"])
29182921
if partition_key_definition.kind == "MultiHash" and \
29192922
(isinstance(partition_key, List) and \
29202923
len(partition_key_definition['paths']) != len(partition_key)):

sdk/cosmos/azure-cosmos/azure/cosmos/container.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,10 @@ def _set_partition_key(
138138
def _get_epk_range_for_partition_key( self, partition_key_value: PartitionKeyType) -> Range:
139139
container_properties = self._get_properties()
140140
partition_key_definition = container_properties["partitionKey"]
141-
partition_key = PartitionKey(path=partition_key_definition["paths"], kind=partition_key_definition["kind"])
141+
partition_key = PartitionKey(
142+
path=partition_key_definition["paths"],
143+
kind=partition_key_definition["kind"],
144+
version=partition_key_definition["version"])
142145

143146
return partition_key._get_epk_range_for_partition_key(partition_key_value)
144147

@@ -715,7 +718,10 @@ def __is_prefix_partitionkey(
715718
self, partition_key: PartitionKeyType) -> bool:
716719
properties = self._get_properties()
717720
pk_properties = properties["partitionKey"]
718-
partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"])
721+
partition_key_definition = PartitionKey(
722+
path=pk_properties["paths"],
723+
kind=pk_properties["kind"],
724+
version=pk_properties["version"])
719725
return partition_key_definition._is_prefix_partition_key(partition_key)
720726

721727

0 commit comments

Comments
 (0)