Skip to content

Commit 51d7883

Browse files
Add priority level support to Change Feed options and tests (#36640)
### Packages impacted by this PR @azure/cosmos ### Issues associated with this PR #36700 ### Describe the problem that is addressed by this PR ### What are the possible designs available to address the problem? If there are more than one possible design, why was the one in this PR chosen? ### Are there test cases added in this PR? _(If not, why?)_ ### Provide a list of related PRs _(if any)_ ### Command used to generate this PR:**_(Applicable only to SDK release request PRs)_ ### Checklists - [ ] Added impacted package name to the issue description - [ ] Does this PR needs any fixes in the SDK Generator?** _(If so, create an Issue in the [Autorest/typescript](https://github.com/Azure/autorest.typescript) repository and link it here)_ - [ ] Added a changelog (if necessary) --------- Co-authored-by: Manik Khandelwal <[email protected]>
1 parent a0f425f commit 51d7883

File tree

8 files changed

+1298
-0
lines changed

8 files changed

+1298
-0
lines changed

sdk/cosmosdb/cosmos/review/cosmos-node.api.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ export interface ChangeFeedIteratorOptions {
8484
changeFeedStartFrom?: ChangeFeedStartFrom;
8585
excludedLocations?: string[];
8686
maxItemCount?: number;
87+
priorityLevel?: PriorityLevel;
8788
sessionToken?: string;
8889
throughputBucket?: number;
8990
}
Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
/**
5+
* @summary Demonstrates using Priority Level support with various Cosmos DB operations
6+
* Shows how Low/High priority levels affect request throttling behavior
7+
*/
8+
9+
import "dotenv/config";
10+
import { finish, handleError, logSampleHeader, logStep } from "./Shared/handleError.js";
11+
import type {
12+
Container,
13+
ChangeFeedIteratorOptions,
14+
ChangeFeedPullModelIterator,
15+
} from "@azure/cosmos";
16+
import {
17+
CosmosClient,
18+
PartitionKeyDefinitionVersion,
19+
StatusCodes,
20+
ChangeFeedStartFrom,
21+
PriorityLevel,
22+
} from "@azure/cosmos";
23+
24+
const key = process.env.COSMOS_KEY || "<cosmos key>";
25+
const endpoint = process.env.COSMOS_ENDPOINT || "<cosmos endpoint>";
26+
const databaseId = process.env.COSMOS_DATABASE || "<cosmos database>";
27+
const containerId = process.env.COSMOS_CONTAINER || "<cosmos container>";
28+
29+
logSampleHeader("Cosmos DB Priority Level Support - Comprehensive Sample");
30+
31+
// Establish a new instance of the CosmosClient to be used throughout this demo
32+
const client = new CosmosClient({ endpoint, key });
33+
34+
async function run(): Promise<void> {
35+
const { database } = await client.databases.createIfNotExists({ id: databaseId });
36+
const containerDef = {
37+
id: containerId,
38+
partitionKey: {
39+
paths: ["/category"],
40+
version: PartitionKeyDefinitionVersion.V2,
41+
},
42+
throughput: 11000,
43+
};
44+
45+
try {
46+
const { container } = await database.containers.createIfNotExists(containerDef);
47+
console.log(`Created container with id: ${containerId}`);
48+
49+
console.log("\n========== CRUD Operations with Priority Level ==========\n");
50+
51+
await demonstrateCreateWithPriority(container);
52+
await demonstrateReadWithPriority(container);
53+
await demonstrateUpdateWithPriority(container);
54+
await demonstrateDeleteWithPriority(container);
55+
56+
console.log("\n========== Query with Priority Level ==========\n");
57+
58+
await demonstrateQueryWithPriority(container);
59+
60+
console.log("\n========== Change Feed with Priority Level ==========\n");
61+
62+
await demonstrateChangeFeedWithPriority(container);
63+
} catch (err) {
64+
console.error(err);
65+
} finally {
66+
await finish();
67+
}
68+
}
69+
70+
// ========== CRUD Operations ==========
71+
72+
async function demonstrateCreateWithPriority(container: Container): Promise<void> {
73+
logStep("Creating items with different priority levels");
74+
75+
console.log("Creating item with High Priority...");
76+
const { resource: highPriorityItem } = await container.items.create(
77+
{
78+
id: "high-priority-item",
79+
category: "important",
80+
description: "Critical operation",
81+
},
82+
{ priorityLevel: PriorityLevel.High },
83+
);
84+
console.log(`✓ Created item: ${highPriorityItem?.id} with High Priority`);
85+
86+
console.log("Creating item with Low Priority...");
87+
const { resource: lowPriorityItem } = await container.items.create(
88+
{
89+
id: "low-priority-item",
90+
category: "background",
91+
description: "Background operation",
92+
},
93+
{ priorityLevel: PriorityLevel.Low },
94+
);
95+
console.log(`✓ Created item: ${lowPriorityItem?.id} with Low Priority`);
96+
97+
console.log("Creating multiple items with mixed priorities...");
98+
for (let i = 1; i <= 3; i++) {
99+
const priority = i % 2 === 0 ? PriorityLevel.Low : PriorityLevel.High;
100+
await container.items.create(
101+
{
102+
id: `item-${i}`,
103+
category: `category-${i}`,
104+
value: i * 100,
105+
},
106+
{ priorityLevel: priority },
107+
);
108+
const priorityStr = priority === PriorityLevel.Low ? "Low" : "High";
109+
console.log(`✓ Created item-${i} with ${priorityStr} Priority`);
110+
}
111+
}
112+
113+
async function demonstrateReadWithPriority(container: Container): Promise<void> {
114+
logStep("Reading items with different priority levels");
115+
116+
console.log("Reading item with High Priority...");
117+
const { statusCode } = await container
118+
.item("high-priority-item", "important")
119+
.read({ priorityLevel: PriorityLevel.High });
120+
console.log(`✓ Read item with status code ${statusCode} using High Priority`);
121+
122+
console.log("Reading item with Low Priority...");
123+
await container
124+
.item("low-priority-item", "background")
125+
.read({ priorityLevel: PriorityLevel.Low });
126+
console.log("✓ Read item using Low Priority");
127+
128+
console.log("Reading all items with Low Priority...");
129+
const { resources: allItems } = await container.items
130+
.readAll({ priorityLevel: PriorityLevel.Low })
131+
.fetchAll();
132+
console.log(`✓ Retrieved ${allItems.length} items using Low Priority`);
133+
}
134+
135+
async function demonstrateQueryWithPriority(container: Container): Promise<void> {
136+
logStep("Querying items with different priority levels");
137+
138+
console.log("Querying items with High Priority...");
139+
const { resources: highPriorityResults } = await container.items
140+
.query(
141+
{
142+
query: "SELECT * FROM c WHERE c.category = @category",
143+
parameters: [{ name: "@category", value: "important" }],
144+
},
145+
{ priorityLevel: PriorityLevel.High },
146+
)
147+
.fetchAll();
148+
console.log(`✓ Found ${highPriorityResults.length} items using High Priority query`);
149+
150+
console.log("Querying items with Low Priority...");
151+
const { resources: lowPriorityResults } = await container.items
152+
.query(
153+
{
154+
query: "SELECT * FROM c WHERE c.category != @category",
155+
parameters: [{ name: "@category", value: "system" }],
156+
},
157+
{ priorityLevel: PriorityLevel.Low },
158+
)
159+
.fetchAll();
160+
console.log(`✓ Found ${lowPriorityResults.length} items using Low Priority query`);
161+
}
162+
163+
async function demonstrateUpdateWithPriority(container: Container): Promise<void> {
164+
logStep("Updating items with different priority levels");
165+
166+
// Read the item first
167+
const { resource: itemToUpdate } = await container
168+
.item("item-1", "category-1")
169+
.read({ priorityLevel: PriorityLevel.High });
170+
171+
if (itemToUpdate) {
172+
// Update with High Priority
173+
console.log("Updating item with High Priority...");
174+
itemToUpdate.value = 150;
175+
await container
176+
.item(itemToUpdate.id, "category-1")
177+
.replace(itemToUpdate, { priorityLevel: PriorityLevel.High });
178+
console.log("✓ Updated item with High Priority");
179+
180+
// Update with Low Priority
181+
console.log("Updating item with Low Priority...");
182+
itemToUpdate.value = 200;
183+
await container
184+
.item(itemToUpdate.id, "category-1")
185+
.replace(itemToUpdate, { priorityLevel: PriorityLevel.Low });
186+
console.log("✓ Updated item with Low Priority");
187+
}
188+
}
189+
190+
async function demonstrateDeleteWithPriority(container: Container): Promise<void> {
191+
logStep("Deleting items with different priority levels");
192+
193+
// Delete with Low Priority (cleanup operations)
194+
console.log("Deleting item with Low Priority...");
195+
await container
196+
.item("low-priority-item", "background")
197+
.delete({ priorityLevel: PriorityLevel.Low });
198+
console.log("✓ Deleted item using Low Priority");
199+
200+
// Delete with High Priority
201+
console.log("Deleting item with High Priority...");
202+
await container
203+
.item("high-priority-item", "important")
204+
.delete({ priorityLevel: PriorityLevel.High });
205+
console.log("✓ Deleted item using High Priority");
206+
}
207+
208+
// ========== Change Feed with Priority Level ==========
209+
210+
async function demonstrateChangeFeedWithPriority(container: Container): Promise<void> {
211+
logStep("Change Feed with Low Priority for entire container");
212+
213+
// Ingest initial data
214+
await ingestData(container, 1, 2);
215+
216+
// Fetch changes from beginning with Low priority
217+
let options: ChangeFeedIteratorOptions = {
218+
maxItemCount: 5,
219+
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(),
220+
priorityLevel: PriorityLevel.Low,
221+
};
222+
let continuationToken = await iterateChangeFeedFromBeginning(container, options);
223+
224+
// Ingest more data
225+
await ingestData(container, 3, 4);
226+
await iterateChangeFeedFromContinuationToken(container, continuationToken, PriorityLevel.Low);
227+
228+
logStep("Change Feed with High Priority for partition key");
229+
230+
options = {
231+
maxItemCount: 5,
232+
changeFeedStartFrom: ChangeFeedStartFrom.Beginning("category-1"),
233+
priorityLevel: PriorityLevel.High,
234+
};
235+
continuationToken = await iterateChangeFeedFromBeginning(container, options);
236+
237+
await ingestData(container, 5, 6);
238+
await iterateChangeFeedFromContinuationToken(container, continuationToken, PriorityLevel.High);
239+
240+
logStep("Change Feed with Low Priority for an EPK range");
241+
242+
const epkRanges = await container.getFeedRanges();
243+
options = {
244+
maxItemCount: 5,
245+
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(epkRanges[0]),
246+
priorityLevel: PriorityLevel.Low,
247+
};
248+
continuationToken = await iterateChangeFeedFromBeginning(container, options);
249+
250+
await ingestData(container, 7, 8);
251+
await iterateChangeFeedFromContinuationToken(container, continuationToken, PriorityLevel.Low);
252+
253+
logStep("Change Feed with High Priority starting from now");
254+
255+
options = {
256+
maxItemCount: 5,
257+
changeFeedStartFrom: ChangeFeedStartFrom.Now(epkRanges[0]),
258+
priorityLevel: PriorityLevel.High,
259+
};
260+
const iterator = container.items.getChangeFeedIterator(options);
261+
let nowContinuationToken = "";
262+
263+
for await (const result of iterator.getAsyncIterator()) {
264+
if (result.statusCode === StatusCodes.NotModified) {
265+
nowContinuationToken = result.continuationToken;
266+
console.log("✓ Established 'now' starting point with High Priority");
267+
break;
268+
}
269+
}
270+
271+
await ingestData(container, 9, 10);
272+
await iterateChangeFeedFromContinuationToken(container, nowContinuationToken, PriorityLevel.High);
273+
}
274+
275+
// ========== Helper Functions ==========
276+
277+
async function ingestData(container: Container, initialize: number, end: number): Promise<void> {
278+
for (let i = initialize; i <= end; i++) {
279+
await container.items.create({ id: `item${i}`, category: `category-${(i % 3) + 1}`, key: i });
280+
}
281+
console.log(`✓ Ingested items from item${initialize} to item${end}`);
282+
}
283+
284+
async function iterateChangeFeedFromBeginning(
285+
container: Container,
286+
options: ChangeFeedIteratorOptions,
287+
): Promise<string> {
288+
const iterator = container.items.getChangeFeedIterator(options);
289+
const priorityLevel = options.priorityLevel || "Default (High)";
290+
console.log(`✓ Starting Change Feed iteration from beginning with ${priorityLevel} priority`);
291+
return iterateChangeFeed(iterator);
292+
}
293+
294+
async function iterateChangeFeedFromContinuationToken(
295+
container: Container,
296+
continuationToken: string,
297+
priorityLevel: PriorityLevel,
298+
): Promise<void> {
299+
const options: ChangeFeedIteratorOptions = {
300+
maxItemCount: 5,
301+
changeFeedStartFrom: ChangeFeedStartFrom.Continuation(continuationToken),
302+
priorityLevel,
303+
};
304+
const iterator = container.items.getChangeFeedIterator(options);
305+
const levelStr = priorityLevel === PriorityLevel.Low ? "Low" : "High";
306+
console.log(
307+
`✓ Resuming Change Feed iteration with continuation token using ${levelStr} priority`,
308+
);
309+
await iterateChangeFeed(iterator);
310+
}
311+
312+
async function iterateChangeFeed(
313+
iterator: ChangeFeedPullModelIterator<any>,
314+
continuationToken: string = "",
315+
): Promise<string> {
316+
let itemCount = 0;
317+
318+
for await (const result of iterator.getAsyncIterator()) {
319+
try {
320+
if (result.statusCode === StatusCodes.NotModified) {
321+
continuationToken = result.continuationToken;
322+
console.log(`✓ No new results. Total items processed: ${itemCount}`);
323+
break;
324+
} else if (result.result && result.result.length > 0) {
325+
itemCount += result.result.length;
326+
console.log(` - Received ${result.result.length} item(s)`);
327+
}
328+
} catch (error) {
329+
console.error("Error occurred: ", error);
330+
}
331+
}
332+
333+
return continuationToken;
334+
}
335+
336+
run().catch(handleError);

0 commit comments

Comments
 (0)