Skip to content

Commit 3e511da

Browse files
author
Paddy
authored
Merge pull request #25 from hashicorp/paddy_stop
Cancel in-flight contexts when Stop is called.
2 parents 594469b + 3f8632a commit 3e511da

File tree

1 file changed

+66
-13
lines changed

1 file changed

+66
-13
lines changed

tfprotov5/server/server.go

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package tf5server
22

33
import (
44
"context"
5+
"sync"
56

67
"github.com/hashicorp/go-hclog"
78
"github.com/hashicorp/go-plugin"
@@ -101,17 +102,44 @@ func Serve(name string, serverFactory func() tfprotov5.ProviderServer, opts ...S
101102
type server struct {
102103
downstream tfprotov5.ProviderServer
103104
tfplugin5.UnimplementedProviderServer
105+
106+
stopMu sync.Mutex
107+
stopCh chan struct{}
108+
}
109+
110+
func mergeStop(ctx context.Context, cancel context.CancelFunc, stopCh chan struct{}) {
111+
select {
112+
case <-ctx.Done():
113+
return
114+
case <-stopCh:
115+
cancel()
116+
}
117+
}
118+
119+
// stoppableContext returns a context that wraps `ctx` but will be canceled
120+
// when the server's stopCh is closed.
121+
//
122+
// This is used to cancel all in-flight contexts when the Stop method of the
123+
// server is called.
124+
func (s *server) stoppableContext(ctx context.Context) context.Context {
125+
s.stopMu.Lock()
126+
defer s.stopMu.Unlock()
127+
128+
stoppable, cancel := context.WithCancel(ctx)
129+
go mergeStop(stoppable, cancel, s.stopCh)
130+
return stoppable
104131
}
105132

106133
// New converts a tfprotov5.ProviderServer into a server capable of handling
107134
// Terraform protocol requests and issuing responses using the gRPC types.
108135
func New(serve tfprotov5.ProviderServer) tfplugin5.ProviderServer {
109-
return server{
136+
return &server{
110137
downstream: serve,
111138
}
112139
}
113140

114-
func (s server) GetSchema(ctx context.Context, req *tfplugin5.GetProviderSchema_Request) (*tfplugin5.GetProviderSchema_Response, error) {
141+
func (s *server) GetSchema(ctx context.Context, req *tfplugin5.GetProviderSchema_Request) (*tfplugin5.GetProviderSchema_Response, error) {
142+
ctx = s.stoppableContext(ctx)
115143
r, err := fromproto.GetProviderSchemaRequest(req)
116144
if err != nil {
117145
return nil, err
@@ -127,7 +155,8 @@ func (s server) GetSchema(ctx context.Context, req *tfplugin5.GetProviderSchema_
127155
return ret, nil
128156
}
129157

130-
func (s server) PrepareProviderConfig(ctx context.Context, req *tfplugin5.PrepareProviderConfig_Request) (*tfplugin5.PrepareProviderConfig_Response, error) {
158+
func (s *server) PrepareProviderConfig(ctx context.Context, req *tfplugin5.PrepareProviderConfig_Request) (*tfplugin5.PrepareProviderConfig_Response, error) {
159+
ctx = s.stoppableContext(ctx)
131160
r, err := fromproto.PrepareProviderConfigRequest(req)
132161
if err != nil {
133162
return nil, err
@@ -143,7 +172,8 @@ func (s server) PrepareProviderConfig(ctx context.Context, req *tfplugin5.Prepar
143172
return ret, nil
144173
}
145174

146-
func (s server) Configure(ctx context.Context, req *tfplugin5.Configure_Request) (*tfplugin5.Configure_Response, error) {
175+
func (s *server) Configure(ctx context.Context, req *tfplugin5.Configure_Request) (*tfplugin5.Configure_Response, error) {
176+
ctx = s.stoppableContext(ctx)
147177
r, err := fromproto.ConfigureProviderRequest(req)
148178
if err != nil {
149179
return nil, err
@@ -159,7 +189,21 @@ func (s server) Configure(ctx context.Context, req *tfplugin5.Configure_Request)
159189
return ret, nil
160190
}
161191

162-
func (s server) Stop(ctx context.Context, req *tfplugin5.Stop_Request) (*tfplugin5.Stop_Response, error) {
192+
// stop closes the stopCh associated with the server and replaces it with a new
193+
// one.
194+
//
195+
// This causes all in-flight requests for the server to have their contexts
196+
// canceled.
197+
func (s *server) stop() {
198+
s.stopMu.Lock()
199+
defer s.stopMu.Unlock()
200+
201+
close(s.stopCh)
202+
s.stopCh = make(chan struct{})
203+
}
204+
205+
func (s *server) Stop(ctx context.Context, req *tfplugin5.Stop_Request) (*tfplugin5.Stop_Response, error) {
206+
ctx = s.stoppableContext(ctx)
163207
r, err := fromproto.StopProviderRequest(req)
164208
if err != nil {
165209
return nil, err
@@ -168,14 +212,16 @@ func (s server) Stop(ctx context.Context, req *tfplugin5.Stop_Request) (*tfplugi
168212
if err != nil {
169213
return nil, err
170214
}
215+
s.stop()
171216
ret, err := toproto.Stop_Response(resp)
172217
if err != nil {
173218
return nil, err
174219
}
175220
return ret, nil
176221
}
177222

178-
func (s server) ValidateDataSourceConfig(ctx context.Context, req *tfplugin5.ValidateDataSourceConfig_Request) (*tfplugin5.ValidateDataSourceConfig_Response, error) {
223+
func (s *server) ValidateDataSourceConfig(ctx context.Context, req *tfplugin5.ValidateDataSourceConfig_Request) (*tfplugin5.ValidateDataSourceConfig_Response, error) {
224+
ctx = s.stoppableContext(ctx)
179225
r, err := fromproto.ValidateDataSourceConfigRequest(req)
180226
if err != nil {
181227
return nil, err
@@ -191,7 +237,8 @@ func (s server) ValidateDataSourceConfig(ctx context.Context, req *tfplugin5.Val
191237
return ret, nil
192238
}
193239

194-
func (s server) ReadDataSource(ctx context.Context, req *tfplugin5.ReadDataSource_Request) (*tfplugin5.ReadDataSource_Response, error) {
240+
func (s *server) ReadDataSource(ctx context.Context, req *tfplugin5.ReadDataSource_Request) (*tfplugin5.ReadDataSource_Response, error) {
241+
ctx = s.stoppableContext(ctx)
195242
r, err := fromproto.ReadDataSourceRequest(req)
196243
if err != nil {
197244
return nil, err
@@ -207,7 +254,8 @@ func (s server) ReadDataSource(ctx context.Context, req *tfplugin5.ReadDataSourc
207254
return ret, nil
208255
}
209256

210-
func (s server) ValidateResourceTypeConfig(ctx context.Context, req *tfplugin5.ValidateResourceTypeConfig_Request) (*tfplugin5.ValidateResourceTypeConfig_Response, error) {
257+
func (s *server) ValidateResourceTypeConfig(ctx context.Context, req *tfplugin5.ValidateResourceTypeConfig_Request) (*tfplugin5.ValidateResourceTypeConfig_Response, error) {
258+
ctx = s.stoppableContext(ctx)
211259
r, err := fromproto.ValidateResourceTypeConfigRequest(req)
212260
if err != nil {
213261
return nil, err
@@ -223,7 +271,8 @@ func (s server) ValidateResourceTypeConfig(ctx context.Context, req *tfplugin5.V
223271
return ret, nil
224272
}
225273

226-
func (s server) UpgradeResourceState(ctx context.Context, req *tfplugin5.UpgradeResourceState_Request) (*tfplugin5.UpgradeResourceState_Response, error) {
274+
func (s *server) UpgradeResourceState(ctx context.Context, req *tfplugin5.UpgradeResourceState_Request) (*tfplugin5.UpgradeResourceState_Response, error) {
275+
ctx = s.stoppableContext(ctx)
227276
r, err := fromproto.UpgradeResourceStateRequest(req)
228277
if err != nil {
229278
return nil, err
@@ -239,7 +288,8 @@ func (s server) UpgradeResourceState(ctx context.Context, req *tfplugin5.Upgrade
239288
return ret, nil
240289
}
241290

242-
func (s server) ReadResource(ctx context.Context, req *tfplugin5.ReadResource_Request) (*tfplugin5.ReadResource_Response, error) {
291+
func (s *server) ReadResource(ctx context.Context, req *tfplugin5.ReadResource_Request) (*tfplugin5.ReadResource_Response, error) {
292+
ctx = s.stoppableContext(ctx)
243293
r, err := fromproto.ReadResourceRequest(req)
244294
if err != nil {
245295
return nil, err
@@ -255,7 +305,8 @@ func (s server) ReadResource(ctx context.Context, req *tfplugin5.ReadResource_Re
255305
return ret, nil
256306
}
257307

258-
func (s server) PlanResourceChange(ctx context.Context, req *tfplugin5.PlanResourceChange_Request) (*tfplugin5.PlanResourceChange_Response, error) {
308+
func (s *server) PlanResourceChange(ctx context.Context, req *tfplugin5.PlanResourceChange_Request) (*tfplugin5.PlanResourceChange_Response, error) {
309+
ctx = s.stoppableContext(ctx)
259310
r, err := fromproto.PlanResourceChangeRequest(req)
260311
if err != nil {
261312
return nil, err
@@ -271,7 +322,8 @@ func (s server) PlanResourceChange(ctx context.Context, req *tfplugin5.PlanResou
271322
return ret, nil
272323
}
273324

274-
func (s server) ApplyResourceChange(ctx context.Context, req *tfplugin5.ApplyResourceChange_Request) (*tfplugin5.ApplyResourceChange_Response, error) {
325+
func (s *server) ApplyResourceChange(ctx context.Context, req *tfplugin5.ApplyResourceChange_Request) (*tfplugin5.ApplyResourceChange_Response, error) {
326+
ctx = s.stoppableContext(ctx)
275327
r, err := fromproto.ApplyResourceChangeRequest(req)
276328
if err != nil {
277329
return nil, err
@@ -287,7 +339,8 @@ func (s server) ApplyResourceChange(ctx context.Context, req *tfplugin5.ApplyRes
287339
return ret, nil
288340
}
289341

290-
func (s server) ImportResourceState(ctx context.Context, req *tfplugin5.ImportResourceState_Request) (*tfplugin5.ImportResourceState_Response, error) {
342+
func (s *server) ImportResourceState(ctx context.Context, req *tfplugin5.ImportResourceState_Request) (*tfplugin5.ImportResourceState_Response, error) {
343+
ctx = s.stoppableContext(ctx)
291344
r, err := fromproto.ImportResourceStateRequest(req)
292345
if err != nil {
293346
return nil, err

0 commit comments

Comments
 (0)