@@ -28,6 +28,12 @@ type grpcServer struct {
28
28
config config.Config
29
29
}
30
30
31
+ var (
32
+ ErrNonAddressable = errors .New ("cannot inject container as it is not addressable or is nil" )
33
+ ErrInvalidPort = errors .New ("invalid port number" )
34
+ ErrFailedCreateServer = errors .New ("failed to create gRPC server" )
35
+ )
36
+
31
37
// AddGRPCServerOptions allows users to add custom gRPC server options such as TLS configuration,
32
38
// timeouts, interceptors, and other server-specific settings in a single call.
33
39
//
@@ -44,14 +50,15 @@ type grpcServer struct {
44
50
// to the server's configuration. It allows fine-tuning of the gRPC server's behavior during its initialization.
45
51
func (a * App ) AddGRPCServerOptions (grpcOpts ... grpc.ServerOption ) {
46
52
if len (grpcOpts ) == 0 {
47
- a .container .Logger .Debug ("no gRPC server options provided" )
48
- return
49
- }
53
+ a .container .Logger .Debug ("no gRPC server options provided" )
54
+ return
55
+ }
56
+
50
57
a .container .Logger .Debugf ("adding %d gRPC server options" , len (grpcOpts ))
51
58
a .grpcServer .options = append (a .grpcServer .options , grpcOpts ... )
52
59
}
53
60
54
- // AddGRPCUnaryInterceptors allows users to add custom gRPC interceptors.
61
+ // AddGRPCUnaryInterceptors allows users to add custom gRPC interceptors.
55
62
// Example:
56
63
//
57
64
// func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
@@ -62,12 +69,12 @@ func (a *App) AddGRPCServerOptions(grpcOpts ...grpc.ServerOption) {
62
69
// app.AddGRPCUnaryInterceptors(loggingInterceptor)
63
70
func (a * App ) AddGRPCUnaryInterceptors (interceptors ... grpc.UnaryServerInterceptor ) {
64
71
if len (interceptors ) == 0 {
65
- a .container .Logger .Debug ("no unary interceptors provided" )
66
- return
67
- }
68
-
72
+ a .container .Logger .Debug ("no unary interceptors provided" )
73
+ return
74
+ }
75
+
69
76
a .container .Logger .Debugf ("adding %d valid unary interceptors" , len (interceptors ))
70
- a .grpcServer .interceptors = append (a .grpcServer .interceptors , interceptors ... )
77
+ a .grpcServer .interceptors = append (a .grpcServer .interceptors , interceptors ... )
71
78
}
72
79
73
80
func (a * App ) AddGRPCServerStreamInterceptors (interceptors ... grpc.StreamServerInterceptor ) {
@@ -81,12 +88,12 @@ func (a *App) AddGRPCServerStreamInterceptors(interceptors ...grpc.StreamServerI
81
88
}
82
89
83
90
func newGRPCServer (c * container.Container , port int , cfg config.Config ) (* grpcServer , error ) {
84
- if port <= 0 || port > 65535 {
85
- return nil , fmt .Errorf ("invalid port number : %d" , port )
86
- }
91
+ if port <= 0 || port > 65535 {
92
+ return nil , fmt .Errorf ("%w : %d" , ErrInvalidPort , port )
93
+ }
87
94
88
95
c .Logger .Infof ("creating new gRPC server on port %d" , port )
89
-
96
+
90
97
registerGRPCMetrics (c )
91
98
92
99
middleware := make ([]grpc.UnaryServerInterceptor , 0 )
@@ -98,9 +105,9 @@ func newGRPCServer(c *container.Container, port int, cfg config.Config) (*grpcSe
98
105
streamMiddleware = append (streamMiddleware ,
99
106
grpc_recovery .StreamServerInterceptor (),
100
107
gofr_grpc .StreamObservabilityInterceptor (c .Logger , c .Metrics ()))
101
-
108
+
102
109
c .Logger .Debugf ("gRPC server created successfully on port %d" , port )
103
-
110
+
104
111
return & grpcServer {
105
112
port : port ,
106
113
interceptors : middleware ,
@@ -109,62 +116,64 @@ func newGRPCServer(c *container.Container, port int, cfg config.Config) (*grpcSe
109
116
}, nil
110
117
}
111
118
112
- // registerGRPCMetrics registers essential gRPC metrics
119
+ // registerGRPCMetrics registers essential gRPC metrics.
113
120
func registerGRPCMetrics (c * container.Container ) {
114
121
c .Metrics ().NewGauge ("grpc_server_status" , "gRPC server status (1=running, 0=stopped)" )
115
122
c .Metrics ().NewCounter ("grpc_server_errors_total" , "Total gRPC server errors" )
116
123
c .Metrics ().NewCounter ("grpc_services_registered_total" , "Total gRPC services registered" )
117
124
}
118
125
119
- func (g * grpcServer ) createServer () error {
126
+ func (g * grpcServer ) createServer () error {
120
127
interceptorOption := grpc .UnaryInterceptor (grpc_middleware .ChainUnaryServer (g .interceptors ... ))
121
128
streamOpt := grpc .StreamInterceptor (grpc_middleware .ChainStreamServer (g .streamInterceptors ... ))
122
129
g .options = append (g .options , interceptorOption , streamOpt )
123
130
124
131
g .server = grpc .NewServer (g .options ... )
125
132
if g .server == nil {
126
- return errors . New ( "failed to create gRPC server" )
127
- }
133
+ return ErrFailedCreateServer
134
+ }
128
135
129
136
enabled := strings .ToLower (g .config .GetOrDefault ("GRPC_ENABLE_REFLECTION" , "false" ))
130
- if enabled == "true" {
131
- reflection .Register (g .server )
132
- }
137
+ if enabled == defaultTelemetry {
138
+ reflection .Register (g .server )
139
+ }
133
140
134
141
return nil
135
142
}
136
143
137
144
func (g * grpcServer ) Run (c * container.Container ) {
138
-
139
145
if g .server == nil {
140
- if err := g .createServer (); err != nil {
141
- c .Logger .Fatalf ("failed to create gRPC server: %v" , err )
146
+ if err := g .createServer (); err != nil {
147
+ c .Logger .Fatalf ("failed to create gRPC server: %v" , err )
142
148
c .Metrics ().IncrementCounter (context .Background (), "grpc_server_errors_total" )
143
- return
144
- }
145
- }
149
+
150
+ return
151
+ }
152
+ }
146
153
147
154
addr := ":" + strconv .Itoa (g .port )
148
155
149
156
c .Logger .Infof ("starting gRPC server at %s" , addr )
150
157
151
- listener , err := net .Listen ("tcp" , addr )
158
+ listener , err := ( & net.ListenConfig {}). Listen (context . Background (), "tcp" , addr )
152
159
if err != nil {
153
160
c .Logger .Errorf ("error in starting gRPC server at %s: %s" , addr , err )
154
161
c .Metrics ().IncrementCounter (context .Background (), "grpc_server_errors_total" )
155
162
c .Metrics ().SetGauge ("grpc_server_status" , 0 )
163
+
156
164
return
157
165
}
158
-
166
+
159
167
c .Metrics ().SetGauge ("grpc_server_status" , 1 )
160
168
161
169
if err := g .server .Serve (listener ); err != nil {
162
170
c .Logger .Errorf ("error in starting gRPC server at %s: %s" , addr , err )
163
171
c .Metrics ().IncrementCounter (context .Background (), "grpc_server_errors_total" )
164
172
c .Metrics ().SetGauge ("grpc_server_status" , 0 )
173
+
165
174
return
166
175
}
167
-
176
+
168
177
c .Logger .Infof ("gRPC server stopped on %s" , addr )
169
178
c .Metrics ().SetGauge ("grpc_server_status" , 0 )
170
179
}
@@ -183,27 +192,22 @@ func (g *grpcServer) Shutdown(ctx context.Context) error {
183
192
})
184
193
}
185
194
186
- var (
187
- errNonAddressable = errors .New ("cannot inject container as it is not addressable or is nil" )
188
- )
189
-
190
195
// RegisterService adds a gRPC service to the GoFr application.
191
196
func (a * App ) RegisterService (desc * grpc.ServiceDesc , impl any ) {
192
-
193
197
if ! a .grpcRegistered && ! isPortAvailable (a .grpcServer .port ) {
194
198
a .container .Logger .Fatalf ("gRPC port %d is blocked or unreachable" , a .grpcServer .port )
195
199
}
196
200
197
- if ! a .grpcRegistered {
198
- if err := a .grpcServer .createServer (); err != nil {
199
- a .container .Logger .Errorf ("failed to create gRPC server for service %s: %v" , desc .ServiceName , err )
200
- return
201
- }
202
- }
201
+ if ! a .grpcRegistered {
202
+ if err := a .grpcServer .createServer (); err != nil {
203
+ a .container .Logger .Errorf ("failed to create gRPC server for service %s: %v" , desc .ServiceName , err )
204
+ return
205
+ }
206
+ }
203
207
204
208
a .container .Logger .Infof ("registering gRPC Server: %s" , desc .ServiceName )
205
209
a .grpcServer .server .RegisterService (desc , impl )
206
-
210
+
207
211
a .container .Metrics ().IncrementCounter (context .Background (), "grpc_services_registered_total" )
208
212
209
213
err := injectContainer (impl , a .container )
@@ -238,8 +242,8 @@ func injectContainer(impl any, c *container.Container) error {
238
242
239
243
if f .Type == reflect .TypeOf (c ) {
240
244
if ! v .CanSet () {
241
- c .Logger .Error (errNonAddressable )
242
- return errNonAddressable
245
+ c .Logger .Error (ErrNonAddressable )
246
+ return ErrNonAddressable
243
247
}
244
248
245
249
v .Set (reflect .ValueOf (c ))
@@ -250,8 +254,8 @@ func injectContainer(impl any, c *container.Container) error {
250
254
251
255
if f .Type == reflect .TypeOf (* c ) {
252
256
if ! v .CanSet () {
253
- c .Logger .Error (errNonAddressable )
254
- return errNonAddressable
257
+ c .Logger .Error (ErrNonAddressable )
258
+ return ErrNonAddressable
255
259
}
256
260
257
261
v .Set (reflect .ValueOf (* c ))
0 commit comments