|
| 1 | +import { beforeEach, expect, inject, onTestFinished, test } from 'vitest' |
| 2 | + |
| 3 | +import { Driver } from '@ydbjs/core' |
| 4 | +import { CoordinationClient, type CoordinationSession } from '@ydbjs/coordination' |
| 5 | + |
| 6 | +// #region setup |
| 7 | +declare module 'vitest' { |
| 8 | + export interface ProvidedContext { |
| 9 | + connectionString: string |
| 10 | + } |
| 11 | +} |
| 12 | + |
| 13 | +let driver = new Driver(inject('connectionString'), { |
| 14 | + 'ydb.sdk.enable_discovery': false, |
| 15 | +}) |
| 16 | + |
| 17 | +await driver.ready() |
| 18 | + |
| 19 | +let client = new CoordinationClient(driver) |
| 20 | + |
| 21 | +let testNodePath: string |
| 22 | +let electionName: string |
| 23 | +let sessionA: CoordinationSession |
| 24 | + |
| 25 | +beforeEach(async () => { |
| 26 | + let suffix = `${Date.now()}-${Math.floor(Math.random() * 0xffff).toString(16)}` |
| 27 | + testNodePath = `/local/test-coord-elec-${suffix}` |
| 28 | + electionName = `election-${suffix}` |
| 29 | + |
| 30 | + await client.createNode(testNodePath, {}, AbortSignal.timeout(5000)) |
| 31 | + sessionA = await client.createSession(testNodePath, {}, AbortSignal.timeout(5000)) |
| 32 | + |
| 33 | + // Election is backed by a regular semaphore with limit=1. |
| 34 | + // The semaphore must exist before any campaign or observe call. |
| 35 | + await sessionA.semaphore(electionName).create({ limit: 1 }, AbortSignal.timeout(5000)) |
| 36 | + |
| 37 | + onTestFinished(async () => { |
| 38 | + await sessionA.close(AbortSignal.timeout(5000)).catch(() => {}) |
| 39 | + await client.dropNode(testNodePath, AbortSignal.timeout(5000)).catch(() => {}) |
| 40 | + }) |
| 41 | +}) |
| 42 | +// #endregion |
| 43 | + |
| 44 | +test('leader returns null when no leader exists', async () => { |
| 45 | + let election = sessionA.election(electionName) |
| 46 | + |
| 47 | + let info = await election.leader(AbortSignal.timeout(5000)) |
| 48 | + |
| 49 | + expect(info).toBeNull() |
| 50 | +}) |
| 51 | + |
| 52 | +test('campaign becomes leader', async () => { |
| 53 | + let election = sessionA.election(electionName) |
| 54 | + let payload = Buffer.from('candidate-A') |
| 55 | + |
| 56 | + await using _leadership = await election.campaign(payload, AbortSignal.timeout(5000)) |
| 57 | + |
| 58 | + let info = await election.leader(AbortSignal.timeout(5000)) |
| 59 | + |
| 60 | + expect(info).not.toBeNull() |
| 61 | + expect(Buffer.from(info!.data)).toEqual(payload) |
| 62 | +}) |
| 63 | + |
| 64 | +test('resign releases leadership', async () => { |
| 65 | + let election = sessionA.election(electionName) |
| 66 | + |
| 67 | + let leadership = await election.campaign(Buffer.from('A'), AbortSignal.timeout(5000)) |
| 68 | + await leadership.resign(AbortSignal.timeout(5000)) |
| 69 | + |
| 70 | + let info = await election.leader(AbortSignal.timeout(5000)) |
| 71 | + |
| 72 | + expect(info).toBeNull() |
| 73 | +}) |
| 74 | + |
| 75 | +test('async dispose resigns', async () => { |
| 76 | + let election = sessionA.election(electionName) |
| 77 | + |
| 78 | + { |
| 79 | + await using _leadership = await election.campaign( |
| 80 | + Buffer.from('A'), |
| 81 | + AbortSignal.timeout(5000) |
| 82 | + ) |
| 83 | + } |
| 84 | + |
| 85 | + // After dispose the semaphore is free — a second campaign must succeed immediately |
| 86 | + await using leadership2 = await election.campaign(Buffer.from('A2'), AbortSignal.timeout(5000)) |
| 87 | + |
| 88 | + expect(leadership2.signal.aborted).toBe(false) |
| 89 | +}) |
| 90 | + |
| 91 | +test('proclaim updates semaphore data', async () => { |
| 92 | + // proclaim calls updateSemaphore which writes the semaphore-level data field. |
| 93 | + // That is separate from the owner-level acquire data stored at campaign time. |
| 94 | + let election = sessionA.election(electionName) |
| 95 | + |
| 96 | + await using _leadership = await election.campaign( |
| 97 | + Buffer.from('initial'), |
| 98 | + AbortSignal.timeout(5000) |
| 99 | + ) |
| 100 | + |
| 101 | + await _leadership.proclaim(Buffer.from('updated'), AbortSignal.timeout(5000)) |
| 102 | + |
| 103 | + // Read the semaphore's top-level data field — that is what proclaim writes to. |
| 104 | + let description = await sessionA.semaphore(electionName).describe({}, AbortSignal.timeout(5000)) |
| 105 | + |
| 106 | + expect(Buffer.from(description.data)).toEqual(Buffer.from('updated')) |
| 107 | +}) |
| 108 | + |
| 109 | +test('second campaign waits until first leader resigns', async () => { |
| 110 | + await using sessionB = await client.createSession(testNodePath, {}, AbortSignal.timeout(5000)) |
| 111 | + |
| 112 | + let electionA = sessionA.election(electionName) |
| 113 | + let electionB = sessionB.election(electionName) |
| 114 | + |
| 115 | + // Session A becomes leader |
| 116 | + await using leadershipA = await electionA.campaign(Buffer.from('A'), AbortSignal.timeout(5000)) |
| 117 | + |
| 118 | + // Session B starts campaigning — it blocks until A resigns |
| 119 | + let campaignB = electionB.campaign(Buffer.from('B'), AbortSignal.timeout(10000)) |
| 120 | + |
| 121 | + // Let B's request reach the server before A resigns |
| 122 | + await new Promise((resolve) => setTimeout(resolve, 200)) |
| 123 | + |
| 124 | + await leadershipA.resign(AbortSignal.timeout(5000)) |
| 125 | + |
| 126 | + await using _leadershipB = await campaignB |
| 127 | + |
| 128 | + let info = await electionB.leader(AbortSignal.timeout(5000)) |
| 129 | + |
| 130 | + expect(info).not.toBeNull() |
| 131 | + expect(Buffer.from(info!.data)).toEqual(Buffer.from('B')) |
| 132 | +}) |
| 133 | + |
| 134 | +test('observe detects when a leader appears', async () => { |
| 135 | + // sessionA observes, sessionB campaigns — separate sessions keep request |
| 136 | + // registries independent so the watch and the acquire do not interfere. |
| 137 | + await using sessionB = await client.createSession(testNodePath, {}, AbortSignal.timeout(5000)) |
| 138 | + |
| 139 | + let ctrl = new AbortController() |
| 140 | + let observedData: Buffer[] = [] |
| 141 | + |
| 142 | + let observing = (async () => { |
| 143 | + for await (let state of sessionA |
| 144 | + .election(electionName) |
| 145 | + .observe(AbortSignal.any([ctrl.signal, AbortSignal.timeout(10000)]))) { |
| 146 | + observedData.push(Buffer.from(state.data)) |
| 147 | + ctrl.abort() |
| 148 | + break |
| 149 | + } |
| 150 | + })() |
| 151 | + |
| 152 | + // Give the watch stream time to register with the server before campaigning |
| 153 | + await new Promise((resolve) => setTimeout(resolve, 200)) |
| 154 | + |
| 155 | + await using _leadership = await sessionB |
| 156 | + .election(electionName) |
| 157 | + .campaign(Buffer.from('leader-B'), AbortSignal.timeout(5000)) |
| 158 | + |
| 159 | + await observing |
| 160 | + |
| 161 | + expect(observedData).toHaveLength(1) |
| 162 | + expect(observedData[0]).toEqual(Buffer.from('leader-B')) |
| 163 | +}) |
| 164 | + |
| 165 | +test('observe sets isMe true when the observing session is the leader', async () => { |
| 166 | + let election = sessionA.election(electionName) |
| 167 | + |
| 168 | + // Campaign first so the first watch snapshot already has an owner |
| 169 | + await using _leadership = await election.campaign( |
| 170 | + Buffer.from('leader-A'), |
| 171 | + AbortSignal.timeout(5000) |
| 172 | + ) |
| 173 | + |
| 174 | + let firstState: { isMe: boolean; data: Buffer } | undefined |
| 175 | + |
| 176 | + for await (let state of election.observe(AbortSignal.timeout(5000))) { |
| 177 | + firstState = { isMe: state.isMe, data: Buffer.from(state.data) } |
| 178 | + break |
| 179 | + } |
| 180 | + |
| 181 | + expect(firstState).toBeDefined() |
| 182 | + expect(firstState!.isMe).toBe(true) |
| 183 | + expect(firstState!.data).toEqual(Buffer.from('leader-A')) |
| 184 | +}) |
| 185 | + |
| 186 | +test('observe tracks leader change and no-leader transition', async () => { |
| 187 | + // Three sessions: A = initial leader, B = dedicated observer, C = second leader. |
| 188 | + // Keeping observe and campaign on separate sessions avoids request registry conflicts. |
| 189 | + await using sessionB = await client.createSession(testNodePath, {}, AbortSignal.timeout(5000)) |
| 190 | + await using sessionC = await client.createSession(testNodePath, {}, AbortSignal.timeout(5000)) |
| 191 | + |
| 192 | + // A becomes leader before observation starts so the first snapshot is non-empty |
| 193 | + await using leadershipA = await sessionA |
| 194 | + .election(electionName) |
| 195 | + .campaign(Buffer.from('A'), AbortSignal.timeout(5000)) |
| 196 | + |
| 197 | + let states: Array<{ data: Buffer; isMe: boolean }> = [] |
| 198 | + |
| 199 | + let observing = (async () => { |
| 200 | + for await (let state of sessionB |
| 201 | + .election(electionName) |
| 202 | + .observe(AbortSignal.timeout(15000))) { |
| 203 | + states.push({ data: Buffer.from(state.data), isMe: state.isMe }) |
| 204 | + // Collect: A is leader → no leader → C is leader |
| 205 | + if (states.length >= 3) break |
| 206 | + } |
| 207 | + })() |
| 208 | + |
| 209 | + // Give the watch stream time to register before triggering changes |
| 210 | + await new Promise((resolve) => setTimeout(resolve, 200)) |
| 211 | + |
| 212 | + // A resigns → observe yields no-leader state |
| 213 | + await leadershipA.resign(AbortSignal.timeout(5000)) |
| 214 | + |
| 215 | + // Small pause so the server sends the no-leader notification as its own event |
| 216 | + // before C acquires — otherwise the two changes may arrive in one batch. |
| 217 | + await new Promise((resolve) => setTimeout(resolve, 200)) |
| 218 | + |
| 219 | + // C campaigns → observe yields C-is-leader state |
| 220 | + await using _leadershipC = await sessionC |
| 221 | + .election(electionName) |
| 222 | + .campaign(Buffer.from('C'), AbortSignal.timeout(5000)) |
| 223 | + |
| 224 | + await observing |
| 225 | + |
| 226 | + // State 1: A is leader (observed by B, so isMe=false) |
| 227 | + expect(states[0]!.data).toEqual(Buffer.from('A')) |
| 228 | + expect(states[0]!.isMe).toBe(false) |
| 229 | + |
| 230 | + // State 2: no leader |
| 231 | + expect(states[1]!.data).toHaveLength(0) |
| 232 | + expect(states[1]!.isMe).toBe(false) |
| 233 | + |
| 234 | + // State 3: C is leader (observed by B, so isMe=false) |
| 235 | + expect(states[2]!.data).toEqual(Buffer.from('C')) |
| 236 | + expect(states[2]!.isMe).toBe(false) |
| 237 | +}) |
| 238 | + |
| 239 | +test('observe ends gracefully when signal is already aborted', async () => { |
| 240 | + let election = sessionA.election(electionName) |
| 241 | + let ctrl = new AbortController() |
| 242 | + ctrl.abort() |
| 243 | + |
| 244 | + let iterations = 0 |
| 245 | + |
| 246 | + // When the signal is already aborted the underlying watchSemaphore may throw |
| 247 | + // the abort reason before yielding. Both outcomes are acceptable — the |
| 248 | + // important contract is that no items are yielded. |
| 249 | + try { |
| 250 | + for await (let _ of election.observe(ctrl.signal)) { |
| 251 | + iterations++ |
| 252 | + } |
| 253 | + } catch { |
| 254 | + // AbortError propagated from watchSemaphore — expected and intentionally ignored |
| 255 | + } |
| 256 | + |
| 257 | + expect(iterations).toBe(0) |
| 258 | +}) |
| 259 | + |
| 260 | +test('leadership signal aborts when leader resigns', async () => { |
| 261 | + let election = sessionA.election(electionName) |
| 262 | + |
| 263 | + let leadership = await election.campaign(Buffer.from('A'), AbortSignal.timeout(5000)) |
| 264 | + |
| 265 | + // Signal must be alive while holding leadership |
| 266 | + expect(leadership.signal.aborted).toBe(false) |
| 267 | + |
| 268 | + await leadership.resign(AbortSignal.timeout(5000)) |
| 269 | + |
| 270 | + // After resign the underlying lease is released — signal must abort |
| 271 | + expect(leadership.signal.aborted).toBe(true) |
| 272 | +}) |
| 273 | + |
| 274 | +test('leader state signal aborts when the leader changes', async () => { |
| 275 | + // A campaigns. B observes and collects two states: A-is-leader → no-leader. |
| 276 | + // When observe yields the second state, it aborts the first state's signal. |
| 277 | + await using sessionB = await client.createSession(testNodePath, {}, AbortSignal.timeout(5000)) |
| 278 | + |
| 279 | + await using leadershipA = await sessionA |
| 280 | + .election(electionName) |
| 281 | + .campaign(Buffer.from('A'), AbortSignal.timeout(5000)) |
| 282 | + |
| 283 | + let capturedSignal: AbortSignal | undefined |
| 284 | + |
| 285 | + let observing = (async () => { |
| 286 | + for await (let state of sessionB |
| 287 | + .election(electionName) |
| 288 | + .observe(AbortSignal.timeout(10000))) { |
| 289 | + if (!capturedSignal) { |
| 290 | + // First state: A is leader — capture the signal and keep iterating |
| 291 | + capturedSignal = state.signal |
| 292 | + continue |
| 293 | + } |
| 294 | + // Second state arrived: first state's signal must be aborted now |
| 295 | + break |
| 296 | + } |
| 297 | + })() |
| 298 | + |
| 299 | + // Give the watch stream time to register before triggering the change |
| 300 | + await new Promise((resolve) => setTimeout(resolve, 200)) |
| 301 | + |
| 302 | + // A resigns — observe yields the second state (no leader), aborting the first state's signal |
| 303 | + await leadershipA.resign(AbortSignal.timeout(5000)) |
| 304 | + |
| 305 | + await observing |
| 306 | + |
| 307 | + expect(capturedSignal).toBeDefined() |
| 308 | + expect(capturedSignal!.aborted).toBe(true) |
| 309 | +}) |
0 commit comments