Skip to content
This repository was archived by the owner on Nov 7, 2019. It is now read-only.

Commit 2dc90fd

Browse files
Adding Kubernetes Recorder to log to the standard events
* Adds event recorder which will log status updates to the event stream * Will continue between rollbacks so you can see it failing closes #85 Signed-off-by: Christopher Hein <[email protected]>
1 parent f6c9e0a commit 2dc90fd

File tree

9 files changed

+94
-59
lines changed

9 files changed

+94
-59
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ aws-codegen:
2828

2929
.PHONY: k8s-codegen
3030
k8s-codegen:
31-
./codegen.sh
31+
./hack/update-codegen.sh
3232

3333
.PHONY: codegen
3434
codegen: aws-codegen k8s-codegen

code-generation/pkg/codegen/assets/controller.go.templ

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ import (
1414

1515
"github.com/awslabs/aws-service-operator/pkg/config"
1616
{{- if .Spec.Queue}}
17-
"github.com/awslabs/aws-service-operator/pkg/queue"
17+
"github.com/awslabs/aws-service-operator/pkg/queue"
18+
corev1 "k8s.io/api/core/v1"
19+
"github.com/iancoleman/strcase"
20+
"strings"
1821
{{- end}}
1922
opkit "github.com/christopherhein/operator-kit"
2023
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
@@ -50,7 +53,7 @@ type Controller struct {
5053
config *config.Config
5154
context *opkit.Context
5255
awsclientset awsclient.ServiceoperatorV1alpha1Interface
53-
topicARN string
56+
topicARN string
5457
}
5558

5659
// NewController create controller for watching object store custom resources created
@@ -106,27 +109,35 @@ func QueueUpdater(config *config.Config, msg *queue.MessageBody) error {
106109
}
107110

108111
if name != "" && namespace != "" {
112+
annotations := map[string]string{
113+
"StackID": msg.ParsedMessage["StackId"],
114+
"StackName": msg.ParsedMessage["StackName"],
115+
"ResourceType": msg.ParsedMessage["ResourceType"],
116+
}
109117
if msg.ParsedMessage["ResourceStatus"] == "ROLLBACK_COMPLETE" {
110-
err := deleteStack(config, name, namespace, msg.ParsedMessage["StackId"])
118+
obj, err := deleteStack(config, name, namespace, msg.ParsedMessage["StackId"])
111119
if err != nil {
112120
return err
113121
}
122+
config.Recorder.AnnotatedEventf(obj, annotations, corev1.EventTypeWarning, strcase.ToCamel(strings.ToLower(msg.ParsedMessage["ResourceStatus"])), msg.ParsedMessage["ResourceStatusReason"])
114123
} else if msg.ParsedMessage["ResourceStatus"] == "DELETE_COMPLETE" {
115-
err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"])
124+
obj, err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"])
116125
if err != nil {
117126
return err
118127
}
119-
128+
config.Recorder.AnnotatedEventf(obj, annotations, corev1.EventTypeWarning, strcase.ToCamel(strings.ToLower(msg.ParsedMessage["ResourceStatus"])), msg.ParsedMessage["ResourceStatusReason"])
120129
err = incrementRollbackCount(config, name, namespace)
121130
if err != nil {
122131
return err
123132
}
124133
} else {
125-
err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"])
134+
obj, err := updateStatus(config, name, namespace, msg.ParsedMessage["StackId"], msg.ParsedMessage["ResourceStatus"], msg.ParsedMessage["ResourceStatusReason"])
126135
if err != nil {
127136
return err
128137
}
138+
config.Recorder.AnnotatedEventf(obj, annotations, corev1.EventTypeNormal, strcase.ToCamel(strings.ToLower(msg.ParsedMessage["ResourceStatus"])), msg.ParsedMessage["ResourceStatusReason"])
129139
}
140+
130141
}
131142

132143
return nil
@@ -147,7 +158,7 @@ func (c *Controller) onAdd(obj interface{}) {
147158
c.config.Logger.Infof("added {{.Spec.Resource.Name}} '%s' with stackID '%s'", s.Name, string(*output.StackId))
148159
c.config.Logger.Infof("view at https://console.aws.amazon.com/cloudformation/home?#/stack/detail?stackId=%s", string(*output.StackId))
149160

150-
err = updateStatus(c.config, s.Name, s.Namespace, string(*output.StackId), "CREATE_IN_PROGRESS", "")
161+
_, err = updateStatus(c.config, s.Name, s.Namespace, string(*output.StackId), "CREATE_IN_PROGRESS", "")
151162
if err != nil {
152163
c.config.Logger.WithError(err).Error("error updating status")
153164
}
@@ -176,7 +187,7 @@ func (c *Controller) onUpdate(oldObj, newObj interface{}) {
176187
c.config.Logger.Infof("updated {{.Spec.Resource.Name}} '%s' with params '%s'", no.Name, string(*output.StackId))
177188
c.config.Logger.Infof("view at https://console.aws.amazon.com/cloudformation/home?#/stack/detail?stackId=%s", string(*output.StackId))
178189

179-
err = updateStatus(c.config, oo.Name, oo.Namespace, string(*output.StackId), "UPDATE_IN_PROGRESS", "")
190+
_, err = updateStatus(c.config, oo.Name, oo.Namespace, string(*output.StackId), "UPDATE_IN_PROGRESS", "")
180191
if err != nil {
181192
c.config.Logger.WithError(err).Error("error updating status")
182193
}
@@ -225,13 +236,13 @@ func incrementRollbackCount(config *config.Config, name string, namespace string
225236
return nil
226237
}
227238

228-
func updateStatus(config *config.Config, name string, namespace string, stackID string, status string, reason string) error {
239+
func updateStatus(config *config.Config, name string, namespace string, stackID string, status string, reason string) (*awsV1alpha1.{{.Spec.Kind}}, error) {
229240
logger := config.Logger
230241
clientSet, _ := awsclient.NewForConfig(config.RESTConfig)
231242
resource, err := clientSet.{{.Spec.PluralName}}(namespace).Get(name, metav1.GetOptions{})
232243
if err != nil {
233244
logger.WithError(err).Error("error getting {{.Spec.Resource.Plural}}")
234-
return err
245+
return nil, err
235246
}
236247

237248
resourceCopy := resource.DeepCopy()
@@ -259,7 +270,7 @@ func updateStatus(config *config.Config, name string, namespace string, stackID
259270
_, err = clientSet.{{.Spec.PluralName}}(namespace).Update(resourceCopy)
260271
if err != nil {
261272
logger.WithError(err).Error("error updating resource")
262-
return err
273+
return nil, err
263274
}
264275

265276
if helpers.IsStackComplete(status, false) {
@@ -268,26 +279,26 @@ func updateStatus(config *config.Config, name string, namespace string, stackID
268279
logger.WithError(err).Info("error syncing resources")
269280
}
270281
}
271-
return nil
282+
return resourceCopy, nil
272283
}
273284

274-
func deleteStack(config *config.Config, name string, namespace string, stackID string) error {
285+
func deleteStack(config *config.Config, name string, namespace string, stackID string) (*awsV1alpha1.{{.Spec.Kind}}, error) {
275286
logger := config.Logger
276287
clientSet, _ := awsclient.NewForConfig(config.RESTConfig)
277288
resource, err := clientSet.{{.Spec.PluralName}}(namespace).Get(name, metav1.GetOptions{})
278289
if err != nil {
279290
logger.WithError(err).Error("error getting {{.Spec.Resource.Plural}}")
280-
return err
291+
return nil, err
281292
}
282293

283294
cft := New(config, resource, "")
284295
err = cft.DeleteStack()
285296
if err != nil {
286-
return err
297+
return nil, err
287298
}
288299

289300
err = cft.WaitUntilStackDeleted()
290-
return err
301+
return resource, err
291302
}
292303

293304
func syncAdditionalResources(config *config.Config, s *awsV1alpha1.{{.Spec.Kind}}) (err error) {

code-generation/pkg/codegen/templates.go

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

codegen.sh

Lines changed: 0 additions & 21 deletions
This file was deleted.

hack/boilerplate.go.txt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
Licensed under the Apache License, Version 2.0 (the "License");
3+
you may not use this file except in compliance with the License.
4+
You may obtain a copy of the License at
5+
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
8+
Unless required by applicable law or agreed to in writing, software
9+
distributed under the License is distributed on an "AS IS" BASIS,
10+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
See the License for the specific language governing permissions and
12+
limitations under the License.
13+
*/

hack/update-codegen.sh

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
set -o errexit
2+
set -o nounset
3+
set -o pipefail
4+
5+
SCRIPT_ROOT=$(dirname ${BASH_SOURCE})/..
6+
CODEGEN_PKG=${CODEGEN_PKG:-$(cd ${SCRIPT_ROOT}; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)}
7+
8+
# generate the code with:
9+
# --output-base because this script should also be able to run inside the vendor dir of
10+
# k8s.io/kubernetes. The output-base is needed for the generators to output into the vendor dir
11+
# instead of the $GOPATH directly. For normal projects this can be dropped.
12+
${CODEGEN_PKG}/generate-groups.sh all \
13+
github.com/awslabs/aws-service-operator/pkg/client \
14+
github.com/awslabs/aws-service-operator/pkg/apis \
15+
"service-operator.aws:v1alpha1" \
16+
--output-base "$(dirname ${BASH_SOURCE})/../../.." \
17+
--go-header-file ${SCRIPT_ROOT}/hack/boilerplate.go.txt

pkg/config/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
opkit "github.com/christopherhein/operator-kit"
77
"github.com/sirupsen/logrus"
88
"k8s.io/client-go/rest"
9+
"k8s.io/client-go/tools/record"
910
)
1011

1112
// Config defines the configuration for the operator
@@ -22,6 +23,7 @@ type Config struct {
2223
ClusterName string
2324
Bucket string
2425
AccountID string
26+
Recorder record.EventRecorder
2527
}
2628

2729
// LoggingConfig defines the attributes for the logger

pkg/queue/queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ func process(q *Queue, svc *sqs.SQS, h Handler, stopCh <-chan struct{}) error {
230230
if err != nil {
231231
logger.WithError(err).Error("error processing message")
232232
}
233-
logger.Infof("stackID %v updated status to %v", mb.ParsedMessage["StackId"], mb.ParsedMessage["ResourceStatus"])
233+
logger.Debugf("stackID %v updated status to %v", mb.ParsedMessage["StackId"], mb.ParsedMessage["ResourceStatus"])
234234
_, err = svc.DeleteMessage(&sqs.DeleteMessageInput{
235235
QueueUrl: aws.String(q.queueURL),
236236
ReceiptHandle: message.ReceiptHandle,

pkg/server/server.go

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,19 @@ import (
1717
"github.com/awslabs/aws-service-operator/pkg/operator/snstopic"
1818
"github.com/awslabs/aws-service-operator/pkg/operator/sqsqueue"
1919
opkit "github.com/christopherhein/operator-kit"
20-
"k8s.io/api/core/v1"
20+
corev1 "k8s.io/api/core/v1"
2121
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
2222
"k8s.io/client-go/kubernetes"
2323
"k8s.io/client-go/rest"
2424
"k8s.io/client-go/tools/clientcmd"
25+
"k8s.io/client-go/tools/record"
26+
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
27+
"k8s.io/client-go/kubernetes/scheme"
28+
awsscheme "github.com/awslabs/aws-service-operator/pkg/client/clientset/versioned/scheme"
2529
)
2630

31+
const controllerName = "aws-service-operator"
32+
2733
// New creates a new server from a config
2834
func New(config *config.Config) *Server {
2935
return &Server{
@@ -36,7 +42,7 @@ func (c *Server) Run(stopChan chan struct{}) {
3642
config := c.Config
3743
logger := config.Logger
3844
logger.Info("Getting kubernetes context")
39-
context, restConfig, awsClientset, err := createContext(config.Kubeconfig)
45+
context, restConfig, kubeclientset, awsClientset, err := createContext(config.Kubeconfig)
4046
if err != nil {
4147
logger.Fatalf("failed to create context. %+v\n", err)
4248
}
@@ -84,28 +90,35 @@ func (c *Server) Run(stopChan chan struct{}) {
8490
}
8591
config.AWSSession = sess
8692

93+
awsscheme.AddToScheme(scheme.Scheme)
94+
eventBroadcaster := record.NewBroadcaster()
95+
eventBroadcaster.StartLogging(logger.Infof)
96+
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
97+
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName})
98+
config.Recorder = recorder
99+
87100
// start watching the aws operator resources
88101
logger.Info("Watching the resources")
89102
cftcontroller := cloudformationtemplate.NewController(config, context, awsClientset)
90-
cftcontroller.StartWatch(v1.NamespaceAll, stopChan)
103+
cftcontroller.StartWatch(corev1.NamespaceAll, stopChan)
91104

92105
s3controller := s3bucket.NewController(config, context, awsClientset)
93-
s3controller.StartWatch(v1.NamespaceAll, stopChan)
106+
s3controller.StartWatch(corev1.NamespaceAll, stopChan)
94107

95108
ddbcontroller := dynamodb.NewController(config, context, awsClientset)
96-
ddbcontroller.StartWatch(v1.NamespaceAll, stopChan)
109+
ddbcontroller.StartWatch(corev1.NamespaceAll, stopChan)
97110

98111
sqscontroller := sqsqueue.NewController(config, context, awsClientset)
99-
sqscontroller.StartWatch(v1.NamespaceAll, stopChan)
112+
sqscontroller.StartWatch(corev1.NamespaceAll, stopChan)
100113

101114
ecrcontroller := ecrrepository.NewController(config, context, awsClientset)
102-
ecrcontroller.StartWatch(v1.NamespaceAll, stopChan)
115+
ecrcontroller.StartWatch(corev1.NamespaceAll, stopChan)
103116

104117
snscontroller := snstopic.NewController(config, context, awsClientset)
105-
snscontroller.StartWatch(v1.NamespaceAll, stopChan)
118+
snscontroller.StartWatch(corev1.NamespaceAll, stopChan)
106119

107120
snssubcontroller := snssubscription.NewController(config, context, awsClientset)
108-
snssubcontroller.StartWatch(v1.NamespaceAll, stopChan)
121+
snssubcontroller.StartWatch(corev1.NamespaceAll, stopChan)
109122
}
110123

111124
func getClientConfig(kubeconfig string) (*rest.Config, error) {
@@ -115,25 +128,25 @@ func getClientConfig(kubeconfig string) (*rest.Config, error) {
115128
return rest.InClusterConfig()
116129
}
117130

118-
func createContext(kubeconfig string) (*opkit.Context, *rest.Config, awsclient.ServiceoperatorV1alpha1Interface, error) {
131+
func createContext(kubeconfig string) (*opkit.Context, *rest.Config, kubernetes.Interface, awsclient.ServiceoperatorV1alpha1Interface, error) {
119132
config, err := getClientConfig(kubeconfig)
120133
if err != nil {
121-
return nil, nil, nil, fmt.Errorf("failed to get k8s config. %+v", err)
134+
return nil, nil, nil, nil, fmt.Errorf("failed to get k8s config. %+v", err)
122135
}
123136

124137
clientset, err := kubernetes.NewForConfig(config)
125138
if err != nil {
126-
return nil, nil, nil, fmt.Errorf("failed to get k8s client. %+v", err)
139+
return nil, nil, nil, nil, fmt.Errorf("failed to get k8s client. %+v", err)
127140
}
128141

129142
apiExtClientset, err := apiextensionsclient.NewForConfig(config)
130143
if err != nil {
131-
return nil, nil, nil, fmt.Errorf("failed to create k8s API extension clientset. %+v", err)
144+
return nil, nil, nil, nil, fmt.Errorf("failed to create k8s API extension clientset. %+v", err)
132145
}
133146

134147
awsclientset, err := awsclient.NewForConfig(config)
135148
if err != nil {
136-
return nil, nil, nil, fmt.Errorf("failed to create object store clientset. %+v", err)
149+
return nil, nil, nil, nil, fmt.Errorf("failed to create object store clientset. %+v", err)
137150
}
138151

139152
context := &opkit.Context{
@@ -142,5 +155,5 @@ func createContext(kubeconfig string) (*opkit.Context, *rest.Config, awsclient.S
142155
Interval: 500 * time.Millisecond,
143156
Timeout: 60 * time.Second,
144157
}
145-
return context, config, awsclientset, nil
158+
return context, config, clientset, awsclientset, nil
146159
}

0 commit comments

Comments
 (0)