Skip to content

Commit cba4289

Browse files
committed
Update stream positions
1 parent 8cd2954 commit cba4289

File tree

1 file changed

+17
-19
lines changed

1 file changed

+17
-19
lines changed

DOCUMENTATION.md

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -401,35 +401,33 @@ NOTE batching should only be used where there is a direct benefit (the batch han
401401
<a name="write-position"></a>
402402
# Stream Positions, Buffering and Deduplication
403403

404-
The `Propulsion.Streams` processing system receives events from the ChangeFeed as a chain of batches per physical partition of the CosmosDb store (approx a continuous `select * from C`). Internally events are grouped by stream. Handlers are fed all the buffered items for a single stream as an array.
404+
The `Propulsion.Streams` processing system receives events from the ChangeFeed as a chain of batches per physical partition of the CosmosDb store (think of a continuous `select * from c` where the result includes a token representing how far into the history has been read). Internally events are grouped by stream. Handlers are fed all the buffered items for a single stream as an array.
405405

406-
There should never be a gap in the events as they arrive from the feed; where this does happen (e.g. if an Item was manually deleted), the Propulsion Scheduler will refuse to dispatch the events to the handler (TODO document behavior and/or handling).
406+
There should never be a gap in the events as they arrive from the feed; where this does happen (e.g. if an Item was manually deleted), the Propulsion Scheduler will by default refuse to dispatch the events to the handler until the gap has been either approved, or filled by a later read (where there is a source feed integrity issue)
407407

408-
As well as buffering events that have yet to be processed, `Propulsion.Streams` also maintains a 'write position' per stream, to manage:
408+
As well as buffering events that have yet to be processed, `Propulsion.Streams` also maintains a 'write position' per stream, to cover the following:
409409
1. (low frequency) de-duplicating redelivered events where a lease is lost and then reassigned, without the checkpoint having been advanced
410410
2. (high frequency when [Equinox configured to store Events In Tip](#events-in-tip)) discarding events from tip (or calved items) that have already been processed
411-
3. (high frequency when re-traversing events where the handler returns the `version` its state is based on as the 'next write position' result value) immediate discarding of events that are known to already have been ingested into the indexed form. In addition to the fact that the event can immediately be dropped from the buffer, this also avoids the handler potentially doing duplicate work.
411+
3. (high frequency when re-traversing events where the handler returns the `version` that its state is based on as the 'next write position' result value) immediate discarding of events that are known to already have been ingested into handler's target storage. In addition to the fact that the event can immediately be dropped from the buffer to reduce memory consumption, this also avoids the handler doing duplicate work.
412412

413413
The write position is `0`-based, i.e. the following rules apply:
414-
- if you load an Equinox stream with two events, they should be numbered with `Index` values of `0` and `1`, and the `ISyncContext.Version` in such a state would be `2`. The `Version` value thus coincides with the notion of the _write position_ that Propulsion maintains: if the write position is `3`, then events `0`, `1` and `2` can be discarded immediately on read.
414+
- if you load an Equinox stream with two events, they should be numbered with `Index` values of `0` and `1`. In such a case, the `ISyncContext.Version` would be `2`. The `Version` value thus coincides with the notion of the _write position_ that Propulsion maintains: if the write position is `3`, then events `0`, `1` and `2` can be discarded immediately on read.
415415
- when the position is unknown, it's effectively `0`
416416

417-
Given a batch of two `events` numbered `0` and `1`, the write position after they have been processed can be derived by any of the following means using helpers in `Propulsion.Streams.Sinks`:
417+
Given a batch of two `events` numbered `0` and `1`, the write position (_or version_) after they have been processed can be derived by any of the following means using helpers in `Propulsion.Streams.Sinks`:
418418
- `Index` value of the first event, plus the length, `event[0].Index + events.Length` (a handler is only ever supplied a contiguous span of events)
419419
- `Index` value of the last event, plus 1: `event[^1].Index + 1` (the 'next' index after any given event is the value plus 1)
420-
- `Events.nextIndex events` (built in helper in `Propulsion.Streams.Sinks`)
421-
422-
## `StreamResult` implications
423-
424-
Every handler invocation (that does not fail with an Exception), triggers an update to the write position based on the `StreamResult`. The net effect will be one of the following:
425-
1. Full/partial progress has been achieved: where at least one event has been declared handled, the position advances, and the events are removed from the buffer. The stream is removed from the `busy`, `active` and `failing` classifications.
426-
2. the Handler identifies that the indexed state is actually ahead of the current event delivery. This can occur for multiple reasons, e.g.:
427-
- If the handler loads the source data being monitored, it may reach a position beyond the Change Feed's read position. For example: if the handler (prompted by event 0) loads the state of the source stream and notes that the version of the state implies that it has also taken the effect of event 1 into account, it can indicate that the write position should now move to event 2 (which means events 0 and 1 will be discarded immediately on read).
428-
- If the handler records the attained event index as part of the derived state, it can use that information to avoid processing some of the events it has been supplied. For example, if events 0-3 are presented to the Handler, it's first action may be to load the current derived state, and skip all events whose `Index` is less than the position that had previously been attained for that stream (by comparing the `ITimelineEvent.Index` value per event). Furthermore, if it discovers that it has actually previously ingested all information up to position 10, then events 4-9 can be dropped.
429-
- a target may be able to safely and cheaply ignore events that have already been ingested, yielding the actual attained position as a by-product (this is termed 'idempotent write' semantics). For example, `Propulsion.CosmosStore.CosmosStoreSink` uses this technique; if presented with events with `Index` values ranging `0-3`, it will prepare a `Sync` batch consisting of the 4 events, without touching the target store.
430-
- If the Sync Stored Procedure determines that two events are already present, it will only append the two that are new from it's perspective. The handler can report the progress via any of the following:
431-
- `StreamResult.AllProcessed`: as the handler was supplied events `0-4`, the write position thus logically becomes `5`.
432-
- If it determines that it has all events up to number `4`, and more, then it will respond that the write was successful, but the next index / write position is something other than `5`. For instance if the sync attempt reveals that there already 10 events, then the Handler will end up yielding a `StreamResult.OverrideNextIndex 5`
420+
- `Events.next events` (built in helper in `Propulsion.Streams.Sinks`)
421+
422+
Every handler invocation (that does not fail with an Exception), is responsible for returning the write position to indicate what the Handler considers to be the next relevant event. The net effect will be one of the following:
423+
1. (rare) no progress has been achieved, but there was no error. In this case, the write position remains as it was. Note this will cause the stream to fall into the `stalled` classification.
424+
2. Full/partial progress has been achieved: where at least one event has been declared handled, the position advances, and the events are removed from the buffer. The stream is removed from the `busy`, `active` and `failing` classifications.
425+
3. the Handler yields a version that indicates that the projected state is actually ahead of the current event delivery. This can occur for diverse reasons, e.g.:
426+
- If the handler loads the source data being monitored, it may observe a position beyond the Change Feed's current read position. For example: if the handler (prompted by event 0) loads the state of the source stream and notes that the version of the state implies that it has also taken the effect of event 1 into account, it can indicate that the write position should now move to event 2 (which means events 0 and 1 will be discarded immediately on read).
427+
- If the handler records the attained version as part of the derived state, it can use that information to avoid processing some of the events it has been supplied. For example, if events 0-3 are presented to the Handler, it's first action may be to load the current derived state, and skip all events whose `Index` is less than the position that had previously been attained for that stream (by comparing the `ITimelineEvent.Index` value per event). Furthermore, if it discovers that it has actually previously ingested all information up to position 10, it can return that position, and events 4-9 will be dropped.
428+
- a target may be able to safely and cheaply ignore events that have already been ingested, yielding the actual attained position as a by-product ('idempotent write' semantics). For example, `Propulsion.CosmosStore.CosmosStoreSink` uses this technique; if presented with events with `Index` values ranging `0-3`, it will prepare a `Sync` batch consisting of the 4 events, without reading from the target store.
429+
- If the Sync Stored Procedure determines that two events are already present, it will only append the two that are new from it's perspective.
430+
- If it determines that it has all events up to and including that with `Index = 9`, then it will respond that the write was successful, but the version / write position returned will be `10` (as opposed to the `4` that would have been calculated by `Events.next` on a batch with events `0-3` inclusive).
433431

434432
<a name="purging"></a>
435433
## Position storage / purging

0 commit comments

Comments
 (0)