Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 78 additions & 153 deletions modal-js/src/function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,11 @@

import { createHash } from "node:crypto";

import type {
FunctionGetOutputsItem,
FunctionPutInputsItem,
GenericResult,
ModalClientClient,
} from "../proto/modal_proto/api";
import type { GenericResult } from "../proto/modal_proto/api";
import {
DataFormat,
DeploymentNamespace,
FunctionCallInvocationType,
FunctionCallType,
GeneratorDone,
GenericResult_GenericStatus,
} from "../proto/modal_proto/api";
Expand All @@ -29,16 +23,19 @@ import {
} from "./errors";
import { dumps, loads } from "./pickle";
import { ClientError, Status } from "nice-grpc";
import {
ControlPlaneStrategy,
InputPlaneStrategy,
InputStrategy,
} from "./input_strategy";

// From: modal/_utils/blob_utils.py
const maxObjectSizeBytes = 2 * 1024 * 1024; // 2 MiB

// From: modal-client/modal/_utils/function_utils.py
const outputsTimeout = 55 * 1000;
export const outputsTimeoutMillis = 55 * 1000;

function timeNowSeconds() {
return Date.now() / 1e3;
}
const maxSystemRetries = 8;

/** Represents a deployed Modal Function, which can be invoked remotely. */
export class Function_ {
Expand Down Expand Up @@ -82,32 +79,51 @@ export class Function_ {
args: any[] = [],
kwargs: Record<string, any> = {},
): Promise<any> {
const functionOutputPoller = await this.#execFunctionCall(
// InputStrategy sends inputs to either the control plane or the input plane,
// depending on how the function is configured.
const inputStrategy = await this.#createInputStrategy(
args,
kwargs,
FunctionCallInvocationType.FUNCTION_CALL_INVOCATION_TYPE_SYNC,
);
return await functionOutputPoller.poll();
await inputStrategy.attemptStart();
// TODO(ryan): Write tests for retry logic
let retryCount = 0;
while (true) {
try {
return await pollFunctionOutput(inputStrategy);
} catch (err) {
if (err instanceof InternalFailure && retryCount <= maxSystemRetries) {
await inputStrategy.attemptRetry();
retryCount++;
} else {
throw err;
}
}
}
}

// Spawn a single input into a remote function.
async spawn(
args: any[] = [],
kwargs: Record<string, any> = {},
): Promise<FunctionCall> {
const functionOutputPoller = await this.#execFunctionCall(
const inputStrategy = await this.#createInputStrategy(
args,
kwargs,
FunctionCallInvocationType.FUNCTION_CALL_INVOCATION_TYPE_SYNC,
);
return FunctionCall.fromPoller(functionOutputPoller);
await inputStrategy.attemptStart();
return FunctionCall.fromInputStrategy(
inputStrategy as ControlPlaneStrategy,
);
}

async #execFunctionCall(
async #createInputStrategy(
args: any[] = [],
kwargs: Record<string, any> = {},
invocationType: FunctionCallInvocationType = FunctionCallInvocationType.FUNCTION_CALL_INVOCATION_TYPE_SYNC,
): Promise<FunctionOutputPoller> {
): Promise<InputStrategy> {
const payload = dumps([args, kwargs]);

let argsBlobId: string | undefined = undefined;
Expand All @@ -116,156 +132,65 @@ export class Function_ {
}

// Single input sync invocation
const functionInputs = [
{
idx: 0,
input: {
args: argsBlobId ? undefined : payload,
argsBlobId,
dataFormat: DataFormat.DATA_FORMAT_PICKLE,
methodName: this.methodName,
finalInput: false, // This field isn't specified in the Python client, so it defaults to false.
},
const functionInput = {
idx: 0,
input: {
args: argsBlobId ? undefined : payload,
argsBlobId,
dataFormat: DataFormat.DATA_FORMAT_PICKLE,
methodName: this.methodName,
finalInput: false, // This field isn't specified in the Python client, so it defaults to false.
},
];

if (this.inputPlaneUrl !== undefined) {
return this.remoteInputPlane(functionInputs);
};

if (this.inputPlaneUrl === undefined) {
return new ControlPlaneStrategy(
client,
this.functionId,
functionInput,
invocationType,
);
}
return this.remoteControlPlane(functionInputs, invocationType);
}

private async remoteInputPlane(
functionInputs: FunctionPutInputsItem[],
): Promise<any> {
if (!this.inputPlaneUrl) {
throw new Error("Input plane URL is not set");
// Input plane does not support ASYNC inputs
if (
invocationType !==
FunctionCallInvocationType.FUNCTION_CALL_INVOCATION_TYPE_SYNC
) {
throw new Error("Only SYNC invocations types are supported");
}
const client = getOrCreateClient(this.inputPlaneUrl);

const attemptStartResponse = await client.attemptStart({
functionId: this.functionId,
input: functionInputs[0],
});
return FunctionOutputPoller.fromAttemptToken(
client,
attemptStartResponse.attemptToken,
);
}

private async remoteControlPlane(
functionInputs: FunctionPutInputsItem[],
invocationType: FunctionCallInvocationType = FunctionCallInvocationType.FUNCTION_CALL_INVOCATION_TYPE_SYNC,
): Promise<any> {
const functionMapResponse = await client.functionMap({
functionId: this.functionId,
functionCallType: FunctionCallType.FUNCTION_CALL_TYPE_UNARY,
functionCallInvocationType: invocationType,
pipelinedInputs: functionInputs,
});

return FunctionOutputPoller.fromFunctionCallId(
client,
functionMapResponse.functionCallId,
return new InputPlaneStrategy(
getOrCreateClient(this.inputPlaneUrl),
this.functionId,
functionInput,
);
}
}

/**
* The `FunctionOutputPoller` class is responsible for polling the outputs of a remote function call.
* When an instance is created using one of the static factory methods, it is configured to poll either
* the input plane or the control plane.
*/
export class FunctionOutputPoller {
functionCallId?: string;
attemptToken?: string;
client: ModalClientClient;

static fromFunctionCallId(
client: ModalClientClient,
functionCallId: string,
): FunctionOutputPoller {
return new FunctionOutputPoller(client, functionCallId, undefined);
export async function pollFunctionOutput(
inputStrategy: InputStrategy,
timeoutMillis?: number,
): Promise<any> {
const startTime = Date.now();
let pollTimeout = outputsTimeoutMillis;
if (timeoutMillis !== undefined) {
pollTimeout = Math.min(timeoutMillis, outputsTimeoutMillis);
}

static fromAttemptToken(
client: ModalClientClient,
attemptToken: string,
): FunctionOutputPoller {
return new FunctionOutputPoller(client, undefined, attemptToken);
}

private constructor(
client: ModalClientClient,
functionCallId?: string,
attemptToken?: string,
) {
if (!functionCallId && !attemptToken) {
throw new Error("Either functionCallId or attemptToken must be provided");
}
this.client = client;
this.functionCallId = functionCallId;
this.attemptToken = attemptToken;
}

async poll(
timeout?: number, // in milliseconds
): Promise<any> {
const startTime = Date.now();
let pollTimeout = outputsTimeout;
if (timeout !== undefined) {
pollTimeout = Math.min(timeout, outputsTimeout);
while (true) {
const outputs = await inputStrategy.attemptAwait(pollTimeout);
if (outputs.length > 0) {
return await processResult(outputs[0].result, outputs[0].dataFormat);
}

while (true) {
const outputs = this.attemptToken
? await this.#pollInputPlane(pollTimeout)
: await this.#pollControlPlane(pollTimeout);
if (outputs.length > 0) {
return await processResult(outputs[0].result, outputs[0].dataFormat);
if (timeoutMillis !== undefined) {
const remainingTime = timeoutMillis - (Date.now() - startTime);
if (remainingTime <= 0) {
const message = `Timeout exceeded: ${(timeoutMillis / 1000).toFixed(1)}s`;
throw new FunctionTimeoutError(message);
}

if (timeout !== undefined) {
const remainingTime = timeout - (Date.now() - startTime);
if (remainingTime <= 0) {
const message = `Timeout exceeded: ${(timeout / 1000).toFixed(1)}s`;
throw new FunctionTimeoutError(message);
}
pollTimeout = Math.min(outputsTimeout, remainingTime);
}
}
}

async #pollControlPlane(
timeout: number, // in milliseconds
): Promise<FunctionGetOutputsItem[]> {
try {
const response = await this.client.functionGetOutputs({
functionCallId: this.functionCallId,
maxValues: 1,
timeout: timeout / 1000, // Backend needs seconds
lastEntryId: "0-0",
clearOnSuccess: true,
requestedAt: timeNowSeconds(),
});
return response.outputs;
} catch (err) {
throw new Error(`FunctionGetOutputs failed: ${err}`);
}
}

async #pollInputPlane(
timeout: number, // in milliseconds
): Promise<FunctionGetOutputsItem[]> {
try {
const response = await this.client.attemptAwait({
attemptToken: this.attemptToken,
requestedAt: timeNowSeconds(),
timeoutSecs: timeout / 1000, // Convert to seconds
});
return response.output ? [response.output] : [];
} catch (err) {
throw new Error(`AttemptAwait failed: ${err}`);
pollTimeout = Math.min(outputsTimeoutMillis, remainingTime);
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions modal-js/src/function_call.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Manage existing Function Calls (look-ups, polling for output, cancellation).

import { client } from "./client";
import type { FunctionOutputPoller } from "./function";
import { pollFunctionOutput } from "./function";
import { ControlPlaneStrategy } from "./input_strategy";

/** Options for `FunctionCall.get()`. */
export type FunctionCallGetOptions = {
Expand All @@ -19,28 +20,28 @@ export type FunctionCallCancelOptions = {
* (see `cancel()`).
*/
export class FunctionCall {
readonly functionOutputPoller: FunctionOutputPoller;
readonly inputStrategy: ControlPlaneStrategy;

/** @ignore */
constructor(functionOutputPoller: FunctionOutputPoller) {
this.functionOutputPoller = functionOutputPoller;
constructor(inputStrategy: ControlPlaneStrategy) {
this.inputStrategy = inputStrategy;
}

/** Create a new function call from ID. */
static fromPoller(functionOutputPoller: FunctionOutputPoller): FunctionCall {
return new FunctionCall(functionOutputPoller);
static fromInputStrategy(inputStrategy: ControlPlaneStrategy): FunctionCall {
return new FunctionCall(inputStrategy);
}

/** Get the result of a function call, optionally waiting with a timeout. */
async get(options: FunctionCallGetOptions = {}): Promise<any> {
const timeout = options.timeout;
return await this.functionOutputPoller.poll(timeout);
return await pollFunctionOutput(this.inputStrategy, timeout);
}

/** Cancel a running function call. */
async cancel(options: FunctionCallCancelOptions = {}) {
await client.functionCallCancel({
functionCallId: this.functionOutputPoller.functionCallId,
functionCallId: this.inputStrategy.functionCallId,
terminateContainers: options.terminateContainers,
});
}
Expand Down
Loading
Loading