diff --git a/.changeset/nasty-cobras-wonder.md b/.changeset/nasty-cobras-wonder.md new file mode 100644 index 0000000000..0cd7c417d1 --- /dev/null +++ b/.changeset/nasty-cobras-wonder.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": patch +--- + +Fix metadata collapsing correctness diff --git a/packages/core/src/v3/runMetadata/operations.ts b/packages/core/src/v3/runMetadata/operations.ts index 379f5a4bb9..6a64443260 100644 --- a/packages/core/src/v3/runMetadata/operations.ts +++ b/packages/core/src/v3/runMetadata/operations.ts @@ -173,65 +173,68 @@ export function collapseOperations( return operations; } - // Maps to track collapsible operations - const incrementsByKey = new Map(); - const setsByKey = new Map(); - const deletesByKey = new Set(); - const preservedOperations: RunMetadataChangeOperation[] = []; - - // Process operations in order - for (const operation of operations) { - switch (operation.type) { - case "increment": { - const currentIncrement = incrementsByKey.get(operation.key) || 0; - incrementsByKey.set(operation.key, currentIncrement + operation.value); - break; - } - case "set": { - // Keep only the last set operation for each key - setsByKey.set(operation.key, operation); - break; - } - case "delete": { - // Keep only one delete operation per key - deletesByKey.add(operation.key); - break; - } - case "append": - case "remove": - case "update": { - // Preserve these operations as-is to maintain correctness - preservedOperations.push(operation); - break; - } - default: { - // Handle any future operation types by preserving them - preservedOperations.push(operation); - break; - } + const collapsed: RunMetadataChangeOperation[] = []; + let i = 0; + while (i < operations.length) { + const op = operations[i]; + if (!op) { + i++; + continue; } - } - // Build the collapsed operations array - const collapsedOperations: RunMetadataChangeOperation[] = []; + // Collapse consecutive increments on the same key + if (op.type === "increment") { + let sum = op.value; + let j = i + 1; + while ( + j < operations.length && + operations[j]?.type === "increment" && + (operations[j] as typeof op)?.key === op.key + ) { + sum += (operations[j] as typeof op).value; + j++; + } + collapsed.push({ type: "increment", key: op.key, value: sum }); + i = j; + continue; + } - // Add collapsed increment operations - for (const [key, value] of incrementsByKey) { - collapsedOperations.push({ type: "increment", key, value }); - } + // Collapse consecutive sets on the same key (keep only the last in the sequence) + if (op.type === "set") { + let last = op; + let j = i + 1; + while ( + j < operations.length && + operations[j]?.type === "set" && + (operations[j] as typeof op)?.key === op.key + ) { + last = operations[j] as typeof op; + j++; + } + collapsed.push(last); + i = j; + continue; + } - // Add collapsed set operations - for (const operation of setsByKey.values()) { - collapsedOperations.push(operation); - } + // Collapse consecutive deletes on the same key (keep only one) + if (op.type === "delete") { + let j = i + 1; + while ( + j < operations.length && + operations[j]?.type === "delete" && + (operations[j] as typeof op)?.key === op.key + ) { + j++; + } + collapsed.push(op); + i = j; + continue; + } - // Add collapsed delete operations - for (const key of deletesByKey) { - collapsedOperations.push({ type: "delete", key }); + // For append, remove, update, and unknown types, preserve order and do not collapse + collapsed.push(op); + i++; } - // Add preserved operations - collapsedOperations.push(...preservedOperations); - - return collapsedOperations; + return collapsed; } diff --git a/packages/core/test/standardMetadataManager.test.ts b/packages/core/test/standardMetadataManager.test.ts index a96041c9f8..39ae810409 100644 --- a/packages/core/test/standardMetadataManager.test.ts +++ b/packages/core/test/standardMetadataManager.test.ts @@ -395,34 +395,36 @@ describe("StandardMetadataManager", () => { const update = metadataUpdates[0]!; const operations = update.operations!; - // Should have: 1 collapsed increment, 1 collapsed set, 2 appends - // (delete operations on non-existent keys are not queued) - expect(operations).toHaveLength(4); - - // Find each operation type - const incrementOp = operations.find((op) => op.type === "increment"); - const setOp = operations.find((op) => op.type === "set"); - const appendOps = operations.filter((op) => op.type === "append"); + // With order-preserving collapse, expect 6 operations: + // increment(counter, 5), set(status, processing), append(logs, ...), increment(counter, 3), set(status, completed), append(logs, ...) + expect(operations).toHaveLength(6); - expect(incrementOp).toEqual({ + expect(operations[0]).toEqual({ type: "increment", key: "counter", - value: 8, // 5 + 3 + value: 5, }); - - expect(setOp).toEqual({ + expect(operations[1]).toEqual({ type: "set", key: "status", - value: "completed", // Last set value + value: "processing", }); - - expect(appendOps).toHaveLength(2); - expect(appendOps[0]).toEqual({ + expect(operations[2]).toEqual({ type: "append", key: "logs", value: "Started processing", }); - expect(appendOps[1]).toEqual({ + expect(operations[3]).toEqual({ + type: "increment", + key: "counter", + value: 3, + }); + expect(operations[4]).toEqual({ + type: "set", + key: "status", + value: "completed", + }); + expect(operations[5]).toEqual({ type: "append", key: "logs", value: "Processing completed", @@ -454,42 +456,48 @@ describe("StandardMetadataManager", () => { const update = metadataUpdates[1]!; const operations = update.operations!; - // Should have: 1 collapsed increment, 1 collapsed set, 2 appends, 2 collapsed deletes - expect(operations).toHaveLength(6); - - // Find each operation type - const incrementOp = operations.find((op) => op.type === "increment"); - const setOp = operations.find((op) => op.type === "set"); - const appendOps = operations.filter((op) => op.type === "append"); - const deleteOps = operations.filter((op) => op.type === "delete"); + // With order-preserving collapse, expect 8 operations: + // increment(counter, 5), set(status, processing), append(logs, ...), increment(counter, 3), set(status, completed), append(logs, ...), delete(tempData1), delete(tempData2) + expect(operations).toHaveLength(8); - expect(incrementOp).toEqual({ + expect(operations[0]).toEqual({ type: "increment", key: "counter", - value: 8, // 5 + 3 + value: 5, }); - - expect(setOp).toEqual({ + expect(operations[1]).toEqual({ type: "set", key: "status", - value: "completed", // Last set value + value: "processing", }); - - expect(appendOps).toHaveLength(2); - expect(appendOps[0]).toEqual({ + expect(operations[2]).toEqual({ type: "append", key: "logs", value: "Started processing", }); - expect(appendOps[1]).toEqual({ + expect(operations[3]).toEqual({ + type: "increment", + key: "counter", + value: 3, + }); + expect(operations[4]).toEqual({ + type: "set", + key: "status", + value: "completed", + }); + expect(operations[5]).toEqual({ type: "append", key: "logs", value: "Processing completed", }); - - expect(deleteOps).toHaveLength(2); - const deleteKeys = deleteOps.map((op) => (op as any).key).sort(); - expect(deleteKeys).toEqual(["tempData1", "tempData2"]); + expect(operations[6]).toEqual({ + type: "delete", + key: "tempData1", + }); + expect(operations[7]).toEqual({ + type: "delete", + key: "tempData2", + }); }); test("should collapse operations across different keys independently", async () => { @@ -504,26 +512,30 @@ describe("StandardMetadataManager", () => { expect(metadataUpdates).toHaveLength(1); const update = metadataUpdates[0]!; - expect(update.operations).toHaveLength(2); - - // Should have separate collapsed increments for each key - const filesOp = update.operations!.find( - (op) => op.type === "increment" && (op as any).key === "filesProcessed" - ); - const errorsOp = update.operations!.find( - (op) => op.type === "increment" && (op as any).key === "errorsCount" - ); + const operations = update.operations!; - expect(filesOp).toEqual({ + // With order-preserving collapse, expect 4 operations: + // increment(filesProcessed, 10), increment(errorsCount, 1), increment(filesProcessed, 5), increment(errorsCount, 2) + expect(operations).toHaveLength(4); + expect(operations[0]).toEqual({ type: "increment", key: "filesProcessed", - value: 15, // 10 + 5 + value: 10, }); - - expect(errorsOp).toEqual({ + expect(operations[1]).toEqual({ + type: "increment", + key: "errorsCount", + value: 1, + }); + expect(operations[2]).toEqual({ + type: "increment", + key: "filesProcessed", + value: 5, + }); + expect(operations[3]).toEqual({ type: "increment", key: "errorsCount", - value: 3, // 1 + 2 + value: 2, }); });