-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[Cosmos] cross partition continuation token #35511
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…tly across components
… enhance query iterator logging
…ing and refactor related components
…into feature/QueryContinuationToken
…y iterator recreation with continuation token
- Implement comprehensive tests for the _enableQueryControlFetchMoreImplementation method, covering various scenarios including: - Processing existing buffer with items and unprocessed ranges. - Fetching from endpoint when buffer is empty. - Handling cases where endpoint returns no data or an empty buffer. - Managing partial buffer processing and endpoint responses with orderByItemsArray. Enhance functional query tests to log continuation token details and verify query execution phases.
…ling, and improve partition key range mapping in query execution context.
…ling, and improve partition key range mapping in query execution context.
…ate ContinuationTokenManager to handle these values
…nt for partition splits and merges, and validate continuation token usage for unsupported query types.
…rameter and the rangesOverlap method for improved clarity and maintainability.
…en handling - Implemented a test class for ParallelQueryExecutionContextBase to validate continuation token filtering logic. - Added tests for detecting query types (parallel and OrderBy) based on query information. - Included tests for EPK value extraction and document producer creation with EPK values. - Developed integration scenarios to validate the complete continuation token filtering workflow for both parallel and OrderBy queries. - Implemented tests for handling partition splits and merges, ensuring correct behavior of continuation token management.
// Pass shared continuation token manager via options | ||
const optionsWithSharedManager = { | ||
...this.options, | ||
continuationTokenManager: sharedContinuationTokenManager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create an extended feed option for passing continuation token manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or pass continuationTokenManager directly to the EndpointComponents.
// 1. There are items in the fetch buffer, OR | ||
// 2. There are unprocessed ranges in the partition key range map, OR | ||
// 3. The endpoint has more results | ||
if (this.options.enableQueryControl) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw error in older flow
sdk/cosmosdb/cosmos/src/queryExecutionContext/pipelinedQueryExecutionContext.ts
Show resolved
Hide resolved
sdk/cosmosdb/cosmos/src/queryExecutionContext/parallelQueryExecutionContextBase.ts
Outdated
Show resolved
Hide resolved
sdk/cosmosdb/cosmos/src/queryExecutionContext/parallelQueryExecutionContextBase.ts
Outdated
Show resolved
Hide resolved
sdk/cosmosdb/cosmos/src/queryExecutionContext/parallelQueryExecutionContextBase.ts
Outdated
Show resolved
Hide resolved
sdk/cosmosdb/cosmos/src/queryExecutionContext/parallelQueryExecutionContextBase.ts
Show resolved
Hide resolved
…rove continuation token management - Updated GroupByEndpointComponent, GroupByValueEndpointComponent, NonStreamingOrderByDistinctEndpointComponent, NonStreamingOrderByEndpointComponent, and OffsetLimitEndpointComponent to eliminate buffer references and streamline response handling. - Introduced continuation token management in OrderByEndpointComponent and OrderedDistinctEndpointComponent to enhance query execution efficiency. - Removed SimplifiedTargetPartitionRangeManager and integrated its functionality into TargetPartitionRangeManager for better code organization. - Adjusted test cases in parallelQueryExecutionContextBase.spec.ts and targetPartitionRangeManager.spec.ts to reflect changes in response structure and ensure accurate validation of filtered ranges. - Enhanced partition range filtering strategies to support direct return types instead of promises, simplifying the interface and improving performance.
@@ -147,6 +147,10 @@ export class Items { | |||
*/ | |||
public query<T>(query: string | SqlQuerySpec, options?: FeedOptions): QueryIterator<T>; | |||
public query<T>(query: string | SqlQuerySpec, options: FeedOptions = {}): QueryIterator<T> { | |||
console.log("=========================================="); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add logger wherever needed.
…ngeStrategy - Improve validation logic to check for null, undefined, or empty string continuation tokens. - Ensure all range mappings in continuation tokens have valid partitionKeyRange. - Update filterPartitionRanges method to handle empty and null target ranges gracefully. - Refactor tests to remove async/await where unnecessary and ensure proper handling of continuation tokens.
* @returns True if EPK boundaries are defined | ||
*/ | ||
export function hasEpkBoundaries(partitionKeyRange: ExtendedPartitionKeyRange): boolean { | ||
return !!(partitionKeyRange.epkMin && partitionKeyRange.epkMax); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are there two !!
?
sdk/cosmosdb/cosmos/src/queryExecutionContext/QueryRangeMapping.ts
Outdated
Show resolved
Hide resolved
sdk/cosmosdb/cosmos/src/queryExecutionContext/QueryRangeMapping.ts
Outdated
Show resolved
Hide resolved
sdk/cosmosdb/cosmos/src/queryExecutionContext/parallelQueryExecutionContextBase.ts
Outdated
Show resolved
Hide resolved
* @param originalDocumentProducer - The document producer for the original partition that was split/merged | ||
* @param replacementPartitionKeyRanges - The new partition ranges after the split/merge | ||
*/ | ||
private _updateContinuationTokenForPartitionSplit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update name to include merge too
sdk/cosmosdb/cosmos/src/queryExecutionContext/parallelQueryExecutionContextBase.ts
Outdated
Show resolved
Hide resolved
sdk/cosmosdb/cosmos/src/queryExecutionContext/parallelQueryExecutionContextBase.ts
Show resolved
Hide resolved
sdk/cosmosdb/cosmos/src/documents/ContinuationToken/OrderByQueryContinuationToken.ts
Outdated
Show resolved
Hide resolved
sdk/cosmosdb/cosmos/src/documents/ContinuationToken/OrderByQueryContinuationToken.ts
Outdated
Show resolved
Hide resolved
sdk/cosmosdb/cosmos/src/queryExecutionContext/QueryRangeMapping.ts
Outdated
Show resolved
Hide resolved
sdk/cosmosdb/cosmos/src/queryExecutionContext/QueryRangeMapping.ts
Outdated
Show resolved
Hide resolved
sdk/cosmosdb/cosmos/src/queryExecutionContext/QueryRangeMapping.ts
Outdated
Show resolved
Hide resolved
sdk/cosmosdb/cosmos/src/queryExecutionContext/ContinuationTokenManager.ts
Outdated
Show resolved
Hide resolved
…erage - Updated test cases to reject tokens with empty orderByItems array. - Changed async tests to synchronous where applicable for better performance. - Introduced new test scenarios for handling continuation tokens, including edge cases and exhausted tokens. - Enhanced validation checks for malformed and empty composite tokens. - Improved assertions to ensure accurate validation of filtering conditions and continuation tokens. - Organized tests into logical groups for better readability and maintainability.
…thod and streamline offset/limit handling in query execution
…tinuationTokenManager to use new token creation and serialization functions
…functions related to partition key ranges
…andling and enhance error reporting; simplify condition in HybridQueryExecutionContext for result validation
options?: FeedOptions, | ||
) { | ||
// Get the continuation token manager from options if available | ||
this.continuationTokenManager = options.continuationTokenManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: options is optional
* Shared continuation token manager for handling query pagination state. | ||
* This is used internally to coordinate continuation tokens across query execution contexts. | ||
*/ | ||
continuationTokenManager?: ContinuationTokenManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is FeedOptions best place to keep this?
@@ -62,6 +62,9 @@ export class QueryIterator<T> { | |||
private resourceLink?: string, | |||
private resourceType?: ResourceType, | |||
) { | |||
console.log("=========================================="); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will be removed?
sdk/cosmosdb/cosmos/src/queryExecutionContext/CompositeQueryContinuationToken.ts
Outdated
Show resolved
Hide resolved
@@ -98,36 +148,47 @@ export class PipelinedQueryExecutionContext implements ExecutionContext { | |||
); | |||
} | |||
} else { | |||
// Create shared continuation token manager for streaming execution contexts | |||
const sharedContinuationTokenManager = new ContinuationTokenManager( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is confusing. Why do we have 2 continuationTokenManager?
} | ||
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: revert
`Failed to parse continuation token: ${error.message}, initializing empty token`, | ||
); | ||
// Fallback to empty continuation token if parsing fails | ||
this.compositeContinuationToken = new CompositeQueryContinuationTokenClass( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't fallback. Better to fail the request.
Implementation should be transparent and deterministic for users.
sdk/cosmosdb/cosmos/src/queryExecutionContext/ContinuationTokenManager.ts
Show resolved
Hide resolved
// If offset+limit then add that to the pipeline | ||
const limit = partitionedQueryExecutionInfo.queryInfo.limit; | ||
const offset = partitionedQueryExecutionInfo.queryInfo.offset; | ||
if (typeof limit === "number" && typeof offset === "number") { | ||
this.endpoint = new OffsetLimitEndpointComponent(this.endpoint, offset, limit); | ||
this.endpoint = new OffsetLimitEndpointComponent(this.endpoint, offset, limit, optionsWithSharedManager); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can a user pass offset and limit with continuation token?
|
||
console.log("hasBufferedItems:", hasBufferedItems); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not planning to keep these console.log?
- Changed CompositeQueryContinuationToken from a class to an interface and introduced factory functions for creation and serialization. - Updated ContinuationTokenManager to utilize the new interface and functions for managing composite continuation tokens. - Introduced PartitionRangeManager to encapsulate partition key range mapping logic, improving separation of concerns. - Refactored OrderByQueryRangeStrategy and ParallelQueryRangeStrategy to use the new composite token handling methods. - Updated unit tests to reflect changes in the CompositeQueryContinuationToken structure and ensure proper functionality.
sdk/cosmosdb/cosmos/src/queryExecutionContext/parallelQueryExecutionContextBase.ts
Outdated
Show resolved
Hide resolved
// Check continuation token for offset/limit values during initialization | ||
if (options?.continuationToken) { | ||
try { | ||
const parsedToken = JSON.parse(options.continuationToken); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass it from QEC
sdk/cosmosdb/cosmos/src/queryExecutionContext/CompositeQueryContinuationToken.ts
Outdated
Show resolved
Hide resolved
// Initialize continuation token manager early so it's available for OffsetLimitEndpointComponent | ||
const sortOrders = partitionedQueryExecutionInfo.queryInfo.orderBy; | ||
const isOrderByQuery = Array.isArray(sortOrders) && sortOrders.length > 0; | ||
this.continuationTokenManager = new ContinuationTokenManager( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a check for validating that continuation token belongs to this container only using rid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
continuation token manager is getting created in this class? why do we need rid check?
sdk/cosmosdb/cosmos/src/documents/ContinuationToken/OrderByQueryContinuationToken.ts
Outdated
Show resolved
Hide resolved
sdk/cosmosdb/cosmos/src/queryExecutionContext/CompositeQueryContinuationToken.ts
Outdated
Show resolved
Hide resolved
console.warn( | ||
`Failed to parse continuation token: ${error.message}, initializing empty token`, | ||
); | ||
// Fallback to empty continuation token if parsing fails |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parsing could also fail if someone tampers with the continuation token string. We should fail the request here with an error.
* @param rangeId - Unique identifier for the partition range | ||
* @param mapping - The QueryRangeMapping to add | ||
*/ | ||
public updatePartitionRangeMapping(rangeId: string, mapping: QueryRangeMapping): void { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be renamed to addPartitionRangeMapping? update is misleading here
* @param rangeMappings - Array of range mappings to filter | ||
* @returns Filtered array without exhausted ranges | ||
*/ | ||
public removeExhaustedRanges(rangeMappings: QueryRangeMapping[]): QueryRangeMapping[] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
function name is misleading, IMO it should not return anything and just remove the ranges. Or we should update the name accordingly.
// ORDER BY queries require orderByItemsArray to be present and non-empty | ||
if (!orderByItemsArray || orderByItemsArray.length === 0) { | ||
throw new Error( | ||
"ORDER BY query processing failed: orderByItemsArray is required but was not provided or is empty" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add what should Cx do if such a case arises.
console.log("=== Processing ORDER BY Query (Sequential Mode) ==="); | ||
|
||
// ORDER BY queries require orderByItemsArray to be present and non-empty | ||
if (!orderByItemsArray || orderByItemsArray.length === 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is orderByItemsArray passed here just for checking? I don't see any other use of it in this function.
* @param initialLimit - Initial limit value | ||
* @returns Updated partition key range map with offset/limit values for each range | ||
*/ | ||
public calculateOffsetLimitForEachPartitionRange( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this getting used outside this class? If not, make it private
…prove clarity and support for enableQueryControl option
- Updated OrderByQueryRangeStrategy and ParallelQueryRangeStrategy to utilize PartitionRangeWithContinuationToken for improved handling of continuation tokens. - Removed legacy continuation token validation and parsing methods, simplifying the filterPartitionRanges method. - Enhanced TargetPartitionRangeManager to accept range-token pairs and additional query information. - Implemented partition split/merge detection in ParallelQueryExecutionContextBase, allowing for dynamic updates to continuation tokens based on partition topology changes. - Adjusted related interfaces and types to accommodate new structure for handling partition ranges and continuation tokens. - Updated unit tests to reflect changes in continuation token handling and ensure correct functionality.
- Introduced a new `ParallelQueryResult` interface to standardize the structure of results returned from parallel queries. - Updated `GroupByValueEndpointComponent`, `NonStreamingOrderByDistinctEndpointComponent`, and `NonStreamingOrderByEndpointComponent` to utilize the new `ParallelQueryResult` structure. - Enhanced error handling for undefined or empty results in query components. - Created `TargetPartitionRangeManager` and associated strategies (`ParallelQueryRangeStrategy`, `OrderByQueryRangeStrategy`) to manage partition range filtering based on query type and continuation tokens. - Added unit tests for the new filtering strategies and refactored existing tests to accommodate changes in import paths. - Ensured backward compatibility by maintaining existing functionality while introducing new structures and strategies.
…prove continuation token handling
- Introduced CompositeQueryContinuationToken to handle parallel query execution across multiple partition ranges. - Updated OrderByQueryContinuationToken to utilize rangeMappings instead of a composite token. - Enhanced ContinuationTokenManager to manage both composite and order by continuation tokens, streamlining offset and limit handling. - Removed deprecated continuation token parsing logic and integrated new serialization/deserialization methods. - Updated various endpoint components to align with the new continuation token structure, ensuring compatibility with offset and limit queries. - Cleaned up FeedOptions by removing the continuationTokenManager property, as it is now managed internally. - Added unit tests to validate the new continuation token management logic.
…g ORDER BY processing
…t structure and improve offset/limit handling
Packages impacted by this PR
@azure/cosmos
Issues associated with this PR
Describe the problem that is addressed by this PR
Adds support for continuation token for cross partition queries
What are the possible designs available to address the problem? If there are more than one possible design, why was the one in this PR chosen?
DOC Link
Are there test cases added in this PR? (If not, why?)
Yes
Provide a list of related PRs (if any)
Command used to generate this PR:**(Applicable only to SDK release request PRs)
Checklists