- 
                Notifications
    
You must be signed in to change notification settings  - Fork 28
 
Description
Summary
Introduces an API for publishing messages to a YDB topic within the same ACID transaction that modifies tables.
All messages written via TxWriter are bound to the current transaction and become visible atomically together with table changes after a successful commit. Message sending is performed in the background while the application continues to work with tables or other topics. Before committing, the YdbConnection waits for acknowledgements of all pending messages, and then commits.
Motivation
- Today, it is difficult to guarantee atomicity between updates to tables and publications on topics. Failures in the middle of the process cause divergence.
 - Developers often implement an outbox table and a background worker to approximate atomicity; this is complex and adds operational burden.
 - We want a simple, built-in way to publish to a topic as part of a DB transaction, with background sending and an explicit flush-before-commit step.
 
Proposed API (names are indicative)
Interfaces
public interface ITxTopicWriter<T>
{
    // Enqueue a message for this transaction.
    void Write(T value);
}Maybe unused interface:
internal interface IBufferedTxTopicWriter<T> : ITxTopicWriter<T>
{
    // Wait for all enqueued messages to be durably accepted in the context of the current transaction.
    // CommitAsync will call this automatically.
    Task FlushAsync(CancellationToken cancellationToken = default);
}YdbConnection
public sealed class YdbConnection : DbConnection
{
    // Validates an active transaction, acquires a WriterSession from the pool that is compatible
    // with the current QuerySession, and binds all writes to this transaction's ID.
    public ITxTopicWriter<T> CreateTxWriter<T>(string topicName, TxWriterOptions? options = null);
}Usage example
await ydbDataSource.ExecuteInTransactionAsync(async (ydbConnection, ct) =>
{
    var txWriter1 = ydbConnection.CreateTxWriter<string>("topic_1");
    var txWriter2 = ydbConnection.CreateTxWriter<string>("topic_another");
    for (var i = 0; i < selectedCount; i++)
        txWriter1.Write("Example payload: " + i);
    for (var i = 0; i < selectedCount; i++)
        txWriter2.Write("Example payload: " + i);
    await ydbConnection.ExecuteAsync(
        "INSERT INTO Users(Id, Name, Email) VALUES (@Id, @Name, @Email)",
        new User { Id = 1, Name = "Name, Email = "Email" }, ct);
    await ydbConnection.ExecuteAsync(
        "INSERT INTO Users(Id, Name, Email) VALUES (@Id, @Name, @Email)",
        new User { Id = 2, Name = "New_Name", Email = "New_Email" }, ct);
    // No explicit Flush needed; the transaction will Flush both writers, then Commit.
});Reference
- Go / Python topic SDK.
 - Java is not a stable solution. There will be a new API in the next versions.
 
Core Concepts
- Transaction source: The TopicService has no native transactions. Transactional publishing uses a QueryService transaction, which is created by a QuerySession and has a transaction ID.
 - Prerequisite for Write: transactional topic Write requires an active QueryService transaction. If none exists, 
BeginTransactionmust be executed before sending (by the library). - Placement: 
CreateTxWriteris onYdbConnection(it owns QuerySession and the transaction context) and binds the writer to the current txId. - Lazy start: Prefer starting the transaction on 
CreateTxWriterif none is active; starting on first Write is acceptable but must remain single and idempotent under concurrency. - Attach if exists: If a transaction is already active (e.g., after table reads/writes), 
CreateTxWriterattaches to it; no new transaction is started. - Non-blocking writes: Write enqueues to a buffer and triggers background sending bound to the txId; messages stay invisible until commit and are dropped on rollback.
 - Flush before commit: YdbConnection tracks all TxWriters in the scope and Flushes them before Commit. If any Flush fails or is canceled, commit is not attempted and the tx is rolled back.
 - Thread-safety: One active transaction per connection; concurrent CreateTxWriter/Write calls must coalesce to a single BeginTransaction.
 - WriterSession pool: Rent WriterSession from the writer pool, rebind to a new txId as needed, clear buffers after aborted tx, and quarantine unhealthy sessions. A WriterSession is returned to the pool only after a successful commit; on rollback it is cleaned and either quarantined or disposed per policy.
 
TxWriter: API design with void Write and buffer backpressure/overflow handling
Goal and context
- Provide a simple API: void Write(TValue value) with background sending; Commit waits for confirmed delivery.
 - Transactional topic publishing requires an active QueryService transaction; if none exists, BeginTransaction must occur before the first send.
 
Constraints and risks
- Send buffer has a default cap (e.g., ~64 MiB per writer).
 - Blocking when full (as in Go/Python) is risky for web/thread-pooled apps: latency spikes, thread starvation.
 - Removing limits blindly can cause OOM under load.
 
Overflow strategies
- 
Block/sleep (Go/Python-style)
- Pros: simple; natural throttling.
- Cons: hidden blocking; harmful in server scenarios. - 
Throw OverflowException (strongly recommended default)
- Pros: predictable for servers; keeps memory bounded; caller can Flush/retry/back off.
- Cons: requires exception handling and retry logic. - 
TryWrite (non-throwing)
- Pros: returns bool; caller decides to Flush/back off/drop; cheap on hot path.
- Cons: adds caller-side control flow. - 
WriteAsync
- Pros: async backpressure; ideal for ASP.NET and async pipelines; no thread blocking.
- Cons: less ergonomic than void; introduces async into producer path. - 
“Unlimited” buffer (opt-in only)
- Pros: if data is already in heap, client can stream it out in batches.
- Cons: high OOM risk; only with explicit opt-in and global safety caps. 
Blocking/sleeping the producer to handle buffer pressure is not a solution: in .NET it starves the thread pool, creates unbounded latency and timeouts, and can stall the whole app; and it doesn’t even save memory—message payloads are already allocated on the managed heap, you just hold them longer.