diff --git a/.gitignore b/.gitignore index 36849e2e0..7eeb28f1b 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,5 @@ _testmain.go *.prof eventrouter + +.idea diff --git a/eventrouter.go b/eventrouter.go index 4c77cac2e..61c3411bf 100644 --- a/eventrouter.go +++ b/eventrouter.go @@ -22,6 +22,7 @@ import ( "github.com/golang/glog" "github.com/heptiolabs/eventrouter/sinks" "github.com/prometheus/client_golang/prometheus" + "github.com/spf13/cast" "github.com/spf13/viper" v1 "k8s.io/api/core/v1" @@ -90,10 +91,14 @@ type EventRouter struct { // event sink // TODO: Determine if we want to support multiple sinks. eSink sinks.EventSinkInterface + + lastSeenResourceVersion string + lastResourceVersionPosition func(string) } // NewEventRouter will create a new event router using the input params -func NewEventRouter(kubeClient kubernetes.Interface, eventsInformer coreinformers.EventInformer) *EventRouter { +func NewEventRouter(kubeClient kubernetes.Interface, eventsInformer coreinformers.EventInformer, + lastSeenResourceVersion string, lastResourceVersionPosition func(rv string)) *EventRouter { if viper.GetBool("enable-prometheus") { prometheus.MustRegister(kubernetesWarningEventCounterVec) prometheus.MustRegister(kubernetesNormalEventCounterVec) @@ -102,8 +107,10 @@ func NewEventRouter(kubeClient kubernetes.Interface, eventsInformer coreinformer } er := &EventRouter{ - kubeClient: kubeClient, - eSink: sinks.ManufactureSink(), + kubeClient: kubeClient, + eSink: sinks.ManufactureSink(), + lastSeenResourceVersion: lastSeenResourceVersion, + lastResourceVersionPosition: lastResourceVersionPosition, } eventsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: er.addEvent, @@ -133,16 +140,26 @@ func (er *EventRouter) Run(stopCh <-chan struct{}) { // addEvent is called when an event is created, or during the initial list func (er *EventRouter) addEvent(obj interface{}) { e := obj.(*v1.Event) - prometheusEvent(e) - er.eSink.UpdateEvents(e, nil) + if cast.ToInt(er.lastSeenResourceVersion) < cast.ToInt(e.ResourceVersion) { + prometheusEvent(e) + er.eSink.UpdateEvents(e, nil) + er.lastResourceVersionPosition(e.ResourceVersion) + } else { + glog.V(5).Infof("Event had already been processed:\n%v", e) + } } // updateEvent is called any time there is an update to an existing event func (er *EventRouter) updateEvent(objOld interface{}, objNew interface{}) { eOld := objOld.(*v1.Event) eNew := objNew.(*v1.Event) - prometheusEvent(eNew) - er.eSink.UpdateEvents(eNew, eOld) + if cast.ToInt(er.lastSeenResourceVersion) < cast.ToInt(eNew.ResourceVersion) { + prometheusEvent(eNew) + er.eSink.UpdateEvents(eNew, eOld) + er.lastResourceVersionPosition(eNew.ResourceVersion) + } else { + glog.V(5).Infof("Event had already been processed:\n%v", eNew) + } } // prometheusEvent is called when an event is added or updated diff --git a/go.mod b/go.mod index ea798b14a..2157e1fe5 100644 --- a/go.mod +++ b/go.mod @@ -17,10 +17,12 @@ require ( github.com/prometheus/client_golang v1.1.0 github.com/rockset/rockset-go-client v0.6.0 github.com/sethgrid/pester v0.0.0-20190127155807-68a33a018ad0 + github.com/spf13/cast v1.3.0 github.com/spf13/viper v1.4.0 gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect k8s.io/api v0.0.0-20190814101207-0772a1bdf941 k8s.io/apimachinery v0.0.0-20190814100815-533d101be9a6 k8s.io/client-go v12.0.0+incompatible + k8s.io/klog v0.4.0 k8s.io/utils v0.0.0-20190809000727-6c36bc71fc4a // indirect ) diff --git a/main.go b/main.go index cb891e30a..43c6ff492 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ package main import ( "flag" + "io/ioutil" "net/http" "os" "os/signal" @@ -27,6 +28,7 @@ import ( "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spf13/cast" "github.com/spf13/viper" "k8s.io/client-go/informers" @@ -49,7 +51,7 @@ func sigHandler() <-chan struct{} { syscall.SIGSEGV, // FullDerp syscall.SIGABRT, // Abnormal termination syscall.SIGILL, // illegal instruction - syscall.SIGFPE) // floating point - this is why we can't have nice things + syscall.SIGFPE) // floating point - this is why we can't have nice things sig := <-c glog.Warningf("Signal (%v) Detected, Shutting Down", sig) close(stop) @@ -107,11 +109,41 @@ func main() { var wg sync.WaitGroup clientset := loadConfig() + + var lastResourceVersionPosition string + var mostRecentResourceVersion *string + + resourceVersionPositionPath := viper.GetString("lastResourceVersionPositionPath") + resourceVersionPositionFunc := func(resourceVersion string) { + if resourceVersionPositionPath != "" { + if cast.ToInt(resourceVersion) > cast.ToInt(mostRecentResourceVersion) { + err := ioutil.WriteFile(resourceVersionPositionPath, []byte(resourceVersion), 0644) + if err != nil { + glog.Errorf("failed to write lastResourceVersionPosition") + } else { + mostRecentResourceVersion = &resourceVersion + } + } + } + } + + if resourceVersionPositionPath != "" { + _, err := os.Stat(resourceVersionPositionPath) + if !os.IsNotExist(err) { + resourceVersionBytes, err := ioutil.ReadFile(resourceVersionPositionPath) + if err != nil { + glog.Errorf("failed to read resource version bookmark from %s", resourceVersionPositionPath) + } else { + lastResourceVersionPosition = string(resourceVersionBytes) + } + } + } + sharedInformers := informers.NewSharedInformerFactory(clientset, viper.GetDuration("resync-interval")) eventsInformer := sharedInformers.Core().V1().Events() // TODO: Support locking for HA https://github.com/kubernetes/kubernetes/pull/42666 - eventRouter := NewEventRouter(clientset, eventsInformer) + eventRouter := NewEventRouter(clientset, eventsInformer, lastResourceVersionPosition, resourceVersionPositionFunc) stop := sigHandler() // Startup the http listener for Prometheus Metrics endpoint.