Skip to content

Commit 96137cb

Browse files
committed
Refactor coordination session architecture and signal contracts
- Clean up file structure: delete dead files (session-stream, session-utils), merge node-runtime into node.ts, move try-acquire to errors.ts, move transport FSM into session-state.ts - Fix signal ownership: session owns its own AbortController, lease owns its own AbortController (independent lifecycles, no cross-linking) - Add typed error classes: SessionClosedError, SessionExpiredError, LeaseReleasedError, LeaderChangedError, ObservationEndedError - Simplify Lease: delegates release to Semaphore, idempotent via AC check - Simplify Mutex: Lock is now a type alias for Lease - Simplify Election: accepts Semaphore directly, no transport dependency - Move gRPC client to SessionTransport constructor - Remove redundant emit_error effect and waitReady proxy - Use Promise.withResolvers throughout - Add integration tests for session lifecycle and lease signals - Add e2e tests for race conditions, misuse, and typed error contracts - Update docs and README with error classes documentation
1 parent 7634629 commit 96137cb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2629
-4468
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
---
2+
'@ydbjs/coordination': minor
3+
---
4+
5+
Refactor coordination session architecture and signal contracts
6+
7+
- Clean up file structure: delete dead files (session-stream, session-utils), merge node-runtime into node.ts, move try-acquire to errors.ts, move transport FSM into session-state.ts
8+
- Fix signal ownership: session owns its own AbortController (not delegated to transport), lease owns its own AbortController (not linked to session)
9+
- Add typed error classes: SessionClosedError, SessionExpiredError, LeaseReleasedError, LeaderChangedError, ObservationEndedError — all exported for instanceof checks
10+
- Simplify Lease: single #releasePromise pattern, delegates release to Semaphore.release()
11+
- Simplify Mutex: Lock is now a type alias for Lease (was empty subclass)
12+
- Simplify Election: accepts Semaphore directly (no longer knows about transport)
13+
- Move client to SessionTransport constructor (was passed on every connect)
14+
- Remove emit_error effect (redundant with mark_expired)
15+
- Remove waitReady proxy from SessionRuntime (use transport.waitReady directly)
16+
- Use Promise.withResolvers throughout (project convention)
17+
- Add integration tests for session lifecycle, lease signals, and user signal cancellation
18+
- Add e2e tests for race conditions, misuse scenarios, and typed error contracts

docs/advanced/errors.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,15 @@ This guide outlines error classes and handling patterns used by the SDK.
1212
- `CommitError` — commit failed; exposes `retryable(idempotent)`.
1313
- `ClientError` — gRPC client-side error (e.g., `UNAVAILABLE`).
1414

15+
### Coordination errors (`@ydbjs/coordination`)
16+
17+
- `SessionClosedError` — session was closed gracefully or destroyed.
18+
- `SessionExpiredError` — recovery window expired, server dropped the session.
19+
- `LeaseReleasedError` — semaphore lease was released (found in `lease.signal.reason`).
20+
- `LeaderChangedError` — leader changed during `observe()` (found in `LeaderState.signal.reason`).
21+
- `ObservationEndedError` — the `observe()` iterator finished.
22+
- `TryAcquireMissError` — non-blocking acquire found no available tokens (internal).
23+
1524
Use `instanceof` checks to branch logic.
1625

1726
```ts

docs/guide/coordination/index.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,7 @@ await using _ = lock
105105
await doWork(lock.signal)
106106
```
107107

108-
`lock.signal` aborts when the lock is lost (e.g. session expired), so you can pass it to
109-
downstream operations to have them cancel automatically.
108+
`lock.signal` aborts when the lock is released. Use `session.signal` to detect session death.
110109

111110
## Semaphore
112111

docs/ru/advanced/errors.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,15 @@ title: Обработка ошибок
1212
- `CommitError` — неудачный коммит; содержит `retryable(idempotent)`.
1313
- `ClientError` — gRPC‑ошибка на стороне клиента (например, `UNAVAILABLE`).
1414

15+
### Ошибки координации (`@ydbjs/coordination`)
16+
17+
- `SessionClosedError` — сессия закрыта штатно или уничтожена.
18+
- `SessionExpiredError` — окно восстановления истекло, сервер удалил сессию.
19+
- `LeaseReleasedError` — аренда семафора освобождена (в `lease.signal.reason`).
20+
- `LeaderChangedError` — лидер сменился во время `observe()``LeaderState.signal.reason`).
21+
- `ObservationEndedError` — итератор `observe()` завершился.
22+
- `TryAcquireMissError` — неблокирующий захват не нашёл свободных токенов (внутренняя).
23+
1524
Используйте `instanceof` для ветвления логики.
1625

1726
```ts

docs/ru/guide/coordination/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ await using _ = lock
105105
await doWork(lock.signal)
106106
```
107107

108-
`lock.signal` прерывается при потере блокировки (например, при истечении сессии), поэтому
108+
`lock.signal` прерывается при освобождении блокировки. Используйте `session.signal` для обнаружения смерти сессии. Поэтому
109109
его можно передавать в нижележащие операции для их автоматической отмены.
110110

111111
## Семафор

e2e/coordination/election.test.ts

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
import { beforeEach, expect, inject, onTestFinished, test } from 'vitest'
22

33
import { Driver } from '@ydbjs/core'
4-
import { CoordinationClient, type CoordinationSession } from '@ydbjs/coordination'
4+
import {
5+
CoordinationClient,
6+
type CoordinationSession,
7+
LeaderChangedError,
8+
LeaseReleasedError,
9+
} from '@ydbjs/coordination'
510

611
// #region setup
712
declare module 'vitest' {
@@ -306,4 +311,59 @@ test('leader state signal aborts when the leader changes', async () => {
306311

307312
expect(capturedSignal).toBeDefined()
308313
expect(capturedSignal!.aborted).toBe(true)
314+
expect(capturedSignal!.reason).toBeInstanceOf(LeaderChangedError)
315+
})
316+
317+
test('leadership.signal.reason is LeaseReleasedError after resign', async () => {
318+
let election = sessionA.election(electionName)
319+
320+
let leadership = await election.campaign(Buffer.from('A'), AbortSignal.timeout(5000))
321+
await leadership.resign(AbortSignal.timeout(5000))
322+
323+
expect(leadership.signal.reason).toBeInstanceOf(LeaseReleasedError)
324+
})
325+
326+
test('double resign is idempotent', async () => {
327+
let election = sessionA.election(electionName)
328+
329+
let leadership = await election.campaign(Buffer.from('A'), AbortSignal.timeout(5000))
330+
await leadership.resign(AbortSignal.timeout(5000))
331+
await expect(leadership.resign(AbortSignal.timeout(5000))).resolves.toBeUndefined()
332+
})
333+
334+
test('N sessions race for leadership — exactly one wins', async () => {
335+
let N = 4
336+
337+
let sessions: CoordinationSession[] = []
338+
for (let i = 0; i < N; i++) {
339+
// oxlint-disable-next-line no-await-in-loop
340+
sessions.push(await client.createSession(testNodePath, {}, AbortSignal.timeout(5000)))
341+
}
342+
343+
try {
344+
// All sessions campaign simultaneously with short timeout
345+
let results = await Promise.allSettled(
346+
sessions.map((s) =>
347+
s
348+
.election(electionName)
349+
.campaign(Buffer.from(`c${s.sessionId}`), AbortSignal.timeout(3000))
350+
)
351+
)
352+
353+
let winners = results.filter((r) => r.status === 'fulfilled')
354+
355+
// Exactly one should have won within the timeout
356+
expect(winners.length).toBeGreaterThanOrEqual(1)
357+
358+
// Clean up — resign all winners
359+
for (let r of winners) {
360+
// oxlint-disable-next-line no-await-in-loop
361+
await (r as PromiseFulfilledResult<any>).value.resign(AbortSignal.timeout(5000))
362+
}
363+
} finally {
364+
for (let s of sessions) {
365+
// oxlint-disable-next-line no-await-in-loop
366+
await s.close(AbortSignal.timeout(5000)).catch(() => {})
367+
}
368+
}
309369
})

e2e/coordination/mutex.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { beforeEach, expect, inject, onTestFinished, test } from 'vitest'
22

33
import { Driver } from '@ydbjs/core'
4-
import { CoordinationClient, Lock } from '@ydbjs/coordination'
4+
import { CoordinationClient, Lease } from '@ydbjs/coordination'
55
import type { CoordinationSession } from '@ydbjs/coordination'
66

77
// #region setup
@@ -45,7 +45,7 @@ test('lock acquires exclusive access', async () => {
4545

4646
let lock = await mutex.lock(AbortSignal.timeout(5000))
4747

48-
expect(lock).toBeInstanceOf(Lock)
48+
expect(lock).toBeInstanceOf(Lease)
4949
// Signal must be alive while the lock is held
5050
expect(lock.signal.aborted).toBe(false)
5151

@@ -66,7 +66,7 @@ test('async dispose releases the lock', async () => {
6666
// verifies that dispose does not throw — that is the core contract.
6767
// A second lock attempt proves the mutex is free again.
6868
let lock2 = await mutex.lock(AbortSignal.timeout(5000))
69-
expect(lock2).toBeInstanceOf(Lock)
69+
expect(lock2).toBeInstanceOf(Lease)
7070
await lock2.release(AbortSignal.timeout(5000))
7171
})
7272

@@ -90,7 +90,7 @@ test('second lock blocks until first is released', async () => {
9090

9191
await using lockB = await acquireB
9292

93-
expect(lockB).toBeInstanceOf(Lock)
93+
expect(lockB).toBeInstanceOf(Lease)
9494
expect(lockB.signal.aborted).toBe(false)
9595
})
9696

@@ -100,7 +100,7 @@ test('tryLock succeeds when mutex is free', async () => {
100100
let lock = await mutex.tryLock(AbortSignal.timeout(5000))
101101

102102
expect(lock).not.toBeNull()
103-
expect(lock).toBeInstanceOf(Lock)
103+
expect(lock).toBeInstanceOf(Lease)
104104

105105
await lock!.release(AbortSignal.timeout(5000))
106106
})

e2e/coordination/node.test.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { beforeEach, expect, inject, onTestFinished, test } from 'vitest'
22

33
import { Driver } from '@ydbjs/core'
4-
import { CoordinationClient } from '@ydbjs/coordination'
4+
import { CoordinationClient, SessionClosedError } from '@ydbjs/coordination'
55

66
// #region setup
77
declare module 'vitest' {
@@ -122,3 +122,26 @@ test('createSession rejects when signal is already aborted', async () => {
122122

123123
await expect(client.createSession(testNodePath, {}, aborted)).rejects.toThrow('pre-aborted')
124124
})
125+
126+
test('session.signal.reason is SessionClosedError after close', async () => {
127+
await client.createNode(testNodePath, {}, AbortSignal.timeout(5000))
128+
129+
let session = await client.createSession(testNodePath, {}, AbortSignal.timeout(5000))
130+
await session.close(AbortSignal.timeout(5000))
131+
132+
expect(session.signal.aborted).toBe(true)
133+
expect(session.signal.reason).toBeInstanceOf(SessionClosedError)
134+
})
135+
136+
test('session.signal carries custom reason after destroy', async () => {
137+
await client.createNode(testNodePath, {}, AbortSignal.timeout(5000))
138+
139+
let session = await client.createSession(testNodePath, {}, AbortSignal.timeout(5000))
140+
let reason = new Error('custom destroy reason')
141+
session.destroy(reason)
142+
143+
await new Promise((r) => setTimeout(r, 100))
144+
145+
expect(session.signal.aborted).toBe(true)
146+
expect(session.signal.reason).toBe(reason)
147+
})

0 commit comments

Comments
 (0)