|
| 1 | +--- |
| 2 | +title: Change feed pull model |
| 3 | +description: Learn how to use the Azure Cosmos DB change feed pull model to read the change feed and the differences between the pull model and Change Feed Processor |
| 4 | +author: timsander1 |
| 5 | +ms.author: tisande |
| 6 | +ms.service: cosmos-db |
| 7 | +ms.devlang: dotnet |
| 8 | +ms.topic: conceptual |
| 9 | +ms.date: 05/06/2020 |
| 10 | +ms.reviewer: sngun |
| 11 | +--- |
| 12 | + |
| 13 | +# Change feed pull model in Azure Cosmos DB |
| 14 | + |
| 15 | +The change feed pull model is part of the [Azure Cosmos DB SDK V3](https://github.com/Azure/azure-cosmos-dotnet-v3). You can use the change feed pull model to parallelize processing of changes across multiple change feed consumers. |
| 16 | + |
| 17 | +> [!NOTE] |
| 18 | +> The change feed pull model is currently in [preview in the .NET SDK](https://www.nuget.org/packages/Microsoft.Azure.Cosmos/3.9.0-preview) only. The preview is not yet available for other SDK versions. |
| 19 | +
|
| 20 | +## Using FeedTokens for parallelization |
| 21 | + |
| 22 | +In the change feed pull model, you can use the `FeedRange` to parallelize the processing of the change feed. A `FeedRange` represents a range of partition key values. This range could match a complete physical partition, a smaller range, or a single partition key value. |
| 23 | + |
| 24 | +Here's an example showing how to obtain a list of ranges for your container. |
| 25 | + |
| 26 | +```csharp |
| 27 | +IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync(); |
| 28 | +``` |
| 29 | + |
| 30 | +Using a `FeedRange`, you can then create a `FeedIterator` to parallelize the processing of change feed across multiple machines or threads. When you initially obtain a `FeedIterator`, you can specify an optional `StartTime` within the `ChangeFeedRequestOptions`. When left unspecified, the `StartTime` will be the current time. |
| 31 | + |
| 32 | +The `FeedIterator` comes in two flavors. In addition to the examples below that return entity objects, you can also obtain the response with `Stream` support. |
| 33 | + |
| 34 | +Here's an example for obtaining a `FeedIterator` that returns entity objects, in this case a `User` object: |
| 35 | + |
| 36 | +```csharp |
| 37 | +FeedIterator<User> iteratorWithPOCOS = container.GetChangeFeedIterator<User>(); |
| 38 | +``` |
| 39 | + |
| 40 | +Here's an example for obtaining a `FeedIterator` that returns a `Stream`: |
| 41 | + |
| 42 | +```csharp |
| 43 | +FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(); |
| 44 | +``` |
| 45 | + |
| 46 | +Here's a sample that shows reading an example `User` object from the beginning of the container's change feed using two hypothetical separate machines that are reading in parallel: |
| 47 | + |
| 48 | +Machine 1: |
| 49 | + |
| 50 | +```csharp |
| 51 | +FeedIterator<User> iteratorA = container.GetChangeFeedIterator<Person>(ranges[0], new ChangeFeedRequestOptions{StartTime = DateTime.MinValue}); |
| 52 | +while (iteratorA.HasMoreResults) |
| 53 | +{ |
| 54 | + FeedResponse<User> users = await iteratorA.ReadNextAsync(); |
| 55 | + |
| 56 | + foreach (User user in users) |
| 57 | + { |
| 58 | + Console.WriteLine($"Detected change for user with id {user.id}"); |
| 59 | + } |
| 60 | +} |
| 61 | +``` |
| 62 | + |
| 63 | +Machine 2: |
| 64 | + |
| 65 | +```csharp |
| 66 | +FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ranges[1], new ChangeFeedRequestOptions{StartTime = DateTime.MinValue}); |
| 67 | +while (iteratorB.HasMoreResults) |
| 68 | +{ |
| 69 | + FeedResponse<User> users = await iteratorB.ReadNextAsync(); |
| 70 | + |
| 71 | + foreach (User user in users) |
| 72 | + { |
| 73 | + Console.WriteLine($"Detected change for user with id {user.id}"); |
| 74 | + } |
| 75 | +} |
| 76 | +``` |
| 77 | + |
| 78 | +## Saving FeedTokens |
| 79 | + |
| 80 | +You can save the position of your `FeedIterator` by creating a continuation token. A continuation token is a string value that keeps of track of your FeedIterator's last processed changes. This allows the `FeedIterator` to resume at this point later. The following code will read through the change feed since container creation. After no more changes are available, it will persist a continuation token so that change feed consumption can be later resumed. |
| 81 | + |
| 82 | +```csharp |
| 83 | +FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ranges[0], new ChangeFeedRequestOptions{StartTime = DateTime.MinValue}); |
| 84 | + |
| 85 | +string continuation = null; |
| 86 | + |
| 87 | +while (iterator.HasMoreResults) |
| 88 | +{ |
| 89 | + FeedResponse<User> users = await iterator.ReadNextAsync(); |
| 90 | + continuation = orders.ContinuationToken; |
| 91 | + |
| 92 | + foreach (User user in Users) |
| 93 | + { |
| 94 | + Console.WriteLine($"Detected change for user with id {user.id}"); |
| 95 | + } |
| 96 | +} |
| 97 | + |
| 98 | +// Some time later |
| 99 | +FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(lastProcessedToken); |
| 100 | +``` |
| 101 | + |
| 102 | +## Consuming an entire container |
| 103 | + |
| 104 | +Sometimes you might not need any parallelization when reading the change feed. By creating a `FeedIterator` without any `FeedToken` input, you can read an entire container's change feed on one machine: |
| 105 | + |
| 106 | +```csharp |
| 107 | +FeedIterator<User> iteratorForTheEntireContainer= container.GetChangeFeedIterator(new ChangeFeedRequestOptions{StartTime = DateTime.MinValue}); |
| 108 | + |
| 109 | +while (iteratorForTheEntireContainer.HasMoreResults) |
| 110 | +{ |
| 111 | + FeedResponse<User> users = await iteratorForTheEntireContainer.ReadNextAsync(); |
| 112 | + |
| 113 | + foreach (User user in users) |
| 114 | + { |
| 115 | + Console.WriteLine($"Detected change for user with id {user.id}"); |
| 116 | + } |
| 117 | +} |
| 118 | +``` |
| 119 | + |
| 120 | +If you need to stop and resume reading from the entire container's change feed, you can obtain a continuation token from the `FeedIterator`, just as you can for a `FeedRange`. |
| 121 | + |
| 122 | +## Consuming a partition key's changes |
| 123 | + |
| 124 | +In some cases, you may only want to process a specific partition key's changes. You can obtain a `FeedIterator` for a specific partition key. |
| 125 | + |
| 126 | +```csharp |
| 127 | +FeedIterator<User> iteratorForThePartitionKey = container.GetChangeFeedIterator(new PartitionKey("myPartitionKeyValueToRead"), new ChangeFeedRequestOptions{StartTime = DateTime.MinValue}); |
| 128 | + |
| 129 | +while (iteratorForThePartitionKey.HasMoreResults) |
| 130 | +{ |
| 131 | + FeedResponse<User> users = await iteratorForThePartitionKey.ReadNextAsync(); |
| 132 | + |
| 133 | + foreach (User user in users) |
| 134 | + { |
| 135 | + Console.WriteLine($"Detected change for user with id {user.id}"); |
| 136 | + } |
| 137 | +} |
| 138 | +``` |
| 139 | + |
| 140 | +If you need to stop and resume reading from the change feed for a specific partition key, you can obtain a continuation token from the `FeedIterator`, just as you can for a `FeedRange`. |
| 141 | + |
| 142 | +## Comparing with change feed processor |
| 143 | + |
| 144 | +Many scenarios can process the change feed using either the [change feed processor](change-feed-processor.md) or the pull model. The pull model's continuation tokens and the change feed processor's lease container are both "bookmarks" for the last processed item (or batch of items) in the change feed. |
| 145 | +However, you can't convert continuation tokens to a lease container (or vice versa). |
| 146 | + |
| 147 | +You should consider using the pull model in these scenarios: |
| 148 | + |
| 149 | +- You want to do a one-time read of the existing data in the change feed |
| 150 | +- You only want to read changes from a particular partition key |
| 151 | +- You don't want a push model and want to consume the change feed at your own pace |
| 152 | + |
| 153 | +Here's some key differences between the change feed processor and pull model: |
| 154 | + |
| 155 | +| | Change feed processor| Pull model | |
| 156 | +| --- | --- | --- | |
| 157 | +| Keeping track of current point in processing change feed | Lease (stored in an Azure Cosmos DB container) | Continuation token (stored in memory or manually persisted) | |
| 158 | +| Ability to replay past changes | Yes, with push model | Yes, with pull model| |
| 159 | +| Polling for future changes | Automatically checks for changes based on user-specified `WithPollInterval` | Manual | |
| 160 | +| Process changes from entire container | Yes, and automatically parallelized across multiple threads/machine consuming from the same container| Yes, and manually parallelized using FeedTokens | |
| 161 | +| Process changes from just a single partition key | Not supported | Yes| |
| 162 | +| Support level | Generally available | Preview | |
| 163 | + |
| 164 | +## Next steps |
| 165 | + |
| 166 | +* [Overview of change feed](change-feed.md) |
| 167 | +* [Using the change feed processor](change-feed-processor.md) |
| 168 | +* [Trigger Azure Functions](change-feed-functions.md) |
0 commit comments