From fbbef5abcfa4ddfa64c7989a734869314310ba6b Mon Sep 17 00:00:00 2001 From: Scott Norton Date: Wed, 20 Aug 2025 19:36:03 +0000 Subject: [PATCH 1/7] make taskmanager less optimistic --- packages/dds/task-manager/src/interfaces.ts | 5 +- packages/dds/task-manager/src/taskManager.ts | 155 ++++++++++-------- .../task-manager/src/test/taskManager.spec.ts | 13 +- 3 files changed, 92 insertions(+), 81 deletions(-) diff --git a/packages/dds/task-manager/src/interfaces.ts b/packages/dds/task-manager/src/interfaces.ts index 0534175b9a96..2c196ae1009a 100644 --- a/packages/dds/task-manager/src/interfaces.ts +++ b/packages/dds/task-manager/src/interfaces.ts @@ -24,7 +24,7 @@ export interface ITaskManagerEvents extends ISharedObjectEvents { /** * Fires when a task has been exclusively assigned to the client. * - * @remarks Does not account for known pending ops, but instead only reflects the current state. + * @remarks Does not account for known pending ops, but instead only reflects the current consensus state. * * @eventProperty */ @@ -161,8 +161,7 @@ export interface ITaskManager extends ISharedObject { abandon(taskId: string): void; /** - * Check whether this client is the current assignee for the task and there is no outstanding abandon op that - * would abandon the assignment. + * Check whether this client is the current assignee for the task based on the consensus state. * @param taskId - Identifier for the task */ assigned(taskId: string): boolean; diff --git a/packages/dds/task-manager/src/taskManager.ts b/packages/dds/task-manager/src/taskManager.ts index e0f3e0271744..cce5c2ce76dd 100644 --- a/packages/dds/task-manager/src/taskManager.ts +++ b/packages/dds/task-manager/src/taskManager.ts @@ -160,6 +160,7 @@ export class TaskManagerClass assert(pendingOp.type === "abandon", 0x07e /* "Unexpected op type" */); // Delete the pending, because we no longer have an outstanding op this.latestPendingOps.delete(taskId); + this.abandonWatcher.emit("abandon", taskId); } } @@ -190,12 +191,9 @@ export class TaskManagerClass assert(removed === messageId, 0x403 /* Removed complete op id does not match */); } - // For clients in queue, we need to remove them from the queue and raise the proper events. - if (!local) { - this.taskQueues.delete(taskId); - this.completedWatcher.emit("completed", taskId); - this.emit("completed", taskId); - } + this.taskQueues.delete(taskId); + this.completedWatcher.emit("completed", taskId); + this.emit("completed", taskId); }, ); @@ -293,8 +291,10 @@ export class TaskManagerClass * {@inheritDoc ITaskManager.volunteerForTask} */ public async volunteerForTask(taskId: string): Promise { - // If we have the lock, resolve immediately - if (this.assigned(taskId)) { + // If we are both queued and assigned, then we have the lock and do not + // have any pending abandon/complete ops. In this case we can resolve + // true immediately. + if (this.queued(taskId) && this.assigned(taskId)) { return true; } @@ -319,19 +319,28 @@ export class TaskManagerClass // This promise works even if we already have an outstanding volunteer op. const lockAcquireP = new Promise((resolve, reject) => { + const setupListeners = (): void => { + this.queueWatcher.on("queueChange", checkIfAcquiredLock); + this.abandonWatcher.on("abandon", checkIfAbandoned); + this.connectionWatcher.on("disconnect", rejectOnDisconnect); + this.completedWatcher.on("completed", checkIfCompleted); + }; + const removeListeners = (): void => { + this.queueWatcher.off("queueChange", checkIfAcquiredLock); + this.abandonWatcher.off("abandon", checkIfAbandoned); + this.connectionWatcher.off("disconnect", rejectOnDisconnect); + this.completedWatcher.off("completed", checkIfCompleted); + }; + const checkIfAcquiredLock = (eventTaskId: string): void => { if (eventTaskId !== taskId) { return; } - // Also check pending ops here because it's possible we are currently in the queue from a previous // lock attempt, but have an outstanding abandon AND the outstanding volunteer for this lock attempt. // If we reach the head of the queue based on the previous lock attempt, we don't want to resolve. - if (this.assigned(taskId) && !this.latestPendingOps.has(taskId)) { - this.queueWatcher.off("queueChange", checkIfAcquiredLock); - this.abandonWatcher.off("abandon", checkIfAbandoned); - this.connectionWatcher.off("disconnect", rejectOnDisconnect); - this.completedWatcher.off("completed", checkIfCompleted); + if (this.assigned(taskId)) { + removeListeners(); resolve(true); } }; @@ -340,19 +349,12 @@ export class TaskManagerClass if (eventTaskId !== taskId) { return; } - - this.queueWatcher.off("queueChange", checkIfAcquiredLock); - this.abandonWatcher.off("abandon", checkIfAbandoned); - this.connectionWatcher.off("disconnect", rejectOnDisconnect); - this.completedWatcher.off("completed", checkIfCompleted); + removeListeners(); reject(new Error("Abandoned before acquiring task assignment")); }; const rejectOnDisconnect = (): void => { - this.queueWatcher.off("queueChange", checkIfAcquiredLock); - this.abandonWatcher.off("abandon", checkIfAbandoned); - this.connectionWatcher.off("disconnect", rejectOnDisconnect); - this.completedWatcher.off("completed", checkIfCompleted); + removeListeners(); reject(new Error("Disconnected before acquiring task assignment")); }; @@ -360,21 +362,15 @@ export class TaskManagerClass if (eventTaskId !== taskId) { return; } - - this.queueWatcher.off("queueChange", checkIfAcquiredLock); - this.abandonWatcher.off("abandon", checkIfAbandoned); - this.connectionWatcher.off("disconnect", rejectOnDisconnect); - this.completedWatcher.off("completed", checkIfCompleted); + removeListeners(); resolve(false); }; - this.queueWatcher.on("queueChange", checkIfAcquiredLock); - this.abandonWatcher.on("abandon", checkIfAbandoned); - this.connectionWatcher.on("disconnect", rejectOnDisconnect); - this.completedWatcher.on("completed", checkIfCompleted); + setupListeners(); }); if (!this.queued(taskId)) { + // Only send the volunteer op if we are not already queued. this.submitVolunteerOp(taskId); } return lockAcquireP; @@ -396,6 +392,18 @@ export class TaskManagerClass this.submitVolunteerOp(taskId); }; + const setupListeners = (): void => { + this.abandonWatcher.on("abandon", checkIfAbandoned); + this.connectionWatcher.on("disconnect", disconnectHandler); + this.completedWatcher.on("completed", checkIfCompleted); + }; + const removeListeners = (): void => { + this.abandonWatcher.off("abandon", checkIfAbandoned); + this.connectionWatcher.off("disconnect", disconnectHandler); + this.connectionWatcher.off("connect", submitVolunteerOp); + this.completedWatcher.off("completed", checkIfCompleted); + }; + const disconnectHandler = (): void => { // Wait to be connected again and then re-submit volunteer op this.connectionWatcher.once("connect", submitVolunteerOp); @@ -405,12 +413,7 @@ export class TaskManagerClass if (eventTaskId !== taskId) { return; } - - this.abandonWatcher.off("abandon", checkIfAbandoned); - this.connectionWatcher.off("disconnect", disconnectHandler); - this.connectionWatcher.off("connect", submitVolunteerOp); - this.completedWatcher.off("completed", checkIfCompleted); - + removeListeners(); this.subscribedTasks.delete(taskId); }; @@ -418,18 +421,11 @@ export class TaskManagerClass if (eventTaskId !== taskId) { return; } - - this.abandonWatcher.off("abandon", checkIfAbandoned); - this.connectionWatcher.off("disconnect", disconnectHandler); - this.connectionWatcher.off("connect", submitVolunteerOp); - this.completedWatcher.off("completed", checkIfCompleted); - + removeListeners(); this.subscribedTasks.delete(taskId); }; - this.abandonWatcher.on("abandon", checkIfAbandoned); - this.connectionWatcher.on("disconnect", disconnectHandler); - this.completedWatcher.on("completed", checkIfCompleted); + setupListeners(); if (!this.isAttached()) { // Simulate auto-ack in detached scenario @@ -455,7 +451,11 @@ export class TaskManagerClass // If we are disconnected (and attached), wait to be connected and submit volunteer op disconnectHandler(); } else if (!this.assigned(taskId) && !this.queued(taskId)) { - submitVolunteerOp(); + const latestPendingOp = this.latestPendingOps.get(taskId); + if (latestPendingOp === undefined || latestPendingOp.type === "volunteer") { + // We don't need to send a volunteer op unless this task is not already a volunteer op. + submitVolunteerOp(); + } } this.subscribedTasks.add(taskId); } @@ -467,7 +467,7 @@ export class TaskManagerClass // Always allow abandon if the client is subscribed to allow clients to unsubscribe while disconnected. // Otherwise, we should check to make sure the client is both connected queued for the task before sending an // abandon op. - if (!this.subscribed(taskId) && !this.queued(taskId)) { + if (!this.queued(taskId) && !this.subscribed(taskId) && !this.connected) { // Nothing to do return; } @@ -480,11 +480,19 @@ export class TaskManagerClass return; } - // If we're subscribed but not queued, we don't need to submit an abandon op (probably offline) if (this.queued(taskId)) { + if (!this.assigned(taskId)) { + // If we try to abandon when queued but not assigned then we should emit to the abandonWatcher as this is + // not allowed and will throw an error in the volunteer promise. + this.abandonWatcher.emit("abandon", taskId); + } + this.submitAbandonOp(taskId); + } else if (this.subscribed(taskId) && !this.connected) { + // If we are subscribed, not queued, and offline, then we should still submit the + // abandon op/event to ensure we abandon the subscription when reconnecting. + this.abandonWatcher.emit("abandon", taskId); this.submitAbandonOp(taskId); } - this.abandonWatcher.emit("abandon", taskId); } /** @@ -496,11 +504,7 @@ export class TaskManagerClass } const currentAssignee = this.taskQueues.get(taskId)?.[0]; - return ( - currentAssignee !== undefined && - currentAssignee === this.clientId && - !this.latestPendingOps.has(taskId) - ); + return currentAssignee !== undefined && currentAssignee === this.clientId; } /** @@ -513,13 +517,14 @@ export class TaskManagerClass assert(this.clientId !== undefined, 0x07f /* "clientId undefined" */); - const clientQueue = this.taskQueues.get(taskId); - // If we have no queue for the taskId, then no one has signed up for it. - return ( - ((clientQueue?.includes(this.clientId) ?? false) && - !this.latestPendingOps.has(taskId)) || - this.latestPendingOps.get(taskId)?.type === "volunteer" - ); + const inQueue = this.taskQueues.get(taskId)?.includes(this.clientId) ?? false; + const latestPendingOp = this.latestPendingOps.get(taskId); + const isPendingVolunteer = latestPendingOp?.type === "volunteer"; + const isPendingAbandonOrComplete = + latestPendingOp?.type === "abandon" || latestPendingOp?.type === "complete"; + // We return true if the client is either in queue already or the latest pending op for this task is a volunteer op. + // But we should always return false if the latest pending op is an abandon or complete op. + return (inQueue || isPendingVolunteer) && !isPendingAbandonOrComplete; } /** @@ -539,16 +544,17 @@ export class TaskManagerClass // If we are detached we will simulate auto-ack for the complete op. Therefore we only need to send the op if // we are attached. Additionally, we don't need to check if we are connected while detached. - if (this.isAttached()) { - if (!this.connected) { - throw new Error("Attempted to complete task in disconnected state"); - } - this.submitCompleteOp(taskId); + if (!this.isAttached()) { + this.taskQueues.delete(taskId); + this.completedWatcher.emit("completed", taskId); + this.emit("completed", taskId); + return; } - this.taskQueues.delete(taskId); - this.completedWatcher.emit("completed", taskId); - this.emit("completed", taskId); + if (!this.connected) { + throw new Error("Attempted to complete task in disconnected state"); + } + this.submitCompleteOp(taskId); } /** @@ -685,6 +691,13 @@ export class TaskManagerClass this.taskQueues.set(taskId, clientQueue); } + if (clientQueue.includes(clientId)) { + // We shouldn't re-add the client if it's already in the queue. + // This may be possible in scenarios where a client was added in + // while detached. + return; + } + const oldLockHolder = clientQueue[0]; clientQueue.push(clientId); const newLockHolder = clientQueue[0]; diff --git a/packages/dds/task-manager/src/test/taskManager.spec.ts b/packages/dds/task-manager/src/test/taskManager.spec.ts index 6570e5f78568..edfaaa895a26 100644 --- a/packages/dds/task-manager/src/test/taskManager.spec.ts +++ b/packages/dds/task-manager/src/test/taskManager.spec.ts @@ -192,10 +192,10 @@ describe("TaskManager", () => { taskManager1.abandon(taskId); assert.ok(!taskManager1.queued(taskId), "Should not be queued"); - assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); + assert.ok(taskManager1.assigned(taskId), "Should still be assigned (pending abandon)"); const revolunteerTaskP = taskManager1.volunteerForTask(taskId); assert.ok(taskManager1.queued(taskId), "Should be queued"); - assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); + assert.ok(taskManager1.assigned(taskId), "Should still be assigned (pending abandon)"); containerRuntimeFactory.processAllMessages(); const isAssigned2 = await revolunteerTaskP; assert.ok(isAssigned2, "Should resolve true"); @@ -225,7 +225,7 @@ describe("TaskManager", () => { taskManager1.abandon(taskId); assert.ok(!taskManager1.queued(taskId), "Should not be queued"); - assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); + assert.ok(taskManager1.assigned(taskId), "Should still be assigned (pending abandon)"); containerRuntimeFactory.processAllMessages(); assert.ok(!taskManager1.queued(taskId), "Should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); @@ -257,10 +257,10 @@ describe("TaskManager", () => { taskManager1.abandon(taskId); assert.ok(!taskManager1.queued(taskId), "Should not be queued"); - assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); + assert.ok(taskManager1.assigned(taskId), "Should still be assigned (pending abandon)"); taskManager1.abandon(taskId); assert.ok(!taskManager1.queued(taskId), "Should not be queued"); - assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); + assert.ok(taskManager1.assigned(taskId), "Should still be assigned (pending abandon)"); containerRuntimeFactory.processAllMessages(); assert.ok(!taskManager1.queued(taskId), "Should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); @@ -812,8 +812,7 @@ describe("TaskManager", () => { ); }); - // todo AB#7310 - it.skip("Can abandon a subscribed task after attach", async () => { + it("Can abandon a subscribed task after attach", async () => { const taskId = "taskId"; taskManager1.subscribeToTask(taskId); containerRuntimeFactory.processAllMessages(); From 7af92d851f68a0c600bad53797c1dd1144d1f630 Mon Sep 17 00:00:00 2001 From: Scott Norton Date: Wed, 20 Aug 2025 20:16:15 +0000 Subject: [PATCH 2/7] make queued consensus, use queuedOptimistically for internal logic --- packages/dds/task-manager/src/interfaces.ts | 3 +- packages/dds/task-manager/src/taskManager.ts | 69 ++++++++----------- .../src/test/taskManager.fuzz.spec.ts | 3 +- .../task-manager/src/test/taskManager.spec.ts | 34 ++++----- 4 files changed, 47 insertions(+), 62 deletions(-) diff --git a/packages/dds/task-manager/src/interfaces.ts b/packages/dds/task-manager/src/interfaces.ts index 2c196ae1009a..8f561536a0f3 100644 --- a/packages/dds/task-manager/src/interfaces.ts +++ b/packages/dds/task-manager/src/interfaces.ts @@ -167,8 +167,7 @@ export interface ITaskManager extends ISharedObject { assigned(taskId: string): boolean; /** - * Check whether this client is either the current assignee, in queue, or we expect they will be in queue after - * outstanding ops have been ack'd. + * Check whether this client is either the current assignee or in queue to become the assignee. * @param taskId - Identifier for the task */ queued(taskId: string): boolean; diff --git a/packages/dds/task-manager/src/taskManager.ts b/packages/dds/task-manager/src/taskManager.ts index cce5c2ce76dd..1642966670f8 100644 --- a/packages/dds/task-manager/src/taskManager.ts +++ b/packages/dds/task-manager/src/taskManager.ts @@ -96,11 +96,6 @@ export class TaskManagerClass */ private readonly subscribedTasks = new Set(); - /** - * Map to track tasks that have pending complete ops. - */ - private readonly pendingCompletedTasks = new Map(); - /** * Returns the clientId. Will return a placeholder if the runtime is detached and not yet assigned a clientId. */ @@ -180,15 +175,6 @@ export class TaskManagerClass // Delete the pending, because we no longer have an outstanding op this.latestPendingOps.delete(taskId); } - - // Remove complete op from this.pendingCompletedTasks - const pendingIds = this.pendingCompletedTasks.get(taskId); - assert( - pendingIds !== undefined && pendingIds.length > 0, - 0x402 /* pendingIds is empty */, - ); - const removed = pendingIds.shift(); - assert(removed === messageId, 0x403 /* Removed complete op id does not match */); } this.taskQueues.delete(taskId); @@ -277,12 +263,6 @@ export class TaskManagerClass messageId: ++this.messageId, }; - if (this.pendingCompletedTasks.has(taskId)) { - this.pendingCompletedTasks.get(taskId)?.push(pendingOp.messageId); - } else { - this.pendingCompletedTasks.set(taskId, [pendingOp.messageId]); - } - this.submitLocalMessage(op, pendingOp.messageId); this.latestPendingOps.set(taskId, pendingOp); } @@ -294,7 +274,7 @@ export class TaskManagerClass // If we are both queued and assigned, then we have the lock and do not // have any pending abandon/complete ops. In this case we can resolve // true immediately. - if (this.queued(taskId) && this.assigned(taskId)) { + if (this.queuedOptimistically(taskId) && this.assigned(taskId)) { return true; } @@ -369,7 +349,7 @@ export class TaskManagerClass setupListeners(); }); - if (!this.queued(taskId)) { + if (!this.queuedOptimistically(taskId)) { // Only send the volunteer op if we are not already queued. this.submitVolunteerOp(taskId); } @@ -435,7 +415,7 @@ export class TaskManagerClass // a real clientId. At that point we should re-enter the queue with a real volunteer op (assuming we are // connected). this.runtime.once("attached", () => { - if (this.queued(taskId)) { + if (this.queuedOptimistically(taskId)) { // If we are already queued, then we were able to replace the placeholderClientId with our real // clientId and no action is required. return; @@ -450,7 +430,7 @@ export class TaskManagerClass } else if (!this.connected) { // If we are disconnected (and attached), wait to be connected and submit volunteer op disconnectHandler(); - } else if (!this.assigned(taskId) && !this.queued(taskId)) { + } else if (!this.assigned(taskId) && !this.queuedOptimistically(taskId)) { const latestPendingOp = this.latestPendingOps.get(taskId); if (latestPendingOp === undefined || latestPendingOp.type === "volunteer") { // We don't need to send a volunteer op unless this task is not already a volunteer op. @@ -467,7 +447,7 @@ export class TaskManagerClass // Always allow abandon if the client is subscribed to allow clients to unsubscribe while disconnected. // Otherwise, we should check to make sure the client is both connected queued for the task before sending an // abandon op. - if (!this.queued(taskId) && !this.subscribed(taskId) && !this.connected) { + if (!this.queuedOptimistically(taskId) && !this.subscribed(taskId) && !this.connected) { // Nothing to do return; } @@ -480,7 +460,7 @@ export class TaskManagerClass return; } - if (this.queued(taskId)) { + if (this.queuedOptimistically(taskId)) { if (!this.assigned(taskId)) { // If we try to abandon when queued but not assigned then we should emit to the abandonWatcher as this is // not allowed and will throw an error in the volunteer promise. @@ -516,15 +496,7 @@ export class TaskManagerClass } assert(this.clientId !== undefined, 0x07f /* "clientId undefined" */); - - const inQueue = this.taskQueues.get(taskId)?.includes(this.clientId) ?? false; - const latestPendingOp = this.latestPendingOps.get(taskId); - const isPendingVolunteer = latestPendingOp?.type === "volunteer"; - const isPendingAbandonOrComplete = - latestPendingOp?.type === "abandon" || latestPendingOp?.type === "complete"; - // We return true if the client is either in queue already or the latest pending op for this task is a volunteer op. - // But we should always return false if the latest pending op is an abandon or complete op. - return (inQueue || isPendingVolunteer) && !isPendingAbandonOrComplete; + return this.taskQueues.get(taskId)?.includes(this.clientId) ?? false; } /** @@ -673,12 +645,6 @@ export class TaskManagerClass } private addClientToQueue(taskId: string, clientId: string): void { - const pendingIds = this.pendingCompletedTasks.get(taskId); - if (pendingIds !== undefined && pendingIds.length > 0) { - // Ignore the volunteer op if we know this task is about to be completed - return; - } - // Ensure that the clientId exists in the quorum, or it is placeholderClientId (detached scenario) if ( this.runtime.getQuorum().getMembers().has(clientId) || @@ -771,6 +737,27 @@ export class TaskManagerClass } } + /** + * Checks whether this client is currently assigned or in queue to become assigned, while also accounting + * for the latest pending ops. + */ + private queuedOptimistically(taskId: string): boolean { + if (this.isAttached() && !this.connected) { + return false; + } + + assert(this.clientId !== undefined, 0x07f /* "clientId undefined" */); + + const inQueue = this.taskQueues.get(taskId)?.includes(this.clientId) ?? false; + const latestPendingOp = this.latestPendingOps.get(taskId); + const isPendingVolunteer = latestPendingOp?.type === "volunteer"; + const isPendingAbandonOrComplete = + latestPendingOp?.type === "abandon" || latestPendingOp?.type === "complete"; + // We return true if the client is either in queue already or the latest pending op for this task is a volunteer op. + // But we should always return false if the latest pending op is an abandon or complete op. + return (inQueue || isPendingVolunteer) && !isPendingAbandonOrComplete; + } + protected applyStashedOp(content: unknown): void { const taskOp: ITaskManagerOperation = content as ITaskManagerOperation; switch (taskOp.type) { diff --git a/packages/dds/task-manager/src/test/taskManager.fuzz.spec.ts b/packages/dds/task-manager/src/test/taskManager.fuzz.spec.ts index 8c3c8eabd93a..91e7052cf193 100644 --- a/packages/dds/task-manager/src/test/taskManager.fuzz.spec.ts +++ b/packages/dds/task-manager/src/test/taskManager.fuzz.spec.ts @@ -297,8 +297,7 @@ describe("TaskManager fuzz testing with rebasing", () => { createDDSFuzzSuite(model, { validationStrategy: { type: "fixedInterval", interval: defaultOptions.validateInterval }, - // AB#5185: enabling rebasing indicates some unknown eventual consistency issue - skip: [0, 4, 6, 7], + skip: [], rebaseProbability: 0.15, containerRuntimeOptions: { flushMode: FlushMode.TurnBased, diff --git a/packages/dds/task-manager/src/test/taskManager.spec.ts b/packages/dds/task-manager/src/test/taskManager.spec.ts index edfaaa895a26..253c69d4c560 100644 --- a/packages/dds/task-manager/src/test/taskManager.spec.ts +++ b/packages/dds/task-manager/src/test/taskManager.spec.ts @@ -100,7 +100,7 @@ describe("TaskManager", () => { it("Can volunteer for a task", async () => { const taskId = "taskId"; const volunteerTaskP = taskManager1.volunteerForTask(taskId); - assert.ok(taskManager1.queued(taskId), "Should be queued"); + assert.ok(!taskManager1.queued(taskId), "Should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); containerRuntimeFactory.processAllMessages(); const isAssigned = await volunteerTaskP; @@ -114,9 +114,9 @@ describe("TaskManager", () => { const volunteerTaskP1 = taskManager1.volunteerForTask(taskId); const volunteerTaskP2 = taskManager2.volunteerForTask(taskId); - assert.ok(taskManager1.queued(taskId), "Task manager 1 should be queued"); + assert.ok(!taskManager1.queued(taskId), "Task manager 1 should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Task manager 1 should not be assigned"); - assert.ok(taskManager2.queued(taskId), "Task manager 2 should be queued"); + assert.ok(!taskManager2.queued(taskId), "Task manager 2 should not be queued"); assert.ok(!taskManager2.assigned(taskId), "Task manager 2 should not be assigned"); containerRuntimeFactory.processAllMessages(); @@ -156,9 +156,9 @@ describe("TaskManager", () => { const volunteerTaskP1 = taskManager1.volunteerForTask(taskId); const volunteerTaskP2 = taskManager2.volunteerForTask(taskId); - assert.ok(taskManager1.queued(taskId), "Task manager 1 should be queued"); + assert.ok(!taskManager1.queued(taskId), "Task manager 1 not be queued"); assert.ok(!taskManager1.assigned(taskId), "Task manager 1 should not be assigned"); - assert.ok(taskManager2.queued(taskId), "Task manager 2 should be queued"); + assert.ok(!taskManager2.queued(taskId), "Task manager 2 not be queued"); assert.ok(!taskManager2.assigned(taskId), "Task manager 2 should not be assigned"); containerRuntimeFactory.processAllMessages(); @@ -182,7 +182,7 @@ describe("TaskManager", () => { it("Can abandon and immediately attempt to reacquire a task", async () => { const taskId = "taskId"; const volunteerTaskP = taskManager1.volunteerForTask(taskId); - assert.ok(taskManager1.queued(taskId), "Should be queued"); + assert.ok(!taskManager1.queued(taskId), "Should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); containerRuntimeFactory.processAllMessages(); const isAssigned = await volunteerTaskP; @@ -191,7 +191,7 @@ describe("TaskManager", () => { assert.ok(taskManager1.assigned(taskId), "Should be assigned"); taskManager1.abandon(taskId); - assert.ok(!taskManager1.queued(taskId), "Should not be queued"); + assert.ok(taskManager1.queued(taskId), "Should still be queued (pending abandon)"); assert.ok(taskManager1.assigned(taskId), "Should still be assigned (pending abandon)"); const revolunteerTaskP = taskManager1.volunteerForTask(taskId); assert.ok(taskManager1.queued(taskId), "Should be queued"); @@ -206,7 +206,7 @@ describe("TaskManager", () => { it("Can attempt to volunteer for task twice and abandon twice (after ack)", async () => { const taskId = "taskId"; const volunteerTaskP1 = taskManager1.volunteerForTask(taskId); - assert.ok(taskManager1.queued(taskId), "Should be queued"); + assert.ok(!taskManager1.queued(taskId), "Should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); containerRuntimeFactory.processAllMessages(); const isAssigned1 = await volunteerTaskP1; @@ -224,7 +224,7 @@ describe("TaskManager", () => { assert.ok(taskManager1.assigned(taskId), "Should be assigned"); taskManager1.abandon(taskId); - assert.ok(!taskManager1.queued(taskId), "Should not be queued"); + assert.ok(taskManager1.queued(taskId), "Should still be queued (pending abandon)"); assert.ok(taskManager1.assigned(taskId), "Should still be assigned (pending abandon)"); containerRuntimeFactory.processAllMessages(); assert.ok(!taskManager1.queued(taskId), "Should not be queued"); @@ -241,11 +241,11 @@ describe("TaskManager", () => { it("Can attempt to lock task twice and abandon twice (before ack)", async () => { const taskId = "taskId"; const volunteerTaskP1 = taskManager1.volunteerForTask(taskId); - assert.ok(taskManager1.queued(taskId), "Should be queued"); + assert.ok(!taskManager1.queued(taskId), "Should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); const volunteerTaskP2 = taskManager1.volunteerForTask(taskId); - assert.ok(taskManager1.queued(taskId), "Should be queued"); + assert.ok(!taskManager1.queued(taskId), "Should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); containerRuntimeFactory.processAllMessages(); const isAssigned1 = await volunteerTaskP1; @@ -256,10 +256,10 @@ describe("TaskManager", () => { assert.ok(taskManager1.assigned(taskId), "Should be assigned"); taskManager1.abandon(taskId); - assert.ok(!taskManager1.queued(taskId), "Should not be queued"); + assert.ok(taskManager1.queued(taskId), "Should still be queued (pending abandon)"); assert.ok(taskManager1.assigned(taskId), "Should still be assigned (pending abandon)"); taskManager1.abandon(taskId); - assert.ok(!taskManager1.queued(taskId), "Should not be queued"); + assert.ok(taskManager1.queued(taskId), "Should still be queued (pending abandon)"); assert.ok(taskManager1.assigned(taskId), "Should still be assigned (pending abandon)"); containerRuntimeFactory.processAllMessages(); assert.ok(!taskManager1.queued(taskId), "Should not be queued"); @@ -287,7 +287,7 @@ describe("TaskManager", () => { const taskId = "taskId"; taskManager1.subscribeToTask(taskId); - assert.ok(taskManager1.queued(taskId), "Task manager 1 should be queued"); + assert.ok(!taskManager1.queued(taskId), "Task manager 1 should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Task manager 1 should not be assigned"); assert.ok(taskManager1.subscribed(taskId), "Task manager 1 should be subscribed"); @@ -316,9 +316,9 @@ describe("TaskManager", () => { taskManager1.subscribeToTask(taskId); taskManager2.subscribeToTask(taskId); - assert.ok(taskManager1.queued(taskId), "Task manager 1 should be queued"); + assert.ok(!taskManager1.queued(taskId), "Task manager 1 should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Task manager 1 should not be assigned"); - assert.ok(taskManager2.queued(taskId), "Task manager 2 should be queued"); + assert.ok(!taskManager2.queued(taskId), "Task manager 2 should not be queued"); assert.ok(!taskManager2.assigned(taskId), "Task manager 2 should not be assigned"); containerRuntimeFactory.processAllMessages(); @@ -1099,7 +1099,7 @@ describe("TaskManager", () => { containerRuntime1.connected = true; - assert.ok(taskManager1.queued(taskId), "Task manager 1 should be queued"); + assert.ok(!taskManager1.queued(taskId), "Task manager 1 should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Task manager 1 should not be assigned"); assert.ok(taskManager1.subscribed(taskId), "Task manager 1 should be subscribed"); From b18e86a3587255f182ac49dc129a88d5f2961d82 Mon Sep 17 00:00:00 2001 From: Scott Norton Date: Wed, 20 Aug 2025 16:23:25 -0400 Subject: [PATCH 3/7] Update packages/dds/task-manager/src/test/taskManager.spec.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- packages/dds/task-manager/src/test/taskManager.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/dds/task-manager/src/test/taskManager.spec.ts b/packages/dds/task-manager/src/test/taskManager.spec.ts index 253c69d4c560..abf39826353b 100644 --- a/packages/dds/task-manager/src/test/taskManager.spec.ts +++ b/packages/dds/task-manager/src/test/taskManager.spec.ts @@ -156,7 +156,7 @@ describe("TaskManager", () => { const volunteerTaskP1 = taskManager1.volunteerForTask(taskId); const volunteerTaskP2 = taskManager2.volunteerForTask(taskId); - assert.ok(!taskManager1.queued(taskId), "Task manager 1 not be queued"); + assert.ok(!taskManager1.queued(taskId), "Task manager 1 should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Task manager 1 should not be assigned"); assert.ok(!taskManager2.queued(taskId), "Task manager 2 not be queued"); assert.ok(!taskManager2.assigned(taskId), "Task manager 2 should not be assigned"); From 298f35035e76cd46424407c807db4490d4f36481 Mon Sep 17 00:00:00 2001 From: Scott Norton Date: Wed, 20 Aug 2025 16:23:33 -0400 Subject: [PATCH 4/7] Update packages/dds/task-manager/src/test/taskManager.spec.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- packages/dds/task-manager/src/test/taskManager.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/dds/task-manager/src/test/taskManager.spec.ts b/packages/dds/task-manager/src/test/taskManager.spec.ts index abf39826353b..3e15a8b193d8 100644 --- a/packages/dds/task-manager/src/test/taskManager.spec.ts +++ b/packages/dds/task-manager/src/test/taskManager.spec.ts @@ -158,7 +158,7 @@ describe("TaskManager", () => { assert.ok(!taskManager1.queued(taskId), "Task manager 1 should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Task manager 1 should not be assigned"); - assert.ok(!taskManager2.queued(taskId), "Task manager 2 not be queued"); + assert.ok(!taskManager2.queued(taskId), "Task manager 2 should not be queued"); assert.ok(!taskManager2.assigned(taskId), "Task manager 2 should not be assigned"); containerRuntimeFactory.processAllMessages(); From df0e85c8bb4fdfb979a80410340bde93e85326cc Mon Sep 17 00:00:00 2001 From: Scott Norton Date: Thu, 21 Aug 2025 11:49:14 -0400 Subject: [PATCH 5/7] Update packages/dds/task-manager/src/taskManager.ts Co-authored-by: Matt Rakow --- packages/dds/task-manager/src/taskManager.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/dds/task-manager/src/taskManager.ts b/packages/dds/task-manager/src/taskManager.ts index 1642966670f8..55995c4f231d 100644 --- a/packages/dds/task-manager/src/taskManager.ts +++ b/packages/dds/task-manager/src/taskManager.ts @@ -432,8 +432,8 @@ export class TaskManagerClass disconnectHandler(); } else if (!this.assigned(taskId) && !this.queuedOptimistically(taskId)) { const latestPendingOp = this.latestPendingOps.get(taskId); - if (latestPendingOp === undefined || latestPendingOp.type === "volunteer") { - // We don't need to send a volunteer op unless this task is not already a volunteer op. + if (latestPendingOp?.type !== "volunteer") { + // We don't need to send a second volunteer op if we just sent one. submitVolunteerOp(); } } From ea070c889f63e3dbcabe9dca6b9fdc21cd9caf4a Mon Sep 17 00:00:00 2001 From: Scott Norton Date: Thu, 21 Aug 2025 15:53:22 +0000 Subject: [PATCH 6/7] PR feedback --- packages/dds/task-manager/src/taskManager.ts | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/packages/dds/task-manager/src/taskManager.ts b/packages/dds/task-manager/src/taskManager.ts index 55995c4f231d..5cd175a9c9cf 100644 --- a/packages/dds/task-manager/src/taskManager.ts +++ b/packages/dds/task-manager/src/taskManager.ts @@ -445,9 +445,8 @@ export class TaskManagerClass */ public abandon(taskId: string): void { // Always allow abandon if the client is subscribed to allow clients to unsubscribe while disconnected. - // Otherwise, we should check to make sure the client is both connected queued for the task before sending an - // abandon op. - if (!this.queuedOptimistically(taskId) && !this.subscribed(taskId) && !this.connected) { + // Otherwise, we should check to make sure the client is optimistically queued for the task before trying to abandon. + if (!this.queuedOptimistically(taskId) && !this.subscribed(taskId)) { // Nothing to do return; } @@ -461,17 +460,17 @@ export class TaskManagerClass } if (this.queuedOptimistically(taskId)) { + this.submitAbandonOp(taskId); if (!this.assigned(taskId)) { - // If we try to abandon when queued but not assigned then we should emit to the abandonWatcher as this is - // not allowed and will throw an error in the volunteer promise. + // If we try to abandon when queued but not assigned then we should emit to the abandonWatcher + // to ensure that the volunteer promise is rejected. this.abandonWatcher.emit("abandon", taskId); } - this.submitAbandonOp(taskId); } else if (this.subscribed(taskId) && !this.connected) { // If we are subscribed, not queued, and offline, then we should still submit the // abandon op/event to ensure we abandon the subscription when reconnecting. - this.abandonWatcher.emit("abandon", taskId); this.submitAbandonOp(taskId); + this.abandonWatcher.emit("abandon", taskId); } } From 840d89c311d88c1b86df719533761f3d9bb24fe9 Mon Sep 17 00:00:00 2001 From: Scott Norton Date: Fri, 22 Aug 2025 15:18:28 +0000 Subject: [PATCH 7/7] fix placeholder/duplicate client in queue scenarios --- packages/dds/task-manager/src/taskManager.ts | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/packages/dds/task-manager/src/taskManager.ts b/packages/dds/task-manager/src/taskManager.ts index 5cd175a9c9cf..fe47732abb8e 100644 --- a/packages/dds/task-manager/src/taskManager.ts +++ b/packages/dds/task-manager/src/taskManager.ts @@ -212,16 +212,17 @@ export class TaskManagerClass this.connectionWatcher.on("disconnect", () => { assert(this.clientId !== undefined, 0x1d3 /* "Missing client id on disconnect" */); - // We don't modify the taskQueues on disconnect (they still reflect the latest known consensus state). - // After reconnect these will get cleaned up by observing the clientLeaves. - // However we do need to recognize that we lost the lock if we had it. Calls to .queued() and - // .assigned() are also connection-state-aware to be consistent. + // Emit "lost" for any tasks we were assigned to. for (const [taskId, clientQueue] of this.taskQueues.entries()) { if (this.isAttached() && clientQueue[0] === this.clientId) { this.emit("lost", taskId); } } + // Remove this client from all queues to reflect the new state, since being disconnected automatically removes + // this client from all queues. + this.removeClientFromAllQueues(this.clientId); + // All of our outstanding ops will be for the old clientId even if they get ack'd this.latestPendingOps.clear(); }); @@ -664,7 +665,10 @@ export class TaskManagerClass } const oldLockHolder = clientQueue[0]; - clientQueue.push(clientId); + if (!clientQueue.includes(clientId)) { + // Ensure a client is not in the queue twice. + clientQueue.push(clientId); + } const newLockHolder = clientQueue[0]; if (newLockHolder !== oldLockHolder) { this.queueWatcher.emit("queueChange", taskId, oldLockHolder, newLockHolder); @@ -712,7 +716,12 @@ export class TaskManagerClass for (const clientQueue of this.taskQueues.values()) { const clientIdIndex = clientQueue.indexOf(placeholderClientId); if (clientIdIndex !== -1) { - clientQueue[clientIdIndex] = this.runtime.clientId; + if (clientQueue.includes(this.runtime.clientId)) { + // If the real clientId is already in the queue, just remove the placeholder. + clientQueue.splice(clientIdIndex, 1); + } else { + clientQueue[clientIdIndex] = this.runtime.clientId; + } } } }