Skip to content

Commit 671a0a4

Browse files
simorenohannatischFabianMeiswinkelPilchie
authored
[Cosmos] Adds support for non streaming ORDER BY (#35468)
* sync changes and sample for vector search control plane * Update index_management.py * Update index_management.py * async and samples * sync and async tests * Update CHANGELOG.md * developed typehints * skip tests * create_if_not_exists, README * Update README.md * add provisional, add dimension limit * adds sync changes, adds changelog * async changes * some comments addressed * Update CHANGELOG.md * bug fix on ordering * ordering bug fix * fix datetime * samples added * small fixes * fix some additional PQ logic * last bit of pq fixes * Update non_streaming_order_by_aggregator.py * memory optimization * Update sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/document_producer.py Co-authored-by: Anna Tisch <[email protected]> * addressing comments * test name fix, improve readme/ samples * add sync tests, improve readme * async tests * pylint * remove print * pylint * adds env variable * adds JS tests * error logic improvements * readme updates * more fixes to logic * oops * memory optimization * Update sdk/cosmos/azure-cosmos/README.md Co-authored-by: Fabian Meiswinkel <[email protected]> * update variable for naming conventions * remove/ comment out diskANN * offset + limit fix, tests fixes * add capabilities env var flag * use feature flag for existing query tests * disable emulator for query tests * missed some tests * Update test_aggregate.py * Update test-resources.bicep * forgot tests were being skipped * Update sdk/cosmos/azure-cosmos/test/test_vector_policy.py * Update sdk/cosmos/azure-cosmos/test/test_vector_policy_async.py * test fixes * Update README.md * create separate db for vectors * tests * tests * more tests * small bit * final fixes hopefully * raise time limit on test so it doesnt fail * Update test_query_vector_similarity_async.py * add date for release prep * Update CHANGELOG.md --------- Co-authored-by: Anna Tisch <[email protected]> Co-authored-by: Fabian Meiswinkel <[email protected]> Co-authored-by: Kevin Pilch <[email protected]>
1 parent c68e336 commit 671a0a4

33 files changed

+1936
-30
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
11
## Release History
22

3-
### 4.6.1 (Unreleased)
3+
### 4.6.1 (2024-05-15)
44

55
#### Features Added
6-
* Added support for using the start time option for change feed query API. See [PR 35090](https://github.com/Azure/azure-sdk-for-python/pull/35090)
7-
8-
#### Breaking Changes
6+
* Adds vector embedding policy and vector indexing policy. See [PR 34882](https://github.com/Azure/azure-sdk-for-python/pull/34882).
7+
* Adds support for vector search non-streaming order by queries. See [PR 35468](https://github.com/Azure/azure-sdk-for-python/pull/35468).
8+
* Adds support for using the start time option for change feed query API. See [PR 35090](https://github.com/Azure/azure-sdk-for-python/pull/35090).
99

1010
#### Bugs Fixed
11-
* Fixed a bug where change feed query in Async client was not returning all pages due to case-sensitive response headers. See [PR 35090](https://github.com/Azure/azure-sdk-for-python/pull/35090)
11+
* Fixed a bug where change feed query in Async client was not returning all pages due to case-sensitive response headers. See [PR 35090](https://github.com/Azure/azure-sdk-for-python/pull/35090).
1212
* Fixed a bug when a retryable exception occurs in the first page of a query execution causing query to return 0 results. See [PR 35090](https://github.com/Azure/azure-sdk-for-python/pull/35090).
1313

14-
15-
#### Other Changes
16-
1714
### 4.6.0 (2024-03-14)
1815

1916
#### Features Added

sdk/cosmos/azure-cosmos/README.md

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,107 @@ as well as containing the list of failed responses for the failed request.
628628

629629
For more information on Transactional Batch, see [Azure Cosmos DB Transactional Batch][cosmos_transactional_batch].
630630

631+
### Public Preview - Vector Embeddings and Vector Indexes
632+
We have added new capabilities to utilize vector embeddings and vector indexing for users to leverage vector
633+
search utilizing our Cosmos SDK. These two container-level configurations have to be turned on at the account-level
634+
before you can use them.
635+
636+
Each vector embedding should have a path to the relevant vector field in your items being stored, a supported data type
637+
(float32, int8, uint8), the vector's dimensions, and the distance function being used for that embedding. Vectors indexed
638+
with the flat index type can be at most 505 dimensions. Vectors indexed with the quantizedFlat index type can be at most 4,096 dimensions.
639+
A sample vector embedding policy would look like this:
640+
```python
641+
vector_embedding_policy = {
642+
"vectorEmbeddings": [
643+
{
644+
"path": "/vector1",
645+
"dataType": "float32",
646+
"dimensions": 256,
647+
"distanceFunction": "euclidean"
648+
},
649+
{
650+
"path": "/vector2",
651+
"dataType": "int8",
652+
"dimensions": 200,
653+
"distanceFunction": "dotproduct"
654+
},
655+
{
656+
"path": "/vector3",
657+
"dataType": "uint8",
658+
"dimensions": 400,
659+
"distanceFunction": "cosine"
660+
}
661+
]
662+
}
663+
```
664+
665+
Separately, vector indexes have been added to the already existing indexing_policy and only require two fields per index:
666+
the path to the relevant field to be used, and the type of index from the possible options (flat or quantizedFlat).
667+
A sample indexing policy with vector indexes would look like this:
668+
```python
669+
indexing_policy = {
670+
"automatic": True,
671+
"indexingMode": "consistent",
672+
"compositeIndexes": [
673+
[
674+
{"path": "/numberField", "order": "ascending"},
675+
{"path": "/stringField", "order": "descending"}
676+
]
677+
],
678+
"spatialIndexes": [
679+
{"path": "/location/*", "types": [
680+
"Point",
681+
"Polygon"]}
682+
],
683+
"vectorIndexes": [
684+
{"path": "/vector1", "type": "flat"},
685+
{"path": "/vector2", "type": "quantizedFlat"}
686+
]
687+
}
688+
```
689+
You would then pass in the relevant policies to your container creation method to ensure these configurations are used by it.
690+
The operation will fail if you pass new vector indexes to your indexing policy but forget to pass in an embedding policy.
691+
```python
692+
database.create_container(id=container_id, partition_key=PartitionKey(path="/id"),
693+
indexing_policy=indexing_policy, vector_embedding_policy=vector_embedding_policy)
694+
```
695+
***Note: vector embeddings and vector indexes CANNOT be edited by container replace operations. They are only available directly through creation.***
696+
697+
### Public Preview - Vector Search
698+
699+
With the addition of the vector indexing and vector embedding capabilities, the SDK can now perform order by vector search queries.
700+
These queries specify the VectorDistance to use as a metric within the query text. These must always use a TOP or LIMIT clause within the query though,
701+
since vector search queries have to look through a lot of data otherwise and may become too expensive or long-running.
702+
Since these queries are relatively expensive, the SDK sets a default limit of 50000 max items per query - if you'd like to raise that further, you
703+
can use the `AZURE_COSMOS_MAX_ITEM_BUFFER_VECTOR_SEARCH` environment variable to do so. However, be advised that queries with too many vector results
704+
may have additional latencies associated with searching in the service.
705+
The query syntax for these operations looks like this:
706+
```python
707+
VectorDistance(<embedding1>, <embedding2>, [,<exact_search>], [,<specification>])
708+
```
709+
Embeddings 1 and 2 are the arrays of values for the relevant embeddings, `exact_search` is an optional boolean indicating whether
710+
to do an exact search vs. an approximate one (default value of false), and `specification` is an optional Json snippet with embedding
711+
specs that can include `dataType`, `dimensions` and `distanceFunction`. The specifications within the query will take precedence
712+
to any configurations previously set by a vector embedding policy.
713+
A sample vector search query would look something like this:
714+
```python
715+
query = "SELECT TOP 10 c.title,VectorDistance(c.embedding, [{}]) AS " \
716+
"SimilarityScore FROM c ORDER BY VectorDistance(c.embedding, [{}])".format(embeddings_string, embeddings_string)
717+
```
718+
Or if you'd like to add the optional parameters to the vector distance, you could do this:
719+
```python
720+
query = "SELECT TOP 10 c.title,VectorDistance(c.embedding, [{}], true, {{'dataType': 'float32' , 'distanceFunction': 'cosine'}}) AS " \
721+
"SimilarityScore FROM c ORDER BY VectorDistance(c.embedding, [{}], true, {{'dataType': " \
722+
"'float32', 'distanceFunction': 'cosine'}})".format(embeddings_string, embeddings_string)
723+
```
724+
The `embeddings_string` above would be your string made from your vector embeddings.
725+
You can find our sync samples [here][cosmos_index_sample] and our async samples [here][cosmos_index_sample_async] as well to help yourself out.
726+
727+
*Note: For a limited time, if your query operates against a region or emulator that has not yet been updated the client might run into some issues
728+
not being able to recognize the new NonStreamingOrderBy capability that makes vector search possible.
729+
If this happens, you can set the `AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY` environment variable to `"True"` to opt out of this
730+
functionality and continue operating as usual.*
731+
631732
## Troubleshooting
632733

633734
### General
@@ -762,6 +863,8 @@ For more extensive documentation on the Cosmos DB service, see the [Azure Cosmos
762863
[timeouts_document]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/docs/TimeoutAndRetriesConfig.md
763864
[cosmos_transactional_batch]: https://learn.microsoft.com/azure/cosmos-db/nosql/transactional-batch
764865
[cosmos_concurrency_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/concurrency_sample.py
866+
[cosmos_index_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/index_management.py
867+
[cosmos_index_sample_async]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/index_management_async.py
765868

766869
## Contributing
767870

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
"""Document client class for the Azure Cosmos database service.
2525
"""
26+
import os
2627
import urllib.parse
2728
from typing import Callable, Dict, Any, Iterable, List, Mapping, Optional, Sequence, Tuple, Union, cast, Type
2829
from typing_extensions import TypedDict
@@ -3107,7 +3108,16 @@ def _GetQueryPlanThroughGateway(self, query: str, resource_link: str, **kwargs:
31073108
documents._QueryFeature.MultipleOrderBy + "," +
31083109
documents._QueryFeature.OffsetAndLimit + "," +
31093110
documents._QueryFeature.OrderBy + "," +
3110-
documents._QueryFeature.Top)
3111+
documents._QueryFeature.Top + "," +
3112+
documents._QueryFeature.NonStreamingOrderBy)
3113+
if os.environ.get('AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY', False):
3114+
supported_query_features = (documents._QueryFeature.Aggregate + "," +
3115+
documents._QueryFeature.CompositeAggregate + "," +
3116+
documents._QueryFeature.Distinct + "," +
3117+
documents._QueryFeature.MultipleOrderBy + "," +
3118+
documents._QueryFeature.OffsetAndLimit + "," +
3119+
documents._QueryFeature.OrderBy + "," +
3120+
documents._QueryFeature.Top)
31113121

31123122
options = {
31133123
"contentType": runtime_constants.MediaTypes.Json,

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/document_producer.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from azure.cosmos import _base
3030
from azure.cosmos._execution_context.aio.base_execution_context import _DefaultQueryExecutionContext
3131

32+
# pylint: disable=protected-access
3233

3334
class _DocumentProducer(object):
3435
"""This class takes care of handling of the results for one single partition
@@ -271,3 +272,51 @@ def _validate_orderby_items(self, res1, res2):
271272
type2 = _OrderByHelper.getTypeStr(elt2)
272273
if type1 != type2:
273274
raise ValueError("Expected {}, but got {}.".format(type1, type2))
275+
276+
class _NonStreamingItemResultProducer:
277+
"""This class takes care of handling of the items to be sorted in a non-streaming context.
278+
One instance of this document producer goes attached to every item coming in for the priority queue to be able
279+
to properly sort items as they get inserted.
280+
"""
281+
282+
def __init__(self, item_result, sort_order):
283+
"""
284+
Constructor
285+
:param dict[str, Any] item_result: The item result extracted from the document producer
286+
:param list[str] sort_order: List of sort orders (i.e., Ascending, Descending)
287+
"""
288+
self._item_result = item_result
289+
self._doc_producer_comp = _NonStreamingOrderByComparator(sort_order)
290+
291+
292+
293+
class _NonStreamingOrderByComparator(object):
294+
"""Provide a Comparator for item results which respects orderby sort order.
295+
"""
296+
297+
def __init__(self, sort_order):
298+
"""Instantiates this class
299+
:param list sort_order:
300+
List of sort orders (i.e., Ascending, Descending)
301+
"""
302+
self._sort_order = sort_order
303+
304+
async def compare(self, doc_producer1, doc_producer2):
305+
"""Compares the given two instances of DocumentProducers.
306+
Based on the orderby query items and whether the sort order is Ascending
307+
or Descending compares the peek result of the two DocumentProducers.
308+
:param _DocumentProducer doc_producer1: first instance to be compared
309+
:param _DocumentProducer doc_producer2: second instance to be compared
310+
:return:
311+
Integer value of compare result.
312+
positive integer if doc_producers1 > doc_producers2
313+
negative integer if doc_producers1 < doc_producers2
314+
:rtype: int
315+
"""
316+
rank1 = doc_producer1._item_result["orderByItems"][0]
317+
rank2 = doc_producer2._item_result["orderByItems"][0]
318+
res = await _OrderByHelper.compare(rank1, rank2)
319+
if res != 0:
320+
if self._sort_order[0] == "Descending":
321+
return -res
322+
return res

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/endpoint_component.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ async def __anext__(self):
5757
payload = await self._execution_context.__anext__()
5858
return payload["payload"]
5959

60+
class _QueryExecutionNonStreamingEndpointComponent(_QueryExecutionEndpointComponent):
61+
"""Represents an endpoint in handling a non-streaming order by query results.
62+
For each processed orderby result it returns the item result.
63+
"""
64+
async def __anext__(self):
65+
payload = await self._execution_context.__anext__()
66+
return payload._item_result["payload"]
6067

6168
class _QueryExecutionTopEndpointComponent(_QueryExecutionEndpointComponent):
6269
"""Represents an endpoint in handling top query.

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/execution_dispatcher.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@
2323
Cosmos database service.
2424
"""
2525

26-
from azure.cosmos._execution_context.aio import endpoint_component
27-
from azure.cosmos._execution_context.aio import multi_execution_aggregator
26+
import os
27+
from azure.cosmos._execution_context.aio import endpoint_component, multi_execution_aggregator
28+
from azure.cosmos._execution_context.aio import non_streaming_order_by_aggregator
2829
from azure.cosmos._execution_context.aio.base_execution_context import _QueryExecutionContextBase
2930
from azure.cosmos._execution_context.aio.base_execution_context import _DefaultQueryExecutionContext
3031
from azure.cosmos._execution_context.execution_dispatcher import _is_partitioned_execution_info
@@ -107,12 +108,31 @@ async def _create_pipelined_execution_context(self, query_execution_info):
107108
raise CosmosHttpResponseError(StatusCodes.BAD_REQUEST,
108109
"Cross partition query only supports 'VALUE <AggregateFunc>' for aggregates")
109110

110-
execution_context_aggregator = multi_execution_aggregator._MultiExecutionContextAggregator(self._client,
111+
# throw exception here for vector search query without limit filter or limit > max_limit
112+
if query_execution_info.get_non_streaming_order_by():
113+
total_item_buffer = query_execution_info.get_top() or\
114+
query_execution_info.get_limit() + query_execution_info.get_offset()
115+
if total_item_buffer is None:
116+
raise ValueError("Executing a vector search query without TOP or LIMIT can consume many" +
117+
" RUs very fast and have long runtimes. Please ensure you are using one" +
118+
" of the two filters with your vector search query.")
119+
if total_item_buffer > os.environ.get('AZURE_COSMOS_MAX_ITEM_BUFFER_VECTOR_SEARCH', 50000):
120+
raise ValueError("Executing a vector search query with more items than the max is not allowed." +
121+
"Please ensure you are using a limit smaller than the max, or change the max.")
122+
execution_context_aggregator =\
123+
non_streaming_order_by_aggregator._NonStreamingOrderByContextAggregator(self._client,
124+
self._resource_link,
125+
self._query,
126+
self._options,
127+
query_execution_info)
128+
await execution_context_aggregator._configure_partition_ranges()
129+
else:
130+
execution_context_aggregator = multi_execution_aggregator._MultiExecutionContextAggregator(self._client,
111131
self._resource_link,
112132
self._query,
113133
self._options,
114134
query_execution_info)
115-
await execution_context_aggregator._configure_partition_ranges()
135+
await execution_context_aggregator._configure_partition_ranges()
116136
return _PipelineExecutionContext(self._client, self._options, execution_context_aggregator,
117137
query_execution_info)
118138

@@ -134,7 +154,9 @@ def __init__(self, client, options, execution_context, query_execution_info):
134154
self._endpoint = endpoint_component._QueryExecutionEndpointComponent(execution_context)
135155

136156
order_by = query_execution_info.get_order_by()
137-
if order_by:
157+
if query_execution_info.get_non_streaming_order_by():
158+
self._endpoint = endpoint_component._QueryExecutionNonStreamingEndpointComponent(self._endpoint)
159+
elif order_by:
138160
self._endpoint = endpoint_component._QueryExecutionOrderByEndpointComponent(self._endpoint)
139161

140162
aggregates = query_execution_info.get_aggregates()

0 commit comments

Comments
 (0)