Skip to content

Commit 9123c13

Browse files
committed
Fix topic reader/writer disconnecting after Discovery.listEndpoints
When the driver refreshed its endpoint pool on a periodic discovery round, it closed and recreated gRPC channels for all known nodes. This caused active bidirectional streams (topic reader / writer) to receive a `CANCELLED` gRPC status, which was not treated as a retryable error — so the streams terminated instead of reconnecting.
1 parent 79ea6df commit 9123c13

File tree

11 files changed

+286
-38
lines changed

11 files changed

+286
-38
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
---
2+
"@ydbjs/retry": minor
3+
"@ydbjs/topic": patch
4+
---
5+
6+
Fix topic reader/writer disconnecting after Discovery.listEndpoints
7+
8+
When the driver refreshed its endpoint pool on a periodic discovery round,
9+
it closed and recreated gRPC channels for all known nodes. This caused active
10+
bidirectional streams (topic reader / writer) to receive a `CANCELLED` gRPC
11+
status, which was not treated as a retryable error — so the streams terminated
12+
instead of reconnecting.
13+
14+
Changes:
15+
16+
- **`@ydbjs/retry`**: added `isRetryableStreamError` and `defaultStreamRetryConfig`.
17+
Long-lived streaming RPCs should reconnect on `CANCELLED` and `UNAVAILABLE`
18+
in addition to the errors already handled by `isRetryableError`, because for
19+
streams those codes indicate a transport interruption rather than a semantic
20+
cancellation.
21+
22+
- **`@ydbjs/topic`**: reader (`_consume_stream`) and both writers (`writer`,
23+
`writer2`) now use `isRetryableStreamError` / `defaultStreamRetryConfig` so
24+
they transparently reconnect after a discovery-triggered channel replacement.
25+
Fixed a zombie-reader bug where `read()` would block forever if the retry
26+
budget was exhausted: the reader is now destroyed on unrecoverable stream
27+
errors so pending `read()` calls are unblocked immediately.

.oxlintrc.json

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,14 @@
1212
"unicorn/require-post-message-target-origin": "off",
1313
"yoda": "deny",
1414
"vitest/consistent-test-it": ["error", { "fn": "test" }],
15-
"vitest/max-nested-describe": ["error", 0]
15+
"vitest/max-nested-describe": ["error", 0],
16+
"vitest/valid-expect": [
17+
"error",
18+
{
19+
"maxArgs": 3,
20+
"minArgs": 1
21+
}
22+
]
1623
},
1724
"plugins": ["node", "import", "vitest", "promise", "typescript"],
1825
"ignorePatterns": ["**/node_modules/**", "**/dist/**"],

package-lock.json

Lines changed: 0 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
"build": "turbo run build --output-logs=errors-only",
2020
"attw": "turbo run attw",
2121
"test": "vitest --run",
22-
"lint": "oxlint",
22+
"lint": "oxlint --fix",
2323
"format": "prettier . --ignore-unknown --write",
2424
"version": "npx changeset version && npm install",
2525
"publish": "npx changeset publish",

packages/retry/src/index.ts

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { ClientError, Status } from 'nice-grpc'
88

99
import type { RetryConfig } from './config.js'
1010
import type { RetryContext } from './context.js'
11-
import { exponential, fixed } from './strategy.js'
11+
import { type RetryStrategy, exponential, fixed } from './strategy.js'
1212

1313
export * from './config.js'
1414
export * from './context.js'
@@ -137,6 +137,29 @@ export function isRetryableError(error: unknown, idempotent = false): boolean {
137137
return false
138138
}
139139

140+
/**
141+
* Determines whether an error from a long-lived gRPC stream should trigger
142+
* a reconnect attempt.
143+
*
144+
* Streaming RPCs differ from unary calls: a CANCELLED or UNAVAILABLE status
145+
* means the transport was interrupted (e.g. the server restarted, the
146+
* connection pool was refreshed after a discovery round), not that the
147+
* *operation* was semantically cancelled by the caller. We therefore always
148+
* reconnect on those codes, in addition to the errors handled by
149+
* {@link isRetryableError}.
150+
*/
151+
export function isRetryableStreamError(error: unknown): boolean {
152+
if (error instanceof ClientError) {
153+
return (
154+
error.code === Status.CANCELLED ||
155+
error.code === Status.UNAVAILABLE ||
156+
isRetryableError(error, true)
157+
)
158+
}
159+
160+
return isRetryableError(error, false)
161+
}
162+
140163
export const defaultRetryConfig: RetryConfig = {
141164
retry: isRetryableError,
142165
budget: Infinity,
@@ -179,3 +202,26 @@ export const defaultRetryConfig: RetryConfig = {
179202
return exponential(10)(ctx, cfg)
180203
},
181204
}
205+
206+
/**
207+
* Default retry configuration for long-lived gRPC streaming connections
208+
* (topic reader / writer).
209+
*
210+
* Extends {@link defaultRetryConfig} with reconnect logic for transient
211+
* transport errors ({@link isRetryableStreamError}).
212+
*/
213+
export const defaultStreamRetryConfig: RetryConfig = {
214+
...defaultRetryConfig,
215+
retry: isRetryableStreamError,
216+
strategy: (ctx, cfg) => {
217+
if (
218+
ctx.error instanceof ClientError &&
219+
(ctx.error.code === Status.CANCELLED ||
220+
ctx.error.code === Status.UNAVAILABLE)
221+
) {
222+
return exponential(10)(ctx, cfg)
223+
}
224+
225+
return (defaultRetryConfig.strategy as RetryStrategy)(ctx, cfg)
226+
},
227+
}

packages/topic/src/reader/_consume_stream.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { StatusIds_StatusCode } from '@ydbjs/api/operation'
33
import { TopicServiceDefinition } from '@ydbjs/api/topic'
44
import { loggers } from '@ydbjs/debug'
55
import { YDBError } from '@ydbjs/error'
6-
import { defaultRetryConfig, retry } from '@ydbjs/retry'
6+
import { defaultStreamRetryConfig, retry } from '@ydbjs/retry'
77

88
import { _send_init_request } from './_init_request.js'
99
import { _on_init_response } from './_init_response.js'
@@ -26,7 +26,7 @@ export let _consume_stream = async function consume_stream(
2626
let signal = state.controller.signal
2727
await state.driver.ready(signal)
2828

29-
await retry({ ...defaultRetryConfig, signal }, async (signal) => {
29+
await retry({ ...defaultStreamRetryConfig, signal }, async (signal) => {
3030
state.outgoingQueue.close()
3131
state.outgoingQueue.reset()
3232

packages/topic/src/reader/index.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,13 @@ export const createTopicReader = function createTopicReader(
6767
} catch (error) {
6868
if (!state.controller.signal.aborted) {
6969
dbg.log('error occurred while streaming: %O', error)
70+
// Stream died unexpectedly (retry budget exhausted or non-retryable
71+
// error). Destroy the reader so that any pending read() calls are
72+
// unblocked rather than polling forever.
73+
destroy(error)
7074
}
7175
} finally {
7276
dbg.log('stream closed')
73-
// Don't call destroy here - it's already being called by close/destroy
74-
// and calling it again would be a no-op anyway due to disposed check
7577
}
7678
})()
7779

packages/topic/src/writer/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
import type { Driver } from '@ydbjs/core'
1111
import { loggers } from '@ydbjs/debug'
1212
import { YDBError } from '@ydbjs/error'
13-
import { isRetryableError, retry } from '@ydbjs/retry'
13+
import { isRetryableStreamError, retry } from '@ydbjs/retry'
1414
import { backoff, combine, jitter } from '@ydbjs/retry/strategy'
1515

1616
import { type CompressionCodec, defaultCodecMap } from '../codec.js'
@@ -160,7 +160,7 @@ export const createTopicWriter = function createTopicWriter(
160160

161161
let retryConfig = options.retryConfig?.(signal)
162162
retryConfig ??= {
163-
retry: isRetryableError,
163+
retry: isRetryableStreamError,
164164
signal: signal,
165165
budget: Infinity,
166166
strategy: combine(backoff(50, 5000), jitter(50)),

packages/topic/src/writer2/machine.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import {
3333
TransactionIdentitySchema,
3434
} from '@ydbjs/api/topic'
3535
import type { Driver } from '@ydbjs/core'
36-
import { isRetryableError } from '@ydbjs/retry'
36+
import { isRetryableStreamError } from '@ydbjs/retry'
3737
import { assign, enqueueActions, sendTo, setup } from 'xstate'
3838
import { defaultCodecMap } from '../codec.js'
3939
import { WriterStream, type WriterStreamReceiveEvent } from './stream.js'
@@ -791,14 +791,14 @@ let writerMachineFactory = setup({
791791
},
792792
retryableError: ({ context }) => {
793793
if (context.lastError) {
794-
return isRetryableError(context.lastError, true)
794+
return isRetryableStreamError(context.lastError)
795795
}
796796

797797
return false
798798
},
799799
nonRetryableError: ({ context }) => {
800800
if (context.lastError) {
801-
return !isRetryableError(context.lastError, true)
801+
return !isRetryableStreamError(context.lastError)
802802
}
803803

804804
return false

0 commit comments

Comments
 (0)