@@ -11,13 +11,12 @@ import (
1111 "google.golang.org/protobuf/proto"
1212)
1313
14+ // GetOutput is a function type that takes a timeout in milliseconds and returns a FunctionGetOutputsItem or nil, and an error.
15+ type GetOutput func (timeout time.Duration ) (* pb.FunctionGetOutputsItem , error )
16+
1417type Invocation interface {
15- << << << < HEAD
1618 AwaitOutput (timeout * time.Duration ) (any , error )
17- == == == =
18- AwaitOutput (timeout ... int ) (interface {}, error )
19- >> >> >> > a38d9c2 (Add go implementation )
20- Retry (retryCount int ) error
19+ Retry (retryCount uint32 ) error
2120}
2221
2322// ControlPlaneInvocation implements the Invocation interface.
@@ -60,12 +59,8 @@ func ControlPlaneInvocationFromFunctionCallId(ctx context.Context, functionCallI
6059 return & ControlPlaneInvocation {FunctionCallId : functionCallId , ctx : ctx }
6160}
6261
63- << << << < HEAD
6462func (c * ControlPlaneInvocation ) AwaitOutput (timeout * time.Duration ) (any , error ) {
65- == == == =
66- func (c * ControlPlaneInvocation ) AwaitOutput (timeout * time .Duration ) (interface {}, error ) {
67- >> >> >> > a38d9c2 (Add go implementation )
68- return pollFunctionOutput (c .ctx , c .FunctionCallId , timeout )
63+ return pollFunctionOutput (c .ctx , c .getOutput , timeout )
6964}
7065
7166func (c * ControlPlaneInvocation ) Retry (retryCount uint32 ) error {
@@ -88,8 +83,94 @@ func (c *ControlPlaneInvocation) Retry(retryCount uint32) error {
8883 return nil
8984}
9085
91- // Poll for outputs for a given FunctionCall ID.
92- func pollFunctionOutput (ctx context.Context , functionCallId string , timeout * time.Duration ) (any , error ) {
86+ // getOutput fetches the output for the current function call with a timeout in milliseconds.
87+ func (c * ControlPlaneInvocation ) getOutput (timeout time.Duration ) (* pb.FunctionGetOutputsItem , error ) {
88+ response , err := client .FunctionGetOutputs (c .ctx , pb.FunctionGetOutputsRequest_builder {
89+ FunctionCallId : c .FunctionCallId ,
90+ MaxValues : 1 ,
91+ Timeout : float32 (timeout .Seconds ()),
92+ LastEntryId : "0-0" ,
93+ ClearOnSuccess : true ,
94+ RequestedAt : timeNowSeconds (),
95+ }.Build ())
96+ if err != nil {
97+ return nil , fmt .Errorf ("FunctionGetOutputs failed: %w" , err )
98+ }
99+ outputs := response .GetOutputs ()
100+ if len (outputs ) > 0 {
101+ return outputs [0 ], nil
102+ }
103+ return nil , nil
104+ }
105+
106+ // InputPlaneInvocation implements the Invocation interface for the input plane.
107+ type InputPlaneInvocation struct {
108+ client pb.ModalClientClient
109+ functionId string
110+ input * pb.FunctionPutInputsItem
111+ attemptToken string
112+ ctx context.Context
113+ }
114+
115+ // CreateInputPlaneInvocation creates a new InputPlaneInvocation by starting an attempt.
116+ func CreateInputPlaneInvocation (ctx context.Context , inputPlaneURL string , functionId string , input * pb.FunctionInput ) (* InputPlaneInvocation , error ) {
117+ functionPutInputsItem := pb.FunctionPutInputsItem_builder {
118+ Idx : 0 ,
119+ Input : input ,
120+ }.Build ()
121+ client , err := getOrCreateClient (inputPlaneURL )
122+ if err != nil {
123+ return nil , err
124+ }
125+ attemptStartResp , err := client .AttemptStart (ctx , pb.AttemptStartRequest_builder {
126+ FunctionId : functionId ,
127+ Input : functionPutInputsItem ,
128+ }.Build ())
129+ if err != nil {
130+ return nil , err
131+ }
132+ return & InputPlaneInvocation {
133+ client : client ,
134+ functionId : functionId ,
135+ input : functionPutInputsItem ,
136+ attemptToken : attemptStartResp .GetAttemptToken (),
137+ ctx : ctx ,
138+ }, nil
139+ }
140+
141+ // AwaitOutput waits for the output with an optional timeout.
142+ func (i * InputPlaneInvocation ) AwaitOutput (timeout * time.Duration ) (any , error ) {
143+ return pollFunctionOutput (i .ctx , i .getOutput , timeout )
144+ }
145+
146+ // getOutput fetches the output for the current attempt.
147+ func (i * InputPlaneInvocation ) getOutput (timeout time.Duration ) (* pb.FunctionGetOutputsItem , error ) {
148+ resp , err := i .client .AttemptAwait (i .ctx , pb.AttemptAwaitRequest_builder {
149+ AttemptToken : i .attemptToken ,
150+ RequestedAt : timeNowSeconds (),
151+ TimeoutSecs : float32 (timeout .Seconds ()),
152+ }.Build ())
153+ if err != nil {
154+ return nil , fmt .Errorf ("AttemptAwait failed: %w" , err )
155+ }
156+ return resp .GetOutput (), nil
157+ }
158+
159+ // Retry retries the invocation.
160+ func (i * InputPlaneInvocation ) Retry (retryCount uint32 ) error {
161+ resp , err := i .client .AttemptRetry (context .Background (), pb.AttemptRetryRequest_builder {
162+ FunctionId : i .functionId ,
163+ Input : i .input ,
164+ AttemptToken : i .attemptToken ,
165+ }.Build ())
166+ if err != nil {
167+ return err
168+ }
169+ i .attemptToken = resp .GetAttemptToken ()
170+ return nil
171+ }
172+
173+ func pollFunctionOutput (ctx context.Context , getOutput GetOutput , timeout * time.Duration ) (any , error ) {
93174 startTime := time .Now ()
94175 pollTimeout := outputsTimeout
95176 if timeout != nil {
@@ -98,23 +179,14 @@ func pollFunctionOutput(ctx context.Context, functionCallId string, timeout *tim
98179 }
99180
100181 for {
101- response , err := client .FunctionGetOutputs (ctx , pb.FunctionGetOutputsRequest_builder {
102- FunctionCallId : functionCallId ,
103- MaxValues : 1 ,
104- Timeout : float32 (pollTimeout .Seconds ()),
105- LastEntryId : "0-0" ,
106- ClearOnSuccess : true ,
107- RequestedAt : timeNowSeconds (),
108- }.Build ())
182+ output , err := getOutput (pollTimeout )
109183 if err != nil {
110- return nil , fmt . Errorf ( "FunctionGetOutputs failed: %w" , err )
184+ return nil , err
111185 }
112-
113186 // Output serialization may fail if any of the output items can't be deserialized
114187 // into a supported Go type. Users are expected to serialize outputs correctly.
115- outputs := response .GetOutputs ()
116- if len (outputs ) > 0 {
117- return processResult (ctx , outputs [0 ].GetResult (), outputs [0 ].GetDataFormat ())
188+ if output != nil {
189+ return processResult (ctx , output .GetResult (), output .GetDataFormat ())
118190 }
119191
120192 if timeout != nil {
0 commit comments