Skip to content

Commit e1f1c6e

Browse files
authored
Merge pull request #6 from gdziwoki/master
Remove all sinks except stdout, update packages
2 parents 1bf3688 + 9190ea2 commit e1f1c6e

File tree

24 files changed

+382
-2540
lines changed

24 files changed

+382
-2540
lines changed

.github/workflows/build.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
- name: Set up Go
2121
uses: actions/setup-go@v4
2222
with:
23-
go-version: '1.23'
23+
go-version: '1.25'
2424

2525
- name: Test
2626
run: make

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
FROM --platform=$BUILDPLATFORM golang:1.23-alpine3.21@sha256:f8113c4b13e2a8b3a168dceaee88ac27743cc84e959f43b9dbd2291e9c3f57a0 AS builder
15+
FROM --platform=$BUILDPLATFORM golang:1.25.2-alpine3.22@sha256:06cdd34bd531b810650e47762c01e025eb9b1c7eadd191553b91c9f2d549fae8 AS builder
1616

1717
RUN apk add --update --no-cache ca-certificates make git curl
1818

@@ -35,7 +35,7 @@ COPY Makefile /app/Makefile
3535

3636
RUN CGO_ENABLED=0 GOOS=$TARGETOS GOARCH=$TARGETARCH make build
3737

38-
FROM gcr.io/distroless/static-debian12@sha256:3f2b64ef97bd285e36132c684e6b2ae8f2723293d09aae046196cca64251acac
38+
FROM gcr.io/distroless/static:latest@sha256:87bce11be0af225e4ca761c40babb06d6d559f5767fbf7dc3c47f0f1a466b92c
3939

4040
COPY --from=builder /app/eventrouter /app/eventrouter
4141

config.json

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1 @@
1-
{
2-
"kubeconfig": "/var/run/kubernetes/admin.kubeconfig",
3-
"sink": "glog",
4-
"kafkaBrokers": "kafka:9092",
5-
"kafkaTopic": "topic",
6-
"kafkaSaslUser": "user",
7-
"kafkaSaslPwd": "password"
8-
"httpSinkUrl": "http://localhost:8080",
9-
"httpSinkBufferSize": 1500,
10-
"httpSinkDiscardMessages": true,
11-
"rocksetAPIKey": "",
12-
"rocksetCollectionName": "",
13-
"rocksetWorkspaceName": "",
14-
"s3SinkAccessKeyID": "",
15-
"s3SinkSecretAccessKey": "",
16-
"s3SinkRegion": "ap-south-1",
17-
"s3SinkBucket": "",
18-
"s3SinkBucketDir": "",
19-
"s3SinkBufferSize": 1500,
20-
"s3SinkDiscardMessages": true,
21-
"s3SinkOutputFormat": "flatjson",
22-
"s3SinkUploadInterval": 120
23-
}
1+
{"sink": "stdout"}

eventrouter.go

Lines changed: 103 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -137,28 +137,80 @@ func (er *EventRouter) Run(stopCh <-chan struct{}) {
137137
<-stopCh
138138
}
139139

140+
// shouldProcessEvent checks if an event should be processed based on resource version
141+
func (er *EventRouter) shouldProcessEvent(resourceVersion string) bool {
142+
if resourceVersion == "" {
143+
return false
144+
}
145+
146+
if er.lastSeenResourceVersion == "" {
147+
return true
148+
}
149+
150+
return cast.ToInt(er.lastSeenResourceVersion) < cast.ToInt(resourceVersion)
151+
}
152+
140153
// addEvent is called when an event is created, or during the initial list
141154
func (er *EventRouter) addEvent(obj interface{}) {
142-
e := obj.(*v1.Event)
143-
if cast.ToInt(er.lastSeenResourceVersion) < cast.ToInt(e.ResourceVersion) {
155+
e, ok := obj.(*v1.Event)
156+
if !ok {
157+
glog.Errorf("Expected *v1.Event, got %T", obj)
158+
return
159+
}
160+
161+
if e == nil {
162+
glog.Error("Received nil event")
163+
return
164+
}
165+
166+
if er.shouldProcessEvent(e.ResourceVersion) {
144167
prometheusEvent(e)
145-
er.eSink.UpdateEvents(e, nil)
146-
er.lastResourceVersionPosition(e.ResourceVersion)
168+
if er.eSink != nil {
169+
er.eSink.UpdateEvents(e, nil)
170+
} else {
171+
glog.Error("Event sink is nil, cannot process event")
172+
return
173+
}
174+
if er.lastResourceVersionPosition != nil {
175+
er.lastResourceVersionPosition(e.ResourceVersion)
176+
}
147177
} else {
148-
glog.V(5).Infof("Event had already been processed:\n%v", e)
178+
glog.V(5).Infof("Event had already been processed: %s (resource version: %s)", e.Name, e.ResourceVersion)
149179
}
150180
}
151181

152182
// updateEvent is called any time there is an update to an existing event
153183
func (er *EventRouter) updateEvent(objOld interface{}, objNew interface{}) {
154-
eOld := objOld.(*v1.Event)
155-
eNew := objNew.(*v1.Event)
156-
if cast.ToInt(er.lastSeenResourceVersion) < cast.ToInt(eNew.ResourceVersion) {
184+
eOld, okOld := objOld.(*v1.Event)
185+
if !okOld {
186+
glog.Errorf("Expected *v1.Event for old object, got %T", objOld)
187+
return
188+
}
189+
190+
eNew, okNew := objNew.(*v1.Event)
191+
if !okNew {
192+
glog.Errorf("Expected *v1.Event for new object, got %T", objNew)
193+
return
194+
}
195+
196+
if eNew == nil {
197+
glog.Error("Received nil new event")
198+
return
199+
}
200+
201+
if er.shouldProcessEvent(eNew.ResourceVersion) {
157202
prometheusEvent(eNew)
158-
er.eSink.UpdateEvents(eNew, eOld)
159-
er.lastResourceVersionPosition(eNew.ResourceVersion)
203+
if er.eSink != nil {
204+
er.eSink.UpdateEvents(eNew, eOld)
205+
} else {
206+
glog.Error("Event sink is nil, cannot process event update")
207+
return
208+
}
209+
if er.lastResourceVersionPosition != nil {
210+
er.lastResourceVersionPosition(eNew.ResourceVersion)
211+
}
160212
} else {
161-
glog.V(5).Infof("Event had already been processed:\n%v", eNew)
213+
glog.V(5).Infof("Event had already been processed: %s (resource version: %s)", eNew.Name, eNew.ResourceVersion)
162214
}
163215
}
164216

@@ -167,56 +219,68 @@ func prometheusEvent(event *v1.Event) {
167219
if !viper.GetBool("enable-prometheus") {
168220
return
169221
}
222+
223+
if event == nil {
224+
glog.Error("Cannot record metrics for nil event")
225+
return
226+
}
227+
228+
// Safely get label values with defaults
229+
safeString := func(s string) string {
230+
if s == "" {
231+
return "unknown"
232+
}
233+
return s
234+
}
235+
236+
kind := safeString(event.InvolvedObject.Kind)
237+
name := safeString(event.InvolvedObject.Name)
238+
namespace := safeString(event.InvolvedObject.Namespace)
239+
reason := safeString(event.Reason)
240+
sourceHost := safeString(event.Source.Host)
241+
170242
var counter prometheus.Counter
171243
var err error
172244

173245
switch event.Type {
174246
case "Normal":
175247
counter, err = kubernetesNormalEventCounterVec.GetMetricWithLabelValues(
176-
event.InvolvedObject.Kind,
177-
event.InvolvedObject.Name,
178-
event.InvolvedObject.Namespace,
179-
event.Reason,
180-
event.Source.Host,
181-
)
248+
kind, name, namespace, reason, sourceHost)
182249
case "Warning":
183250
counter, err = kubernetesWarningEventCounterVec.GetMetricWithLabelValues(
184-
event.InvolvedObject.Kind,
185-
event.InvolvedObject.Name,
186-
event.InvolvedObject.Namespace,
187-
event.Reason,
188-
event.Source.Host,
189-
)
251+
kind, name, namespace, reason, sourceHost)
190252
case "Info":
191253
counter, err = kubernetesInfoEventCounterVec.GetMetricWithLabelValues(
192-
event.InvolvedObject.Kind,
193-
event.InvolvedObject.Name,
194-
event.InvolvedObject.Namespace,
195-
event.Reason,
196-
event.Source.Host,
197-
)
254+
kind, name, namespace, reason, sourceHost)
198255
default:
256+
glog.V(4).Infof("Unknown event type: %s", event.Type)
199257
counter, err = kubernetesUnknownEventCounterVec.GetMetricWithLabelValues(
200-
event.InvolvedObject.Kind,
201-
event.InvolvedObject.Name,
202-
event.InvolvedObject.Namespace,
203-
event.Reason,
204-
event.Source.Host,
205-
)
258+
kind, name, namespace, reason, sourceHost)
206259
}
207260

208261
if err != nil {
209-
// Not sure this is the right place to log this error?
210-
glog.Warning(err)
262+
glog.Errorf("Failed to get Prometheus counter for event %s/%s: %v", namespace, name, err)
211263
} else {
212264
counter.Add(1)
265+
glog.V(6).Infof("Recorded Prometheus metric for event %s/%s (type: %s)", namespace, name, event.Type)
213266
}
214267
}
215268

216269
// deleteEvent should only occur when the system garbage collects events via TTL expiration
217270
func (er *EventRouter) deleteEvent(obj interface{}) {
218-
e := obj.(*v1.Event)
271+
e, ok := obj.(*v1.Event)
272+
if !ok {
273+
glog.Errorf("Expected *v1.Event in deleteEvent, got %T", obj)
274+
return
275+
}
276+
277+
if e == nil {
278+
glog.Error("Received nil event in deleteEvent")
279+
return
280+
}
281+
219282
// NOTE: This should *only* happen on TTL expiration there
220283
// is no reason to push this to a sink
221-
glog.V(5).Infof("Event Deleted from the system:\n%v", e)
284+
glog.V(5).Infof("Event deleted from the system: %s/%s (reason: %s, resource version: %s)",
285+
e.Namespace, e.Name, e.Reason, e.ResourceVersion)
222286
}

0 commit comments

Comments
 (0)