Skip to content

Commit c6e207d

Browse files
author
Mario Macias
authored
Allow selecting flows' direction (#58)
* Allow selecting flows' direction * Fix typo * make linter happy
1 parent 24af5a5 commit c6e207d

File tree

5 files changed

+60
-5
lines changed

5 files changed

+60
-5
lines changed

docs/config.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ The following environment variables are available to configure the NetObserv eBF
2929
deduplicator. After a flow hasn't been received for that expiry time, the deduplicator forgets it.
3030
That means that a flow from a connection that has been inactive during that period could be
3131
forwarded again from a different interface.
32+
* `DIRECTION` (default: `both`). Allows selecting which flows to trace according to its direction.
33+
Accepted values are `ingress`, `egress` or `both`.
3234
* `LOG_LEVEL` (default: `info`). From more to less verbose: `trace`, `debug`, `info`, `warn`,
3335
`error`, `fatal`, `panic`.
3436
* `KAFKA_BROKERS` (required if `EXPORT` is `kafka`). Comma-separated list of tha addresses of the

e2e/kafka/manifests/12-kafka-topic.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@ metadata:
55
labels:
66
strimzi.io/cluster: "kafka-cluster"
77
spec:
8-
partitions: 1
8+
partitions: 12
99
replicas: 1

pkg/agent/agent.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,11 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
130130
return iface
131131
}
132132

133+
ingress, egress := flowDirections(cfg)
134+
133135
tracer, err := ebpf.NewFlowTracer(
134136
cfg.Sampling, cfg.CacheMaxFlows, cfg.BuffersLength, cfg.CacheActiveTimeout,
137+
ingress, egress,
135138
interfaceNamer,
136139
)
137140
if err != nil {
@@ -147,6 +150,20 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
147150
}, nil
148151
}
149152

153+
func flowDirections(cfg *Config) (ingress, egress bool) {
154+
switch cfg.Direction {
155+
case DirectionIngress:
156+
return true, false
157+
case DirectionEgress:
158+
return false, true
159+
case DirectionBoth:
160+
return true, true
161+
default:
162+
alog.Warnf("unknown DIRECTION %q. Tracing both ingress and egress traffic", cfg.Direction)
163+
return true, true
164+
}
165+
}
166+
150167
// Run a Flows agent. The function will keep running in the same thread
151168
// until the passed context is canceled
152169
func (f *Flows) Run(ctx context.Context) error {

pkg/agent/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ const (
99
ListenWatch = "watch"
1010
DeduperNone = "none"
1111
DeduperFirstCome = "firstCome"
12+
DirectionIngress = "ingress"
13+
DirectionEgress = "egress"
14+
DirectionBoth = "both"
1215
)
1316

1417
type Config struct {
@@ -49,6 +52,9 @@ type Config struct {
4952
// again from a different interface.
5053
// If the value is not set, it will default to 2 * CacheActiveTimeout
5154
DeduperFCExpiry time.Duration `env:"DEDUPER_FC_EXPIRY"`
55+
// Direction allows selecting which flows to trace according to its direction. Accepted values
56+
// are "ingress", "egress" or "both" (default).
57+
Direction string `env:"DIRECTION" envDefault:"both"`
5258
// Logger level. From more to less verbose: trace, debug, info, warn, error, fatal, panic.
5359
LogLevel string `env:"LOG_LEVEL" envDefault:"info"`
5460
// Sampling holds the rate at which packets should be sampled and sent to the target collector.

pkg/ebpf/tracer.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,15 @@ type FlowTracer struct {
4747
flowsEvictor *sync.Cond
4848
lastEvictionNs uint64
4949
cacheMaxSize int
50+
enableIngress bool
51+
enableEgress bool
5052
}
5153

5254
// TODO: decouple flowtracer logic from eBPF maps access so we can inject mocks for testing
5355
func NewFlowTracer(
5456
sampling, cacheMaxSize, buffersLength int,
5557
evictionTimeout time.Duration,
58+
ingress, egress bool,
5659
namer flow.InterfaceNamer,
5760
) (*FlowTracer, error) {
5861
if err := rlimit.RemoveMemlock(); err != nil {
@@ -96,6 +99,8 @@ func NewFlowTracer(
9699
flowsEvictor: sync.NewCond(&sync.Mutex{}),
97100
lastEvictionNs: uint64(monotime.Now()),
98101
cacheMaxSize: cacheMaxSize,
102+
enableIngress: ingress,
103+
enableEgress: egress,
99104
}, nil
100105
}
101106

@@ -129,6 +134,23 @@ func (m *FlowTracer) Register(iface ifaces.Interface) error {
129134
}
130135
m.qdiscs[iface] = qdisc
131136

137+
if err := m.registerEgress(iface, ipvlan); err != nil {
138+
return err
139+
}
140+
141+
if err := m.registerIngress(iface, ipvlan); err != nil {
142+
return err
143+
}
144+
145+
return nil
146+
}
147+
148+
func (m *FlowTracer) registerEgress(iface ifaces.Interface, ipvlan netlink.Link) error {
149+
ilog := log.WithField("iface", iface)
150+
if !m.enableEgress {
151+
ilog.Debug("ignoring egress traffic, according to user configuration")
152+
return nil
153+
}
132154
// Fetch events on egress
133155
egressAttrs := netlink.FilterAttrs{
134156
LinkIndex: ipvlan.Attrs().Index,
@@ -146,15 +168,23 @@ func (m *FlowTracer) Register(iface ifaces.Interface) error {
146168
if err := netlink.FilterDel(egressFilter); err == nil {
147169
ilog.Warn("egress filter already existed. Deleted it")
148170
}
149-
if err = netlink.FilterAdd(egressFilter); err != nil {
171+
if err := netlink.FilterAdd(egressFilter); err != nil {
150172
if errors.Is(err, fs.ErrExist) {
151173
ilog.WithError(err).Warn("egress filter already exists. Ignoring")
152174
} else {
153175
return fmt.Errorf("failed to create egress filter: %w", err)
154176
}
155177
}
156178
m.egressFilters[iface] = egressFilter
179+
return nil
180+
}
157181

182+
func (m *FlowTracer) registerIngress(iface ifaces.Interface, ipvlan netlink.Link) error {
183+
ilog := log.WithField("iface", iface)
184+
if !m.enableIngress {
185+
ilog.Debug("ignoring ingress traffic, according to user configuration")
186+
return nil
187+
}
158188
// Fetch events on ingress
159189
ingressAttrs := netlink.FilterAttrs{
160190
LinkIndex: ipvlan.Attrs().Index,
@@ -172,7 +202,7 @@ func (m *FlowTracer) Register(iface ifaces.Interface) error {
172202
if err := netlink.FilterDel(ingressFilter); err == nil {
173203
ilog.Warn("ingress filter already existed. Deleted it")
174204
}
175-
if err = netlink.FilterAdd(ingressFilter); err != nil {
205+
if err := netlink.FilterAdd(ingressFilter); err != nil {
176206
if errors.Is(err, fs.ErrExist) {
177207
ilog.WithError(err).Warn("ingress filter already exists. Ignoring")
178208
} else {
@@ -376,9 +406,9 @@ func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlow
376406
// For synchronization purposes, we get/delete a whole snapshot of the flows map.
377407
// This way we avoid missing packets that could be updated on the
378408
// ebpf side while we process/aggregate them here
379-
// Changing this method invaction by BatchLookupAndDelete could improve performance
409+
// Changing this method invocation by BatchLookupAndDelete could improve performance
380410
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
381-
// Supported Lookup/Delete oprations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
411+
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
382412
func (m *FlowTracer) lookupAndDeleteFlowsMap() map[flow.RecordKey][]flow.RecordMetrics {
383413
flowMap := m.objects.AggregatedFlows
384414

0 commit comments

Comments
 (0)