@@ -2,15 +2,17 @@ import {
22 DataFormat ,
33 FunctionCallInvocationType ,
44 FunctionCallType ,
5- FunctionGetOutputsResponse ,
5+ FunctionGetOutputsItem ,
66 FunctionInput ,
77 FunctionMapResponse ,
8+ FunctionPutInputsItem ,
89 FunctionRetryInputsItem ,
910 GeneratorDone ,
1011 GenericResult ,
1112 GenericResult_GenericStatus ,
13+ ModalClientClient ,
1214} from "../proto/modal_proto/api" ;
13- import { client } from "./client" ;
15+ import { client , getOrCreateClient } from "./client" ;
1416import { FunctionTimeoutError , InternalFailure , RemoteError } from "./errors" ;
1517import { loads } from "./pickle" ;
1618
@@ -33,6 +35,14 @@ export interface Invocation {
3335 retry ( retryCount : number ) : Promise < void > ;
3436}
3537
38+ /**
39+ * Signature of a function that fetches a single output. Used by `pollForOutputs` to fetch from either
40+ * the control plane or the input plane, depending on the implementation.
41+ */
42+ type GetOutput = (
43+ timeoutMillis : number ,
44+ ) => Promise < FunctionGetOutputsItem | undefined > ;
45+
3646/**
3747 * Implementation of Invocation which sends inputs to the control plane.
3848 */
@@ -77,7 +87,28 @@ export class ControlPlaneInvocation implements Invocation {
7787 }
7888
7989 async await ( timeout ?: number ) : Promise < any > {
80- return await pollControlPlaneForOutput ( this . functionCallId , timeout ) ;
90+ return await pollControlPlaneForOutput (
91+ ( timeoutMillis : number ) => this . #getOutput( timeoutMillis ) ,
92+ timeout ,
93+ ) ;
94+ }
95+
96+ async #getOutput(
97+ timeoutMillis : number ,
98+ ) : Promise < FunctionGetOutputsItem | undefined > {
99+ try {
100+ const response = await client . functionGetOutputs ( {
101+ functionCallId : this . functionCallId ,
102+ maxValues : 1 ,
103+ timeout : timeoutMillis / 1000 , // Backend needs seconds
104+ lastEntryId : "0-0" ,
105+ clearOnSuccess : true ,
106+ requestedAt : timeNowSeconds ( ) ,
107+ } ) ;
108+ return response . outputs ? response . outputs [ 0 ] : undefined ;
109+ } catch ( err ) {
110+ throw new Error ( `FunctionGetOutputs failed: ${ err } ` ) ;
111+ }
81112 }
82113
83114 async retry ( retryCount : number ) : Promise < void > {
@@ -118,12 +149,88 @@ export class ControlPlaneInvocation implements Invocation {
118149 }
119150}
120151
152+ /**
153+ * Implementation of Invocation which sends inputs to the input plane.
154+ */
155+ export class InputPlaneInvocation implements Invocation {
156+ private readonly client : ModalClientClient ;
157+ private readonly functionId : string ;
158+ private readonly input : FunctionPutInputsItem ;
159+ private attemptToken : string ;
160+
161+ constructor (
162+ client : ModalClientClient ,
163+ functionId : string ,
164+ input : FunctionPutInputsItem ,
165+ attemptToken : string ,
166+ ) {
167+ this . client = client ;
168+ this . functionId = functionId ;
169+ this . input = input ;
170+ this . attemptToken = attemptToken ;
171+ }
172+
173+ static async create (
174+ inputPlaneUrl : string ,
175+ functionId : string ,
176+ input : FunctionInput ,
177+ ) {
178+ const functionPutInputsItem = {
179+ idx : 0 ,
180+ input : input ,
181+ } ;
182+ const client = getOrCreateClient ( inputPlaneUrl ) ;
183+ // Single input sync invocation
184+ const attemptStartResponse = await client . attemptStart ( {
185+ functionId : functionId ,
186+ input : functionPutInputsItem ,
187+ } ) ;
188+ return new InputPlaneInvocation (
189+ client ,
190+ functionId ,
191+ functionPutInputsItem ,
192+ attemptStartResponse . attemptToken ,
193+ ) ;
194+ }
195+
196+ async await ( timeout ?: number ) : Promise < any > {
197+ return await pollControlPlaneForOutput (
198+ ( timeoutMillis : number ) => this . #getOutput( timeoutMillis ) ,
199+ timeout ,
200+ ) ;
201+ }
202+
203+ async #getOutput(
204+ timeoutMillis : number ,
205+ ) : Promise < FunctionGetOutputsItem | undefined > {
206+ try {
207+ const response = await this . client . attemptAwait ( {
208+ attemptToken : this . attemptToken ,
209+ requestedAt : timeNowSeconds ( ) ,
210+ timeoutSecs : timeoutMillis / 1000 ,
211+ } ) ;
212+ return response . output ;
213+ } catch ( err ) {
214+ throw new Error ( `AttemptAwait failed: ${ err } ` ) ;
215+ }
216+ }
217+
218+ async retry ( _retryCount : number ) : Promise < void > {
219+ const attemptRetryResponse = await this . client . attemptRetry ( {
220+ functionId : this . functionId ,
221+ input : this . input ,
222+ attemptToken : this . attemptToken ,
223+ } ) ;
224+ this . attemptToken = attemptRetryResponse . attemptToken ;
225+ }
226+ }
227+
121228function timeNowSeconds ( ) {
122229 return Date . now ( ) / 1e3 ;
123230}
124231
125- export async function pollControlPlaneForOutput (
126- functionCallId : string ,
232+ async function pollControlPlaneForOutput (
233+ getOutput : GetOutput ,
127234 timeout ?: number , // in milliseconds
128235) : Promise < any > {
129236 const startTime = Date . now ( ) ;
@@ -133,23 +240,9 @@ export async function pollControlPlaneForOutput(
133240 }
134241
135242 while ( true ) {
136- let response : FunctionGetOutputsResponse ;
137- try {
138- response = await client . functionGetOutputs ( {
139- functionCallId : functionCallId ,
140- maxValues : 1 ,
141- timeout : pollTimeout / 1000 , // Backend needs seconds
142- lastEntryId : "0-0" ,
143- clearOnSuccess : true ,
144- requestedAt : timeNowSeconds ( ) ,
145- } ) ;
146- } catch ( err ) {
147- throw new Error ( `FunctionGetOutputs failed: ${ err } ` ) ;
148- }
149-
150- const outputs = response . outputs ;
151- if ( outputs . length > 0 ) {
152- return await processResult ( outputs [ 0 ] . result , outputs [ 0 ] . dataFormat ) ;
243+ const output = await getOutput ( pollTimeout ) ;
244+ if ( output ) {
245+ return await processResult ( output . result , output . dataFormat ) ;
153246 }
154247
155248 if ( timeout !== undefined ) {
0 commit comments