Skip to content

Commit 66d4247

Browse files
authored
NETOBSERV-889: fix too many colons in address error (#98)
Signed-off-by: msherif1234 <[email protected]>
1 parent 6a074bd commit 66d4247

File tree

8 files changed

+144
-31
lines changed

8 files changed

+144
-31
lines changed

pkg/agent/agent.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,7 @@ func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
192192
return nil, fmt.Errorf("missing target host or port: %s:%d",
193193
cfg.TargetHost, cfg.TargetPort)
194194
}
195-
target := fmt.Sprintf("%s:%d", cfg.TargetHost, cfg.TargetPort)
196-
grpcExporter, err := exporter.StartGRPCProto(target, cfg.GRPCMessageMaxFlows)
195+
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows)
197196
if err != nil {
198197
return nil, err
199198
}
@@ -243,9 +242,7 @@ func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
243242
return nil, fmt.Errorf("missing target host or port: %s:%d",
244243
cfg.TargetHost, cfg.TargetPort)
245244
}
246-
target := fmt.Sprintf("%s:%d", cfg.TargetHost, cfg.TargetPort)
247-
248-
ipfix, err := exporter.StartIPFIXExporter(target, "udp")
245+
ipfix, err := exporter.StartIPFIXExporter(cfg.TargetHost, cfg.TargetPort, "udp")
249246
if err != nil {
250247
return nil, err
251248
}
@@ -255,9 +252,7 @@ func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
255252
return nil, fmt.Errorf("missing target host or port: %s:%d",
256253
cfg.TargetHost, cfg.TargetPort)
257254
}
258-
target := fmt.Sprintf("%s:%d", cfg.TargetHost, cfg.TargetPort)
259-
260-
ipfix, err := exporter.StartIPFIXExporter(target, "tcp")
255+
ipfix, err := exporter.StartIPFIXExporter(cfg.TargetHost, cfg.TargetPort, "tcp")
261256
if err != nil {
262257
return nil, err
263258
}

pkg/agent/ip.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ func fromExternal(ipType string) (net.IP, error) {
8383
// This will just establish an external dialer where we can pickup the external
8484
// host address
8585
addrStr := "8.8.8.8:80"
86-
if ipType == IPTypeIPV6 {
86+
// When IPType is "any" and we have interface with IPv6 address only then use ipv6 dns address
87+
ip, _ := fromLocal(IPTypeIPV4)
88+
if ipType == IPTypeIPV6 || (ipType == IPTypeAny && ip == nil) {
8789
addrStr = "[2001:4860:4860::8888]:80"
8890
}
8991
conn, err := dial("udp", addrStr)

pkg/exporter/grpc_proto.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
77
"github.com/netobserv/netobserv-ebpf-agent/pkg/grpc"
8+
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
89
"github.com/sirupsen/logrus"
910
)
1011

@@ -14,20 +15,22 @@ var glog = logrus.WithField("component", "exporter/GRPCProto")
1415
// by its input channel, converts them to *pbflow.Records instances, and submits
1516
// them to the collector.
1617
type GRPCProto struct {
17-
hostPort string
18+
hostIP string
19+
hostPort int
1820
clientConn *grpc.ClientConnection
1921
// maxFlowsPerMessage limits the maximum number of flows per GRPC message.
2022
// If a message contains more flows than this number, the GRPC message will be split into
2123
// multiple messages.
2224
maxFlowsPerMessage int
2325
}
2426

25-
func StartGRPCProto(hostPort string, maxFlowsPerMessage int) (*GRPCProto, error) {
26-
clientConn, err := grpc.ConnectClient(hostPort)
27+
func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int) (*GRPCProto, error) {
28+
clientConn, err := grpc.ConnectClient(hostIP, hostPort)
2729
if err != nil {
2830
return nil, err
2931
}
3032
return &GRPCProto{
33+
hostIP: hostIP,
3134
hostPort: hostPort,
3235
clientConn: clientConn,
3336
maxFlowsPerMessage: maxFlowsPerMessage,
@@ -37,7 +40,8 @@ func StartGRPCProto(hostPort string, maxFlowsPerMessage int) (*GRPCProto, error)
3740
// ExportFlows accepts slices of *flow.Record by its input channel, converts them
3841
// to *pbflow.Records instances, and submits them to the collector.
3942
func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) {
40-
log := glog.WithField("collector", g.hostPort)
43+
socket := utils.GetSocket(g.hostIP, g.hostPort)
44+
log := glog.WithField("collector", socket)
4145
for inputRecords := range input {
4246
for _, pbRecords := range flowsToPB(inputRecords, g.maxFlowsPerMessage) {
4347
log.Debugf("sending %d records", len(pbRecords.Entries))

pkg/exporter/grpc_proto_test.go

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package exporter
22

33
import (
4-
"fmt"
54
"net"
65
"testing"
76
"time"
@@ -19,7 +18,7 @@ import (
1918

2019
const timeout = 2 * time.Second
2120

22-
func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) {
21+
func TestIPv4GRPCProto_ExportFlows_AgentIP(t *testing.T) {
2322
// start remote ingestor
2423
port, err := test.FreeTCPPort()
2524
require.NoError(t, err)
@@ -29,7 +28,7 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) {
2928
defer coll.Close()
3029

3130
// Start GRPCProto exporter stage
32-
exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port), 1000)
31+
exporter, err := StartGRPCProto("127.0.0.1", port, 1000)
3332
require.NoError(t, err)
3433

3534
// Send some flows to the input of the exporter stage
@@ -61,6 +60,48 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) {
6160
}
6261
}
6362

63+
func TestIPv6GRPCProto_ExportFlows_AgentIP(t *testing.T) {
64+
// start remote ingestor
65+
port, err := test.FreeTCPPort()
66+
require.NoError(t, err)
67+
serverOut := make(chan *pbflow.Records)
68+
coll, err := grpc.StartCollector(port, serverOut)
69+
require.NoError(t, err)
70+
defer coll.Close()
71+
72+
// Start GRPCProto exporter stage
73+
exporter, err := StartGRPCProto("::1", port, 1000)
74+
require.NoError(t, err)
75+
76+
// Send some flows to the input of the exporter stage
77+
flows := make(chan []*flow.Record, 10)
78+
flows <- []*flow.Record{
79+
{AgentIP: net.ParseIP("10.11.12.13")},
80+
}
81+
flows <- []*flow.Record{
82+
{RawRecord: flow.RawRecord{Id: ebpf.BpfFlowId{EthProtocol: flow.IPv6Type}},
83+
AgentIP: net.ParseIP("9999::2222")},
84+
}
85+
go exporter.ExportFlows(flows)
86+
87+
rs := test2.ReceiveTimeout(t, serverOut, timeout)
88+
assert.Len(t, rs.Entries, 1)
89+
r := rs.Entries[0]
90+
assert.EqualValues(t, 0x0a0b0c0d, r.GetAgentIp().GetIpv4())
91+
92+
rs = test2.ReceiveTimeout(t, serverOut, timeout)
93+
assert.Len(t, rs.Entries, 1)
94+
r = rs.Entries[0]
95+
assert.EqualValues(t, net.ParseIP("9999::2222"), r.GetAgentIp().GetIpv6())
96+
97+
select {
98+
case rs = <-serverOut:
99+
assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs)
100+
default:
101+
//ok!
102+
}
103+
}
104+
64105
func TestGRPCProto_SplitLargeMessages(t *testing.T) {
65106
// start remote ingestor
66107
port, err := test.FreeTCPPort()
@@ -72,7 +113,7 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) {
72113

73114
const msgMaxLen = 10000
74115
// Start GRPCProto exporter stage
75-
exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port), msgMaxLen)
116+
exporter, err := StartGRPCProto("127.0.0.1", port, msgMaxLen)
76117
require.NoError(t, err)
77118

78119
// Send a message much longer than the limit length

pkg/exporter/ipfix.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"net"
55

66
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
7+
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
78
"github.com/sirupsen/logrus"
89
"github.com/vmware/go-ipfix/pkg/entities"
910
ipfixExporter "github.com/vmware/go-ipfix/pkg/exporter"
@@ -16,7 +17,8 @@ var ilog = logrus.WithField("component", "exporter/IPFIXProto")
1617
// compatible with OVN-K.
1718

1819
type IPFIX struct {
19-
hostPort string
20+
hostIP string
21+
hostPort int
2022
exporter *ipfixExporter.ExportingProcess
2123
templateIDv4 uint16
2224
templateIDv6 uint16
@@ -199,23 +201,24 @@ func SendTemplateRecordv6(log *logrus.Entry, exporter *ipfixExporter.ExportingPr
199201
}
200202

201203
// Sends out Template record to the IPFIX collector
202-
func StartIPFIXExporter(hostPort string, transportProto string) (*IPFIX, error) {
203-
log := ilog.WithField("collector", hostPort)
204+
func StartIPFIXExporter(hostIP string, hostPort int, transportProto string) (*IPFIX, error) {
205+
socket := utils.GetSocket(hostIP, hostPort)
206+
log := ilog.WithField("collector", socket)
204207

205208
registry.LoadRegistry()
206209
// Create exporter using local server info
207210
input := ipfixExporter.ExporterInput{
208-
CollectorAddress: hostPort,
211+
CollectorAddress: socket,
209212
CollectorProtocol: transportProto,
210213
ObservationDomainID: 1,
211214
TempRefTimeout: 1,
212215
}
213216
exporter, err := ipfixExporter.InitExportingProcess(input)
214217
if err != nil {
215-
log.Fatalf("Got error when connecting to local server %s: %v", hostPort, err)
218+
log.Fatalf("Got error when connecting to local server %s: %v", socket, err)
216219
return nil, err
217220
}
218-
log.Infof("Created exporter connecting to local server with address: %s", hostPort)
221+
log.Infof("Created exporter connecting to local server with address: %s", socket)
219222

220223
templateIDv4, entitiesV4, err := SendTemplateRecordv4(log, exporter)
221224
if err != nil {
@@ -232,6 +235,7 @@ func StartIPFIXExporter(hostPort string, transportProto string) (*IPFIX, error)
232235
log.Infof("entities v6 %+v", entitiesV6)
233236

234237
return &IPFIX{
238+
hostIP: hostIP,
235239
hostPort: hostPort,
236240
exporter: exporter,
237241
templateIDv4: templateIDv4,
@@ -331,7 +335,8 @@ func (ipf *IPFIX) sendDataRecord(log *logrus.Entry, record *flow.Record, v6 bool
331335
// ExportFlows accepts slices of *flow.Record by its input channel, converts them
332336
// to IPFIX Records, and submits them to the collector.
333337
func (ipf *IPFIX) ExportFlows(input <-chan []*flow.Record) {
334-
log := ilog.WithField("collector", ipf.hostPort)
338+
socket := utils.GetSocket(ipf.hostIP, ipf.hostPort)
339+
log := ilog.WithField("collector", socket)
335340
for inputRecords := range input {
336341
for _, record := range inputRecords {
337342
if record.Id.EthProtocol == flow.IPv6Type {

pkg/grpc/client.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package grpc
33

44
import (
55
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
6+
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
67
"google.golang.org/grpc"
78
"google.golang.org/grpc/credentials/insecure"
89
)
@@ -13,9 +14,10 @@ type ClientConnection struct {
1314
conn *grpc.ClientConn
1415
}
1516

16-
func ConnectClient(address string) (*ClientConnection, error) {
17+
func ConnectClient(hostIP string, hostPort int) (*ClientConnection, error) {
1718
// TODO: allow configuring some options (keepalive, backoff...)
18-
conn, err := grpc.Dial(address,
19+
socket := utils.GetSocket(hostIP, hostPort)
20+
conn, err := grpc.Dial(socket,
1921
grpc.WithTransportCredentials(insecure.NewCredentials()))
2022
if err != nil {
2123
return nil, err

pkg/grpc/grpc_test.go

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package grpc
22

33
import (
44
"context"
5-
"fmt"
65
"testing"
76
"time"
87

@@ -22,7 +21,7 @@ func TestGRPCCommunication(t *testing.T) {
2221
serverOut := make(chan *pbflow.Records)
2322
_, err = StartCollector(port, serverOut)
2423
require.NoError(t, err)
25-
cc, err := ConnectClient(fmt.Sprintf("127.0.0.1:%d", port))
24+
cc, err := ConnectClient("127.0.0.1", port)
2625
require.NoError(t, err)
2726
client := cc.Client()
2827

@@ -110,7 +109,7 @@ func TestConstructorOptions(t *testing.T) {
110109
return handler(ctx, req)
111110
})))
112111
require.NoError(t, err)
113-
cc, err := ConnectClient(fmt.Sprintf("127.0.0.1:%d", port))
112+
cc, err := ConnectClient("127.0.0.1", port)
114113
require.NoError(t, err)
115114
client := cc.Client()
116115

@@ -127,14 +126,14 @@ func TestConstructorOptions(t *testing.T) {
127126
}
128127
}
129128

130-
func BenchmarkGRPCCommunication(b *testing.B) {
129+
func BenchmarkIPv4GRPCCommunication(b *testing.B) {
131130
port, err := test.FreeTCPPort()
132131
require.NoError(b, err)
133132
serverOut := make(chan *pbflow.Records, 1000)
134133
collector, err := StartCollector(port, serverOut)
135134
require.NoError(b, err)
136135
defer collector.Close()
137-
cc, err := ConnectClient(fmt.Sprintf("127.0.0.1:%d", port))
136+
cc, err := ConnectClient("127.0.0.1", port)
138137
require.NoError(b, err)
139138
defer cc.Close()
140139
client := cc.Client()
@@ -175,3 +174,52 @@ func BenchmarkGRPCCommunication(b *testing.B) {
175174
<-serverOut
176175
}
177176
}
177+
178+
func BenchmarkIPv6GRPCCommunication(b *testing.B) {
179+
port, err := test.FreeTCPPort()
180+
require.NoError(b, err)
181+
serverOut := make(chan *pbflow.Records, 1000)
182+
collector, err := StartCollector(port, serverOut)
183+
require.NoError(b, err)
184+
defer collector.Close()
185+
cc, err := ConnectClient("::1", port)
186+
require.NoError(b, err)
187+
defer cc.Close()
188+
client := cc.Client()
189+
190+
f := &pbflow.Record{
191+
EthProtocol: 2048,
192+
Bytes: 456,
193+
Flags: 1,
194+
Direction: pbflow.Direction_EGRESS,
195+
TimeFlowStart: timestamppb.Now(),
196+
TimeFlowEnd: timestamppb.Now(),
197+
Network: &pbflow.Network{
198+
SrcAddr: &pbflow.IP{
199+
IpFamily: &pbflow.IP_Ipv6{Ipv6: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}},
200+
},
201+
DstAddr: &pbflow.IP{
202+
IpFamily: &pbflow.IP_Ipv6{Ipv6: []byte{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 24, 24, 25, 26}},
203+
},
204+
},
205+
DataLink: &pbflow.DataLink{
206+
DstMac: 0x112233445566,
207+
SrcMac: 0x665544332211,
208+
},
209+
Transport: &pbflow.Transport{
210+
Protocol: 1,
211+
SrcPort: 23000,
212+
DstPort: 443,
213+
},
214+
}
215+
records := &pbflow.Records{}
216+
for i := 0; i < 100; i++ {
217+
records.Entries = append(records.Entries, f)
218+
}
219+
for i := 0; i < b.N; i++ {
220+
if _, err := client.Send(context.Background(), records); err != nil {
221+
require.Fail(b, "error", err)
222+
}
223+
<-serverOut
224+
}
225+
}

pkg/utils/utils.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package utils
2+
3+
import (
4+
"fmt"
5+
"net"
6+
)
7+
8+
// GetSocket returns socket string in the correct format based on address family
9+
func GetSocket(hostIP string, hostPort int) string {
10+
socket := fmt.Sprintf("%s:%d", hostIP, hostPort)
11+
ipAddr := net.ParseIP(hostIP)
12+
if ipAddr != nil && ipAddr.To4() == nil {
13+
socket = fmt.Sprintf("[%s]:%d", hostIP, hostPort)
14+
}
15+
return socket
16+
}

0 commit comments

Comments
 (0)