diff --git a/packages/dds/task-manager/src/interfaces.ts b/packages/dds/task-manager/src/interfaces.ts index 0534175b9a96..8f561536a0f3 100644 --- a/packages/dds/task-manager/src/interfaces.ts +++ b/packages/dds/task-manager/src/interfaces.ts @@ -24,7 +24,7 @@ export interface ITaskManagerEvents extends ISharedObjectEvents { /** * Fires when a task has been exclusively assigned to the client. * - * @remarks Does not account for known pending ops, but instead only reflects the current state. + * @remarks Does not account for known pending ops, but instead only reflects the current consensus state. * * @eventProperty */ @@ -161,15 +161,13 @@ export interface ITaskManager extends ISharedObject { abandon(taskId: string): void; /** - * Check whether this client is the current assignee for the task and there is no outstanding abandon op that - * would abandon the assignment. + * Check whether this client is the current assignee for the task based on the consensus state. * @param taskId - Identifier for the task */ assigned(taskId: string): boolean; /** - * Check whether this client is either the current assignee, in queue, or we expect they will be in queue after - * outstanding ops have been ack'd. + * Check whether this client is either the current assignee or in queue to become the assignee. * @param taskId - Identifier for the task */ queued(taskId: string): boolean; diff --git a/packages/dds/task-manager/src/taskManager.ts b/packages/dds/task-manager/src/taskManager.ts index e0f3e0271744..fe47732abb8e 100644 --- a/packages/dds/task-manager/src/taskManager.ts +++ b/packages/dds/task-manager/src/taskManager.ts @@ -96,11 +96,6 @@ export class TaskManagerClass */ private readonly subscribedTasks = new Set(); - /** - * Map to track tasks that have pending complete ops. - */ - private readonly pendingCompletedTasks = new Map(); - /** * Returns the clientId. Will return a placeholder if the runtime is detached and not yet assigned a clientId. */ @@ -160,6 +155,7 @@ export class TaskManagerClass assert(pendingOp.type === "abandon", 0x07e /* "Unexpected op type" */); // Delete the pending, because we no longer have an outstanding op this.latestPendingOps.delete(taskId); + this.abandonWatcher.emit("abandon", taskId); } } @@ -179,23 +175,11 @@ export class TaskManagerClass // Delete the pending, because we no longer have an outstanding op this.latestPendingOps.delete(taskId); } - - // Remove complete op from this.pendingCompletedTasks - const pendingIds = this.pendingCompletedTasks.get(taskId); - assert( - pendingIds !== undefined && pendingIds.length > 0, - 0x402 /* pendingIds is empty */, - ); - const removed = pendingIds.shift(); - assert(removed === messageId, 0x403 /* Removed complete op id does not match */); } - // For clients in queue, we need to remove them from the queue and raise the proper events. - if (!local) { - this.taskQueues.delete(taskId); - this.completedWatcher.emit("completed", taskId); - this.emit("completed", taskId); - } + this.taskQueues.delete(taskId); + this.completedWatcher.emit("completed", taskId); + this.emit("completed", taskId); }, ); @@ -228,16 +212,17 @@ export class TaskManagerClass this.connectionWatcher.on("disconnect", () => { assert(this.clientId !== undefined, 0x1d3 /* "Missing client id on disconnect" */); - // We don't modify the taskQueues on disconnect (they still reflect the latest known consensus state). - // After reconnect these will get cleaned up by observing the clientLeaves. - // However we do need to recognize that we lost the lock if we had it. Calls to .queued() and - // .assigned() are also connection-state-aware to be consistent. + // Emit "lost" for any tasks we were assigned to. for (const [taskId, clientQueue] of this.taskQueues.entries()) { if (this.isAttached() && clientQueue[0] === this.clientId) { this.emit("lost", taskId); } } + // Remove this client from all queues to reflect the new state, since being disconnected automatically removes + // this client from all queues. + this.removeClientFromAllQueues(this.clientId); + // All of our outstanding ops will be for the old clientId even if they get ack'd this.latestPendingOps.clear(); }); @@ -279,12 +264,6 @@ export class TaskManagerClass messageId: ++this.messageId, }; - if (this.pendingCompletedTasks.has(taskId)) { - this.pendingCompletedTasks.get(taskId)?.push(pendingOp.messageId); - } else { - this.pendingCompletedTasks.set(taskId, [pendingOp.messageId]); - } - this.submitLocalMessage(op, pendingOp.messageId); this.latestPendingOps.set(taskId, pendingOp); } @@ -293,8 +272,10 @@ export class TaskManagerClass * {@inheritDoc ITaskManager.volunteerForTask} */ public async volunteerForTask(taskId: string): Promise { - // If we have the lock, resolve immediately - if (this.assigned(taskId)) { + // If we are both queued and assigned, then we have the lock and do not + // have any pending abandon/complete ops. In this case we can resolve + // true immediately. + if (this.queuedOptimistically(taskId) && this.assigned(taskId)) { return true; } @@ -319,19 +300,28 @@ export class TaskManagerClass // This promise works even if we already have an outstanding volunteer op. const lockAcquireP = new Promise((resolve, reject) => { + const setupListeners = (): void => { + this.queueWatcher.on("queueChange", checkIfAcquiredLock); + this.abandonWatcher.on("abandon", checkIfAbandoned); + this.connectionWatcher.on("disconnect", rejectOnDisconnect); + this.completedWatcher.on("completed", checkIfCompleted); + }; + const removeListeners = (): void => { + this.queueWatcher.off("queueChange", checkIfAcquiredLock); + this.abandonWatcher.off("abandon", checkIfAbandoned); + this.connectionWatcher.off("disconnect", rejectOnDisconnect); + this.completedWatcher.off("completed", checkIfCompleted); + }; + const checkIfAcquiredLock = (eventTaskId: string): void => { if (eventTaskId !== taskId) { return; } - // Also check pending ops here because it's possible we are currently in the queue from a previous // lock attempt, but have an outstanding abandon AND the outstanding volunteer for this lock attempt. // If we reach the head of the queue based on the previous lock attempt, we don't want to resolve. - if (this.assigned(taskId) && !this.latestPendingOps.has(taskId)) { - this.queueWatcher.off("queueChange", checkIfAcquiredLock); - this.abandonWatcher.off("abandon", checkIfAbandoned); - this.connectionWatcher.off("disconnect", rejectOnDisconnect); - this.completedWatcher.off("completed", checkIfCompleted); + if (this.assigned(taskId)) { + removeListeners(); resolve(true); } }; @@ -340,19 +330,12 @@ export class TaskManagerClass if (eventTaskId !== taskId) { return; } - - this.queueWatcher.off("queueChange", checkIfAcquiredLock); - this.abandonWatcher.off("abandon", checkIfAbandoned); - this.connectionWatcher.off("disconnect", rejectOnDisconnect); - this.completedWatcher.off("completed", checkIfCompleted); + removeListeners(); reject(new Error("Abandoned before acquiring task assignment")); }; const rejectOnDisconnect = (): void => { - this.queueWatcher.off("queueChange", checkIfAcquiredLock); - this.abandonWatcher.off("abandon", checkIfAbandoned); - this.connectionWatcher.off("disconnect", rejectOnDisconnect); - this.completedWatcher.off("completed", checkIfCompleted); + removeListeners(); reject(new Error("Disconnected before acquiring task assignment")); }; @@ -360,21 +343,15 @@ export class TaskManagerClass if (eventTaskId !== taskId) { return; } - - this.queueWatcher.off("queueChange", checkIfAcquiredLock); - this.abandonWatcher.off("abandon", checkIfAbandoned); - this.connectionWatcher.off("disconnect", rejectOnDisconnect); - this.completedWatcher.off("completed", checkIfCompleted); + removeListeners(); resolve(false); }; - this.queueWatcher.on("queueChange", checkIfAcquiredLock); - this.abandonWatcher.on("abandon", checkIfAbandoned); - this.connectionWatcher.on("disconnect", rejectOnDisconnect); - this.completedWatcher.on("completed", checkIfCompleted); + setupListeners(); }); - if (!this.queued(taskId)) { + if (!this.queuedOptimistically(taskId)) { + // Only send the volunteer op if we are not already queued. this.submitVolunteerOp(taskId); } return lockAcquireP; @@ -396,6 +373,18 @@ export class TaskManagerClass this.submitVolunteerOp(taskId); }; + const setupListeners = (): void => { + this.abandonWatcher.on("abandon", checkIfAbandoned); + this.connectionWatcher.on("disconnect", disconnectHandler); + this.completedWatcher.on("completed", checkIfCompleted); + }; + const removeListeners = (): void => { + this.abandonWatcher.off("abandon", checkIfAbandoned); + this.connectionWatcher.off("disconnect", disconnectHandler); + this.connectionWatcher.off("connect", submitVolunteerOp); + this.completedWatcher.off("completed", checkIfCompleted); + }; + const disconnectHandler = (): void => { // Wait to be connected again and then re-submit volunteer op this.connectionWatcher.once("connect", submitVolunteerOp); @@ -405,12 +394,7 @@ export class TaskManagerClass if (eventTaskId !== taskId) { return; } - - this.abandonWatcher.off("abandon", checkIfAbandoned); - this.connectionWatcher.off("disconnect", disconnectHandler); - this.connectionWatcher.off("connect", submitVolunteerOp); - this.completedWatcher.off("completed", checkIfCompleted); - + removeListeners(); this.subscribedTasks.delete(taskId); }; @@ -418,18 +402,11 @@ export class TaskManagerClass if (eventTaskId !== taskId) { return; } - - this.abandonWatcher.off("abandon", checkIfAbandoned); - this.connectionWatcher.off("disconnect", disconnectHandler); - this.connectionWatcher.off("connect", submitVolunteerOp); - this.completedWatcher.off("completed", checkIfCompleted); - + removeListeners(); this.subscribedTasks.delete(taskId); }; - this.abandonWatcher.on("abandon", checkIfAbandoned); - this.connectionWatcher.on("disconnect", disconnectHandler); - this.completedWatcher.on("completed", checkIfCompleted); + setupListeners(); if (!this.isAttached()) { // Simulate auto-ack in detached scenario @@ -439,7 +416,7 @@ export class TaskManagerClass // a real clientId. At that point we should re-enter the queue with a real volunteer op (assuming we are // connected). this.runtime.once("attached", () => { - if (this.queued(taskId)) { + if (this.queuedOptimistically(taskId)) { // If we are already queued, then we were able to replace the placeholderClientId with our real // clientId and no action is required. return; @@ -454,8 +431,12 @@ export class TaskManagerClass } else if (!this.connected) { // If we are disconnected (and attached), wait to be connected and submit volunteer op disconnectHandler(); - } else if (!this.assigned(taskId) && !this.queued(taskId)) { - submitVolunteerOp(); + } else if (!this.assigned(taskId) && !this.queuedOptimistically(taskId)) { + const latestPendingOp = this.latestPendingOps.get(taskId); + if (latestPendingOp?.type !== "volunteer") { + // We don't need to send a second volunteer op if we just sent one. + submitVolunteerOp(); + } } this.subscribedTasks.add(taskId); } @@ -465,9 +446,8 @@ export class TaskManagerClass */ public abandon(taskId: string): void { // Always allow abandon if the client is subscribed to allow clients to unsubscribe while disconnected. - // Otherwise, we should check to make sure the client is both connected queued for the task before sending an - // abandon op. - if (!this.subscribed(taskId) && !this.queued(taskId)) { + // Otherwise, we should check to make sure the client is optimistically queued for the task before trying to abandon. + if (!this.queuedOptimistically(taskId) && !this.subscribed(taskId)) { // Nothing to do return; } @@ -480,11 +460,19 @@ export class TaskManagerClass return; } - // If we're subscribed but not queued, we don't need to submit an abandon op (probably offline) - if (this.queued(taskId)) { + if (this.queuedOptimistically(taskId)) { + this.submitAbandonOp(taskId); + if (!this.assigned(taskId)) { + // If we try to abandon when queued but not assigned then we should emit to the abandonWatcher + // to ensure that the volunteer promise is rejected. + this.abandonWatcher.emit("abandon", taskId); + } + } else if (this.subscribed(taskId) && !this.connected) { + // If we are subscribed, not queued, and offline, then we should still submit the + // abandon op/event to ensure we abandon the subscription when reconnecting. this.submitAbandonOp(taskId); + this.abandonWatcher.emit("abandon", taskId); } - this.abandonWatcher.emit("abandon", taskId); } /** @@ -496,11 +484,7 @@ export class TaskManagerClass } const currentAssignee = this.taskQueues.get(taskId)?.[0]; - return ( - currentAssignee !== undefined && - currentAssignee === this.clientId && - !this.latestPendingOps.has(taskId) - ); + return currentAssignee !== undefined && currentAssignee === this.clientId; } /** @@ -512,14 +496,7 @@ export class TaskManagerClass } assert(this.clientId !== undefined, 0x07f /* "clientId undefined" */); - - const clientQueue = this.taskQueues.get(taskId); - // If we have no queue for the taskId, then no one has signed up for it. - return ( - ((clientQueue?.includes(this.clientId) ?? false) && - !this.latestPendingOps.has(taskId)) || - this.latestPendingOps.get(taskId)?.type === "volunteer" - ); + return this.taskQueues.get(taskId)?.includes(this.clientId) ?? false; } /** @@ -539,16 +516,17 @@ export class TaskManagerClass // If we are detached we will simulate auto-ack for the complete op. Therefore we only need to send the op if // we are attached. Additionally, we don't need to check if we are connected while detached. - if (this.isAttached()) { - if (!this.connected) { - throw new Error("Attempted to complete task in disconnected state"); - } - this.submitCompleteOp(taskId); + if (!this.isAttached()) { + this.taskQueues.delete(taskId); + this.completedWatcher.emit("completed", taskId); + this.emit("completed", taskId); + return; } - this.taskQueues.delete(taskId); - this.completedWatcher.emit("completed", taskId); - this.emit("completed", taskId); + if (!this.connected) { + throw new Error("Attempted to complete task in disconnected state"); + } + this.submitCompleteOp(taskId); } /** @@ -667,12 +645,6 @@ export class TaskManagerClass } private addClientToQueue(taskId: string, clientId: string): void { - const pendingIds = this.pendingCompletedTasks.get(taskId); - if (pendingIds !== undefined && pendingIds.length > 0) { - // Ignore the volunteer op if we know this task is about to be completed - return; - } - // Ensure that the clientId exists in the quorum, or it is placeholderClientId (detached scenario) if ( this.runtime.getQuorum().getMembers().has(clientId) || @@ -685,8 +657,18 @@ export class TaskManagerClass this.taskQueues.set(taskId, clientQueue); } + if (clientQueue.includes(clientId)) { + // We shouldn't re-add the client if it's already in the queue. + // This may be possible in scenarios where a client was added in + // while detached. + return; + } + const oldLockHolder = clientQueue[0]; - clientQueue.push(clientId); + if (!clientQueue.includes(clientId)) { + // Ensure a client is not in the queue twice. + clientQueue.push(clientId); + } const newLockHolder = clientQueue[0]; if (newLockHolder !== oldLockHolder) { this.queueWatcher.emit("queueChange", taskId, oldLockHolder, newLockHolder); @@ -734,7 +716,12 @@ export class TaskManagerClass for (const clientQueue of this.taskQueues.values()) { const clientIdIndex = clientQueue.indexOf(placeholderClientId); if (clientIdIndex !== -1) { - clientQueue[clientIdIndex] = this.runtime.clientId; + if (clientQueue.includes(this.runtime.clientId)) { + // If the real clientId is already in the queue, just remove the placeholder. + clientQueue.splice(clientIdIndex, 1); + } else { + clientQueue[clientIdIndex] = this.runtime.clientId; + } } } } @@ -758,6 +745,27 @@ export class TaskManagerClass } } + /** + * Checks whether this client is currently assigned or in queue to become assigned, while also accounting + * for the latest pending ops. + */ + private queuedOptimistically(taskId: string): boolean { + if (this.isAttached() && !this.connected) { + return false; + } + + assert(this.clientId !== undefined, 0x07f /* "clientId undefined" */); + + const inQueue = this.taskQueues.get(taskId)?.includes(this.clientId) ?? false; + const latestPendingOp = this.latestPendingOps.get(taskId); + const isPendingVolunteer = latestPendingOp?.type === "volunteer"; + const isPendingAbandonOrComplete = + latestPendingOp?.type === "abandon" || latestPendingOp?.type === "complete"; + // We return true if the client is either in queue already or the latest pending op for this task is a volunteer op. + // But we should always return false if the latest pending op is an abandon or complete op. + return (inQueue || isPendingVolunteer) && !isPendingAbandonOrComplete; + } + protected applyStashedOp(content: unknown): void { const taskOp: ITaskManagerOperation = content as ITaskManagerOperation; switch (taskOp.type) { diff --git a/packages/dds/task-manager/src/test/taskManager.fuzz.spec.ts b/packages/dds/task-manager/src/test/taskManager.fuzz.spec.ts index 8c3c8eabd93a..91e7052cf193 100644 --- a/packages/dds/task-manager/src/test/taskManager.fuzz.spec.ts +++ b/packages/dds/task-manager/src/test/taskManager.fuzz.spec.ts @@ -297,8 +297,7 @@ describe("TaskManager fuzz testing with rebasing", () => { createDDSFuzzSuite(model, { validationStrategy: { type: "fixedInterval", interval: defaultOptions.validateInterval }, - // AB#5185: enabling rebasing indicates some unknown eventual consistency issue - skip: [0, 4, 6, 7], + skip: [], rebaseProbability: 0.15, containerRuntimeOptions: { flushMode: FlushMode.TurnBased, diff --git a/packages/dds/task-manager/src/test/taskManager.spec.ts b/packages/dds/task-manager/src/test/taskManager.spec.ts index 6570e5f78568..3e15a8b193d8 100644 --- a/packages/dds/task-manager/src/test/taskManager.spec.ts +++ b/packages/dds/task-manager/src/test/taskManager.spec.ts @@ -100,7 +100,7 @@ describe("TaskManager", () => { it("Can volunteer for a task", async () => { const taskId = "taskId"; const volunteerTaskP = taskManager1.volunteerForTask(taskId); - assert.ok(taskManager1.queued(taskId), "Should be queued"); + assert.ok(!taskManager1.queued(taskId), "Should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); containerRuntimeFactory.processAllMessages(); const isAssigned = await volunteerTaskP; @@ -114,9 +114,9 @@ describe("TaskManager", () => { const volunteerTaskP1 = taskManager1.volunteerForTask(taskId); const volunteerTaskP2 = taskManager2.volunteerForTask(taskId); - assert.ok(taskManager1.queued(taskId), "Task manager 1 should be queued"); + assert.ok(!taskManager1.queued(taskId), "Task manager 1 should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Task manager 1 should not be assigned"); - assert.ok(taskManager2.queued(taskId), "Task manager 2 should be queued"); + assert.ok(!taskManager2.queued(taskId), "Task manager 2 should not be queued"); assert.ok(!taskManager2.assigned(taskId), "Task manager 2 should not be assigned"); containerRuntimeFactory.processAllMessages(); @@ -156,9 +156,9 @@ describe("TaskManager", () => { const volunteerTaskP1 = taskManager1.volunteerForTask(taskId); const volunteerTaskP2 = taskManager2.volunteerForTask(taskId); - assert.ok(taskManager1.queued(taskId), "Task manager 1 should be queued"); + assert.ok(!taskManager1.queued(taskId), "Task manager 1 should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Task manager 1 should not be assigned"); - assert.ok(taskManager2.queued(taskId), "Task manager 2 should be queued"); + assert.ok(!taskManager2.queued(taskId), "Task manager 2 should not be queued"); assert.ok(!taskManager2.assigned(taskId), "Task manager 2 should not be assigned"); containerRuntimeFactory.processAllMessages(); @@ -182,7 +182,7 @@ describe("TaskManager", () => { it("Can abandon and immediately attempt to reacquire a task", async () => { const taskId = "taskId"; const volunteerTaskP = taskManager1.volunteerForTask(taskId); - assert.ok(taskManager1.queued(taskId), "Should be queued"); + assert.ok(!taskManager1.queued(taskId), "Should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); containerRuntimeFactory.processAllMessages(); const isAssigned = await volunteerTaskP; @@ -191,11 +191,11 @@ describe("TaskManager", () => { assert.ok(taskManager1.assigned(taskId), "Should be assigned"); taskManager1.abandon(taskId); - assert.ok(!taskManager1.queued(taskId), "Should not be queued"); - assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); + assert.ok(taskManager1.queued(taskId), "Should still be queued (pending abandon)"); + assert.ok(taskManager1.assigned(taskId), "Should still be assigned (pending abandon)"); const revolunteerTaskP = taskManager1.volunteerForTask(taskId); assert.ok(taskManager1.queued(taskId), "Should be queued"); - assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); + assert.ok(taskManager1.assigned(taskId), "Should still be assigned (pending abandon)"); containerRuntimeFactory.processAllMessages(); const isAssigned2 = await revolunteerTaskP; assert.ok(isAssigned2, "Should resolve true"); @@ -206,7 +206,7 @@ describe("TaskManager", () => { it("Can attempt to volunteer for task twice and abandon twice (after ack)", async () => { const taskId = "taskId"; const volunteerTaskP1 = taskManager1.volunteerForTask(taskId); - assert.ok(taskManager1.queued(taskId), "Should be queued"); + assert.ok(!taskManager1.queued(taskId), "Should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); containerRuntimeFactory.processAllMessages(); const isAssigned1 = await volunteerTaskP1; @@ -224,8 +224,8 @@ describe("TaskManager", () => { assert.ok(taskManager1.assigned(taskId), "Should be assigned"); taskManager1.abandon(taskId); - assert.ok(!taskManager1.queued(taskId), "Should not be queued"); - assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); + assert.ok(taskManager1.queued(taskId), "Should still be queued (pending abandon)"); + assert.ok(taskManager1.assigned(taskId), "Should still be assigned (pending abandon)"); containerRuntimeFactory.processAllMessages(); assert.ok(!taskManager1.queued(taskId), "Should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); @@ -241,11 +241,11 @@ describe("TaskManager", () => { it("Can attempt to lock task twice and abandon twice (before ack)", async () => { const taskId = "taskId"; const volunteerTaskP1 = taskManager1.volunteerForTask(taskId); - assert.ok(taskManager1.queued(taskId), "Should be queued"); + assert.ok(!taskManager1.queued(taskId), "Should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); const volunteerTaskP2 = taskManager1.volunteerForTask(taskId); - assert.ok(taskManager1.queued(taskId), "Should be queued"); + assert.ok(!taskManager1.queued(taskId), "Should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); containerRuntimeFactory.processAllMessages(); const isAssigned1 = await volunteerTaskP1; @@ -256,11 +256,11 @@ describe("TaskManager", () => { assert.ok(taskManager1.assigned(taskId), "Should be assigned"); taskManager1.abandon(taskId); - assert.ok(!taskManager1.queued(taskId), "Should not be queued"); - assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); + assert.ok(taskManager1.queued(taskId), "Should still be queued (pending abandon)"); + assert.ok(taskManager1.assigned(taskId), "Should still be assigned (pending abandon)"); taskManager1.abandon(taskId); - assert.ok(!taskManager1.queued(taskId), "Should not be queued"); - assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); + assert.ok(taskManager1.queued(taskId), "Should still be queued (pending abandon)"); + assert.ok(taskManager1.assigned(taskId), "Should still be assigned (pending abandon)"); containerRuntimeFactory.processAllMessages(); assert.ok(!taskManager1.queued(taskId), "Should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Should not be assigned"); @@ -287,7 +287,7 @@ describe("TaskManager", () => { const taskId = "taskId"; taskManager1.subscribeToTask(taskId); - assert.ok(taskManager1.queued(taskId), "Task manager 1 should be queued"); + assert.ok(!taskManager1.queued(taskId), "Task manager 1 should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Task manager 1 should not be assigned"); assert.ok(taskManager1.subscribed(taskId), "Task manager 1 should be subscribed"); @@ -316,9 +316,9 @@ describe("TaskManager", () => { taskManager1.subscribeToTask(taskId); taskManager2.subscribeToTask(taskId); - assert.ok(taskManager1.queued(taskId), "Task manager 1 should be queued"); + assert.ok(!taskManager1.queued(taskId), "Task manager 1 should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Task manager 1 should not be assigned"); - assert.ok(taskManager2.queued(taskId), "Task manager 2 should be queued"); + assert.ok(!taskManager2.queued(taskId), "Task manager 2 should not be queued"); assert.ok(!taskManager2.assigned(taskId), "Task manager 2 should not be assigned"); containerRuntimeFactory.processAllMessages(); @@ -812,8 +812,7 @@ describe("TaskManager", () => { ); }); - // todo AB#7310 - it.skip("Can abandon a subscribed task after attach", async () => { + it("Can abandon a subscribed task after attach", async () => { const taskId = "taskId"; taskManager1.subscribeToTask(taskId); containerRuntimeFactory.processAllMessages(); @@ -1100,7 +1099,7 @@ describe("TaskManager", () => { containerRuntime1.connected = true; - assert.ok(taskManager1.queued(taskId), "Task manager 1 should be queued"); + assert.ok(!taskManager1.queued(taskId), "Task manager 1 should not be queued"); assert.ok(!taskManager1.assigned(taskId), "Task manager 1 should not be assigned"); assert.ok(taskManager1.subscribed(taskId), "Task manager 1 should be subscribed");