Skip to content

Commit 4a0bd67

Browse files
author
Mario Macias
authored
Update Go and go-pipes version (#80)
1 parent aa7838d commit 4a0bd67

File tree

12 files changed

+149
-446
lines changed

12 files changed

+149
-446
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
module github.com/netobserv/netobserv-ebpf-agent
22

3-
go 1.17
3+
go 1.18
44

55
require (
66
github.com/caarlos0/env/v6 v6.9.1
77
github.com/cilium/ebpf v0.8.1
88
github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424
99
github.com/mariomac/guara v0.0.0-20220523124851-5fc279816f1f
10-
github.com/netobserv/gopipes v0.2.0
10+
github.com/netobserv/gopipes v0.3.0
1111
github.com/paulbellamy/ratecounter v0.2.0
1212
github.com/segmentio/kafka-go v0.4.35
1313
github.com/sirupsen/logrus v1.8.1

go.sum

Lines changed: 2 additions & 138 deletions
Large diffs are not rendered by default.

pkg/agent/agent.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ type Flows struct {
6464
mapTracer *flow.MapTracer
6565
rbTracer *flow.RingBufTracer
6666
accounter *flow.Accounter
67-
exporter flowExporter
67+
exporter node.TerminalFunc[[]*flow.Record]
6868

6969
// elements used to decorate flows with extra information
7070
interfaceNamer flow.InterfaceNamer
@@ -82,10 +82,6 @@ type ebpfFlowFetcher interface {
8282
ReadRingBuf() (ringbuf.Record, error)
8383
}
8484

85-
// flowExporter abstract the ExportFlows' method of exporter.GRPCProto to allow dependency injection
86-
// in tests
87-
type flowExporter func(in <-chan []*flow.Record)
88-
8985
// FlowsAgent instantiates a new agent, given a configuration.
9086
func FlowsAgent(cfg *Config) (*Flows, error) {
9187
alog.Info("initializing Flows agent")
@@ -138,7 +134,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
138134
func flowsAgent(cfg *Config,
139135
informer ifaces.Informer,
140136
fetcher ebpfFlowFetcher,
141-
exporter flowExporter,
137+
exporter node.TerminalFunc[[]*flow.Record],
142138
agentIP net.IP,
143139
) (*Flows, error) {
144140
// configure allow/deny interfaces filter
@@ -189,7 +185,7 @@ func flowDirections(cfg *Config) (ingress, egress bool) {
189185
}
190186
}
191187

192-
func buildFlowExporter(cfg *Config) (flowExporter, error) {
188+
func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
193189
switch cfg.Export {
194190
case "grpc":
195191
if cfg.TargetHost == "" || cfg.TargetPort == 0 {
@@ -342,7 +338,7 @@ func (f *Flows) interfacesManager(ctx context.Context) error {
342338

343339
// buildAndStartPipeline creates the ETL flow processing graph.
344340
// For a more visual view, check the docs/architecture.md document.
345-
func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, error) {
341+
func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*flow.Record], error) {
346342

347343
alog.Debug("registering interfaces' listener in background")
348344
err := f.interfacesManager(ctx)
@@ -351,8 +347,8 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro
351347
}
352348

353349
alog.Debug("connecting flows' processing graph")
354-
mapTracer := node.AsInit(f.mapTracer.TraceLoop(ctx))
355-
rbTracer := node.AsInit(f.rbTracer.TraceLoop(ctx))
350+
mapTracer := node.AsStart(f.mapTracer.TraceLoop(ctx))
351+
rbTracer := node.AsStart(f.rbTracer.TraceLoop(ctx))
356352

357353
accounter := node.AsMiddle(f.accounter.Account,
358354
node.ChannelBufferLen(f.cfg.BuffersLength))

pkg/flow/tracer_map.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (m *MapTracer) Flush() {
4141
m.evictionCond.Broadcast()
4242
}
4343

44-
func (m *MapTracer) TraceLoop(ctx context.Context) node.InitFunc {
44+
func (m *MapTracer) TraceLoop(ctx context.Context) node.StartFunc[[]*Record] {
4545
return func(out chan<- []*Record) {
4646
evictionTicker := time.NewTicker(m.evictionTimeout)
4747
go m.evictionSynchronization(ctx, out)

pkg/flow/tracer_ringbuf.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func NewRingBufTracer(
5151
}
5252
}
5353

54-
func (m *RingBufTracer) TraceLoop(ctx context.Context) node.InitFunc {
54+
func (m *RingBufTracer) TraceLoop(ctx context.Context) node.StartFunc[*RawRecord] {
5555
return func(out chan<- *RawRecord) {
5656
debugging := logrus.IsLevelEnabled(logrus.DebugLevel)
5757
for {

vendor/github.com/netobserv/gopipes/pkg/internal/refl/channel.go

Lines changed: 0 additions & 43 deletions
This file was deleted.

vendor/github.com/netobserv/gopipes/pkg/internal/refl/function.go

Lines changed: 0 additions & 79 deletions
This file was deleted.

vendor/github.com/netobserv/gopipes/pkg/internal/refl/helpers.go

Lines changed: 0 additions & 12 deletions
This file was deleted.

vendor/github.com/netobserv/gopipes/pkg/internal/connect/connectors.go renamed to vendor/github.com/netobserv/gopipes/pkg/node/internal/connect/connectors.go

Lines changed: 23 additions & 29 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)