Skip to content

Commit e67c266

Browse files
authored
Merge pull request kubernetes#65782 from yastij/eventv2-eventf
Implementing logic for v1beta1.Event API
2 parents 4f28fa8 + 464a994 commit e67c266

File tree

9 files changed

+725
-28
lines changed

9 files changed

+725
-28
lines changed

staging/publishing/import-restrictions.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
- "./vendor/k8s.io/client-go/tools/leaderelection/resourcelock"
8282
- "./vendor/k8s.io/client-go/tools/portforward"
8383
- "./vendor/k8s.io/client-go/tools/record"
84+
- "./vendor/k8s.io/client-go/tools/events"
8485
- "./vendor/k8s.io/client-go/tools/reference"
8586
- "./vendor/k8s.io/client-go/tools/remotecommand"
8687
allowedImports:

staging/src/k8s.io/client-go/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ filegroup(
6969
"//staging/src/k8s.io/client-go/tools/auth:all-srcs",
7070
"//staging/src/k8s.io/client-go/tools/cache:all-srcs",
7171
"//staging/src/k8s.io/client-go/tools/clientcmd:all-srcs",
72+
"//staging/src/k8s.io/client-go/tools/events:all-srcs",
7273
"//staging/src/k8s.io/client-go/tools/leaderelection:all-srcs",
7374
"//staging/src/k8s.io/client-go/tools/metrics:all-srcs",
7475
"//staging/src/k8s.io/client-go/tools/pager:all-srcs",

staging/src/k8s.io/client-go/go.sum

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "go_default_library",
5+
srcs = [
6+
"event_broadcaster.go",
7+
"event_recorder.go",
8+
"interfaces.go",
9+
],
10+
importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/events",
11+
importpath = "k8s.io/client-go/tools/events",
12+
visibility = ["//visibility:public"],
13+
deps = [
14+
"//staging/src/k8s.io/api/core/v1:go_default_library",
15+
"//staging/src/k8s.io/api/events/v1beta1:go_default_library",
16+
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
17+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
18+
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
19+
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
20+
"//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",
21+
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
22+
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
23+
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
24+
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
25+
"//staging/src/k8s.io/client-go/rest:go_default_library",
26+
"//staging/src/k8s.io/client-go/tools/record/util:go_default_library",
27+
"//staging/src/k8s.io/client-go/tools/reference:go_default_library",
28+
"//vendor/k8s.io/klog:go_default_library",
29+
],
30+
)
31+
32+
go_test(
33+
name = "go_default_test",
34+
srcs = ["eventseries_test.go"],
35+
embed = [":go_default_library"],
36+
deps = [
37+
"//staging/src/k8s.io/api/core/v1:go_default_library",
38+
"//staging/src/k8s.io/api/events/v1beta1:go_default_library",
39+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
40+
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
41+
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
42+
"//staging/src/k8s.io/client-go/tools/reference:go_default_library",
43+
],
44+
)
45+
46+
filegroup(
47+
name = "package-srcs",
48+
srcs = glob(["**"]),
49+
tags = ["automanaged"],
50+
visibility = ["//visibility:private"],
51+
)
52+
53+
filegroup(
54+
name = "all-srcs",
55+
srcs = [":package-srcs"],
56+
tags = ["automanaged"],
57+
visibility = ["//visibility:public"],
58+
)
Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package events
18+
19+
import (
20+
"os"
21+
"sync"
22+
"time"
23+
24+
corev1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/api/errors"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/util/clock"
29+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30+
"k8s.io/apimachinery/pkg/watch"
31+
restclient "k8s.io/client-go/rest"
32+
33+
"k8s.io/api/events/v1beta1"
34+
"k8s.io/apimachinery/pkg/util/json"
35+
"k8s.io/apimachinery/pkg/util/strategicpatch"
36+
"k8s.io/apimachinery/pkg/util/wait"
37+
"k8s.io/client-go/tools/record/util"
38+
"k8s.io/klog"
39+
)
40+
41+
const (
42+
maxTriesPerEvent = 12
43+
finishTime = 6 * time.Minute
44+
refreshTime = 30 * time.Minute
45+
maxQueuedEvents = 1000
46+
)
47+
48+
var defaultSleepDuration = 10 * time.Second
49+
50+
// TODO: validate impact of copying and investigate hashing
51+
type eventKey struct {
52+
action string
53+
reason string
54+
reportingController string
55+
reportingInstance string
56+
regarding corev1.ObjectReference
57+
related corev1.ObjectReference
58+
}
59+
60+
type eventBroadcasterImpl struct {
61+
*watch.Broadcaster
62+
mu sync.RWMutex
63+
eventCache map[eventKey]*v1beta1.Event
64+
sleepDuration time.Duration
65+
sink EventSink
66+
}
67+
68+
// NewBroadcaster Creates a new event broadcaster.
69+
func NewBroadcaster(sink EventSink) EventBroadcaster {
70+
return newBroadcaster(sink, defaultSleepDuration)
71+
}
72+
73+
// NewBroadcasterForTest Creates a new event broadcaster for test purposes.
74+
func newBroadcaster(sink EventSink, sleepDuration time.Duration) EventBroadcaster {
75+
return &eventBroadcasterImpl{
76+
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
77+
eventCache: map[eventKey]*v1beta1.Event{},
78+
sleepDuration: sleepDuration,
79+
sink: sink,
80+
}
81+
}
82+
83+
// TODO: add test for refreshExistingEventSeries
84+
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
85+
// TODO: Investigate whether lock contention won't be a problem
86+
e.mu.RLock()
87+
defer e.mu.RUnlock()
88+
for isomorphicKey, event := range e.eventCache {
89+
if event.Series != nil {
90+
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
91+
e.eventCache[isomorphicKey] = recordedEvent
92+
}
93+
}
94+
}
95+
}
96+
97+
// TODO: add test for finishSeries
98+
func (e *eventBroadcasterImpl) finishSeries() {
99+
// TODO: Investigate whether lock contention won't be a problem
100+
e.mu.Lock()
101+
defer e.mu.Unlock()
102+
for isomorphicKey, event := range e.eventCache {
103+
eventSerie := event.Series
104+
if eventSerie != nil {
105+
if eventSerie.LastObservedTime.Time.Add(finishTime).Before(time.Now()) {
106+
if _, retry := recordEvent(e.sink, event); !retry {
107+
delete(e.eventCache, isomorphicKey)
108+
}
109+
}
110+
}
111+
}
112+
}
113+
114+
// NewRecorder returns an EventRecorder that records events with the given event source.
115+
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder {
116+
hostname, _ := os.Hostname()
117+
reportingInstance := reportingController + "-" + hostname
118+
return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}
119+
}
120+
121+
func (e *eventBroadcasterImpl) recordToSink(event *v1beta1.Event, clock clock.Clock) {
122+
// Make a copy before modification, because there could be multiple listeners.
123+
eventCopy := event.DeepCopy()
124+
go func() {
125+
evToRecord := func() *v1beta1.Event {
126+
e.mu.Lock()
127+
defer e.mu.Unlock()
128+
eventKey := getKey(eventCopy)
129+
isomorphicEvent, isIsomorphic := e.eventCache[eventKey]
130+
if isIsomorphic {
131+
if isomorphicEvent.Series != nil {
132+
isomorphicEvent.Series.Count++
133+
isomorphicEvent.EventTime = metav1.MicroTime{Time: clock.Now()}
134+
return nil
135+
}
136+
isomorphicEvent.Series = &v1beta1.EventSeries{
137+
Count: 1,
138+
LastObservedTime: metav1.MicroTime{Time: clock.Now()},
139+
}
140+
return isomorphicEvent
141+
}
142+
e.eventCache[eventKey] = eventCopy
143+
return eventCopy
144+
}()
145+
if evToRecord != nil {
146+
recordedEvent := e.attemptRecording(evToRecord)
147+
if recordedEvent != nil {
148+
recordedEventKey := getKey(recordedEvent)
149+
e.mu.Lock()
150+
defer e.mu.Unlock()
151+
e.eventCache[recordedEventKey] = recordedEvent
152+
}
153+
}
154+
}()
155+
}
156+
157+
func (e *eventBroadcasterImpl) attemptRecording(event *v1beta1.Event) *v1beta1.Event {
158+
tries := 0
159+
for {
160+
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
161+
return recordedEvent
162+
}
163+
tries++
164+
if tries >= maxTriesPerEvent {
165+
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
166+
return nil
167+
}
168+
// Randomize sleep so that various clients won't all be
169+
// synced up if the master goes down.
170+
time.Sleep(wait.Jitter(e.sleepDuration, 0.25))
171+
}
172+
}
173+
174+
func recordEvent(sink EventSink, event *v1beta1.Event) (*v1beta1.Event, bool) {
175+
var newEvent *v1beta1.Event
176+
var err error
177+
isEventSerie := event.Series != nil
178+
if isEventSerie {
179+
patch, err := createPatchBytesForSeries(event)
180+
if err != nil {
181+
klog.Errorf("Unable to calculate diff, no merge is possible: %v", err)
182+
return nil, false
183+
}
184+
newEvent, err = sink.Patch(event, patch)
185+
}
186+
// Update can fail because the event may have been removed and it no longer exists.
187+
if !isEventSerie || (isEventSerie && util.IsKeyNotFoundError(err)) {
188+
// Making sure that ResourceVersion is empty on creation
189+
event.ResourceVersion = ""
190+
newEvent, err = sink.Create(event)
191+
}
192+
if err == nil {
193+
return newEvent, false
194+
}
195+
// If we can't contact the server, then hold everything while we keep trying.
196+
// Otherwise, something about the event is malformed and we should abandon it.
197+
switch err.(type) {
198+
case *restclient.RequestConstructionError:
199+
// We will construct the request the same next time, so don't keep trying.
200+
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
201+
return nil, false
202+
case *errors.StatusError:
203+
if errors.IsAlreadyExists(err) {
204+
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
205+
} else {
206+
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
207+
}
208+
return nil, false
209+
case *errors.UnexpectedObjectError:
210+
// We don't expect this; it implies the server's response didn't match a
211+
// known pattern. Go ahead and retry.
212+
default:
213+
// This case includes actual http transport errors. Go ahead and retry.
214+
}
215+
klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
216+
return nil, true
217+
}
218+
219+
func createPatchBytesForSeries(event *v1beta1.Event) ([]byte, error) {
220+
oldEvent := event.DeepCopy()
221+
oldEvent.Series = nil
222+
oldData, err := json.Marshal(oldEvent)
223+
if err != nil {
224+
return nil, err
225+
}
226+
newData, err := json.Marshal(event)
227+
if err != nil {
228+
return nil, err
229+
}
230+
return strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1beta1.Event{})
231+
}
232+
233+
func getKey(event *v1beta1.Event) eventKey {
234+
key := eventKey{
235+
action: event.Action,
236+
reason: event.Reason,
237+
reportingController: event.ReportingController,
238+
reportingInstance: event.ReportingInstance,
239+
regarding: event.Regarding,
240+
}
241+
if event.Related != nil {
242+
key.related = *event.Related
243+
}
244+
return key
245+
}
246+
247+
// startEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
248+
// The return value is used to stop recording
249+
func (e *eventBroadcasterImpl) startEventWatcher(eventHandler func(event runtime.Object)) func() {
250+
watcher := e.Watch()
251+
go func() {
252+
defer utilruntime.HandleCrash()
253+
for {
254+
watchEvent, ok := <-watcher.ResultChan()
255+
if !ok {
256+
return
257+
}
258+
eventHandler(watchEvent.Object)
259+
}
260+
}()
261+
return watcher.Stop
262+
}
263+
264+
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
265+
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
266+
go wait.Until(func() {
267+
e.refreshExistingEventSeries()
268+
}, refreshTime, stopCh)
269+
go wait.Until(func() {
270+
e.finishSeries()
271+
}, finishTime, stopCh)
272+
eventHandler := func(obj runtime.Object) {
273+
event, ok := obj.(*v1beta1.Event)
274+
if !ok {
275+
klog.Errorf("unexpected type, expected v1beta1.Event")
276+
return
277+
}
278+
e.recordToSink(event, clock.RealClock{})
279+
}
280+
stopWatcher := e.startEventWatcher(eventHandler)
281+
go func() {
282+
<-stopCh
283+
stopWatcher()
284+
}()
285+
}

0 commit comments

Comments
 (0)