Skip to content

Commit 32b38d9

Browse files
committed
Add debug logging to @ydbjs/coordination
- Add 'coordination' category to @ydbjs/debug loggers - Log session lifecycle: connecting, ready, reconnecting, expiry, close - Log all semaphore operations with meaningful context - Log retry when a request is interrupted by reconnect - Add Semaphore, Mutex, Election scopes for targeted DEBUG filtering - Use human-readable messages describing what is happening, not what code runs
1 parent 7f91188 commit 32b38d9

File tree

8 files changed

+141
-13
lines changed

8 files changed

+141
-13
lines changed

packages/coordination/src/client.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import type { Driver } from '@ydbjs/core'
2+
import { loggers } from '@ydbjs/debug'
23

34
import { CoordinationNodeRuntime } from './runtime/node-runtime.js'
45
import { getSessionRuntime } from './internal/session-runtime.js'
56
import type { CoordinationNodeConfig, CoordinationNodeDescription } from './runtime/node-runtime.js'
67
import { CoordinationSession } from './session.js'
78

9+
let dbg = loggers.coordination.extend('client')
10+
811
export interface SessionOptions {
912
description?: string
1013
recoveryWindow?: number
@@ -46,12 +49,15 @@ export class CoordinationClient {
4649
throw signal.reason
4750
}
4851

52+
dbg.log('creating session on %s', path)
4953
let session = new CoordinationSession(this.#driver, { path, ...options }, signal)
5054

5155
try {
5256
await getSessionRuntime(session).waitReady(signal)
57+
dbg.log('session ready on %s (id=%s)', path, session.sessionId)
5358
return session
5459
} catch (error) {
60+
dbg.log('failed to open session on %s: %O', path, error)
5561
session.destroy(error)
5662
throw error
5763
}
@@ -62,6 +68,7 @@ export class CoordinationClient {
6268
options?: SessionOptions,
6369
signal?: AbortSignal
6470
): AsyncIterable<CoordinationSession> {
71+
dbg.log('opening persistent session on %s', path)
6572
for (;;) {
6673
if (signal?.aborted) {
6774
return
@@ -76,6 +83,8 @@ export class CoordinationClient {
7683
if (!shouldOpenNext) {
7784
return
7885
}
86+
87+
dbg.log('session expired on %s, reopening', path)
7988
}
8089
}
8190

packages/coordination/src/election.ts

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
import { loggers } from '@ydbjs/debug'
12
import { getSessionRuntime } from './internal/session-runtime.js'
23
import { Lease, Semaphore } from './semaphore.js'
34
import type { CoordinationSession } from './session.js'
45

6+
let dbg = loggers.coordination.extend('election')
7+
58
export interface LeaderInfo {
69
data: Uint8Array
710
}
@@ -33,6 +36,7 @@ export class Leadership implements AsyncDisposable {
3336
async proclaim(data: Uint8Array, signal?: AbortSignal): Promise<void> {
3437
throwIfAborted(signal)
3538

39+
dbg.log('proclaiming leadership on %s (%d bytes)', this.#name, data.byteLength)
3640
return getSessionRuntime(this.#session).updateSemaphore(this.#name, data, signal)
3741
}
3842

@@ -41,6 +45,7 @@ export class Leadership implements AsyncDisposable {
4145
return
4246
}
4347

48+
dbg.log('resigning from leadership on %s', this.#name)
4449
this.#resigned = true
4550
await this.#lease.release(signal)
4651
}
@@ -64,13 +69,16 @@ export class Election {
6469
}
6570

6671
async campaign(data: Uint8Array, signal?: AbortSignal): Promise<Leadership> {
72+
dbg.log('campaigning for leadership on %s', this.#name)
6773
let semaphore = new Semaphore(this.#session, this.#name)
6874
let lease = await semaphore.acquire({ count: 1, data }, signal)
6975

76+
dbg.log('became leader on %s', this.#name)
7077
return new Leadership(lease, this.#name, this.#session)
7178
}
7279

7380
async *observe(signal?: AbortSignal): AsyncIterable<LeaderState> {
81+
dbg.log('observing leadership changes on %s', this.#name)
7482
let previousLeader: { sessionId: bigint; orderId: bigint } | null = null
7583
// Tracks the AbortController for the currently live LeaderState so we can
7684
// abort it as soon as the leader changes, before yielding the next state.
@@ -98,6 +106,7 @@ export class Election {
98106
currentController = new AbortController()
99107

100108
if (!owner) {
109+
dbg.log('no leader on %s', this.#name)
101110
yield {
102111
data: emptyBytes,
103112
isMe: false,
@@ -106,30 +115,40 @@ export class Election {
106115
continue
107116
}
108117

118+
let isMe =
119+
this.#session.sessionId !== null && owner.sessionId === this.#session.sessionId
120+
dbg.log(
121+
'leader changed on %s (sessionId=%s, isMe=%s)',
122+
this.#name,
123+
owner.sessionId,
124+
isMe
125+
)
109126
yield {
110127
data: owner.data,
111-
isMe:
112-
this.#session.sessionId !== null &&
113-
owner.sessionId === this.#session.sessionId,
128+
isMe,
114129
signal: currentController.signal,
115130
}
116131
}
117132
} finally {
118133
// Ensure the last yielded state's signal is aborted when iteration ends
119134
// so consumers relying on it for cancellation are always unblocked.
135+
dbg.log('stopped observing %s', this.#name)
120136
currentController?.abort(new Error('Election observation ended'))
121137
}
122138
}
123139

124140
async leader(signal?: AbortSignal): Promise<LeaderInfo | null> {
141+
dbg.log('reading current leader on %s', this.#name)
125142
let semaphore = new Semaphore(this.#session, this.#name)
126143
let description = await semaphore.describe({ owners: true }, signal)
127144
let owner = description.owners?.[0]
128145

129146
if (!owner) {
147+
dbg.log('no current leader on %s', this.#name)
130148
return null
131149
}
132150

151+
dbg.log('current leader on %s is session %s', this.#name, owner.sessionId)
133152
return {
134153
data: owner.data,
135154
}

packages/coordination/src/mutex.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import type { CoordinationSession } from './session.js'
2+
import { loggers } from '@ydbjs/debug'
23
import { getSessionRuntime } from './internal/session-runtime.js'
34
import { isTryAcquireMiss } from './internal/try-acquire.js'
45
import { Lease } from './semaphore.js'
56
import type { SessionRuntime } from './runtime/session-runtime.js'
67

8+
let dbg = loggers.coordination.extend('mutex')
9+
710
// Ephemeral semaphores in YDB have a server-hardcoded limit of MAX_UINT64.
811
// Mutex exclusivity is achieved by acquiring all tokens at once — no other
912
// session can acquire even a single token while they are all held.
@@ -29,26 +32,31 @@ export class Mutex {
2932
}
3033

3134
async lock(signal?: AbortSignal): Promise<Lock> {
35+
dbg.log('waiting to acquire lock on %s', this.#name)
3236
let lease = await this.#runtime.acquireSemaphore(
3337
this.#name,
3438
{ count: mutexCapacity, ephemeral: true },
3539
signal
3640
)
3741

42+
dbg.log('lock acquired on %s', this.#name)
3843
return new Lock(this.#name, lease)
3944
}
4045

4146
async tryLock(signal?: AbortSignal): Promise<Lock | null> {
47+
dbg.log('trying to acquire lock on %s without waiting', this.#name)
4248
try {
4349
let lease = await this.#runtime.acquireSemaphore(
4450
this.#name,
4551
{ count: mutexCapacity, ephemeral: true, waitTimeout: 0 },
4652
signal
4753
)
4854

55+
dbg.log('lock acquired on %s', this.#name)
4956
return new Lock(this.#name, lease)
5057
} catch (error) {
5158
if (isTryAcquireMiss(error)) {
59+
dbg.log('%s is already locked, skipping', this.#name)
5260
return null
5361
}
5462

packages/coordination/src/runtime/node-runtime.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@ import {
1616
import { StatusIds_StatusCode } from '@ydbjs/api/operation'
1717
import type { Entry } from '@ydbjs/api/scheme'
1818
import type { Driver } from '@ydbjs/core'
19+
import { loggers } from '@ydbjs/debug'
1920
import { YDBError } from '@ydbjs/error'
2021
import type { Client } from 'nice-grpc'
2122

23+
let dbg = loggers.coordination.extend('node')
24+
2225
export interface CoordinationNodeConfig {
2326
selfCheckPeriod?: number
2427
sessionGracePeriod?: number
@@ -41,6 +44,7 @@ export class CoordinationNodeRuntime {
4144
}
4245

4346
async describe(path: string, signal?: AbortSignal): Promise<CoordinationNodeDescription> {
47+
dbg.log('reading configuration of coordination node at %s', path)
4448
let request = create(DescribeNodeRequestSchema, { path })
4549
let response = signal
4650
? await this.#client.describeNode(request, { signal })
@@ -72,6 +76,7 @@ export class CoordinationNodeRuntime {
7276
}
7377

7478
async create(path: string, cfg: CoordinationNodeConfig, signal?: AbortSignal): Promise<void> {
79+
dbg.log('creating coordination node at %s', path)
7580
let request = create(CreateNodeRequestSchema, { path, config: toNodeConfigMessage(cfg) })
7681
let response = signal
7782
? await this.#client.createNode(request, { signal })
@@ -88,6 +93,7 @@ export class CoordinationNodeRuntime {
8893
}
8994

9095
async alter(path: string, cfg: CoordinationNodeConfig, signal?: AbortSignal): Promise<void> {
96+
dbg.log('updating configuration of coordination node at %s', path)
9197
let request = create(AlterNodeRequestSchema, { path, config: toNodeConfigMessage(cfg) })
9298
let response = signal
9399
? await this.#client.alterNode(request, { signal })
@@ -104,6 +110,7 @@ export class CoordinationNodeRuntime {
104110
}
105111

106112
async drop(path: string, signal?: AbortSignal): Promise<void> {
113+
dbg.log('dropping coordination node at %s', path)
107114
let request = create(DropNodeRequestSchema, { path })
108115
let response = signal
109116
? await this.#client.dropNode(request, { signal })

0 commit comments

Comments
 (0)