Skip to content

Commit 2675391

Browse files
authored
[@azure/cosmos] Add throughput bucket in change feed and Samples (#36520)
### Packages impacted by this PR @azure/cosmos ### Issues associated with this PR ### Describe the problem that is addressed by this PR This PR makes the following changes: 1. Add support of throughput bucket in Change feed. 2. Test cases to validate scenarios 3. Throughput bucket samples for different scenarios. ### What are the possible designs available to address the problem? If there are more than one possible design, why was the one in this PR chosen? ### Are there test cases added in this PR? _(If not, why?)_ Yes ### Provide a list of related PRs _(if any)_ ### Command used to generate this PR:**_(Applicable only to SDK release request PRs)_ ### Checklists - [ ] Added impacted package name to the issue description - [ ] Does this PR needs any fixes in the SDK Generator?** _(If so, create an Issue in the [Autorest/typescript](https://github.com/Azure/autorest.typescript) repository and link it here)_ - [ ] Added a changelog (if necessary)
1 parent ee76e18 commit 2675391

File tree

11 files changed

+1115
-85
lines changed

11 files changed

+1115
-85
lines changed

sdk/cosmosdb/cosmos/review/cosmos-node.api.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ export interface ChangeFeedIteratorOptions {
8585
excludedLocations?: string[];
8686
maxItemCount?: number;
8787
sessionToken?: string;
88+
throughputBucket?: number;
8889
}
8990

9091
// @public
Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
/**
5+
* @summary Demonstrates throughput bucket operations at the client, database, container and item levels.
6+
*/
7+
import "dotenv/config";
8+
import { ChangeFeedStartFrom, CosmosClient, type Container } from "@azure/cosmos";
9+
import { logSampleHeader, handleError, logStep } from "./Shared/handleError.js";
10+
import { randomUUID } from "@azure/core-util";
11+
12+
const endpoint = process.env.COSMOS_ENDPOINT || "<cosmos endpoint>";
13+
const key = process.env.COSMOS_KEY || "<cosmos key>";
14+
const databaseId = process.env.COSMOS_DATABASE || "ThroughputBucketSampleDB";
15+
const containerId = process.env.COSMOS_CONTAINER || "ThroughputBucketSampleContainer";
16+
17+
let globalClient: CosmosClient;
18+
let globalContainer: Container;
19+
20+
logSampleHeader("Throughput Bucket Operations");
21+
22+
// Applies throughput bucket 1 to all requests from a client application
23+
async function createClientWithThroughputBucket(): Promise<void> {
24+
logStep("Creating client with throughput bucket 1 for all operations");
25+
26+
globalClient = new CosmosClient({
27+
endpoint,
28+
key,
29+
throughputBucket: 1,
30+
});
31+
32+
logStep("Client created with throughput bucket 1");
33+
}
34+
35+
// Creates database and container for the sample
36+
async function setupDatabaseAndContainer(): Promise<void> {
37+
logStep("Setting up database and container for throughput bucket operations");
38+
39+
// Create database
40+
const { database } = await globalClient.databases.createIfNotExists({ id: databaseId });
41+
logStep(`Created/found database: ${database.id}`);
42+
43+
// Create container
44+
const { container } = await database.containers.createIfNotExists({
45+
id: containerId,
46+
partitionKey: { paths: ["/pk"] },
47+
});
48+
logStep(`Created/found container: ${container.id}`);
49+
globalContainer = container;
50+
}
51+
52+
// Demonstrates all CRUD operations with different throughput buckets
53+
async function demonstrateCRUDWithThroughputBuckets(): Promise<void> {
54+
logStep("Demonstrating CRUD operations with different throughput buckets");
55+
56+
// CREATE - Create item with throughput bucket 2
57+
const createItemId = `create-item-${randomUUID()}`;
58+
const { resource: createdItem } = await globalContainer.items.create(
59+
{
60+
id: createItemId,
61+
pk: "crud-pk",
62+
name: "CRUD Test Document",
63+
description: "Created with throughput bucket 2",
64+
operation: "create",
65+
},
66+
{
67+
throughputBucket: 2,
68+
},
69+
);
70+
71+
logStep(`Created item with id: ${createdItem?.id} using throughput bucket 2`);
72+
73+
// READ - Read item with throughput bucket 3
74+
const { resource: readItem } = await globalContainer.item(createItemId, "crud-pk").read({
75+
throughputBucket: 3,
76+
});
77+
78+
logStep(`Read item with id: ${readItem.id} using throughput bucket 3`);
79+
80+
// UPDATE (Replace) - Update item with throughput bucket 4
81+
readItem.description = "Updated with throughput bucket 4";
82+
readItem.operation = "update";
83+
readItem.updatedAt = new Date().toISOString();
84+
85+
const { resource: updatedItem } = await globalContainer
86+
.item(createItemId, "crud-pk")
87+
.replace(readItem, {
88+
throughputBucket: 4,
89+
});
90+
91+
logStep(`Updated item with id: ${updatedItem.id} using throughput bucket 4`);
92+
93+
// UPSERT - Upsert items with throughput bucket 5
94+
for (let i = 1; i <= 2; i++) {
95+
const upsertItemId = `upsert-item-${i}`;
96+
const { resource: upsertedItem } = await globalContainer.items.upsert(
97+
{
98+
id: upsertItemId,
99+
pk: "crud-pk",
100+
name: `Upserted Document ${i}`,
101+
description: `Upserted with bucket 2 - iteration ${i}`,
102+
operation: "upsert",
103+
},
104+
{
105+
throughputBucket: 2,
106+
},
107+
);
108+
109+
logStep(`Upserted item with id: ${upsertedItem?.id} using throughput bucket 2`);
110+
}
111+
112+
// DELETE - Delete item with throughput bucket 6
113+
await globalContainer.item(createItemId, "crud-pk").delete({
114+
throughputBucket: 4,
115+
});
116+
117+
logStep(`Deleted item with id: ${createItemId} using throughput bucket 4`);
118+
119+
// QUERY - Query items with throughput bucket 7
120+
const querySpec = {
121+
query: "SELECT * FROM c WHERE c.operation = @operation",
122+
parameters: [
123+
{
124+
name: "@operation",
125+
value: "upsert",
126+
},
127+
],
128+
};
129+
130+
const { resources: queryResults } = await globalContainer.items
131+
.query(querySpec, {
132+
throughputBucket: 3,
133+
})
134+
.fetchAll();
135+
136+
logStep(`Queried ${queryResults.length} items using throughput bucket 3`);
137+
138+
// Clean up remaining items
139+
for (const item of queryResults) {
140+
await globalContainer.item(item.id, item.pk).delete();
141+
}
142+
143+
logStep("Cleaned up remaining CRUD test items");
144+
}
145+
146+
// Demonstrates client-level vs operation-level throughput buckets
147+
async function demonstrateClientVsOperationBuckets(): Promise<void> {
148+
logStep("Demonstrating client-level vs operation-level throughput buckets");
149+
150+
// Create client with throughput bucket 1 for all operations
151+
const bucketClient = new CosmosClient({
152+
endpoint,
153+
key,
154+
throughputBucket: 1,
155+
});
156+
157+
const database = bucketClient.database(databaseId);
158+
const clientContainer = database.container(globalContainer.id);
159+
160+
// Operation 1: Uses client's default bucket (1)
161+
const itemId1 = `client-bucket-${randomUUID()}`;
162+
await clientContainer.items.create({
163+
id: itemId1,
164+
pk: "client-test",
165+
name: "Uses Client Bucket",
166+
description: "This operation uses the client's default throughput bucket 1",
167+
});
168+
169+
logStep("Created item using client's default throughput bucket 1");
170+
171+
// Operation 2: Overrides client bucket with operation-specific bucket (2)
172+
const itemId2 = `operation-bucket-${randomUUID()}`;
173+
await clientContainer.items.create(
174+
{
175+
id: itemId2,
176+
pk: "client-test",
177+
name: "Uses Operation Bucket",
178+
description: "This operation overrides client bucket with throughput bucket 2",
179+
},
180+
{
181+
throughputBucket: 2,
182+
},
183+
);
184+
185+
logStep("Created item using operation-specific throughput bucket 2 (overriding client bucket 1)");
186+
187+
// Clean up
188+
await clientContainer.item(itemId1, "client-test").delete();
189+
await clientContainer.item(itemId2, "client-test").delete();
190+
191+
logStep("Cleaned up client vs operation bucket test items");
192+
}
193+
194+
// Demonstrates bulk operations with throughput buckets
195+
async function demonstrateBulkOperationsWithThroughputBucket(): Promise<void> {
196+
logStep("Demonstrating bulk operations with throughput bucket 2");
197+
198+
// Prepare bulk operations
199+
const operations = [];
200+
for (let i = 1; i <= 5; i++) {
201+
operations.push({
202+
operationType: "Create" as const,
203+
resourceBody: {
204+
id: `bulk-item-${i}`,
205+
pk: "bulk-pk",
206+
name: `Bulk Document ${i}`,
207+
description: `Created via bulk operation with bucket 2`,
208+
},
209+
});
210+
}
211+
212+
// Execute bulk operations with throughput bucket 2
213+
const res = await globalContainer.items.executeBulkOperations(operations, {
214+
throughputBucket: 2,
215+
});
216+
217+
logStep(`Executed ${res.length} bulk operations using throughput bucket 2`);
218+
}
219+
220+
// Demonstrates change feed operations with throughput buckets
221+
async function demonstrateChangeFeedWithThroughputBucket(): Promise<void> {
222+
logStep("Demonstrating change feed operations with throughput bucket 2");
223+
224+
// Create some test data for change feed
225+
const testItems = [];
226+
for (let i = 1; i <= 3; i++) {
227+
const itemId = `changefeed-item-${i}`;
228+
await globalContainer.items.create({
229+
id: itemId,
230+
pk: "changefeed-pk",
231+
name: `Change Feed Document ${i}`,
232+
description: `Document for change feed demo`,
233+
timestamp: new Date().toISOString(),
234+
});
235+
testItems.push(itemId);
236+
}
237+
238+
logStep("Created test data for change feed");
239+
240+
// Read change feed with throughput bucket 4
241+
const changeFeedIterator = globalContainer.items.getChangeFeedIterator({
242+
changeFeedStartFrom: ChangeFeedStartFrom.Beginning("changefeed-pk"),
243+
throughputBucket: 4,
244+
});
245+
246+
let changeCount = 0;
247+
if (changeFeedIterator.hasMoreResults) {
248+
const res = await changeFeedIterator.readNext();
249+
changeCount = res.result.length;
250+
}
251+
252+
logStep(`Read change feed with ${changeCount} changes using throughput bucket 4`);
253+
254+
// Clean up test items
255+
for (const itemId of testItems) {
256+
await globalContainer.item(itemId, "changefeed-pk").delete();
257+
}
258+
259+
logStep("Cleaned up change feed test items");
260+
}
261+
262+
// Cleanup function to delete database and container
263+
async function cleanup(): Promise<void> {
264+
logStep("Cleaning up resources");
265+
266+
try {
267+
await globalClient.database(databaseId).delete();
268+
logStep(`Cleaned up database: ${databaseId}`);
269+
} catch (error) {
270+
logStep(`Could not delete database ${databaseId}. You may need to delete it manually.`);
271+
}
272+
}
273+
274+
async function runSample(): Promise<void> {
275+
try {
276+
// Initialize client
277+
await createClientWithThroughputBucket();
278+
279+
// Setup database and container
280+
await setupDatabaseAndContainer();
281+
282+
// 1. Demonstrate all CRUD operations with different throughput buckets
283+
await demonstrateCRUDWithThroughputBuckets();
284+
285+
// 2. Demonstrate client-level vs operation-level throughput buckets
286+
await demonstrateClientVsOperationBuckets();
287+
288+
// 3. Demonstrate bulk operations with throughput buckets
289+
await demonstrateBulkOperationsWithThroughputBucket();
290+
291+
// 4. Demonstrate change feed operations with throughput buckets
292+
await demonstrateChangeFeedWithThroughputBucket();
293+
294+
logStep("Throughput bucket sample completed successfully!");
295+
} catch (error: any) {
296+
await handleError(error);
297+
} finally {
298+
// Clean up resources
299+
if (globalClient) {
300+
await cleanup();
301+
}
302+
}
303+
}
304+
305+
// Run the sample
306+
runSample().catch((error) => {
307+
console.error("Sample failed:", error);
308+
process.exit(1);
309+
});

0 commit comments

Comments
 (0)