Skip to content

Commit 5b5bddf

Browse files
committed
update change feed pull model docs based on feedback
1 parent 39b4223 commit 5b5bddf

File tree

2 files changed

+54
-50
lines changed

2 files changed

+54
-50
lines changed

articles/cosmos-db/change-feed-processor.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,5 +98,6 @@ You can now proceed to learn more about change feed processor in the following a
9898

9999
* [Overview of change feed](change-feed.md)
100100
* [Change feed pull model](change-feed-pull-model.md)
101+
* [How to migrate from the change feed processor library](how-to-migrate-from-change-feed-library.md)
101102
* [Using the change feed estimator](how-to-use-change-feed-estimator.md)
102103
* [Change feed processor start time](how-to-configure-change-feed-start-time.md)

articles/cosmos-db/change-feed-pull-model.md

Lines changed: 53 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,16 @@ ms.reviewer: sngun
1212

1313
# Change feed pull model in Azure Cosmos DB
1414

15-
The change feed pull model is part of the [Azure Cosmos DB SDK V3](https://github.com/Azure/azure-cosmos-dotnet-v3). You can use the change feed pull model to parallelize processing of changes across multiple change feed consumers.
15+
With the change feed pull model, you can consume the Azure Cosmos DB change feed at your own pace. As you can already do with the [change feed processor](change-feed-processor.md), you can use the change feed pull model to parallelize the processing of changes across multiple change feed consumers.
1616

1717
> [!NOTE]
1818
> The change feed pull model is currently in [preview in the Azure Cosmos DB .NET SDK](https://www.nuget.org/packages/Microsoft.Azure.Cosmos/3.9.0-preview) only. The preview is not yet available for other SDK versions.
1919
20-
## Using FeedRange for parallelization
21-
22-
In the change feed pull model, you can use the `FeedRange` to parallelize the processing of the change feed. A `FeedRange` represents a single [physical partition](partition-data.md#physical-partitions).
23-
24-
Here's an example showing how to obtain a list of ranges for your container.
25-
26-
```csharp
27-
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
28-
```
20+
## Consuming an entire container's changes
2921

30-
Using a `FeedRange`, you can then create a `FeedIterator` to parallelize the processing of change feed across multiple machines or threads. When you initially obtain a `FeedIterator`, you can specify an optional `StartTime` within the `ChangeFeedRequestOptions`. When left unspecified, the `StartTime` will be the current time.
22+
You can crete a `FeedIterator` to process the change feed using the pull model. When you initially create a `FeedIterator`, you can specify an optional `StartTime` within the `ChangeFeedRequestOptions`. When left unspecified, the `StartTime` will be the current time.
3123

32-
The `FeedIterator` comes in two flavors. In addition to the examples below that return entity objects, you can also obtain the response with `Stream` support.
24+
The `FeedIterator` comes in two flavors. In addition to the examples below that return entity objects, you can also obtain the response with `Stream` support. Streams allow you to read data without having it first deserialized, saving on client resources.
3325

3426
Here's an example for obtaining a `FeedIterator` that returns entity objects, in this case a `User` object:
3527

@@ -43,15 +35,14 @@ Here's an example for obtaining a `FeedIterator` that returns a `Stream`:
4335
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator();
4436
```
4537

46-
Here's a sample that shows reading an example `User` object from the beginning of the container's change feed using two hypothetical separate machines that are reading in parallel:
47-
48-
Machine 1:
38+
Using a `FeedIterator`, you can easily process an entire container's change feed at your own pace. Here's an example:
4939

5040
```csharp
51-
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<Person>(ranges[0], new ChangeFeedRequestOptions{StartTime = DateTime.MinValue});
52-
while (iteratorA.HasMoreResults)
41+
FeedIterator<User> iteratorForTheEntireContainer= container.GetChangeFeedIterator(new ChangeFeedRequestOptions{StartTime = DateTime.MinValue});
42+
43+
while (iteratorForTheEntireContainer.HasMoreResults)
5344
{
54-
FeedResponse<User> users = await iteratorA.ReadNextAsync();
45+
FeedResponse<User> users = await iteratorForTheEntireContainer.ReadNextAsync();
5546

5647
foreach (User user in users)
5748
{
@@ -60,13 +51,16 @@ while (iteratorA.HasMoreResults)
6051
}
6152
```
6253

63-
Machine 2:
54+
## Consuming a partition key's changes
55+
56+
In some cases, you may only want to process a specific partition key's changes. You can obtain a `FeedIterator` for a specific partition key and process the changes the same way that you can for an entire container:
6457

6558
```csharp
66-
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ranges[1], new ChangeFeedRequestOptions{StartTime = DateTime.MinValue});
67-
while (iteratorB.HasMoreResults)
59+
FeedIterator<User> iteratorForThePartitionKey = container.GetChangeFeedIterator(new PartitionKey("myPartitionKeyValueToRead"), new ChangeFeedRequestOptions{StartTime = DateTime.MinValue});
60+
61+
while (iteratorForThePartitionKey.HasMoreResults)
6862
{
69-
FeedResponse<User> users = await iteratorB.ReadNextAsync();
63+
FeedResponse<User> users = await iteratorForThePartitionKey.ReadNextAsync();
7064

7165
foreach (User user in users)
7266
{
@@ -75,40 +69,47 @@ while (iteratorB.HasMoreResults)
7569
}
7670
```
7771

78-
## Saving continuation tokens
72+
## Using FeedRange for parallelization
7973

80-
You can save the position of your `FeedIterator` by creating a continuation token. A continuation token is a string value that keeps of track of your FeedIterator's last processed changes. This allows the `FeedIterator` to resume at this point later. The following code will read through the change feed since container creation. After no more changes are available, it will persist a continuation token so that change feed consumption can be later resumed.
74+
In [change feed processor](change-feed-processor.md), change feed processing is automatically spread across multiple consumers. In the change feed pull model, you can use the `FeedRange` to parallelize the processing of the change feed. A `FeedRange` represents a single [physical partition](partition-data.md#physical-partitions).
75+
76+
Here's an example showing how to obtain a list of ranges for your container.
8177

8278
```csharp
83-
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ranges[0], new ChangeFeedRequestOptions{StartTime = DateTime.MinValue});
79+
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
80+
```
8481

85-
string continuation = null;
82+
Using a `FeedRange`, you can then create a `FeedIterator` to parallelize the processing of change feed across multiple machines or threads. Unlike the previous example that showed how to obtain a single `FeedIterator` for the entire container, you can use the `FeedRange` to obtain multiple FeedIterators which can process the change feed in parallel.
8683

87-
while (iterator.HasMoreResults)
84+
In the case where you want to use FeedRanges, you need to have an orchestrator process that obtains FeedRanges and distributes them to those machines. This distribution could be:
85+
86+
* Using `FeedRange.ToJsonString` and storing/distributing this string value. The consumers can use this value with `FeedRange.FromJsonString`
87+
* If the distribution is in-process, passing the `FeedRange` object reference.
88+
89+
Here's a sample that shows how to read from the beginning of the container's change feed using two hypothetical separate machines that are reading in parallel:
90+
91+
Machine 1:
92+
93+
```csharp
94+
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<Person>(ranges[0], new ChangeFeedRequestOptions{StartTime = DateTime.MinValue});
95+
while (iteratorA.HasMoreResults)
8896
{
89-
FeedResponse<User> users = await iterator.ReadNextAsync();
90-
continuation = orders.ContinuationToken;
97+
FeedResponse<User> users = await iteratorA.ReadNextAsync();
9198

92-
foreach (User user in Users)
99+
foreach (User user in users)
93100
{
94101
Console.WriteLine($"Detected change for user with id {user.id}");
95102
}
96103
}
97-
98-
// Some time later
99-
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(lastProcessedToken);
100104
```
101105

102-
## Consuming an entire container's changes
103-
104-
Sometimes you might not need any parallelization when reading the change feed. By creating a `FeedIterator` without any `FeedRange` input, you can read an entire container's change feed on one machine:
106+
Machine 2:
105107

106108
```csharp
107-
FeedIterator<User> iteratorForTheEntireContainer= container.GetChangeFeedIterator(new ChangeFeedRequestOptions{StartTime = DateTime.MinValue});
108-
109-
while (iteratorForTheEntireContainer.HasMoreResults)
109+
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ranges[1], new ChangeFeedRequestOptions{StartTime = DateTime.MinValue});
110+
while (iteratorB.HasMoreResults)
110111
{
111-
FeedResponse<User> users = await iteratorForTheEntireContainer.ReadNextAsync();
112+
FeedResponse<User> users = await iteratorB.ReadNextAsync();
112113

113114
foreach (User user in users)
114115
{
@@ -117,27 +118,29 @@ while (iteratorForTheEntireContainer.HasMoreResults)
117118
}
118119
```
119120

120-
If you need to stop and resume reading from the entire container's change feed, you can obtain a continuation token from the `FeedIterator`, just as you can for a `FeedRange`.
121-
122-
## Consuming a partition key's changes
121+
## Saving continuation tokens
123122

124-
In some cases, you may only want to process a specific partition key's changes. You can obtain a `FeedIterator` for a specific partition key.
123+
You can save the position of your `FeedIterator` by creating a continuation token. A continuation token is a string value that keeps of track of your FeedIterator's last processed changes. This allows the `FeedIterator` to resume at this point later. The following code will read through the change feed since container creation. After no more changes are available, it will persist a continuation token so that change feed consumption can be later resumed.
125124

126125
```csharp
127-
FeedIterator<User> iteratorForThePartitionKey = container.GetChangeFeedIterator(new PartitionKey("myPartitionKeyValueToRead"), new ChangeFeedRequestOptions{StartTime = DateTime.MinValue});
126+
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ranges[0], new ChangeFeedRequestOptions{StartTime = DateTime.MinValue});
128127

129-
while (iteratorForThePartitionKey.HasMoreResults)
128+
string continuation = null;
129+
130+
while (iterator.HasMoreResults)
130131
{
131-
FeedResponse<User> users = await iteratorForThePartitionKey.ReadNextAsync();
132+
FeedResponse<User> users = await iterator.ReadNextAsync();
133+
continuation = orders.ContinuationToken;
132134

133-
foreach (User user in users)
135+
foreach (User user in Users)
134136
{
135137
Console.WriteLine($"Detected change for user with id {user.id}");
136138
}
137139
}
138-
```
139140

140-
If you need to stop and resume reading from the change feed for a specific partition key, you can obtain a continuation token from the `FeedIterator`, just as you can for a `FeedRange`.
141+
// Some time later
142+
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(continuation);
143+
```
141144

142145
## Comparing with change feed processor
143146

0 commit comments

Comments
 (0)