Skip to content

Commit 3fd4695

Browse files
author
Mario Macias
authored
NETOBSERV-333: allow overriding default GRPC options (#31)
1 parent 32b6878 commit 3fd4695

File tree

2 files changed

+58
-3
lines changed

2 files changed

+58
-3
lines changed

pkg/grpc/grpc_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
1111
"github.com/stretchr/testify/assert"
1212
"github.com/stretchr/testify/require"
13+
"google.golang.org/grpc"
1314
"google.golang.org/protobuf/types/known/timestamppb"
1415
)
1516

@@ -84,6 +85,40 @@ func TestGRPCCommunication(t *testing.T) {
8485
}
8586
}
8687

88+
func TestConstructorOptions(t *testing.T) {
89+
port, err := test.FreeTCPPort()
90+
require.NoError(t, err)
91+
intercepted := make(chan struct{})
92+
// Override the default GRPC collector to verify that StartCollector is applying the
93+
// passed options
94+
_, err = StartCollector(port, make(chan *pbflow.Records),
95+
WithGRPCServerOptions(grpc.UnaryInterceptor(func(
96+
ctx context.Context,
97+
req interface{},
98+
info *grpc.UnaryServerInfo,
99+
handler grpc.UnaryHandler,
100+
) (resp interface{}, err error) {
101+
close(intercepted)
102+
return handler(ctx, req)
103+
})))
104+
require.NoError(t, err)
105+
cc, err := ConnectClient(fmt.Sprintf("127.0.0.1:%d", port))
106+
require.NoError(t, err)
107+
client := cc.Client()
108+
109+
go func() {
110+
_, err = client.Send(context.Background(),
111+
&pbflow.Records{Entries: []*pbflow.Record{{EthProtocol: 123, Bytes: 456}}})
112+
require.NoError(t, err)
113+
}()
114+
115+
select {
116+
case <-intercepted:
117+
case <-time.After(timeout):
118+
require.Fail(t, "timeout waiting for unary interceptor to work")
119+
}
120+
}
121+
87122
func BenchmarkGRPCCommunication(b *testing.B) {
88123
port, err := test.FreeTCPPort()
89124
require.NoError(b, err)

pkg/grpc/server.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,35 @@ type CollectorServer struct {
1616
grpcServer *grpc.Server
1717
}
1818

19+
type collectorOptions struct {
20+
grpcServerOptions []grpc.ServerOption
21+
}
22+
23+
// CollectorOption allows overriding the default configuration of the CollectorServer instance.
24+
// Use them in the StartCollector function.
25+
type CollectorOption func(options *collectorOptions)
26+
27+
func WithGRPCServerOptions(options ...grpc.ServerOption) CollectorOption {
28+
return func(copt *collectorOptions) {
29+
copt.grpcServerOptions = options
30+
}
31+
}
32+
1933
// StartCollector listens in background for gRPC+Protobuf flows in the given port, and forwards each
2034
// set of *pbflow.Records by the provided channel.
21-
func StartCollector(port int, recordForwarder chan<- *pbflow.Records) (*CollectorServer, error) {
35+
func StartCollector(
36+
port int, recordForwarder chan<- *pbflow.Records, options ...CollectorOption,
37+
) (*CollectorServer, error) {
38+
copts := collectorOptions{}
39+
for _, opt := range options {
40+
opt(&copts)
41+
}
42+
2243
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
2344
if err != nil {
2445
return nil, err
2546
}
26-
// TODO: set server option arguments
27-
grpcServer := grpc.NewServer()
47+
grpcServer := grpc.NewServer(copts.grpcServerOptions...)
2848
pbflow.RegisterCollectorServer(grpcServer, &collectorAPI{
2949
recordForwarder: recordForwarder,
3050
})

0 commit comments

Comments
 (0)