|
| 1 | +# JetStream durable stream sourcing/mirroring |
| 2 | + |
| 3 | +| Metadata | Value | |
| 4 | +|----------|---------------------------------| |
| 5 | +| Date | 2025-11-06 | |
| 6 | +| Author | @MauriceVanVeen | |
| 7 | +| Status | Proposed | |
| 8 | +| Tags | jetstream, client, server, 2.14 | |
| 9 | + |
| 10 | +| Revision | Date | Author | Info | |
| 11 | +|----------|------------|-----------------|-----------------------------------------| |
| 12 | +| 1 | 2025-11-06 | @MauriceVanVeen | Initial design | |
| 13 | +| 2 | 2025-12-05 | @MauriceVanVeen | Refinement after initial implementation | |
| 14 | + |
| 15 | +## Context and Problem Statement |
| 16 | + |
| 17 | +JetStream streams can be mirrored or sourced from another stream. Usually this is done on separate servers, for example, |
| 18 | +loosely connected as a leaf node. This is achieved by the server creating an ephemeral ordered push consumer using |
| 19 | +`AckNone`. This is really reliable if the stream that's being mirrored/sourced is a Limits stream. If the server detects |
| 20 | +a gap, it recreates the consumer at the sequence it missed. And since the stream is a Limits stream, it will be able to |
| 21 | +recover from the gap since the messages will still be in the stream. |
| 22 | + |
| 23 | +However, if the stream is a WorkQueue or Interest stream, then the use of an ephemeral `AckNone` consumer is problematic |
| 24 | +for two reasons: |
| 25 | + |
| 26 | +- For both WorkQueue and Interest streams any messages that are sent are immediately acknowledged and removed. If this |
| 27 | + message is not received on the other end, this message will be lost. |
| 28 | +- Additionally, for an Interest stream since the consumer is ephemeral, interest will be lost while there's no active |
| 29 | + connection between the two servers. This also results in messages being lost. |
| 30 | + |
| 31 | +Reliable stream mirroring/sourcing is required for use cases where WorkQueue or Interest streams are used or desired. |
| 32 | + |
| 33 | +## Design |
| 34 | + |
| 35 | +### Pre-created durable consumer |
| 36 | + |
| 37 | +Instead of the server creating and managing ephemeral consumers for stream sourcing, the user creates a durable consumer |
| 38 | +that the server will use. |
| 39 | + |
| 40 | +The benefits of using a durable consumer are that these will be visible to the user and can be monitored. This eases the |
| 41 | +control (and security implications) of the consumer configuration as this consumer will be manually created on the |
| 42 | +server containing the data that will be sourced. Additionally, the consumer can be paused and resumed, allowing the |
| 43 | +sourcing to temporarily stop if desired. |
| 44 | + |
| 45 | +WorkQueue streams don't allow having multiple consumers with overlapping filter subjects. This means that a durable |
| 46 | +consumer used for mirroring/sourcing of a WorkQueue stream, would not allow another overlapping consumer to be created |
| 47 | +used for a different purpose. In that case, an Interest or Limits stream should be used. |
| 48 | + |
| 49 | +Some additional tooling will be required to create the durable consumer with the proper configuration. But through the |
| 50 | +use of a new `AckPolicy=AckFlowControl` field, the server will be able to help enforce the correct configuration. |
| 51 | + |
| 52 | +### Performance / Consumer configuration |
| 53 | + |
| 54 | +The durable consumer used for stream sourcing/mirroring will need to be just as performant as the current ephemeral |
| 55 | +variant. The current ephemeral consumer configuration uses `AckNone` which is problematic for WorkQueue and Interest |
| 56 | +streams. A different `AckPolicy` (`AckFlowControl`) will need to be used to ensure that messages are not lost. |
| 57 | + |
| 58 | +The consumer configuration will closely resemble the ephemeral push consumer variant: |
| 59 | + |
| 60 | +- The consumer will still act as an "ordered push consumer" but it will be durable. |
| 61 | +- Requires `FlowControl` and `Heartbeat` to be set. |
| 62 | +- Uses `AckPolicy=AckFlowControl` instead of `AckNone`. |
| 63 | +- `AckPolicy=AckFlowControl` will function like `AckAll` although the receiving server will not use the current ack |
| 64 | + reply format by acknowledging individual messages. |
| 65 | +- The receiving server responds to the flow control messages, which includes the stream sequence (`Nats-Last-Stream`) |
| 66 | + and delivery sequence (`Nats-Last-Consumer`) as headers to signal which messages have been successfully stored. |
| 67 | +- The server receiving the flow control response will ack messages based on these stream/delivery sequences. For |
| 68 | + WorkQueue and Interest streams this may result in messages deletion. |
| 69 | +- Acknowledgements happen based on flow control limits, usually a data window size. But if the stream is idle the |
| 70 | + `Heartbeat` will also trigger a flow control message to move the acknowledgement floor up. |
| 71 | +- Flow control messages will happen automatically after a certain data size is reached, but can be controlled using the |
| 72 | + `MaxAckPending` setting. `MaxAckPending` determines the maximum number of pending messages that can be sent before the |
| 73 | + sourcing pauses. A flow control message will be automatically sent (no need to wait for a `Heartbeat`) so these |
| 74 | + messages are acknowledged and new messages can be sent as soon as possible. |
| 75 | +- Since acknowledgements happen based on dynamic flow control, it being determined either by data size, `MaxAckPending` |
| 76 | + or `Heartbeat`, the consumer cannot have an `AckWait` or `BackOff` setting. These fields need to be unset. |
| 77 | +- Additionally, `MaxDeliver` must be set to `-1` (infinite) to ensure if some messages are lost in transit, they can |
| 78 | + still be reliably redelivered. |
| 79 | + |
| 80 | +The stream configuration will be extended to include the consumer name as well as the delivery subject used for stream |
| 81 | +sourcing/mirroring. |
| 82 | + |
| 83 | +```go |
| 84 | +type StreamSource struct { |
| 85 | + Name string `json:"name"` |
| 86 | + ConsumerName string `json:"consumer_name,omitempty"` |
| 87 | + ConsumerDeliverSubject string `json:"consumer_deliver_subject,omitempty"` |
| 88 | +} |
| 89 | +``` |
| 90 | + |
| 91 | +### Consumer delivery state reset API |
| 92 | + |
| 93 | +The ordered consumer implementation relies on the consumer's delivery sequence to start at 1 and increment by 1 for |
| 94 | +every delivered message. Gaps are detected by ensuring this delivery sequence increments monotonically. If a gap is |
| 95 | +detected, the consumer delivery state will need to be reset such that the delivery sequence starts at 1 again. |
| 96 | + |
| 97 | +This is a non-issue for the ephemeral ordered push consumer variant as it creates a new consumer starting at the |
| 98 | +expected sequence if a gap is detected. However, the durable consumer must not be deleted and recreated, since that will |
| 99 | +result in losing interest on an Interest stream and subsequently losing messages. Waiting for re-delivery is also not an |
| 100 | +option as this will result in out of order delivery. |
| 101 | + |
| 102 | +Therefore, the server will provide an API to reset the consumer delivery state. When the server detects a gap, it will |
| 103 | +call this API to reset the consumer delivery state. The consumer delivery sequence restarts at 1 and re-delivers pending |
| 104 | +messages. Optionally re-delivery will start from a specified stream sequence. |
| 105 | + |
| 106 | +This reset API, `$JS.API.CONSUMER.RESET.<STREAM>.<CONSUMER>`, will have the following functionality: |
| 107 | + |
| 108 | +- The consumer will be reset, resembling the delivery state of creating a new consumer with `opt_start_seq` set to the |
| 109 | + specified sequence. |
| 110 | +- The pending and redelivered messages will always be reset. |
| 111 | +- The delivered stream and consumer sequences will always be reset. |
| 112 | +- The ack floor consumer sequence will always be reset. |
| 113 | +- The ack floor stream sequence will be updated depending on the payload. The next message to be delivered is above this |
| 114 | + new ack floor. |
| 115 | +- An empty payload will reset the consumer's state, but the ack floor stream sequence will remain the same. (This will |
| 116 | + be used for the durable sourcing consumer after detecting a gap.) |
| 117 | +- A payload of `{"seq":<seq>}` (with `seq>0`) will update the ack floor stream sequence to be one below the provided |
| 118 | + sequence. The next message to be delivered has a sequence of `msg.seq >= reset.seq`. A zero-sequence is invalid. |
| 119 | +- Resetting a consumer to a specific sequence will only be allowed on specific consumer configurations. |
| 120 | + - Only allowed on `DeliverPolicy=all,by_start_sequence,by_start_time`. |
| 121 | + - If `DeliverPolicy=all`, the reset will always be successful and allow to move forward or backward arbitrarily. |
| 122 | + - If `DeliverPolicy=by_start_sequence,by_start_time`, the reset will only be successful if |
| 123 | + `reset.seq >= opt_start_seq`, or if `loadNextMsg(reset.seq).start_time >= opt_start_time`. This is a safety |
| 124 | + measure to prevent the consumer from being reset to a sequence before what was allowed by the consumer |
| 125 | + configuration. |
| 126 | +- The response to the reset API call will follow standard JS API conventions. Specifically, returning a "consumer |
| 127 | + reset" response to not only reset the consumer, but also expose the current configuration and updated delivery state |
| 128 | + like a "consumer create" response. This is useful for the durable sourcing consumer to confirm the proper |
| 129 | + configuration is used before allowing the sourcing to happen. As well as generally looking as if the consumer was |
| 130 | + recreated, this response can then also be kept by clients if they need to keep a cached consumer response. |
| 131 | +- Additionally, the response will contain the `ResetSeq` that the consumer is reset to. |
| 132 | + |
| 133 | +Importantly, the server should also handle the case where a user manually resets the consumer that's used for sourcing. |
| 134 | +The server should handle this gracefully and ensure no messages are lost. However, the user could also reset the |
| 135 | +consumer such that it moves ahead in the stream. The server should also handle this by properly skipping over those |
| 136 | +messages. If instead the user manually resets the consumer to go backward, the server should guarantee that mirrored |
| 137 | +messages are not duplicated. |
| 138 | + |
| 139 | +## Consequences |
| 140 | + |
| 141 | +Client should support the `$JS.API.CONSUMER.RESET.<STREAM>.<CONSUMER>` reset API. Clients should not rely on this call |
| 142 | +to be initiated by the client process, but it potentially being called by another process, by the CLI for example. |
| 143 | +Importantly, clients should not fail when the consumer delivery sequence is not monotonic, except when needed for the |
| 144 | +"ordered consumer" implementations. If the reset API is called for an ordered consumer, the client should detect a gap |
| 145 | +as it would normally and simply recreate the consumer. |
0 commit comments