Skip to content

(2.14) Support reliable WQ/Interest stream sourcing and mirroring#7613

Open
MauriceVanVeen wants to merge 1 commit intomainfrom
maurice/durable-mirror
Open

(2.14) Support reliable WQ/Interest stream sourcing and mirroring#7613
MauriceVanVeen wants to merge 1 commit intomainfrom
maurice/durable-mirror

Conversation

@MauriceVanVeen
Copy link
Member

@MauriceVanVeen MauriceVanVeen commented Dec 4, 2025

Implements reliable stream sourcing/mirroring on WQ/Interest streams as described in this ADR: nats-io/nats-architecture-and-design#389. Built on top of the consumer reset API: #7489.

The server knows when it's sourcing from a WQ/Interest stream and "upgrades" the consumer to be durable and use AckPolicy: AckFlowControl. Instead of acknowledging messages immediately (like with AckNone), messages will be acknowledged by the receiving end based on flow control, or triggered by the heartbeat. Additionally, MaxAckPending specifies the upper bound for pending messages, if this is reached a flow control message will also be sent, if not already, to acknowledge messages.

AckPolicy: AckFlowControl is special in that it only allows for infinite retries for MaxDeliver and has no AckWait or BackOff. Messages are meant to reliably flow from one place to the other, the heartbeat somewhat functions like AckWait but acknowledgements may happen either earlier or later depending on how many messages are sent, how large they are (larger means flow control happens sooner), or when MaxAckPending is hit.

Importantly, this durable consumer can reliably source and mirror messages from Interest and WorkQueue retention streams, without the user needing to specify this, whereas this is not fully reliable with the current ephemeral consumer approach.

Additionally, users can "bring their own consumer" to be used for mirroring/sourcing, but this is an opt-in.

Resolves #4109, #7292

Signed-off-by: Maurice van Veen github@mauricevanveen.com

@MauriceVanVeen MauriceVanVeen linked an issue Jan 2, 2026 that may be closed by this pull request
@MauriceVanVeen MauriceVanVeen marked this pull request as ready for review January 28, 2026 13:14
@MauriceVanVeen MauriceVanVeen requested review from a team as code owners January 28, 2026 13:14
@MauriceVanVeen MauriceVanVeen force-pushed the maurice/consumer-reset branch 2 times, most recently from 5c11024 to eb2ce11 Compare February 12, 2026 06:21
Base automatically changed from maurice/consumer-reset to main February 12, 2026 10:39
@MauriceVanVeen MauriceVanVeen force-pushed the maurice/durable-mirror branch from 2a04ef0 to f7816e7 Compare March 6, 2026 14:59
@MauriceVanVeen MauriceVanVeen marked this pull request as draft March 11, 2026 11:18
@MauriceVanVeen MauriceVanVeen force-pushed the maurice/durable-mirror branch from f7816e7 to 043ff5d Compare March 11, 2026 11:23
@MauriceVanVeen MauriceVanVeen changed the title (2.14) Support durable and flow-controled stream sourcing/mirroring (2.14) Support reliable WQ/Interest stream sourcing and mirroring Mar 11, 2026
@MauriceVanVeen MauriceVanVeen force-pushed the maurice/durable-mirror branch from 043ff5d to 2652e78 Compare March 11, 2026 11:40
@MauriceVanVeen MauriceVanVeen marked this pull request as ready for review March 11, 2026 16:40
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 2652e78848

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@MauriceVanVeen MauriceVanVeen force-pushed the maurice/durable-mirror branch from 2652e78 to 4422b8a Compare March 12, 2026 09:13
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 4422b8a44f

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
@MauriceVanVeen MauriceVanVeen force-pushed the maurice/durable-mirror branch from 4422b8a to 5c25265 Compare March 12, 2026 09:41
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 5c25265b61

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

// it to be durable without AckNone if not a Limits-based stream.
if req.Config.Direct && req.Config.Sourcing && req.Config.Name != _EMPTY_ {
if !isClustered && stream.isInterestRetention() {
req.Config.Direct = false

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep internal sourcing consumers out of public consumer listings

This branch converts server-managed sourcing consumers on non-Limits streams to Direct=false, but listing paths still treat only Direct consumers as internal (stream.getPublicConsumers() filters on !o.cfg.Direct, and consumer list/name APIs use that), so JS_SRC_*/JS_MIRROR_* consumers become visible to normal clients. That is a behavior regression for mirror/source setup on WorkQueue/Interest streams and can break tooling that expects consumer list APIs to return only user-managed consumers.

Useful? React with 👍 / 👎.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an intended and expected change, since these consumers need to be durable and replicated if on WQ/Interest.

neilalexander added a commit that referenced this pull request Mar 12, 2026
…erest stream (#7936)

While working on #7613 I
noticed that `TestJetStreamClusterMirrorAndSourceWorkQueues` was flaky.
This was due to only forwarding message delete proposals on consumer
leaders and not on the stream leader. This could result in a race
condition where consumer leader A wouldn't remove the message since
consumer B had yet to ack it, and similarly consumer leader B wouldn't
remove it since consumer A had yet to ack it. Two consumers acking at
the roughly the same time and leaders being on different servers with
some delay could result in a message not being removed in a timely
fashion.

This PR fixes that by always letting the stream leader handle this. This
has the added benefit of less network traffic since there don't need to
be any forwarded delete proposals. But, this does come with needing to
handle a rare potential edge case where all consumers have already acked
the message but the stream leader hasn't stored the message yet (which
is possible due to our use of async queues).

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
@emresudo
Copy link

I am looking forward version 2.14 for this feature...

Why I need this feature,
I am developing an log stream architecture, every location has own collector and NATS server and every log must be arrived to central absolutely.

  • Branch Central

    • Log Collector
    • NATS Server
  • Branch A

    • Log Collector
    • NATS Server
  • Branch B

    • Log Collector
    • NATS Server

But here's where the real problem begins; until processed by the central branch, the logs of all branches must be saved to the NATS server in the respective branch.

If I want to run for this feature in current version which is 2.12, I need a nats subscriber with new stream for all branch for listen and sync to central branch. I think it is more complex logic.

If there's another way I can do this, please let me know.

@MauriceVanVeen
Copy link
Member Author

If I want to run for this feature in current version which is 2.12, I need a nats subscriber with new stream for all branch for listen and sync to central branch. I think it is more complex logic.

If there's another way I can do this, please let me know.

You can already source streams today: https://docs.nats.io/nats-concepts/jetstream/source_and_mirror

As long as you make sure that the stream you're sourcing from is a Limits-based stream, then it already functions reliably.

@emresudo
Copy link

If I want to run for this feature in current version which is 2.12, I need a nats subscriber with new stream for all branch for listen and sync to central branch. I think it is more complex logic.
If there's another way I can do this, please let me know.

You can already source streams today: https://docs.nats.io/nats-concepts/jetstream/source_and_mirror

As long as you make sure that the stream you're sourcing from is a Limits-based stream, then it already functions reliably.

In my scenario, I want to ensure that the data arrives and is delivered only once. If LimitPolicy solves this, then all I need to do is set a long-term LimitPolicy to prevent this loss in case of prolonged outages, right?

@MauriceVanVeen
Copy link
Member Author

In my scenario, I want to ensure that the data arrives and is delivered only once. If LimitPolicy solves this, then all I need to do is set a long-term LimitPolicy to prevent this loss in case of prolonged outages, right?

Correct, LimitPolicy already ensures a message is only sourced once.

This PR is about extending that guarantee when sourcing from WorkQueue and Interest streams.

@emresudo
Copy link

In my scenario, I want to ensure that the data arrives and is delivered only once. If LimitPolicy solves this, then all I need to do is set a long-term LimitPolicy to prevent this loss in case of prolonged outages, right?

Correct, LimitPolicy already ensures a message is only sourced once.

This PR is about extending that guarantee when sourcing from WorkQueue and Interest streams.

I understood, but I have a one last question. If we are using LimitsPolicy stream, is it always consume by order (FIFO)?

@MauriceVanVeen
Copy link
Member Author

If we are using LimitsPolicy stream, is it always consume by order (FIFO)?

Yep, the order is preserved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

2 participants