@@ -3,6 +3,7 @@ package gofr
3
3
import (
4
4
"context"
5
5
"errors"
6
+ "fmt"
6
7
"net"
7
8
"reflect"
8
9
"strconv"
@@ -27,6 +28,12 @@ type grpcServer struct {
27
28
config config.Config
28
29
}
29
30
31
+ var (
32
+ ErrNonAddressable = errors .New ("cannot inject container as it is not addressable or is nil" )
33
+ ErrInvalidPort = errors .New ("invalid port number" )
34
+ ErrFailedCreateServer = errors .New ("failed to create gRPC server" )
35
+ )
36
+
30
37
// AddGRPCServerOptions allows users to add custom gRPC server options such as TLS configuration,
31
38
// timeouts, interceptors, and other server-specific settings in a single call.
32
39
//
@@ -42,6 +49,12 @@ type grpcServer struct {
42
49
// This function accepts a variadic list of gRPC server options (grpc.ServerOption) and appends them
43
50
// to the server's configuration. It allows fine-tuning of the gRPC server's behavior during its initialization.
44
51
func (a * App ) AddGRPCServerOptions (grpcOpts ... grpc.ServerOption ) {
52
+ if len (grpcOpts ) == 0 {
53
+ a .container .Logger .Debug ("no gRPC server options provided" )
54
+ return
55
+ }
56
+
57
+ a .container .Logger .Debugf ("adding %d gRPC server options" , len (grpcOpts ))
45
58
a .grpcServer .options = append (a .grpcServer .options , grpcOpts ... )
46
59
}
47
60
@@ -55,14 +68,32 @@ func (a *App) AddGRPCServerOptions(grpcOpts ...grpc.ServerOption) {
55
68
// }
56
69
// app.AddGRPCUnaryInterceptors(loggingInterceptor)
57
70
func (a * App ) AddGRPCUnaryInterceptors (interceptors ... grpc.UnaryServerInterceptor ) {
71
+ if len (interceptors ) == 0 {
72
+ a .container .Logger .Debug ("no unary interceptors provided" )
73
+ return
74
+ }
75
+
76
+ a .container .Logger .Debugf ("adding %d valid unary interceptors" , len (interceptors ))
58
77
a .grpcServer .interceptors = append (a .grpcServer .interceptors , interceptors ... )
59
78
}
60
79
61
80
func (a * App ) AddGRPCServerStreamInterceptors (interceptors ... grpc.StreamServerInterceptor ) {
81
+ if len (interceptors ) == 0 {
82
+ a .container .Logger .Debug ("no stream interceptors provided" )
83
+ return
84
+ }
85
+
86
+ a .container .Logger .Debugf ("adding %d stream interceptors" , len (interceptors ))
62
87
a .grpcServer .streamInterceptors = append (a .grpcServer .streamInterceptors , interceptors ... )
63
88
}
64
89
65
- func newGRPCServer (c * container.Container , port int , cfg config.Config ) * grpcServer {
90
+ func newGRPCServer (c * container.Container , port int , cfg config.Config ) (* grpcServer , error ) {
91
+ if port <= 0 || port > 65535 {
92
+ return nil , fmt .Errorf ("%w: %d" , ErrInvalidPort , port )
93
+ }
94
+
95
+ registerGRPCMetrics (c )
96
+
66
97
middleware := make ([]grpc.UnaryServerInterceptor , 0 )
67
98
middleware = append (middleware ,
68
99
grpc_recovery .UnaryServerInterceptor (),
@@ -78,41 +109,78 @@ func newGRPCServer(c *container.Container, port int, cfg config.Config) *grpcSer
78
109
interceptors : middleware ,
79
110
streamInterceptors : streamMiddleware ,
80
111
config : cfg ,
81
- }
112
+ }, nil
82
113
}
83
114
84
- func (g * grpcServer ) createServer () {
115
+ // registerGRPCMetrics registers essential gRPC metrics.
116
+ func registerGRPCMetrics (c * container.Container ) {
117
+ c .Metrics ().NewGauge ("grpc_server_status" , "gRPC server status (1=running, 0=stopped)" )
118
+ c .Metrics ().NewCounter ("grpc_server_errors_total" , "Total gRPC server errors" )
119
+ c .Metrics ().NewCounter ("grpc_services_registered_total" , "Total gRPC services registered" )
120
+ }
121
+
122
+ func (g * grpcServer ) createServer () error {
85
123
interceptorOption := grpc .UnaryInterceptor (grpc_middleware .ChainUnaryServer (g .interceptors ... ))
86
124
streamOpt := grpc .StreamInterceptor (grpc_middleware .ChainStreamServer (g .streamInterceptors ... ))
87
125
g .options = append (g .options , interceptorOption , streamOpt )
88
126
89
127
g .server = grpc .NewServer (g .options ... )
128
+ if g .server == nil {
129
+ return ErrFailedCreateServer
130
+ }
90
131
91
132
enabled := strings .ToLower (g .config .GetOrDefault ("GRPC_ENABLE_REFLECTION" , "false" ))
92
- if enabled != defaultReflection {
133
+ if enabled == defaultTelemetry {
93
134
reflection .Register (g .server )
94
135
}
136
+
137
+ return nil
95
138
}
96
139
97
140
func (g * grpcServer ) Run (c * container.Container ) {
98
141
if g .server == nil {
99
- g .createServer ()
142
+ if err := g .createServer (); err != nil {
143
+ c .Logger .Fatalf ("failed to create gRPC server: %v" , err )
144
+ c .Metrics ().IncrementCounter (context .Background (), "grpc_server_errors_total" )
145
+
146
+ return
147
+ }
148
+ }
149
+
150
+ if ! isPortAvailable (g .port ) {
151
+ c .Logger .Fatalf ("gRPC port %d is blocked or unreachable" , g .port )
152
+ c .Metrics ().IncrementCounter (context .Background (), "grpc_server_errors_total" )
153
+ c .Metrics ().SetGauge ("grpc_server_status" , 0 )
154
+
155
+ return
100
156
}
101
157
102
158
addr := ":" + strconv .Itoa (g .port )
103
159
104
160
c .Logger .Infof ("starting gRPC server at %s" , addr )
105
161
106
- listener , err := net .Listen ("tcp" , addr )
162
+ listener , err := ( & net.ListenConfig {}). Listen (context . Background (), "tcp" , addr )
107
163
if err != nil {
108
164
c .Logger .Errorf ("error in starting gRPC server at %s: %s" , addr , err )
165
+ c .Metrics ().IncrementCounter (context .Background (), "grpc_server_errors_total" )
166
+ c .Metrics ().SetGauge ("grpc_server_status" , 0 )
167
+
109
168
return
110
169
}
111
170
171
+ c .Metrics ().SetGauge ("grpc_server_status" , 1 )
172
+ c .Logger .Infof ("gRPC server started successfully on %s" , addr )
173
+
112
174
if err := g .server .Serve (listener ); err != nil {
113
175
c .Logger .Errorf ("error in starting gRPC server at %s: %s" , addr , err )
176
+ c .Metrics ().IncrementCounter (context .Background (), "grpc_server_errors_total" )
177
+ c .Metrics ().SetGauge ("grpc_server_status" , 0 )
178
+
114
179
return
115
180
}
181
+
182
+ c .Logger .Infof ("gRPC server stopped on %s" , addr )
183
+ c .Metrics ().SetGauge ("grpc_server_status" , 0 )
116
184
}
117
185
118
186
func (g * grpcServer ) Shutdown (ctx context.Context ) error {
@@ -129,29 +197,27 @@ func (g *grpcServer) Shutdown(ctx context.Context) error {
129
197
})
130
198
}
131
199
132
- var (
133
- errNonAddressable = errors .New ("cannot inject container as it is not addressable or is fail" )
134
- )
135
-
136
200
// RegisterService adds a gRPC service to the GoFr application.
137
201
func (a * App ) RegisterService (desc * grpc.ServiceDesc , impl any ) {
138
- if ! a .grpcRegistered && ! isPortAvailable (a .grpcServer .port ) {
139
- a .container .Logger .Fatalf ("gRPC port %d is blocked or unreachable" , a .grpcServer .port )
140
- }
141
-
142
202
if ! a .grpcRegistered {
143
- a .grpcServer .createServer ()
203
+ if err := a .grpcServer .createServer (); err != nil {
204
+ a .container .Logger .Errorf ("failed to create gRPC server for service %s: %v" , desc .ServiceName , err )
205
+ return
206
+ }
144
207
}
145
208
146
- a .container .Logger .Infof ("registering gRPC Server : %s" , desc .ServiceName )
209
+ a .container .Logger .Infof ("registering gRPC Service : %s" , desc .ServiceName )
147
210
a .grpcServer .server .RegisterService (desc , impl )
148
211
212
+ a .container .Metrics ().IncrementCounter (context .Background (), "grpc_services_registered_total" )
213
+
149
214
err := injectContainer (impl , a .container )
150
215
if err != nil {
151
- return
216
+ a . container . Logger . Fatalf ( "failed to inject container into gRPC service %s: %v" , desc . ServiceName , err )
152
217
}
153
218
154
219
a .grpcRegistered = true
220
+ a .container .Logger .Infof ("successfully registered gRPC service: %s" , desc .ServiceName )
155
221
}
156
222
157
223
func injectContainer (impl any , c * container.Container ) error {
@@ -177,8 +243,8 @@ func injectContainer(impl any, c *container.Container) error {
177
243
178
244
if f .Type == reflect .TypeOf (c ) {
179
245
if ! v .CanSet () {
180
- c .Logger .Error (errNonAddressable )
181
- return errNonAddressable
246
+ c .Logger .Error (ErrNonAddressable )
247
+ return ErrNonAddressable
182
248
}
183
249
184
250
v .Set (reflect .ValueOf (c ))
@@ -189,8 +255,8 @@ func injectContainer(impl any, c *container.Container) error {
189
255
190
256
if f .Type == reflect .TypeOf (* c ) {
191
257
if ! v .CanSet () {
192
- c .Logger .Error (errNonAddressable )
193
- return errNonAddressable
258
+ c .Logger .Error (ErrNonAddressable )
259
+ return ErrNonAddressable
194
260
}
195
261
196
262
v .Set (reflect .ValueOf (* c ))
@@ -202,3 +268,11 @@ func injectContainer(impl any, c *container.Container) error {
202
268
203
269
return nil
204
270
}
271
+
272
+ func (g * grpcServer ) addServerOptions (opts ... grpc.ServerOption ) {
273
+ g .options = append (g .options , opts ... )
274
+ }
275
+
276
+ func (g * grpcServer ) addUnaryInterceptors (interceptors ... grpc.UnaryServerInterceptor ) {
277
+ g .interceptors = append (g .interceptors , interceptors ... )
278
+ }
0 commit comments