You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: DOCUMENTATION.md
+26-50Lines changed: 26 additions & 50 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -438,9 +438,9 @@ A stream that does not have events buffered is represented by a 16 byte `struct`
438
438
439
439
# Checkpoints
440
440
441
-
The ChangeFeedProcessor mechanism guarantees that all events written to the store will be observed at least once per Consumer Group Name (referred to as a Processor Name in the MS documentation).
441
+
The ChangeFeedProcessor mechanism guarantees that all events written to the store will be observed at least once per Consumer Group.
442
442
443
-
Note that there is no relationship between the checkpoint position and any derived state; the sole control is that checkpoints can only ever advance when all events across all streams within a given batch of items from the feed have been (successfully) processed by the handler.
443
+
Note that there is no relationship between the checkpoint position and any derived state; the sole control is that checkpoints can only ever advance when all events across all streams within a given batch of items from the feed have been successfully reported as processed by the handler.
444
444
445
445
<aname="resetting-checkpoints"></a>
446
446
## Resetting checkpoints
@@ -452,11 +452,7 @@ The MS CFP implementation [does not presently implement a facility to rewind/ret
452
452
<aname="handling-malformed"></a>
453
453
# Malformed streams handling
454
454
455
-
Production has a number of streams that have had mutations applied that result in the ChangeFeed presenting the events out of order. The Propulsion scheduler component will refuse to process streams that have gaps within them, which results in processing stalling (TODO include screenshot of how this manifests)
456
-
457
-
For the data presently in the store, it's possible to circumvent this hurdle by supplying arguments that have the reader obtain the missing events 'from the future' in order to ensure that processing can complete despite the inconsistency:
458
-
-`-r 10` (the 'batches to read ahead count)
459
-
- cosmos -b 1000 (the 'batch size count')
455
+
In the case where a store has had manual updates to non-tip batches applied, those changes can result in the ChangeFeed presenting the events out of order. The Propulsion scheduler component can be configured to refuse to process streams that have gaps within them (`requireAll` mode) as a safeguard/sanity check. One side effect of this is that the Processor can then be left in a state where the reader has not read far enough ahead to slot the out of order event back into the sequence. This stalling can be worked around be increasing the `maxReadAhead` (`-r` parameter) to increase the number of batches, along with specifying a large batch size (`-b` parameter)
460
456
461
457
# Notes
462
458
@@ -468,33 +464,13 @@ In CosmosDb, each item is subject to two general overheads: there's a set of hea
468
464
From the write perspective, a Sync operation that's writing events becomes one of:
469
465
1._append_: pushing an event to the tail of the `e` field in the Tip that holds the buffered events. The cost of such an operation involves a read and an overwrite, and is largely a function of the size of the Tip document.
470
466
2._calve_: when the events in the Tip cause the maximum JSON size (configurable, but think 32K) to be breached, the events held in the Tip, together with the pending one being appended are 'calved off' into a separate item/document in the Container. Instead of a point write, there's a transactional batch consisting of:
471
-
- inserting the calved item/document - the events being added as part of this Sync operation, combined with the ones in the Tip become a fresh document
472
-
- updating the tip - the events in tip buffer is reset to being empty, and the unfolds (snapshots) and position are updated
467
+
- inserting the calved item/document - the events being added as part of this Sync operation, combined with the ones in the Tip become a fresh document (relatively cheap in RU terms as you are not paying to overwrite data)
468
+
- updating the tip - the events in tip buffer is reset to being empty, and the unfolds (snapshots) and position are updated (note the RU coats are based on the original size, not the (lesser) final size)
473
469
474
470
In both the calve and append cases above, the events that were being held in the Tip will be re-observed by the ChangeFeed reader (if we assume it's reading from the tail; in the scenario where we are re-traversing all the events, there are no such repeats; in fact the reading is actually more efficient as there's a minimal amount of per item overhead to the read cost).
475
471
476
472
The most significant effect of storing event in tip on reactors (and projection systems based off the ChangeFeed in general) is the fact that the consumer needs to de-duplicate the events each time the document is received, ideally shedding the existing events and forwarding only the newly appended events for processing.
477
473
478
-
### Testing
479
-
480
-
#### Unit testing projections
481
-
482
-
- No Propulsion required, but MemoryStore can help
An integral part of the `Equinox.CosmosStore` value proposition is the intrinsic ability that Azure CosmosDb affords to project changes via the [ChangeFeed mechanism](https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed). Key elements involved in realizing this are:
544
-
- the [storage model needs to be designed in such a way that the aforementioned processor can do its job efficiently](https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#cosmos-storage-model)
545
-
- there needs to be one or more active ChangeFeed Processor Host Applications per Container that monitor events being written (while tracking the position of the most recently propagated events). (This could also be an Azure Function driven by a Change Feed Trigger, but the full wiring for that has not been implemented to date.)
519
+
-`Propulsion.MemoryStore`; example of deterministic waits in a test
546
520
547
-
In CosmosDb, every Item (document) belongs to a logical partition, defined by the Item's partition key (similar to the concept of a stream in EventStoreDb). `Equinox.CosmosStore`'s schema uses the stream name as the partition key, and never updates anything except the Tip document, in order to guarantee that the Change Feed will always deliver items from any given stream in `Index` order. A Container is split into [physical partitions hosting a variable number of these logical partitions](https://docs.microsoft.com/en-gb/azure/cosmos-db/partition-data), with an individually accessible endpoint node per physical partition (the multiple nodes/connections are managed internally within the `Microsoft.Azure.Cosmos` SDK).
521
+
#### Generalising MemoryStore Projector tests to hit a concrete store
548
522
549
-
In concrete terms, the ChangeFeed's consists of a long running Processor per physical partition node that repeatedly tails (think of it as a `SELECT * FROM <all documents/items for the node> WHERE lastUpdated > <checkpoint>`) across the set of documents being managed by a given partition host. The topology is subject to continual change (the SDK internally tracks the topology metadata to become aware of new nodes); Processor instances can spin up and down, with the assigned ranges shuffling to balance the load per Processor. e.g. if you allocate 30K RU/s to a container and/or store >20GB of data, it will have at least 3 processors, each handling 1/3 of the partition key space, and running a Change Feed against that involves maintaining 3 continuous queries, with a continuation token per physical being held/leased/controlled by a given Change Feed Processor. The Change Feed implementation within the SDK stores the continuation token and lease information per physical partition within a nominated Leases Container.
523
+
- Pointers to `proHotel` things
550
524
551
-
## Effect of ChangeFeed on Request Charges
525
+
#`Propulsion.CosmosStore` design overview
552
526
553
-
It should be noted that the ChangeFeed is not special-cased by CosmosDb itself in any meaningful way; an active loop somewhere is continually making CosmosDb API queries, paying Request Charges for the privilege (even a tail request based on a continuation token yielding zero documents incurs a charge). It's thus important to consider that every Event written by `Equinox.CosmosStore` into the CosmosDb Container will induce a read cost due to the fact that the freshly inserted document will be included in the next batch propagated by the Processor (each update of a document also 'moves' that document from it's present position in the change order past the the notional tail of the ChangeFeed). Thus each insert/update also induces an (unavoidable) read request charge based on the fact that the document will be included in the aggregate set of touched documents being surfaced by the ChangeFeed batch callback (charging is proportional to the size of the affected item per KiB or part thereof; reads are cheaper than writes). **_The effect of this cost is reads-triggered-by-writes is multiplied by the number of Processors (consumer groups) one is running._**
527
+
(This section duplicates some earlier information, with some lower level details covering nuances of how Equinox.CosmosStore's storage schema, the CosmosDb ChangeFeed mechanism and Propulsion converge - it also covers some basics, as a secondary goal is to dispel some common misconceptions of what a ChangeFeed might be, given that 'there is no feed' as such, and that it differs significantly from how e.g. DynamoDB Streams operates)
554
528
555
-
## Change Feed Processors
529
+
In CosmosDb, every Item (document) belongs to a logical partition, defined by the Item's partition key (similar to the concept of a stream in EventStoreDb). `Equinox.CosmosStore`'s schema uses the stream name as the partition key, and never updates anything except the Tip document, in order to guarantee that the Change Feed will always deliver items from any given stream in `Index` order.
556
530
557
-
As outlined above, the CosmosDb ChangeFeed's real world manifestation is as a continuous query per CosmosDb Container _node_ ("physical partition").
531
+
An integral part of the `Equinox.CosmosStore` value proposition is the intrinsic ability that Azure CosmosDb affords to project changes via the [ChangeFeed mechanism](https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed). The integral parts of this are:
532
+
- the [storage model needs to be designed in such a way that the aforementioned processor can do its job efficiently](https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#cosmos-storage-model)
533
+
- there needs to be one or more active ChangeFeed Processor Host Applications per Container that monitor events being written (while tracking the position of the most recently propagated events). (ASIDE: This could also be an Azure Function driven by a Change Feed Trigger, but the full wiring for that has not been implemented to date.)
534
+
- the position/bookmark representing the progress that ahs been attained per physical partition needs to be maintained durably in some form of checkpoint store (aka the 'Leases Container'). By convention this is maintained in an ancillary container alongside the source one, e.g. a given `<monitoredContainer>` will typically have a `<monitoredContainer>-aux` Leases Container sitting alongside it.
535
+
536
+
A Container is split into [physical partitions hosting a variable number of these logical partitions](https://docs.microsoft.com/en-gb/azure/cosmos-db/partition-data), with an individually accessible endpoint node per physical partition (the multiple nodes/connections are managed internally within the `Microsoft.Azure.Cosmos` SDK).
558
537
559
-
For .NET, this is wrapped in a set of APIs within the `Microsoft.Azure.Cosmos` package.
538
+
In concrete terms, the ChangeFeed consists of a long running Processor per physical partition node that continually tails (think of it as a `SELECT * FROM <all documents/items for the node> WHERE lastUpdated > <checkpoint>`) across the set of documents being managed by a given partition host. The topology of a given Container's physical partition splits is subject to continual change (the SDK internally tracks the topology metadata to become aware of new nodes); Processor instances can spin up and down, with the assigned ranges shuffling to balance the load per Processor. e.g. if you allocate 30K RU/s to a container (and/or store >100GB of data), it will have at least 3 processors, each handling 1/3 of the partition key space, and running a Change Feed against that involves maintaining 3 continuous queries, with a continuation token per physical being held/leased/controlled by a given Change Feed Processor. The Change Feed implementation within the SDK stores the continuation token and lease information per physical partition within a nominated Leases Container.
560
539
561
-
A ChangeFeed _Processor_ consists of (per CosmosDb processor/range) the following elements:
562
-
- a _host_ process running somewhere that will run the query and then do something with the results before marking off progress (the instances need to coordinate to distribute the consumption processing load fairly, typically via a Leases Container)
563
-
- a continuous query across the set of items/documents that fall within the partition key range hosted by a given physical partition host
564
-
- that progress then needs to be maintained durably in some form of checkpoint store (aka a 'Leases Container'), which by convention are maintained in an ancillary container alongside the source one, e.g. a given `<monitoredContainer>` will typically have a `<monitoredContainer>-aux` Leases Container sitting alongside it.
540
+
See the [the Equinox QuickStart](https://github.com/jet/equinox/blob/master/README.md#quickstart) for instructions / walkthrough on what's involved end to end.
565
541
566
-
The `Propulsion.CosmosStore` in this repo uses the evolution of the [original `ChangeFeedProcessor` implementation: `Microsoft.Azure.DocumentDB.ChangeFeedProcessor`](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet), that is integrated into the [OSS `Microsoft.Azure.Cosmos` impl](https://github.com/Azure/azure-cosmos-dotnet-v3). NOTE the necessary explicit checkpointing support was not exposed in the `Microsoft.Azure.Cosmos` package until version `3.21.0`.
542
+
## Effect of ChangeFeed on Request Charges
567
543
568
-
See the [PR that added the initial support for CosmosDb Projections](https://github.com/jet/equinox/pull/87) and [the Equinox QuickStart](https://github.com/jet/equinox/blob/master/README.md#quickstart) for instructions.
544
+
It should be noted that the ChangeFeed is not special-cased by CosmosDb itself in any meaningful way; an active loop somewhere is continually making CosmosDb API queries, paying Request Charges for the privilege (even a tail request based on a continuation token yielding zero documents incurs a charge). It's thus important to consider that every Event written by `Equinox.CosmosStore` into the CosmosDb Container will induce a read cost due to the fact that the freshly inserted document will be included in the next batch propagated by the Processor (each update of a document also 'moves' that document from it's present position in the change order past the the notional tail of the ChangeFeed). Thus each insert/update also induces an (unavoidable) read request charge based on the fact that the document will be included in the aggregate set of touched documents being surfaced by the ChangeFeed batch callback (charging is proportional to the size of the affected item per KiB or part thereof; reads are cheaper than writes). **_The effect of this cost is reads-triggered-by-writes is multiplied by the number of Processors (consumer groups) one is running._**
0 commit comments