Skip to content

Commit 4a8ebba

Browse files
committed
Fix seqNo renumbering bug and simplify TopicWriter API
- Fix bug where messages written before session initialization were not renumbered after receiving lastSeqNo from server - Remove return value from write() method (now returns void) to simplify API - Update both legacy writer and writer2 implementations - Add comprehensive tests for seqNo renumbering logic - Update documentation and migration guide in changeset Breaking changes: write() method no longer returns seqNo (minor version bump) Signed-off-by: Vladislav Polyakov <polRk@ydb.tech>
1 parent 2f6b298 commit 4a8ebba

File tree

15 files changed

+517
-91
lines changed

15 files changed

+517
-91
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
---
2+
'@ydbjs/topic': minor
3+
---
4+
5+
Fix seqNo renumbering bug in both writer implementations and simplify TopicWriter API.
6+
7+
**Bug fix:**
8+
9+
- Fixed issue where messages written before session initialization were not renumbered after receiving `lastSeqNo` from server. Previously, auto-generated seqNo started from 0 and were not updated when server provided actual `lastSeqNo`, causing seqNo conflicts. Now messages are properly renumbered to continue from server's `lastSeqNo + 1`.
10+
- Fixed in both `writer` (legacy) and `writer2` implementations
11+
12+
**API changes:**
13+
14+
- `TopicWriter.write()` no longer returns sequence number (now returns `void`) to simplify API and prevent confusion about temporary vs final seqNo values
15+
16+
**Migration guide:**
17+
18+
- If you were storing seqNo from `write()` return value, use `flush()` instead to get final seqNo:
19+
20+
```typescript
21+
// Before
22+
let seqNo = writer.write(data)
23+
24+
// After
25+
writer.write(data)
26+
let lastSeqNo = await writer.flush() // Get final seqNo
27+
```
28+
29+
- User-provided seqNo (via `extra.seqNo`) remain final and unchanged - no migration needed for this case.

packages/topic/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
"types": "dist/index.d.ts",
3131
"exports": {
3232
".": "./dist/index.js",
33+
"./codec": "./dist/codec.js",
3334
"./reader": "./dist/reader/index.js",
3435
"./writer": "./dist/writer/index.js",
35-
"./writer2": "./dist/writer2/index.js"
36+
"./writer2": "./dist/writer2/index.js",
37+
"./message": "./dist/message.js"
3638
},
3739
"engines": {
3840
"node": ">=20.19.0",

packages/topic/src/index.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,3 @@ export function topic(driver: Driver): TopicClient {
3939
},
4040
} as TopicClient
4141
}
42-
43-
export type { TopicTxReader } from './reader/index.js'
44-
export type { TopicTxWriter } from './writer/index.js'

packages/topic/src/writer/_init_reponse.ts

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,18 @@ export const _on_init_response = function on_init_response(
1818
readonly inflight: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight
1919
readonly lastSeqNo?: bigint // The last sequence number acknowledged by the server
2020
readonly throughputSettings: ThroughputSettings // Current throughput settings for the writer
21+
readonly isSeqNoProvided?: boolean // Whether user provided seqNo (manual mode)
2122
updateLastSeqNo: (seqNo: bigint) => void
2223
updateBufferSize: (bytes: bigint) => void // Function to update the buffer size
2324
},
2425
input: StreamWriteMessage_InitResponse
2526
) {
26-
if (!ctx.lastSeqNo) {
27-
// Store the last sequence number from the server.
28-
ctx.updateLastSeqNo(input.lastSeqNo)
29-
}
27+
let serverLastSeqNo = input.lastSeqNo || 0n
28+
let currentLastSeqNo = ctx.lastSeqNo
29+
let isFirstInit = currentLastSeqNo === undefined
30+
let lastSeqNoChanged = isFirstInit || currentLastSeqNo !== serverLastSeqNo
3031

32+
// Return inflight messages to buffer
3133
while (ctx.inflight.length > 0) {
3234
const message = ctx.inflight.pop()
3335
if (!message) {
@@ -38,5 +40,48 @@ export const _on_init_response = function on_init_response(
3840
ctx.updateBufferSize(BigInt(message.data.length))
3941
}
4042

41-
_flush(ctx) // Flush the buffer to send any pending messages.
43+
// If this is the first initialization or server provided a new lastSeqNo, and we're in auto seqNo mode,
44+
// renumber all messages in buffer to continue from serverLastSeqNo + 1
45+
// Always renumber on first init, even if currentLastSeqNo === serverLastSeqNo (messages written before init)
46+
// Also renumber if there are messages in buffer that were written before init (their seqNo start from 1, not serverLastSeqNo + 1)
47+
let finalLastSeqNo = serverLastSeqNo
48+
let shouldRenumber = false
49+
// Only renumber in auto mode (when user didn't provide seqNo)
50+
if (!ctx.isSeqNoProvided && ctx.buffer.length > 0) {
51+
if (isFirstInit) {
52+
// First initialization: always renumber messages written before init
53+
shouldRenumber = true
54+
} else if (lastSeqNoChanged) {
55+
// Reconnection: renumber if server's lastSeqNo changed
56+
shouldRenumber = true
57+
} else if (ctx.buffer.length > 0) {
58+
// Check if messages in buffer were written before init (seqNo start from 1, not serverLastSeqNo + 1)
59+
// If first message's seqNo is <= serverLastSeqNo, it was written before init and needs renumbering
60+
let firstMessageSeqNo = ctx.buffer[0]?.seqNo
61+
if (
62+
firstMessageSeqNo !== undefined &&
63+
firstMessageSeqNo <= serverLastSeqNo
64+
) {
65+
shouldRenumber = true
66+
}
67+
}
68+
}
69+
70+
if (shouldRenumber) {
71+
let nextSeqNo = serverLastSeqNo + 1n
72+
// Renumber all messages in buffer sequentially starting from serverLastSeqNo + 1
73+
for (let message of ctx.buffer) {
74+
message.seqNo = nextSeqNo
75+
nextSeqNo++
76+
}
77+
// Update lastSeqNo to the last renumbered seqNo so flush() returns correct value
78+
finalLastSeqNo = nextSeqNo - 1n
79+
ctx.updateLastSeqNo(finalLastSeqNo)
80+
} else if (lastSeqNoChanged) {
81+
// Store the last sequence number from the server if we didn't renumber
82+
ctx.updateLastSeqNo(serverLastSeqNo)
83+
}
84+
85+
// Flush the buffer to send any pending messages
86+
_flush(ctx)
4287
}

packages/topic/src/writer/_write.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,7 @@ export function _write(
3737
let seqNo = msg.seqNo ?? (ctx.lastSeqNo ?? 0n) + 1n
3838
let createdAt = timestampFromDate(msg.createdAt ?? new Date())
3939
let metadataItems = Object.entries(msg.metadataItems || {}).map(
40-
([key, value]) => ({
41-
key,
42-
value,
43-
})
40+
([key, value]) => ({ key, value })
4441
)
4542
let uncompressedSize = BigInt(data.length)
4643

@@ -54,7 +51,13 @@ export function _write(
5451

5552
ctx.buffer.push(message) // Store the message in the buffer
5653
ctx.updateBufferSize(BigInt(data.length)) // Update the buffer size
57-
ctx.updateLastSeqNo(seqNo) // Update the last sequence number
54+
55+
// Only update lastSeqNo if session is initialized (lastSeqNo is defined)
56+
// For messages written before session initialization, lastSeqNo will be updated
57+
// after renumbering in _on_init_response
58+
if (ctx.lastSeqNo !== undefined) {
59+
ctx.updateLastSeqNo(seqNo)
60+
}
5861

5962
return seqNo
6063
}

packages/topic/src/writer/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ export const createTopicWriter = function createTopicWriter(
238238
throughputSettings,
239239
updateLastSeqNo,
240240
updateBufferSize,
241+
isSeqNoProvided,
241242
...(options.tx && { tx: options.tx }),
242243
...(lastSeqNo && { lastSeqNo }),
243244
},

0 commit comments

Comments
 (0)