diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index 02e59483b6..e2a7ba3259 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -536,7 +536,11 @@ class TaskCoordinator { socket.on("TEST", (message, callback) => { logger.log("Handling TEST", { eventName: "TEST", ...getSocketMetadata(), ...message }); - callback(); + try { + callback(); + } catch (error) { + logger.error("TEST error", { error }); + } }); // Deprecated: Only workers without support for lazy attempts use this @@ -669,13 +673,25 @@ class TaskCoordinator { log.log("Handling READY_FOR_RESUME"); - updateAttemptFriendlyId(message.attemptFriendlyId); + try { + updateAttemptFriendlyId(message.attemptFriendlyId); - if (message.version === "v2") { - updateAttemptNumber(message.attemptNumber); - } + if (message.version === "v2") { + updateAttemptNumber(message.attemptNumber); + } + + this.#platformSocket?.send("READY_FOR_RESUME", { ...message, version: "v1" }); + } catch (error) { + log.error("READY_FOR_RESUME error", { error }); - this.#platformSocket?.send("READY_FOR_RESUME", { ...message, version: "v1" }); + await crashRun({ + name: "ReadyForResumeError", + message: + error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error", + }); + + return; + } }); // MARK: RUN COMPLETED @@ -692,100 +708,112 @@ class TaskCoordinator { log.log("Handling TASK_RUN_COMPLETED"); - const { completion, execution } = message; + try { + const { completion, execution } = message; - // Cancel all in-progress checkpoints (if any) - this.#cancelCheckpoint(socket.data.runId); + // Cancel all in-progress checkpoints (if any) + this.#cancelCheckpoint(socket.data.runId); - await chaosMonkey.call({ throwErrors: false }); + await chaosMonkey.call({ throwErrors: false }); - const completeWithoutCheckpoint = (shouldExit: boolean) => { - const supportsRetryCheckpoints = message.version === "v1"; + const completeWithoutCheckpoint = (shouldExit: boolean) => { + const supportsRetryCheckpoints = message.version === "v1"; - this.#platformSocket?.send("TASK_RUN_COMPLETED", { - version: supportsRetryCheckpoints ? "v1" : "v2", - execution, - completion, - }); - callback({ willCheckpointAndRestore: false, shouldExit }); - }; + this.#platformSocket?.send("TASK_RUN_COMPLETED", { + version: supportsRetryCheckpoints ? "v1" : "v2", + execution, + completion, + }); + callback({ willCheckpointAndRestore: false, shouldExit }); + }; - if (completion.ok) { - completeWithoutCheckpoint(true); - return; - } + if (completion.ok) { + completeWithoutCheckpoint(true); + return; + } - if ( - completion.error.type === "INTERNAL_ERROR" && - completion.error.code === "TASK_RUN_CANCELLED" - ) { - completeWithoutCheckpoint(true); - return; - } + if ( + completion.error.type === "INTERNAL_ERROR" && + completion.error.code === "TASK_RUN_CANCELLED" + ) { + completeWithoutCheckpoint(true); + return; + } - if (completion.retry === undefined) { - completeWithoutCheckpoint(true); - return; - } + if (completion.retry === undefined) { + completeWithoutCheckpoint(true); + return; + } - if (completion.retry.delay < this.#delayThresholdInMs) { - completeWithoutCheckpoint(false); + if (completion.retry.delay < this.#delayThresholdInMs) { + completeWithoutCheckpoint(false); - // Prevents runs that fail fast from never sending a heartbeat - this.#sendRunHeartbeat(socket.data.runId); + // Prevents runs that fail fast from never sending a heartbeat + this.#sendRunHeartbeat(socket.data.runId); - return; - } + return; + } - if (message.version === "v2") { - completeWithoutCheckpoint(true); - return; - } + if (message.version === "v2") { + completeWithoutCheckpoint(true); + return; + } - const { canCheckpoint, willSimulate } = await this.#checkpointer.init(); + const { canCheckpoint, willSimulate } = await this.#checkpointer.init(); - const willCheckpointAndRestore = canCheckpoint || willSimulate; + const willCheckpointAndRestore = canCheckpoint || willSimulate; - if (!willCheckpointAndRestore) { - completeWithoutCheckpoint(false); - return; - } + if (!willCheckpointAndRestore) { + completeWithoutCheckpoint(false); + return; + } - // The worker will then put itself in a checkpointable state - callback({ willCheckpointAndRestore: true, shouldExit: false }); + // The worker will then put itself in a checkpointable state + callback({ willCheckpointAndRestore: true, shouldExit: false }); - const ready = await readyToCheckpoint("RETRY"); + const ready = await readyToCheckpoint("RETRY"); - if (!ready.success) { - log.error("Failed to become checkpointable", { reason: ready.reason }); + if (!ready.success) { + log.error("Failed to become checkpointable", { reason: ready.reason }); - return; - } + return; + } - const checkpoint = await this.#checkpointer.checkpointAndPush({ - runId: socket.data.runId, - projectRef: socket.data.projectRef, - deploymentVersion: socket.data.deploymentVersion, - shouldHeartbeat: true, - }); + const checkpoint = await this.#checkpointer.checkpointAndPush({ + runId: socket.data.runId, + projectRef: socket.data.projectRef, + deploymentVersion: socket.data.deploymentVersion, + shouldHeartbeat: true, + }); - if (!checkpoint) { - log.error("Failed to checkpoint"); - completeWithoutCheckpoint(false); - return; - } + if (!checkpoint) { + log.error("Failed to checkpoint"); + completeWithoutCheckpoint(false); + return; + } - log.addFields({ checkpoint }); + log.addFields({ checkpoint }); - this.#platformSocket?.send("TASK_RUN_COMPLETED", { - version: "v1", - execution, - completion, - checkpoint, - }); + this.#platformSocket?.send("TASK_RUN_COMPLETED", { + version: "v1", + execution, + completion, + checkpoint, + }); - if (!checkpoint.docker || !willSimulate) { - exitRun(); + if (!checkpoint.docker || !willSimulate) { + exitRun(); + } + } catch (error) { + log.error("TASK_RUN_COMPLETED error", { error }); + + await crashRun({ + name: "TaskRunCompletedError", + message: + error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error", + }); + + return; } }); @@ -802,15 +830,21 @@ class TaskCoordinator { log.log("Handling TASK_RUN_FAILED_TO_RUN"); - // Cancel all in-progress checkpoints (if any) - this.#cancelCheckpoint(socket.data.runId); + try { + // Cancel all in-progress checkpoints (if any) + this.#cancelCheckpoint(socket.data.runId); + + this.#platformSocket?.send("TASK_RUN_FAILED_TO_RUN", { + version: "v1", + completion, + }); - this.#platformSocket?.send("TASK_RUN_FAILED_TO_RUN", { - version: "v1", - completion, - }); + exitRun(); + } catch (error) { + log.error("TASK_RUN_FAILED_TO_RUN error", { error }); - exitRun(); + return; + } }); // MARK: CHECKPOINT @@ -823,14 +857,20 @@ class TaskCoordinator { log.log("Handling READY_FOR_CHECKPOINT"); - const checkpointable = this.#checkpointableTasks.get(socket.data.runId); + try { + const checkpointable = this.#checkpointableTasks.get(socket.data.runId); + + if (!checkpointable) { + log.error("No checkpoint scheduled"); + return; + } + + checkpointable.resolve(); + } catch (error) { + log.error("READY_FOR_CHECKPOINT error", { error }); - if (!checkpointable) { - log.error("No checkpoint scheduled"); return; } - - checkpointable.resolve(); }); // MARK: CXX CHECKPOINT @@ -843,15 +883,19 @@ class TaskCoordinator { log.log("Handling CANCEL_CHECKPOINT"); - if (message.version === "v1") { - this.#cancelCheckpoint(socket.data.runId); - // v1 has no callback - return; - } + try { + if (message.version === "v1") { + this.#cancelCheckpoint(socket.data.runId); + // v1 has no callback + return; + } - const checkpointCanceled = this.#cancelCheckpoint(socket.data.runId); + const checkpointCanceled = this.#cancelCheckpoint(socket.data.runId); - callback({ version: "v2", checkpointCanceled }); + callback({ version: "v2", checkpointCanceled }); + } catch (error) { + log.error("CANCEL_CHECKPOINT error", { error }); + } }); // MARK: DURATION WAIT @@ -864,66 +908,78 @@ class TaskCoordinator { log.log("Handling WAIT_FOR_DURATION"); - await chaosMonkey.call({ throwErrors: false }); + try { + await chaosMonkey.call({ throwErrors: false }); - if (checkpointInProgress()) { - log.error("Checkpoint already in progress"); - callback({ willCheckpointAndRestore: false }); - return; - } + if (checkpointInProgress()) { + log.error("Checkpoint already in progress"); + callback({ willCheckpointAndRestore: false }); + return; + } - const { canCheckpoint, willSimulate } = await this.#checkpointer.init(); + const { canCheckpoint, willSimulate } = await this.#checkpointer.init(); - const willCheckpointAndRestore = canCheckpoint || willSimulate; + const willCheckpointAndRestore = canCheckpoint || willSimulate; - callback({ willCheckpointAndRestore }); + callback({ willCheckpointAndRestore }); - if (!willCheckpointAndRestore) { - return; - } + if (!willCheckpointAndRestore) { + return; + } - const ready = await readyToCheckpoint("WAIT_FOR_DURATION"); + const ready = await readyToCheckpoint("WAIT_FOR_DURATION"); - if (!ready.success) { - log.error("Failed to become checkpointable", { reason: ready.reason }); - return; - } + if (!ready.success) { + log.error("Failed to become checkpointable", { reason: ready.reason }); + return; + } - const checkpoint = await this.#checkpointer.checkpointAndPush({ - runId: socket.data.runId, - projectRef: socket.data.projectRef, - deploymentVersion: socket.data.deploymentVersion, - attemptNumber: getAttemptNumber(), - }); + const checkpoint = await this.#checkpointer.checkpointAndPush({ + runId: socket.data.runId, + projectRef: socket.data.projectRef, + deploymentVersion: socket.data.deploymentVersion, + attemptNumber: getAttemptNumber(), + }); - if (!checkpoint) { - // The task container will keep running until the wait duration has elapsed - log.error("Failed to checkpoint"); - return; - } + if (!checkpoint) { + // The task container will keep running until the wait duration has elapsed + log.error("Failed to checkpoint"); + return; + } - log.addFields({ checkpoint }); + log.addFields({ checkpoint }); - const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { - version: "v1", - runId: socket.data.runId, - attemptFriendlyId: message.attemptFriendlyId, - docker: checkpoint.docker, - location: checkpoint.location, - reason: { - type: "WAIT_FOR_DURATION", - ms: message.ms, - now: message.now, - }, - }); + const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { + version: "v1", + runId: socket.data.runId, + attemptFriendlyId: message.attemptFriendlyId, + docker: checkpoint.docker, + location: checkpoint.location, + reason: { + type: "WAIT_FOR_DURATION", + ms: message.ms, + now: message.now, + }, + }); - if (ack?.keepRunAlive) { - log.log("keeping run alive after duration checkpoint"); - return; - } + if (ack?.keepRunAlive) { + log.log("keeping run alive after duration checkpoint"); + return; + } - if (!checkpoint.docker || !willSimulate) { - exitRun(); + if (!checkpoint.docker || !willSimulate) { + exitRun(); + } + } catch (error) { + log.error("WAIT_FOR_DURATION error", { error }); + + await crashRun({ + name: "WaitForDurationError", + message: + error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error", + }); + + return; } }); @@ -937,74 +993,86 @@ class TaskCoordinator { log.log("Handling WAIT_FOR_TASK"); - await chaosMonkey.call({ throwErrors: false }); + try { + await chaosMonkey.call({ throwErrors: false }); - if (checkpointInProgress()) { - log.error("Checkpoint already in progress"); - callback({ willCheckpointAndRestore: false }); - return; - } + if (checkpointInProgress()) { + log.error("Checkpoint already in progress"); + callback({ willCheckpointAndRestore: false }); + return; + } - const { canCheckpoint, willSimulate } = await this.#checkpointer.init(); + const { canCheckpoint, willSimulate } = await this.#checkpointer.init(); - const willCheckpointAndRestore = canCheckpoint || willSimulate; + const willCheckpointAndRestore = canCheckpoint || willSimulate; - callback({ willCheckpointAndRestore }); + callback({ willCheckpointAndRestore }); - if (!willCheckpointAndRestore) { - return; - } + if (!willCheckpointAndRestore) { + return; + } - // Workers with v1 schemas don't signal when they're ready to checkpoint for dependency waits - if (message.version === "v2") { - const ready = await readyToCheckpoint("WAIT_FOR_TASK"); + // Workers with v1 schemas don't signal when they're ready to checkpoint for dependency waits + if (message.version === "v2") { + const ready = await readyToCheckpoint("WAIT_FOR_TASK"); - if (!ready.success) { - log.error("Failed to become checkpointable", { reason: ready.reason }); + if (!ready.success) { + log.error("Failed to become checkpointable", { reason: ready.reason }); + return; + } + } + + const checkpoint = await this.#checkpointer.checkpointAndPush({ + runId: socket.data.runId, + projectRef: socket.data.projectRef, + deploymentVersion: socket.data.deploymentVersion, + attemptNumber: getAttemptNumber(), + }); + + if (!checkpoint) { + log.error("Failed to checkpoint"); return; } - } - const checkpoint = await this.#checkpointer.checkpointAndPush({ - runId: socket.data.runId, - projectRef: socket.data.projectRef, - deploymentVersion: socket.data.deploymentVersion, - attemptNumber: getAttemptNumber(), - }); + log.addFields({ checkpoint }); - if (!checkpoint) { - log.error("Failed to checkpoint"); - return; - } + log.log("WAIT_FOR_TASK checkpoint created"); - log.addFields({ checkpoint }); + //setting this means we can only resume from a checkpoint + socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; + log.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage"); - log.log("WAIT_FOR_TASK checkpoint created"); + const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { + version: "v1", + runId: socket.data.runId, + attemptFriendlyId: message.attemptFriendlyId, + docker: checkpoint.docker, + location: checkpoint.location, + reason: { + type: "WAIT_FOR_TASK", + friendlyId: message.friendlyId, + }, + }); - //setting this means we can only resume from a checkpoint - socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; - log.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage"); + if (ack?.keepRunAlive) { + socket.data.requiresCheckpointResumeWithMessage = undefined; + log.log("keeping run alive after task checkpoint"); + return; + } - const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { - version: "v1", - runId: socket.data.runId, - attemptFriendlyId: message.attemptFriendlyId, - docker: checkpoint.docker, - location: checkpoint.location, - reason: { - type: "WAIT_FOR_TASK", - friendlyId: message.friendlyId, - }, - }); + if (!checkpoint.docker || !willSimulate) { + exitRun(); + } + } catch (error) { + log.error("WAIT_FOR_TASK error", { error }); - if (ack?.keepRunAlive) { - socket.data.requiresCheckpointResumeWithMessage = undefined; - log.log("keeping run alive after task checkpoint"); - return; - } + await crashRun({ + name: "WaitForTaskError", + message: + error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error", + }); - if (!checkpoint.docker || !willSimulate) { - exitRun(); + return; } }); @@ -1018,75 +1086,87 @@ class TaskCoordinator { log.log("Handling WAIT_FOR_BATCH", message); - await chaosMonkey.call({ throwErrors: false }); + try { + await chaosMonkey.call({ throwErrors: false }); - if (checkpointInProgress()) { - log.error("Checkpoint already in progress"); - callback({ willCheckpointAndRestore: false }); - return; - } + if (checkpointInProgress()) { + log.error("Checkpoint already in progress"); + callback({ willCheckpointAndRestore: false }); + return; + } - const { canCheckpoint, willSimulate } = await this.#checkpointer.init(); + const { canCheckpoint, willSimulate } = await this.#checkpointer.init(); - const willCheckpointAndRestore = canCheckpoint || willSimulate; + const willCheckpointAndRestore = canCheckpoint || willSimulate; - callback({ willCheckpointAndRestore }); + callback({ willCheckpointAndRestore }); - if (!willCheckpointAndRestore) { - return; - } + if (!willCheckpointAndRestore) { + return; + } - // Workers with v1 schemas don't signal when they're ready to checkpoint for dependency waits - if (message.version === "v2") { - const ready = await readyToCheckpoint("WAIT_FOR_BATCH"); + // Workers with v1 schemas don't signal when they're ready to checkpoint for dependency waits + if (message.version === "v2") { + const ready = await readyToCheckpoint("WAIT_FOR_BATCH"); - if (!ready.success) { - log.error("Failed to become checkpointable", { reason: ready.reason }); + if (!ready.success) { + log.error("Failed to become checkpointable", { reason: ready.reason }); + return; + } + } + + const checkpoint = await this.#checkpointer.checkpointAndPush({ + runId: socket.data.runId, + projectRef: socket.data.projectRef, + deploymentVersion: socket.data.deploymentVersion, + attemptNumber: getAttemptNumber(), + }); + + if (!checkpoint) { + log.error("Failed to checkpoint"); return; } - } - const checkpoint = await this.#checkpointer.checkpointAndPush({ - runId: socket.data.runId, - projectRef: socket.data.projectRef, - deploymentVersion: socket.data.deploymentVersion, - attemptNumber: getAttemptNumber(), - }); + log.addFields({ checkpoint }); - if (!checkpoint) { - log.error("Failed to checkpoint"); - return; - } + log.log("WAIT_FOR_BATCH checkpoint created"); - log.addFields({ checkpoint }); + //setting this means we can only resume from a checkpoint + socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; + log.log("WAIT_FOR_BATCH set checkpoint"); - log.log("WAIT_FOR_BATCH checkpoint created"); + const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { + version: "v1", + runId: socket.data.runId, + attemptFriendlyId: message.attemptFriendlyId, + docker: checkpoint.docker, + location: checkpoint.location, + reason: { + type: "WAIT_FOR_BATCH", + batchFriendlyId: message.batchFriendlyId, + runFriendlyIds: message.runFriendlyIds, + }, + }); - //setting this means we can only resume from a checkpoint - socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; - log.log("WAIT_FOR_BATCH set checkpoint"); + if (ack?.keepRunAlive) { + socket.data.requiresCheckpointResumeWithMessage = undefined; + log.log("keeping run alive after batch checkpoint"); + return; + } - const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { - version: "v1", - runId: socket.data.runId, - attemptFriendlyId: message.attemptFriendlyId, - docker: checkpoint.docker, - location: checkpoint.location, - reason: { - type: "WAIT_FOR_BATCH", - batchFriendlyId: message.batchFriendlyId, - runFriendlyIds: message.runFriendlyIds, - }, - }); + if (!checkpoint.docker || !willSimulate) { + exitRun(); + } + } catch (error) { + log.error("WAIT_FOR_BATCH error", { error }); - if (ack?.keepRunAlive) { - socket.data.requiresCheckpointResumeWithMessage = undefined; - log.log("keeping run alive after batch checkpoint"); - return; - } + await crashRun({ + name: "WaitForBatchError", + message: + error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error", + }); - if (!checkpoint.docker || !willSimulate) { - exitRun(); + return; } }); @@ -1100,24 +1180,29 @@ class TaskCoordinator { log.log("Handling INDEX_TASKS"); - const workerAck = await this.#platformSocket?.sendWithAck("CREATE_WORKER", { - version: "v2", - projectRef: socket.data.projectRef, - envId: socket.data.envId, - deploymentId: message.deploymentId, - metadata: { - contentHash: socket.data.contentHash, - packageVersion: message.packageVersion, - tasks: message.tasks, - }, - supportsLazyAttempts: message.version !== "v1" && message.supportsLazyAttempts, - }); + try { + const workerAck = await this.#platformSocket?.sendWithAck("CREATE_WORKER", { + version: "v2", + projectRef: socket.data.projectRef, + envId: socket.data.envId, + deploymentId: message.deploymentId, + metadata: { + contentHash: socket.data.contentHash, + packageVersion: message.packageVersion, + tasks: message.tasks, + }, + supportsLazyAttempts: message.version !== "v1" && message.supportsLazyAttempts, + }); - if (!workerAck) { - log.debug("no worker ack while indexing"); - } + if (!workerAck) { + log.debug("no worker ack while indexing"); + } - callback({ success: !!workerAck?.success }); + callback({ success: !!workerAck?.success }); + } catch (error) { + log.error("INDEX_TASKS error", { error }); + callback({ success: false }); + } }); // MARK: INDEX FAILED @@ -1130,11 +1215,15 @@ class TaskCoordinator { log.log("Handling INDEXING_FAILED"); - this.#platformSocket?.send("INDEXING_FAILED", { - version: "v1", - deploymentId: message.deploymentId, - error: message.error, - }); + try { + this.#platformSocket?.send("INDEXING_FAILED", { + version: "v1", + deploymentId: message.deploymentId, + error: message.error, + }); + } catch (error) { + log.error("INDEXING_FAILED error", { error }); + } }); // MARK: CREATE ATTEMPT @@ -1147,26 +1236,38 @@ class TaskCoordinator { log.log("Handling CREATE_TASK_RUN_ATTEMPT"); - await chaosMonkey.call({ throwErrors: false }); - - const createAttempt = await this.#platformSocket?.sendWithAck("CREATE_TASK_RUN_ATTEMPT", { - runId: message.runId, - envId: socket.data.envId, - }); + try { + await chaosMonkey.call({ throwErrors: false }); + + const createAttempt = await this.#platformSocket?.sendWithAck( + "CREATE_TASK_RUN_ATTEMPT", + { + runId: message.runId, + envId: socket.data.envId, + } + ); - if (!createAttempt?.success) { - log.debug("no ack while creating attempt", { reason: createAttempt?.reason }); - callback({ success: false, reason: createAttempt?.reason }); - return; - } + if (!createAttempt?.success) { + log.debug("no ack while creating attempt", { reason: createAttempt?.reason }); + callback({ success: false, reason: createAttempt?.reason }); + return; + } - updateAttemptFriendlyId(createAttempt.executionPayload.execution.attempt.id); - updateAttemptNumber(createAttempt.executionPayload.execution.attempt.number); + updateAttemptFriendlyId(createAttempt.executionPayload.execution.attempt.id); + updateAttemptNumber(createAttempt.executionPayload.execution.attempt.number); - callback({ - success: true, - executionPayload: createAttempt.executionPayload, - }); + callback({ + success: true, + executionPayload: createAttempt.executionPayload, + }); + } catch (error) { + log.error("CREATE_TASK_RUN_ATTEMPT error", { error }); + callback({ + success: false, + reason: + error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error", + }); + } }); socket.on("UNRECOVERABLE_ERROR", async (message) => { @@ -1178,7 +1279,11 @@ class TaskCoordinator { log.log("Handling UNRECOVERABLE_ERROR"); - await crashRun(message.error); + try { + await crashRun(message.error); + } catch (error) { + log.error("UNRECOVERABLE_ERROR error", { error }); + } }); socket.on("SET_STATE", async (message) => { @@ -1190,20 +1295,28 @@ class TaskCoordinator { log.log("Handling SET_STATE"); - if (message.attemptFriendlyId) { - updateAttemptFriendlyId(message.attemptFriendlyId); - } + try { + if (message.attemptFriendlyId) { + updateAttemptFriendlyId(message.attemptFriendlyId); + } - if (message.attemptNumber) { - updateAttemptNumber(message.attemptNumber); + if (message.attemptNumber) { + updateAttemptNumber(message.attemptNumber); + } + } catch (error) { + log.error("SET_STATE error", { error }); } }); }, onDisconnect: async (socket, handler, sender, logger) => { - this.#platformSocket?.send("LOG", { - metadata: socket.data, - text: "disconnect", - }); + try { + this.#platformSocket?.send("LOG", { + metadata: socket.data, + text: "disconnect", + }); + } catch (error) { + logger.error("onDisconnect error", { error }); + } }, handlers: { TASK_HEARTBEAT: async (message) => {