Skip to content

Commit 170bbb2

Browse files
committed
improved grpc implementation + documentation update
1 parent 346004b commit 170bbb2

File tree

6 files changed

+448
-47
lines changed

6 files changed

+448
-47
lines changed

docs/advanced-guide/grpc/page.md

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,31 @@ func authInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, h
194194
}
195195
```
196196

197+
## Adding Custom Stream interceptors
198+
199+
For streaming RPCs (client-stream, server-stream, or bidirectional), GoFr allows you to add stream interceptors using AddGRPCServerStreamInterceptors. These are useful for handling logic that needs to span the entire lifetime of a stream.
200+
```go
201+
func main() {
202+
app := gofr.New()
203+
204+
app.AddGRPCServerStreamInterceptors(streamAuthInterceptor)
205+
206+
// ... register your service
207+
app.Run()
208+
}
209+
210+
func streamAuthInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
211+
// Example: Validate metadata for the entire stream
212+
md, ok := metadata.FromIncomingContext(ss.Context())
213+
if !ok || !isValidToken(md["auth-token"]) {
214+
return status.Errorf(codes.Unauthenticated, "invalid stream token")
215+
}
216+
217+
// If valid, continue processing the stream
218+
return handler(srv, ss)
219+
}
220+
```
221+
197222
For more details on adding additional interceptors and server options, refer to the [official gRPC Go package](https://pkg.go.dev/google.golang.org/grpc#ServerOption).
198223

199224
## Generating gRPC Client using `gofr wrap grpc client`
@@ -229,7 +254,48 @@ func {serviceMethod}(ctx *gofr.Context) (*{serviceResponse}, error) {
229254
return res, nil
230255
}
231256
```
257+
## Error Handling and Validation
258+
GoFr's gRPC implementation includes built-in error handling and validation:
259+
260+
**Port Validation**: Automatically validates that gRPC ports are within valid range (1-65535)
261+
**Port Availability**: Checks if the specified port is available before starting the server
262+
**Server Creation**: Validates server creation and provides detailed error messages
263+
**Container Injection**: Validates container injection into gRPC services with detailed logging
264+
265+
Port Configuration
266+
```bash
267+
// Set custom gRPC port in .env file
268+
GRPC_PORT=9001
269+
270+
// Or use default port 9000 if not specified
271+
```
272+
## gRPC Reflection
273+
GoFr supports gRPC reflection for easier debugging and testing. Enable it using the configuration:
274+
```bash
275+
# In your .env file
276+
GRPC_ENABLE_REFLECTION=true
277+
```
278+
When enabled, you can use tools like grpcurl to inspect and test your gRPC services:
279+
280+
```bash
281+
# List available services
282+
grpcurl -plaintext localhost:9000 list
283+
284+
# Describe a service
285+
grpcurl -plaintext localhost:9000 describe YourService
286+
287+
# Make a test call
288+
grpcurl -plaintext -d '{"name": "test"}' localhost:9000 YourService/YourMethod
289+
```
290+
291+
## Built-in Metrics
292+
GoFr automatically registers the following gRPC metrics:
293+
294+
+ **grpc_server_status**: Gauge indicating server status (1=running, 0=stopped)
295+
+ **grpc_server_errors_total**: Counter for total gRPC server errors
296+
+ **grpc_services_registered_total**: Counter for total registered gRPC services
232297
298+
These metrics are automatically available in your metrics endpoint and can be used for monitoring and alerting.
233299
234300
## Customizing gRPC Client with DialOptions
235301

pkg/gofr/factory.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,10 @@ func New() *App {
5757
port = defaultGRPCPort
5858
}
5959

60-
app.grpcServer = newGRPCServer(app.container, port, app.Config)
60+
app.grpcServer, err = newGRPCServer(app.container, port, app.Config)
61+
if err != nil {
62+
app.container.Logger.Fatalf("failed to create gRPC server: %v", err)
63+
}
6164

6265
app.subscriptionManager = newSubscriptionManager(app.container)
6366

@@ -78,7 +81,7 @@ func NewCMD() *App {
7881
app.readConfig(true)
7982
app.container = container.NewContainer(nil)
8083
app.container.Logger = logging.NewFileLogger(app.Config.Get("CMD_LOGS_FILE"))
81-
84+
8285
app.cmd = &cmd{
8386
out: terminal.New(),
8487
}

pkg/gofr/grpc.go

Lines changed: 77 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package gofr
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"net"
78
"reflect"
89
"strconv"
@@ -42,10 +43,15 @@ type grpcServer struct {
4243
// This function accepts a variadic list of gRPC server options (grpc.ServerOption) and appends them
4344
// to the server's configuration. It allows fine-tuning of the gRPC server's behavior during its initialization.
4445
func (a *App) AddGRPCServerOptions(grpcOpts ...grpc.ServerOption) {
46+
if len(grpcOpts) == 0 {
47+
a.container.Logger.Debug("no gRPC server options provided")
48+
return
49+
}
50+
a.container.Logger.Debugf("adding %d gRPC server options", len(grpcOpts))
4551
a.grpcServer.options = append(a.grpcServer.options, grpcOpts...)
4652
}
4753

48-
// AddGRPCUnaryInterceptors allows users to add custom gRPC interceptors.
54+
// AddGRPCUnaryInterceptors allows users to add custom gRPC interceptors.
4955
// Example:
5056
//
5157
// func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
@@ -55,14 +61,34 @@ func (a *App) AddGRPCServerOptions(grpcOpts ...grpc.ServerOption) {
5561
// }
5662
// app.AddGRPCUnaryInterceptors(loggingInterceptor)
5763
func (a *App) AddGRPCUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) {
58-
a.grpcServer.interceptors = append(a.grpcServer.interceptors, interceptors...)
64+
if len(interceptors) == 0 {
65+
a.container.Logger.Debug("no unary interceptors provided")
66+
return
67+
}
68+
69+
a.container.Logger.Debugf("adding %d valid unary interceptors", len(interceptors))
70+
a.grpcServer.interceptors = append(a.grpcServer.interceptors, interceptors...)
5971
}
6072

6173
func (a *App) AddGRPCServerStreamInterceptors(interceptors ...grpc.StreamServerInterceptor) {
74+
if len(interceptors) == 0 {
75+
a.container.Logger.Debug("no stream interceptors provided")
76+
return
77+
}
78+
79+
a.container.Logger.Debugf("adding %d stream interceptors", len(interceptors))
6280
a.grpcServer.streamInterceptors = append(a.grpcServer.streamInterceptors, interceptors...)
6381
}
6482

65-
func newGRPCServer(c *container.Container, port int, cfg config.Config) *grpcServer {
83+
func newGRPCServer(c *container.Container, port int, cfg config.Config) (*grpcServer, error) {
84+
if port <= 0 || port > 65535 {
85+
return nil, fmt.Errorf("invalid port number: %d", port)
86+
}
87+
88+
c.Logger.Infof("creating new gRPC server on port %d", port)
89+
90+
registerGRPCMetrics(c)
91+
6692
middleware := make([]grpc.UnaryServerInterceptor, 0)
6793
middleware = append(middleware,
6894
grpc_recovery.UnaryServerInterceptor(),
@@ -72,32 +98,51 @@ func newGRPCServer(c *container.Container, port int, cfg config.Config) *grpcSer
7298
streamMiddleware = append(streamMiddleware,
7399
grpc_recovery.StreamServerInterceptor(),
74100
gofr_grpc.StreamObservabilityInterceptor(c.Logger, c.Metrics()))
75-
101+
102+
c.Logger.Debugf("gRPC server created successfully on port %d", port)
103+
76104
return &grpcServer{
77105
port: port,
78106
interceptors: middleware,
79107
streamInterceptors: streamMiddleware,
80108
config: cfg,
81-
}
109+
}, nil
110+
}
111+
112+
// registerGRPCMetrics registers essential gRPC metrics
113+
func registerGRPCMetrics(c *container.Container) {
114+
c.Metrics().NewGauge("grpc_server_status", "gRPC server status (1=running, 0=stopped)")
115+
c.Metrics().NewCounter("grpc_server_errors_total", "Total gRPC server errors")
116+
c.Metrics().NewCounter("grpc_services_registered_total", "Total gRPC services registered")
82117
}
83118

84-
func (g *grpcServer) createServer() {
119+
func (g *grpcServer) createServer() error{
85120
interceptorOption := grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(g.interceptors...))
86121
streamOpt := grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(g.streamInterceptors...))
87122
g.options = append(g.options, interceptorOption, streamOpt)
88123

89124
g.server = grpc.NewServer(g.options...)
125+
if g.server == nil {
126+
return errors.New("failed to create gRPC server")
127+
}
90128

91129
enabled := strings.ToLower(g.config.GetOrDefault("GRPC_ENABLE_REFLECTION", "false"))
92-
if enabled != defaultReflection {
93-
reflection.Register(g.server)
94-
}
130+
if enabled == "true" {
131+
reflection.Register(g.server)
132+
}
133+
134+
return nil
95135
}
96136

97137
func (g *grpcServer) Run(c *container.Container) {
138+
98139
if g.server == nil {
99-
g.createServer()
100-
}
140+
if err := g.createServer(); err != nil {
141+
c.Logger.Fatalf("failed to create gRPC server: %v", err)
142+
c.Metrics().NewCounter("grpc_server_errors_total", "").Inc()
143+
return
144+
}
145+
}
101146

102147
addr := ":" + strconv.Itoa(g.port)
103148

@@ -106,13 +151,22 @@ func (g *grpcServer) Run(c *container.Container) {
106151
listener, err := net.Listen("tcp", addr)
107152
if err != nil {
108153
c.Logger.Errorf("error in starting gRPC server at %s: %s", addr, err)
154+
c.Metrics().NewCounter("grpc_server_errors_total", "").Inc()
155+
c.Metrics().NewGauge("grpc_server_status", "").Set(0)
109156
return
110157
}
158+
159+
c.Metrics().NewGauge("grpc_server_status", "").Set(1)
111160

112161
if err := g.server.Serve(listener); err != nil {
113162
c.Logger.Errorf("error in starting gRPC server at %s: %s", addr, err)
163+
c.Metrics().NewCounter("grpc_server_errors_total", "").Inc()
164+
c.Metrics().NewGauge("grpc_server_status", "").Set(0)
114165
return
115166
}
167+
168+
c.Logger.Infof("gRPC server stopped on %s", addr)
169+
c.Metrics().NewGauge("grpc_server_status", "").Set(0)
116170
}
117171

118172
func (g *grpcServer) Shutdown(ctx context.Context) error {
@@ -130,28 +184,35 @@ func (g *grpcServer) Shutdown(ctx context.Context) error {
130184
}
131185

132186
var (
133-
errNonAddressable = errors.New("cannot inject container as it is not addressable or is fail")
187+
errNonAddressable = errors.New("cannot inject container as it is not addressable or is nil")
134188
)
135189

136190
// RegisterService adds a gRPC service to the GoFr application.
137191
func (a *App) RegisterService(desc *grpc.ServiceDesc, impl any) {
192+
138193
if !a.grpcRegistered && !isPortAvailable(a.grpcServer.port) {
139194
a.container.Logger.Fatalf("gRPC port %d is blocked or unreachable", a.grpcServer.port)
140195
}
141196

142-
if !a.grpcRegistered {
143-
a.grpcServer.createServer()
144-
}
197+
if !a.grpcRegistered {
198+
if err := a.grpcServer.createServer(); err != nil {
199+
a.container.Logger.Errorf("failed to create gRPC server for service %s: %v", desc.ServiceName, err)
200+
return
201+
}
202+
}
145203

146204
a.container.Logger.Infof("registering gRPC Server: %s", desc.ServiceName)
147205
a.grpcServer.server.RegisterService(desc, impl)
148206

207+
a.container.Metrics().NewCounter("grpc_services_registered_total", "").Inc()
208+
149209
err := injectContainer(impl, a.container)
150210
if err != nil {
151-
return
211+
a.container.Logger.Fatalf("failed to inject container into gRPC service %s: %v", desc.ServiceName, err)
152212
}
153213

154214
a.grpcRegistered = true
215+
a.container.Logger.Debugf("successfully registered gRPC service: %s", desc.ServiceName)
155216
}
156217

157218
func injectContainer(impl any, c *container.Container) error {

0 commit comments

Comments
 (0)