Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const { action } = createActionApiRoute(
{
params: ParamsSchema,
body: UpdateMetadataRequestBody,
maxContentLength: 1024 * 1024, // 1MB
maxContentLength: 1024 * 1024 * 2, // 3MB
method: "PUT",
},
async ({ authentication, body, params }) => {
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/services/metadata/updateMetadata.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ export class UpdateMetadataService extends BaseService {
`[UpdateMetadataService][updateRunMetadataWithOperations] Updated metadata for run ${runId}`,
{
metadata: applyResults.newMetadata,
operations: operations,
}
);
}
Expand Down
30 changes: 21 additions & 9 deletions packages/core/src/v3/runMetadata/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ApiClient } from "../apiClient/index.js";
import { FlushedRunMetadata, RunMetadataChangeOperation } from "../schemas/common.js";
import { ApiRequestOptions } from "../zodfetch.js";
import { MetadataStream } from "./metadataStream.js";
import { applyMetadataOperations } from "./operations.js";
import { applyMetadataOperations, collapseOperations } from "./operations.js";
import { RunMetadataManager, RunMetadataUpdater } from "./types.js";
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";

Expand Down Expand Up @@ -33,7 +33,7 @@ export class StandardMetadataManager implements RunMetadataManager {
get parent(): RunMetadataUpdater {
// Store a reference to 'this' to ensure proper context
const self = this;

// Create the updater object and store it in a local variable
const parentUpdater: RunMetadataUpdater = {
set: (key, value) => {
Expand Down Expand Up @@ -66,14 +66,14 @@ export class StandardMetadataManager implements RunMetadataManager {
},
stream: (key, value, signal) => self.doStream(key, value, "parent", parentUpdater, signal),
};

return parentUpdater;
}

get root(): RunMetadataUpdater {
// Store a reference to 'this' to ensure proper context
const self = this;

// Create the updater object and store it in a local variable
const rootUpdater: RunMetadataUpdater = {
set: (key, value) => {
Expand Down Expand Up @@ -106,7 +106,7 @@ export class StandardMetadataManager implements RunMetadataManager {
},
stream: (key, value, signal) => self.doStream(key, value, "root", rootUpdater, signal),
};

return rootUpdater;
}

Expand Down Expand Up @@ -353,9 +353,17 @@ export class StandardMetadataManager implements RunMetadataManager {
this.queuedRootOperations.clear();

try {
const collapsedOperations = collapseOperations(operations);
const collapsedParentOperations = collapseOperations(parentOperations);
const collapsedRootOperations = collapseOperations(rootOperations);

const response = await this.apiClient.updateRunMetadata(
this.runId,
{ operations, parentOperations, rootOperations },
{
operations: collapsedOperations,
parentOperations: collapsedParentOperations,
rootOperations: collapsedRootOperations,
},
requestOptions
);

Expand Down Expand Up @@ -406,10 +414,14 @@ export class StandardMetadataManager implements RunMetadataManager {
return;
}

const operations = Array.from(this.queuedOperations);
const parentOperations = Array.from(this.queuedParentOperations);
const rootOperations = Array.from(this.queuedRootOperations);

return {
operations: Array.from(this.queuedOperations),
parentOperations: Array.from(this.queuedParentOperations),
rootOperations: Array.from(this.queuedRootOperations),
operations: collapseOperations(operations),
parentOperations: collapseOperations(parentOperations),
rootOperations: collapseOperations(rootOperations),
};
}

Expand Down
106 changes: 106 additions & 0 deletions packages/core/src/v3/runMetadata/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,109 @@ export function applyMetadataOperations(

return { newMetadata, unappliedOperations };
}

/**
* Collapses metadata operations to reduce payload size and avoid 413 "Request Entity Too Large" errors.
*
* When there are many operations queued up (e.g., 10k increment operations), sending them all
* individually can result in request payloads exceeding the server's 1MB limit. This function
* intelligently combines operations where possible to reduce the payload size:
*
* - **Increment operations**: Multiple increments on the same key are summed into a single increment
* - Example: increment("counter", 1) + increment("counter", 2) → increment("counter", 3)
*
* - **Set operations**: Multiple sets on the same key keep only the last one (since later sets override earlier ones)
* - Example: set("status", "processing") + set("status", "done") → set("status", "done")
*
* - **Delete operations**: Multiple deletes on the same key keep only one (duplicates are redundant)
* - Example: del("temp") + del("temp") → del("temp")
*
* - **Append, remove, and update operations**: Preserved as-is to maintain correctness since order matters
*
* @param operations Array of metadata change operations to collapse
* @returns Collapsed array with fewer operations that produce the same final result
*
* @example
* ```typescript
* const operations = [
* { type: "increment", key: "counter", value: 1 },
* { type: "increment", key: "counter", value: 2 },
* { type: "set", key: "status", value: "processing" },
* { type: "set", key: "status", value: "done" }
* ];
*
* const collapsed = collapseOperations(operations);
* // Result: [
* // { type: "increment", key: "counter", value: 3 },
* // { type: "set", key: "status", value: "done" }
* // ]
* ```
*/
export function collapseOperations(
operations: RunMetadataChangeOperation[]
): RunMetadataChangeOperation[] {
if (operations.length === 0) {
return operations;
}

// Maps to track collapsible operations
const incrementsByKey = new Map<string, number>();
const setsByKey = new Map<string, RunMetadataChangeOperation>();
const deletesByKey = new Set<string>();
const preservedOperations: RunMetadataChangeOperation[] = [];

// Process operations in order
for (const operation of operations) {
switch (operation.type) {
case "increment":
const currentIncrement = incrementsByKey.get(operation.key) || 0;
incrementsByKey.set(operation.key, currentIncrement + operation.value);
break;

case "set":
// Keep only the last set operation for each key
setsByKey.set(operation.key, operation);
break;

case "delete":
// Keep only one delete operation per key
deletesByKey.add(operation.key);
break;

case "append":
case "remove":
case "update":
// Preserve these operations as-is to maintain correctness
preservedOperations.push(operation);
break;

default:
// Handle any future operation types by preserving them
preservedOperations.push(operation);
break;
}
}

// Build the collapsed operations array
const collapsedOperations: RunMetadataChangeOperation[] = [];

// Add collapsed increment operations
for (const [key, value] of incrementsByKey) {
collapsedOperations.push({ type: "increment", key, value });
}

// Add collapsed set operations
for (const operation of setsByKey.values()) {
collapsedOperations.push(operation);
}

// Add collapsed delete operations
for (const key of deletesByKey) {
collapsedOperations.push({ type: "delete", key });
}

// Add preserved operations
collapsedOperations.push(...preservedOperations);

return collapsedOperations;
}
Loading
Loading