Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 201 additions & 18 deletions apps/webapp/app/v3/marqs/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,7 @@ export class MarQS {
public async replaceMessage(
messageId: string,
messageData: Record<string, unknown>,
timestamp?: number,
priority?: number,
inplace?: boolean
timestamp?: number
) {
return this.#trace(
"replaceMessage",
Expand All @@ -273,6 +271,57 @@ export class MarQS {
return;
}

span.setAttributes({
[SemanticAttributes.QUEUE]: oldMessage.queue,
[SemanticAttributes.MESSAGE_ID]: oldMessage.messageId,
[SemanticAttributes.CONCURRENCY_KEY]: oldMessage.concurrencyKey,
[SemanticAttributes.PARENT_QUEUE]: oldMessage.parentQueue,
});

const traceContext = {
traceparent: oldMessage.data.traceparent,
tracestate: oldMessage.data.tracestate,
};

const newMessage: MessagePayload = {
version: "1",
// preserve original trace context
data: { ...oldMessage.data, ...messageData, ...traceContext, queue: oldMessage.queue },
queue: oldMessage.queue,
concurrencyKey: oldMessage.concurrencyKey,
timestamp: timestamp ?? Date.now(),
messageId,
parentQueue: oldMessage.parentQueue,
};

await this.#callReplaceMessage(newMessage);
},
{
kind: SpanKind.CONSUMER,
attributes: {
[SEMATTRS_MESSAGING_OPERATION]: "replace",
[SEMATTRS_MESSAGE_ID]: messageId,
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
},
}
);
}

public async requeueMessage(
messageId: string,
messageData: Record<string, unknown>,
timestamp?: number,
priority?: number
) {
return this.#trace(
"requeueMessage",
async (span) => {
const oldMessage = await this.readMessage(messageId);

if (!oldMessage) {
return;
}

const queue = this.keys.queueKeyFromQueue(oldMessage.queue, priority);

span.setAttributes({
Expand All @@ -298,27 +347,16 @@ export class MarQS {
parentQueue: oldMessage.parentQueue,
};

if (inplace) {
await this.#callReplaceMessage(newMessage);
return;
}

await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId);

await this.#callAcknowledgeMessage({
parentQueue: oldMessage.parentQueue,
messageQueue: oldMessage.queue,
messageId,
});
await this.#callRequeueMessage(oldMessage.queue, newMessage);

await this.#callEnqueueMessage(newMessage);

await this.options.subscriber?.messageReplaced(newMessage);
await this.options.subscriber?.messageRequeued(oldMessage.queue, newMessage);
},
{
kind: SpanKind.CONSUMER,
attributes: {
[SEMATTRS_MESSAGING_OPERATION]: "replace",
[SEMATTRS_MESSAGING_OPERATION]: "requeue",
[SEMATTRS_MESSAGE_ID]: messageId,
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
},
Expand Down Expand Up @@ -602,7 +640,7 @@ export class MarQS {
});

if (updates) {
await this.replaceMessage(messageId, updates, retryAt, undefined, true);
await this.replaceMessage(messageId, updates, retryAt);
}

await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId);
Expand Down Expand Up @@ -1163,6 +1201,78 @@ export class MarQS {
);
}

async #callRequeueMessage(oldQueue: string, message: MessagePayload) {
const queueKey = message.queue;
const oldQueueKey = oldQueue;
const parentQueueKey = message.parentQueue;
const messageKey = this.keys.messageKey(message.messageId);
const queueCurrentConcurrencyKey = this.keys.queueCurrentConcurrencyKeyFromQueue(message.queue);
const queueReserveConcurrencyKey = this.keys.queueReserveConcurrencyKeyFromQueue(message.queue);
const envCurrentConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(message.queue);
const envReserveConcurrencyKey = this.keys.envReserveConcurrencyKeyFromQueue(message.queue);
const envQueueKey = this.keys.envQueueKeyFromQueue(message.queue);

const queueName = message.queue;
const oldQueueName = oldQueue;
const messageId = message.messageId;
const messageData = JSON.stringify(message);
const messageScore = String(message.timestamp);

logger.debug("Calling requeueMessage", {
service: this.name,
queueKey,
oldQueueKey,
parentQueueKey,
messageKey,
queueCurrentConcurrencyKey,
queueReserveConcurrencyKey,
envCurrentConcurrencyKey,
envReserveConcurrencyKey,
envQueueKey,
queueName,
oldQueueName,
messageId,
messageData,
messageScore,
});

const result = await this.redis.requeueMessage(
queueKey,
oldQueueKey,
parentQueueKey,
messageKey,
queueCurrentConcurrencyKey,
queueReserveConcurrencyKey,
envCurrentConcurrencyKey,
envReserveConcurrencyKey,
envQueueKey,
queueName,
oldQueueName,
messageId,
messageData,
messageScore
);

logger.debug("requeueMessage result", {
service: this.name,
queueKey,
parentQueueKey,
messageKey,
queueCurrentConcurrencyKey,
queueReserveConcurrencyKey,
envCurrentConcurrencyKey,
envReserveConcurrencyKey,
envQueueKey,
queueName,
messageId,
messageData,
messageScore,
result,
});

return true;
}

async #callAcknowledgeMessage({
parentQueue,
messageQueue,
Expand Down Expand Up @@ -1587,6 +1697,61 @@ redis.call('DEL', messageKey)
`,
});

this.redis.defineCommand("requeueMessage", {
numberOfKeys: 9,
lua: `
local queueKey = KEYS[1]
local oldQueueKey = KEYS[2]
local parentQueueKey = KEYS[3]
local messageKey = KEYS[4]
local queueCurrentConcurrencyKey = KEYS[5]
local queueReserveConcurrencyKey = KEYS[6]
local envCurrentConcurrencyKey = KEYS[7]
local envReserveConcurrencyKey = KEYS[8]
local envQueueKey = KEYS[9]

local queueName = ARGV[1]
local oldQueueName = ARGV[2]
local messageId = ARGV[3]
local messageData = ARGV[4]
local messageScore = ARGV[5]

-- First remove the message from the old queue
redis.call('ZREM', oldQueueKey, messageId)

-- Write the new message data
redis.call('SET', messageKey, messageData)

-- Add the message to the new queue with a new score
redis.call('ZADD', queueKey, messageScore, messageId)
redis.call('ZADD', envQueueKey, messageScore, messageId)

-- Rebalance the parent queue (for the new queue)
local earliestMessage = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES')
if #earliestMessage == 0 then
redis.call('ZREM', parentQueueKey, queueName)
else
redis.call('ZADD', parentQueueKey, earliestMessage[2], queueName)
end

-- Rebalance the parent queue (for the old queue)
local earliestMessage = redis.call('ZRANGE', oldQueueKey, 0, 0, 'WITHSCORES')
if #earliestMessage == 0 then
redis.call('ZREM', parentQueueKey, oldQueueName)
else
redis.call('ZADD', parentQueueKey, earliestMessage[2], oldQueueName)
end

-- Clear all concurrency sets (combined from both scripts)
redis.call('SREM', queueCurrentConcurrencyKey, messageId)
redis.call('SREM', queueReserveConcurrencyKey, messageId)
redis.call('SREM', envCurrentConcurrencyKey, messageId)
redis.call('SREM', envReserveConcurrencyKey, messageId)

return true
`,
});

this.redis.defineCommand("nackMessage", {
numberOfKeys: 7,
lua: `
Expand Down Expand Up @@ -1749,6 +1914,24 @@ declare module "ioredis" {
callback?: Callback<void>
): Result<void, Context>;

requeueMessage(
queueKey: string,
oldQueueKey: string,
parentQueueKey: string,
messageKey: string,
queueCurrentConcurrencyKey: string,
queueReserveConcurrencyKey: string,
envCurrentConcurrencyKey: string,
envReserveConcurrencyKey: string,
envQueueKey: string,
queueName: string,
oldQueueName: string,
messageId: string,
messageData: string,
messageScore: string,
callback?: Callback<string>
): Result<string, Context>;

acknowledgeMessage(
parentQueue: string,
messageKey: string,
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/marqs/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export interface MessageQueueSubscriber {
messageAcked(message: MessagePayload): Promise<void>;
messageNacked(message: MessagePayload): Promise<void>;
messageReplaced(message: MessagePayload): Promise<void>;
messageRequeued(oldQueue: string, message: MessagePayload): Promise<void>;
}

export interface VisibilityTimeoutStrategy {
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/v3/services/completeAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ export class CompleteAttemptService extends BaseService {
logger.debug("[CompleteAttemptService] Enqueuing retry attempt", { runId: run.id });

// We have to replace a potential RESUME with EXECUTE to correctly retry the attempt
return marqs?.replaceMessage(
return marqs?.requeueMessage(
run.id,
{
type: "EXECUTE",
Expand Down Expand Up @@ -615,7 +615,7 @@ export class CompleteAttemptService extends BaseService {
});

if (environment.type === "DEVELOPMENT") {
marqs.replaceMessage(
marqs.requeueMessage(
taskRunAttempt.taskRunId,
{},
executionRetry.timestamp,
Expand Down
14 changes: 4 additions & 10 deletions apps/webapp/app/v3/services/createCheckpoint.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ export class CreateCheckpointService extends BaseService {
});

if (checkpointEvent) {
await marqs?.replaceMessage(
await marqs.requeueMessage(
attempt.taskRunId,
{
type: "RESUME_AFTER_DURATION",
Expand Down Expand Up @@ -297,15 +297,9 @@ export class CreateCheckpointService extends BaseService {
}

//if there's a message in the queue, we make sure the checkpoint event is on it
await marqs?.replaceMessage(
attempt.taskRun.id,
{
checkpointEventId: checkpointEvent.id,
},
undefined,
undefined,
true
);
await marqs.replaceMessage(attempt.taskRun.id, {
checkpointEventId: checkpointEvent.id,
});

await ResumeBatchRunService.enqueue(
batchRun.id,
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/services/resumeBatchRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ export class ResumeBatchRunService extends BaseService {
hasCheckpointEvent: !!batchRun.checkpointEventId,
});

await marqs?.replaceMessage(
await marqs?.requeueMessage(
dependentRun.id,
{
type: "RESUME",
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/services/resumeTaskDependency.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export class ResumeTaskDependencyService extends BaseService {
return;
}

await marqs?.replaceMessage(
await marqs?.requeueMessage(
dependentRun.id,
{
type: "RESUME",
Expand Down
26 changes: 26 additions & 0 deletions apps/webapp/app/v3/services/taskRunConcurrencyTracker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,32 @@ class TaskRunConcurrencyTracker implements MessageQueueSubscriber {
});
}

async messageRequeued(oldQueue: string, message: MessagePayload): Promise<void> {
logger.debug("TaskRunConcurrencyTracker.messageRequeued()", {
data: message.data,
messageId: message.messageId,
oldQueue,
});

const data = this.getMessageData(message);

if (!data) {
logger.info(
`TaskRunConcurrencyTracker.messageReplaced(): could not parse message data`,
message
);
return;
}

await this.executionFinished({
projectId: data.projectId,
taskId: data.taskIdentifier,
runId: message.messageId,
environmentId: data.environmentId,
deployed: data.environmentType !== "DEVELOPMENT",
});
}
Comment on lines +124 to +148
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect error message in messageRequeued method.

The error message uses "messageReplaced" instead of "messageRequeued" which could be confusing for debugging.

Apply this diff to fix the error message:

-        `TaskRunConcurrencyTracker.messageReplaced(): could not parse message data`,
+        `TaskRunConcurrencyTracker.messageRequeued(): could not parse message data`,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async messageRequeued(oldQueue: string, message: MessagePayload): Promise<void> {
logger.debug("TaskRunConcurrencyTracker.messageRequeued()", {
data: message.data,
messageId: message.messageId,
oldQueue,
});
const data = this.getMessageData(message);
if (!data) {
logger.info(
`TaskRunConcurrencyTracker.messageReplaced(): could not parse message data`,
message
);
return;
}
await this.executionFinished({
projectId: data.projectId,
taskId: data.taskIdentifier,
runId: message.messageId,
environmentId: data.environmentId,
deployed: data.environmentType !== "DEVELOPMENT",
});
}
async messageRequeued(oldQueue: string, message: MessagePayload): Promise<void> {
logger.debug("TaskRunConcurrencyTracker.messageRequeued()", {
data: message.data,
messageId: message.messageId,
oldQueue,
});
const data = this.getMessageData(message);
if (!data) {
logger.info(
`TaskRunConcurrencyTracker.messageRequeued(): could not parse message data`,
message
);
return;
}
await this.executionFinished({
projectId: data.projectId,
taskId: data.taskIdentifier,
runId: message.messageId,
environmentId: data.environmentId,
deployed: data.environmentType !== "DEVELOPMENT",
});
}


private getMessageData(message: MessagePayload) {
const result = ConcurrentMessageData.safeParse(message.data);
if (result.success) {
Expand Down
Loading