Skip to content
This repository was archived by the owner on Nov 7, 2019. It is now read-only.

Commit 66efa33

Browse files
author
Tanton
committed
Fixed operator signal interruption using context WithCancel
Signed-off-by: Tanton <[email protected]>
1 parent c797f3b commit 66efa33

File tree

6 files changed

+76
-65
lines changed

6 files changed

+76
-65
lines changed

cmd/aws-service-operator/server.go

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package main
22

33
import (
4+
"context"
5+
"os"
6+
"os/signal"
7+
"syscall"
8+
49
"github.com/awslabs/aws-service-operator/pkg/logger"
510
"github.com/awslabs/aws-service-operator/pkg/server"
611
"github.com/sirupsen/logrus"
712
"github.com/spf13/cobra"
8-
"os"
9-
"os/signal"
10-
"syscall"
1113
)
1214

1315
var serverCmd = &cobra.Command{
@@ -26,21 +28,15 @@ var serverCmd = &cobra.Command{
2628
}
2729
config.Logger = logger
2830

31+
ctx, cancel := context.WithCancel(context.Background())
2932
signalChan := make(chan os.Signal, 1)
30-
stopChan := make(chan struct{})
3133
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
3234

33-
server.New(config).Run(stopChan)
34-
35-
for {
36-
select {
37-
case <-signalChan:
38-
logger.Info("shutdown signal received, exiting...")
39-
close(stopChan)
40-
return
41-
}
42-
}
35+
go server.New(config).Run(ctx)
4336

37+
<-signalChan
38+
logger.Info("shutdown signal received, exiting...")
39+
cancel()
4440
},
4541
}
4642

code-generation/pkg/codegen/assets/base.go.templ

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package base
22

33
import (
4+
"context"
45
"github.com/awslabs/aws-service-operator/pkg/config"
56
{{- range $index, $element := .Items}}
67
"github.com/awslabs/aws-service-operator/pkg/operators/{{$element.Spec.Resource.Name}}"
@@ -19,16 +20,13 @@ func New(
1920
}
2021
}
2122

22-
func (b *base) Watch(namespace string, stopCh chan struct{}) (err error) {
23+
func (b *base) Watch(ctx context.Context, namespace string) {
2324
{{- range $index, $element := .Items}}
2425
if b.config.Resources["{{$element.Spec.Resource.Name}}"] {
2526
{{$element.Spec.Resource.Name}}operator := {{$element.Spec.Resource.Name}}.NewOperator(b.config)
26-
err = {{$element.Spec.Resource.Name}}operator.StartWatch(namespace, stopCh)
27-
if err != nil {
28-
return err
29-
}
27+
go {{$element.Spec.Resource.Name}}operator.StartWatch(ctx, namespace)
3028
}
3129
{{- end}}
3230

33-
return nil
31+
<-ctx.Done()
3432
}

code-generation/pkg/codegen/assets/operator.go.templ

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package {{.Spec.Resource.Name}}
77

88
import (
9+
"context"
910
{{- if .Spec.IsCustomized}}
1011
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1112
"github.com/awslabs/aws-service-operator/pkg/helpers"
@@ -43,7 +44,7 @@ func NewOperator(config *config.Config) *Operator {
4344
}
4445

4546
// StartWatch watches for instances of Object Store custom resources and acts on them
46-
func (c *Operator) StartWatch(namespace string, stopCh chan struct{}) error {
47+
func (c *Operator) StartWatch(ctx context.Context, namespace string) {
4748
resourceHandlers := cache.ResourceEventHandlerFuncs{
4849
AddFunc: c.onAdd,
4950
UpdateFunc: c.onUpdate,
@@ -52,13 +53,11 @@ func (c *Operator) StartWatch(namespace string, stopCh chan struct{}) error {
5253
{{- if .Spec.Queue}}
5354
queuectrl := queue.New(c.config, c.config.AWSClientset, 1)
5455
c.topicARN, _, _, _ = queuectrl.Register("{{.Spec.Resource.Name}}", &awsV1alpha1.{{.Spec.Kind}}{})
55-
go queuectrl.StartWatch(queue.HandlerFunc(QueueUpdater), stopCh)
56+
go queuectrl.StartWatch(queue.HandlerFunc(QueueUpdater), ctx.Done())
5657
{{- end}}
5758

5859
oper := operator.New("{{.Spec.Resource.Plural}}", namespace, resourceHandlers, c.config.AWSClientset.RESTClient())
59-
go oper.Watch(&awsV1alpha1.{{.Spec.Kind}}{}, stopCh)
60-
61-
return nil
60+
oper.Watch(&awsV1alpha1.{{.Spec.Kind}}{}, ctx.Done())
6261
}
6362

6463
{{- if .Spec.Queue}}

code-generation/pkg/codegen/templates.go

Lines changed: 20 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/server/server.go

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
package server
22

33
import (
4+
"context"
45
"fmt"
56
"net/http"
67

78
awsscheme "github.com/awslabs/aws-service-operator/pkg/client/clientset/versioned/scheme"
89
"github.com/awslabs/aws-service-operator/pkg/config"
910
opBase "github.com/awslabs/aws-service-operator/pkg/operators/base"
11+
"github.com/prometheus/client_golang/prometheus/promhttp"
1012
"github.com/sirupsen/logrus"
1113
corev1 "k8s.io/api/core/v1"
1214
"k8s.io/client-go/kubernetes/scheme"
1315
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
1416
"k8s.io/client-go/tools/record"
15-
16-
"github.com/prometheus/client_golang/prometheus/promhttp"
1717
)
1818

1919
const controllerName = "aws-service-operator"
@@ -25,16 +25,27 @@ func New(config *config.Config) *Server {
2525
}
2626
}
2727

28-
func (c *Server) exposeMetrics(errChan chan error) {
29-
http.Handle("/metrics", promhttp.Handler())
30-
err := http.ListenAndServe(":9090", nil)
31-
if err != nil {
32-
errChan <- fmt.Errorf("unable to expose metrics: %v", err)
28+
func (c *Server) exposeMetrics(errChan chan error, ctx context.Context) {
29+
c.Handle("/metrics", promhttp.Handler())
30+
server := http.Server{
31+
Addr: ":9090",
32+
Handler: c,
3333
}
34-
}
34+
defer server.Shutdown(ctx)
3535

36-
func (c *Server) watchOperatorResources(errChan chan error, stopChan chan struct{}) {
36+
go func() {
37+
err := server.ListenAndServe()
38+
if err != nil {
39+
errChan <- fmt.Errorf("unable to expose metrics: %v", err)
40+
}
41+
}()
3742

43+
c.Config.Logger.Info("metrics server started")
44+
<-ctx.Done()
45+
c.Config.Logger.Info("metrics server stopped")
46+
}
47+
48+
func (c *Server) watchOperatorResources(errChan chan error, ctx context.Context) {
3849
logger := c.Config.Logger
3950

4051
logger.Info("getting kubernetes context")
@@ -48,27 +59,31 @@ func (c *Server) watchOperatorResources(errChan chan error, stopChan chan struct
4859
// start watching the aws operator resources
4960
logger.WithFields(logrus.Fields{"resources": c.Config.Resources}).Info("Watching")
5061
operators := opBase.New(c.Config) // TODO: remove context and Clientset
51-
err := operators.Watch(corev1.NamespaceAll, stopChan)
52-
if err != nil {
53-
errChan <- fmt.Errorf("unable to watch resources: %v", err)
54-
}
62+
63+
go operators.Watch(ctx, corev1.NamespaceAll)
64+
<-ctx.Done()
65+
c.Config.Logger.Info("operators stopped")
5566
}
5667

5768
// Run starts the server to listen to Kubernetes
58-
func (c *Server) Run(stopChan chan struct{}) {
69+
func (c *Server) Run(ctx context.Context) {
5970
config := c.Config
6071
logger := config.Logger
6172
errChan := make(chan error, 1)
6273

6374
logger.Info("starting metrics server")
64-
go c.exposeMetrics(errChan)
75+
go c.exposeMetrics(errChan, ctx)
6576

6677
logger.Info("starting resource watcher")
67-
go c.watchOperatorResources(errChan, stopChan)
78+
go c.watchOperatorResources(errChan, ctx)
6879

69-
err := <-errChan
70-
if err != nil {
71-
logger.Fatal(err)
80+
for {
81+
select {
82+
case err := <-errChan:
83+
c.Config.Logger.WithError(err).Fatal(err)
84+
case <-ctx.Done():
85+
c.Config.Logger.Info("stop signal received. waiting for operators to stop")
86+
return
87+
}
7288
}
73-
7489
}

pkg/server/types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package server
22

33
import (
4+
"net/http"
5+
46
"github.com/awslabs/aws-service-operator/pkg/config"
57
)
68

79
// Server defines the bas construct for the operator
810
type Server struct {
11+
http.ServeMux
912
Config *config.Config
1013
}

0 commit comments

Comments
 (0)