@@ -5,23 +5,24 @@ import { createHash } from "node:crypto";
55import {
66 DataFormat ,
77 DeploymentNamespace ,
8- FunctionPutInputsItem ,
8+ FunctionCallInvocationType ,
9+ FunctionInput ,
910} from "../proto/modal_proto/api" ;
1011import type { LookupOptions } from "./app" ;
1112import { client } from "./client" ;
1213import { FunctionCall } from "./function_call" ;
1314import { environmentName } from "./config" ;
14- import { NotFoundError } from "./errors" ;
15+ import { InternalFailure , NotFoundError } from "./errors" ;
1516import { dumps } from "./pickle" ;
1617import { ClientError , Status } from "nice-grpc" ;
17- import {
18- ControlPlaneStrategy ,
19- pollControlPlaneForOutput ,
20- } from "./invocation_strategy" ;
18+ import { ControlPlaneInvocation } from "./invocation" ;
2119
2220// From: modal/_utils/blob_utils.py
2321const maxObjectSizeBytes = 2 * 1024 * 1024 ; // 2 MiB
2422
23+ // From: client/modal/_functions.py
24+ const maxSystemRetries = 8 ;
25+
2526/** Represents a deployed Modal Function, which can be invoked remotely. */
2627export class Function_ {
2728 readonly functionId : string ;
@@ -59,8 +60,25 @@ export class Function_ {
5960 kwargs : Record < string , any > = { } ,
6061 ) : Promise < any > {
6162 const input = await this . #createInput( args , kwargs ) ;
62- const invocationStrategy = new ControlPlaneStrategy ( this . functionId , input ) ;
63- return await invocationStrategy . remote ( ) ;
63+ const invocation = await ControlPlaneInvocation . create (
64+ this . functionId ,
65+ input ,
66+ FunctionCallInvocationType . FUNCTION_CALL_INVOCATION_TYPE_SYNC ,
67+ ) ;
68+ // TODO(ryan): Add tests for retries.
69+ let retryCount = 0 ;
70+ while ( true ) {
71+ try {
72+ return await invocation . await ( ) ;
73+ } catch ( err ) {
74+ if ( err instanceof InternalFailure && retryCount <= maxSystemRetries ) {
75+ await invocation . retry ( retryCount ) ;
76+ retryCount ++ ;
77+ } else {
78+ throw err ;
79+ }
80+ }
81+ }
6482 }
6583
6684 // Spawn a single input into a remote function.
@@ -69,15 +87,18 @@ export class Function_ {
6987 kwargs : Record < string , any > = { } ,
7088 ) : Promise < FunctionCall > {
7189 const input = await this . #createInput( args , kwargs ) ;
72- const invocationStrategy = new ControlPlaneStrategy ( this . functionId , input ) ;
73- const functionCallId = await invocationStrategy . spawn ( ) ;
74- return new FunctionCall ( functionCallId ) ;
90+ const invocation = await ControlPlaneInvocation . create (
91+ this . functionId ,
92+ input ,
93+ FunctionCallInvocationType . FUNCTION_CALL_INVOCATION_TYPE_ASYNC ,
94+ ) ;
95+ return new FunctionCall ( invocation . functionCallId ) ;
7596 }
7697
7798 async #createInput(
7899 args : any [ ] = [ ] ,
79100 kwargs : Record < string , any > = { } ,
80- ) : Promise < FunctionPutInputsItem > {
101+ ) : Promise < FunctionInput > {
81102 const payload = dumps ( [ args , kwargs ] ) ;
82103
83104 let argsBlobId : string | undefined = undefined ;
@@ -87,25 +108,15 @@ export class Function_ {
87108
88109 // Single input sync invocation
89110 return {
90- idx : 0 ,
91- input : {
92- args : argsBlobId ? undefined : payload ,
93- argsBlobId,
94- dataFormat : DataFormat . DATA_FORMAT_PICKLE ,
95- methodName : this . methodName ,
96- finalInput : false , // This field isn't specified in the Python client, so it defaults to false.
97- } ,
111+ args : argsBlobId ? undefined : payload ,
112+ argsBlobId,
113+ dataFormat : DataFormat . DATA_FORMAT_PICKLE ,
114+ methodName : this . methodName ,
115+ finalInput : false , // This field isn't specified in the Python client, so it defaults to false.
98116 } ;
99117 }
100118}
101119
102- export async function pollFunctionOutput (
103- functionCallId : string ,
104- timeout ?: number , // in milliseconds
105- ) : Promise < any > {
106- return pollControlPlaneForOutput ( functionCallId , timeout ) ;
107- }
108-
109120async function blobUpload ( data : Uint8Array ) : Promise < string > {
110121 const contentMd5 = createHash ( "md5" ) . update ( data ) . digest ( "base64" ) ;
111122 const contentSha256 = createHash ( "sha256" ) . update ( data ) . digest ( "base64" ) ;
0 commit comments