Skip to content

Commit 47f9631

Browse files
committed
implementation of sending progress events
1 parent 8fdf71d commit 47f9631

File tree

10 files changed

+583
-28
lines changed

10 files changed

+583
-28
lines changed

action/invoke.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,18 @@ type InvokeResponse struct {
2828
// generated.
2929
Diagnostics diag.Diagnostics
3030

31+
// SendProgress will immediately send a progress update to Terraform core during action invocation.
32+
// This function is provided by the framework and can be called multiple times while action logic is running.
33+
//
34+
// TODO:Actions: More documentation about when you should use this / when you shouldn't
35+
SendProgress func(event InvokeProgressEvent)
36+
3137
// TODO:Actions: Add linked resources once lifecycle/linked actions are implemented
3238
}
39+
40+
// InvokeProgressEvent is the event returned to Terraform while an action is being invoked.
41+
type InvokeProgressEvent struct {
42+
// Message is the string that will be presented to the practitioner either via the console
43+
// or an external system like HCP Terraform.
44+
Message string
45+
}

internal/fwserver/server_invokeaction.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,21 @@ type InvokeActionRequest struct {
2323

2424
// InvokeActionEventsStream is the framework server stream for the InvokeAction RPC.
2525
type InvokeActionResponse struct {
26-
Diagnostics diag.Diagnostics
26+
// ProgressEvents is a channel provided by the consuming proto{5/6}server implementation
27+
// that allows the provider developers to return progress events while the action is being invoked.
28+
ProgressEvents chan InvokeProgressEvent
29+
Diagnostics diag.Diagnostics
30+
}
31+
32+
type InvokeProgressEvent struct {
33+
Message string
34+
}
35+
36+
// SendProgress is injected into the action.InvokeResponse for use by the provider developer
37+
func (r *InvokeActionResponse) SendProgress(event action.InvokeProgressEvent) {
38+
r.ProgressEvents <- InvokeProgressEvent{
39+
Message: event.Message,
40+
}
2741
}
2842

2943
// InvokeAction implements the framework server InvokeAction RPC.
@@ -61,7 +75,9 @@ func (s *Server) InvokeAction(ctx context.Context, req *InvokeActionRequest, res
6175
invokeReq := action.InvokeRequest{
6276
Config: *req.Config,
6377
}
64-
invokeResp := action.InvokeResponse{}
78+
invokeResp := action.InvokeResponse{
79+
SendProgress: resp.SendProgress,
80+
}
6581

6682
logging.FrameworkTrace(ctx, "Calling provider defined Action Invoke")
6783
req.Action.Invoke(ctx, invokeReq, &invokeResp)

internal/proto5server/server_invokeaction.go

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,21 +60,34 @@ func (s *Server) InvokeAction(ctx context.Context, proto5Req *tfprotov5.InvokeAc
6060
return invokeActionErrorDiagnostics(ctx, fwResp.Diagnostics)
6161
}
6262

63-
// TODO:Actions: Create messaging call back for progress updates
64-
65-
s.FrameworkServer.InvokeAction(ctx, fwReq, fwResp)
66-
67-
// TODO:Actions: This is a stub implementation, so we aren't currently exposing any streaming mechanism to the developer.
68-
// That will eventually need to change to send progress events back to Terraform.
69-
//
70-
// This logic will likely need to be moved over to the "toproto" package as well.
7163
protoStream := &tfprotov5.InvokeActionServerStream{
7264
Events: func(push func(tfprotov5.InvokeActionEvent) bool) {
73-
push(tfprotov5.InvokeActionEvent{
74-
Type: tfprotov5.CompletedInvokeActionEventType{
75-
Diagnostics: toproto5.Diagnostics(ctx, fwResp.Diagnostics),
76-
},
77-
})
65+
// Create a channel for framework to receive progress events
66+
progressChan := make(chan fwserver.InvokeProgressEvent)
67+
fwResp.ProgressEvents = progressChan
68+
69+
// Create a channel to be triggered when the invoke action method has finished
70+
completedChan := make(chan any)
71+
go func() {
72+
s.FrameworkServer.InvokeAction(ctx, fwReq, fwResp)
73+
close(completedChan)
74+
}()
75+
76+
for {
77+
select {
78+
// Actions can only push one completed event and it's automatically handled by the framework
79+
// by closing the completed channel above.
80+
case <-completedChan:
81+
push(toproto5.CompletedInvokeActionEventType(ctx, fwResp))
82+
return
83+
84+
// Actions can push multiple progress events
85+
case progressEvent := <-fwResp.ProgressEvents:
86+
if !push(toproto5.ProgressInvokeActionEventType(ctx, progressEvent)) {
87+
return
88+
}
89+
}
90+
}
7891
},
7992
}
8093

internal/proto5server/server_invokeaction_test.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package proto5server
55

66
import (
77
"context"
8+
"fmt"
89
"slices"
910
"testing"
1011

@@ -120,6 +121,57 @@ func TestServerInvokeAction(t *testing.T) {
120121
},
121122
},
122123
},
124+
"response-progress-events": {
125+
server: &Server{
126+
FrameworkServer: fwserver.Server{
127+
Provider: &testprovider.Provider{
128+
ActionsMethod: func(_ context.Context) []func() action.Action {
129+
return []func() action.Action{
130+
func() action.Action {
131+
return &testprovider.Action{
132+
SchemaMethod: func(_ context.Context, _ action.SchemaRequest, resp *action.SchemaResponse) {
133+
resp.Schema = testUnlinkedSchema
134+
},
135+
MetadataMethod: func(_ context.Context, _ action.MetadataRequest, resp *action.MetadataResponse) {
136+
resp.TypeName = "test_action"
137+
},
138+
InvokeMethod: func(ctx context.Context, req action.InvokeRequest, resp *action.InvokeResponse) {
139+
resp.SendProgress(action.InvokeProgressEvent{Message: "progress event 1"})
140+
resp.SendProgress(action.InvokeProgressEvent{Message: "progress event 2"})
141+
resp.SendProgress(action.InvokeProgressEvent{Message: "progress event 3"})
142+
},
143+
}
144+
},
145+
}
146+
},
147+
},
148+
},
149+
},
150+
request: &tfprotov5.InvokeActionRequest{
151+
Config: testConfigDynamicValue,
152+
ActionType: "test_action",
153+
},
154+
expectedEvents: []tfprotov5.InvokeActionEvent{
155+
{
156+
Type: tfprotov5.ProgressInvokeActionEventType{
157+
Message: "progress event 1",
158+
},
159+
},
160+
{
161+
Type: tfprotov5.ProgressInvokeActionEventType{
162+
Message: "progress event 2",
163+
},
164+
},
165+
{
166+
Type: tfprotov5.ProgressInvokeActionEventType{
167+
Message: "progress event 3",
168+
},
169+
},
170+
{
171+
Type: tfprotov5.CompletedInvokeActionEventType{},
172+
},
173+
},
174+
},
123175
"response-diagnostics": {
124176
server: &Server{
125177
FrameworkServer: fwserver.Server{
@@ -168,6 +220,83 @@ func TestServerInvokeAction(t *testing.T) {
168220
},
169221
},
170222
},
223+
"response-diagnostics-with-progress-events": {
224+
server: &Server{
225+
FrameworkServer: fwserver.Server{
226+
Provider: &testprovider.Provider{
227+
ActionsMethod: func(_ context.Context) []func() action.Action {
228+
return []func() action.Action{
229+
func() action.Action {
230+
return &testprovider.Action{
231+
SchemaMethod: func(_ context.Context, _ action.SchemaRequest, resp *action.SchemaResponse) {
232+
resp.Schema = testUnlinkedSchema
233+
},
234+
MetadataMethod: func(_ context.Context, _ action.MetadataRequest, resp *action.MetadataResponse) {
235+
resp.TypeName = "test_action"
236+
},
237+
InvokeMethod: func(ctx context.Context, req action.InvokeRequest, resp *action.InvokeResponse) {
238+
for i := 0; i < 5; i++ {
239+
resp.SendProgress(action.InvokeProgressEvent{Message: fmt.Sprintf("progress event %d", i+1)})
240+
}
241+
242+
resp.Diagnostics.AddWarning("warning summary", "warning detail")
243+
resp.Diagnostics.AddError("error summary", "error detail")
244+
},
245+
}
246+
},
247+
}
248+
},
249+
},
250+
},
251+
},
252+
request: &tfprotov5.InvokeActionRequest{
253+
Config: testConfigDynamicValue,
254+
ActionType: "test_action",
255+
},
256+
expectedEvents: []tfprotov5.InvokeActionEvent{
257+
{
258+
Type: tfprotov5.ProgressInvokeActionEventType{
259+
Message: "progress event 1",
260+
},
261+
},
262+
{
263+
Type: tfprotov5.ProgressInvokeActionEventType{
264+
Message: "progress event 2",
265+
},
266+
},
267+
{
268+
Type: tfprotov5.ProgressInvokeActionEventType{
269+
Message: "progress event 3",
270+
},
271+
},
272+
{
273+
Type: tfprotov5.ProgressInvokeActionEventType{
274+
Message: "progress event 4",
275+
},
276+
},
277+
{
278+
Type: tfprotov5.ProgressInvokeActionEventType{
279+
Message: "progress event 5",
280+
},
281+
},
282+
{
283+
Type: tfprotov5.CompletedInvokeActionEventType{
284+
Diagnostics: []*tfprotov5.Diagnostic{
285+
{
286+
Severity: tfprotov5.DiagnosticSeverityWarning,
287+
Summary: "warning summary",
288+
Detail: "warning detail",
289+
},
290+
{
291+
Severity: tfprotov5.DiagnosticSeverityError,
292+
Summary: "error summary",
293+
Detail: "error detail",
294+
},
295+
},
296+
},
297+
},
298+
},
299+
},
171300
}
172301

173302
for name, testCase := range testCases {

internal/proto6server/server_invokeaction.go

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,21 +60,34 @@ func (s *Server) InvokeAction(ctx context.Context, proto6Req *tfprotov6.InvokeAc
6060
return invokeActionErrorDiagnostics(ctx, fwResp.Diagnostics)
6161
}
6262

63-
// TODO:Actions: Create messaging call back for progress updates
64-
65-
s.FrameworkServer.InvokeAction(ctx, fwReq, fwResp)
66-
67-
// TODO:Actions: This is a stub implementation, so we aren't currently exposing any streaming mechanism to the developer.
68-
// That will eventually need to change to send progress events back to Terraform.
69-
//
70-
// This logic will likely need to be moved over to the "toproto" package as well.
7163
protoStream := &tfprotov6.InvokeActionServerStream{
7264
Events: func(push func(tfprotov6.InvokeActionEvent) bool) {
73-
push(tfprotov6.InvokeActionEvent{
74-
Type: tfprotov6.CompletedInvokeActionEventType{
75-
Diagnostics: toproto6.Diagnostics(ctx, fwResp.Diagnostics),
76-
},
77-
})
65+
// Create a channel for framework to receive progress events
66+
progressChan := make(chan fwserver.InvokeProgressEvent)
67+
fwResp.ProgressEvents = progressChan
68+
69+
// Create a channel to be triggered when the invoke action method has finished
70+
completedChan := make(chan any)
71+
go func() {
72+
s.FrameworkServer.InvokeAction(ctx, fwReq, fwResp)
73+
close(completedChan)
74+
}()
75+
76+
for {
77+
select {
78+
// Actions can only push one completed event and it's automatically handled by the framework
79+
// by closing the completed channel above.
80+
case <-completedChan:
81+
push(toproto6.CompletedInvokeActionEventType(ctx, fwResp))
82+
return
83+
84+
// Actions can push multiple progress events
85+
case progressEvent := <-fwResp.ProgressEvents:
86+
if !push(toproto6.ProgressInvokeActionEventType(ctx, progressEvent)) {
87+
return
88+
}
89+
}
90+
}
7891
},
7992
}
8093

0 commit comments

Comments
 (0)