diff --git a/go.work.sum b/go.work.sum index ec87d4cd5..f0d40030b 100644 --- a/go.work.sum +++ b/go.work.sum @@ -195,6 +195,7 @@ cloud.google.com/go/compute v1.34.0 h1:+k/kmViu4TEi97NGaxAATYtpYBviOWJySPZ+ekA95 cloud.google.com/go/compute v1.34.0/go.mod h1:zWZwtLwZQyonEvIQBuIa0WvraMYK69J5eDCOw9VZU4g= cloud.google.com/go/compute v1.37.0 h1:XxtZlXYkZXub3LNaLu90TTemcFqIU1yZ4E4q9VlR39A= cloud.google.com/go/compute v1.37.0/go.mod h1:AsK4VqrSyXBo4SMbRtfAO1VfaMjUEjEwv1UB/AwVp5Q= +cloud.google.com/go/compute v1.38.0 h1:MilCLYQW2m7Dku8hRIIKo4r0oKastlD74sSu16riYKs= cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= cloud.google.com/go/compute/metadata v0.5.2/go.mod h1:C66sj2AluDcIqakBq/M8lw8/ybHgOZqin2obFxa/E5k= cloud.google.com/go/compute/metadata v0.6.0/go.mod h1:FjyFAW1MW0C203CEOMDTu3Dk1FlqW3Rga40jzHL4hfg= @@ -1144,6 +1145,7 @@ go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0. go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0/go.mod h1:umTcuxiv1n/s/S6/c2AT/g2CQ7u5C59sHDNmfSwgz7Q= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0/go.mod h1:UHB22Z8QsdRDrnAtX4PntOl36ajSxcdUMt1sF7Y6E7Q= go.opentelemetry.io/otel v1.22.0/go.mod h1:eoV4iAi3Ea8LkAEI9+GFT44O6T/D0GWAVFyZVCC6pMI= go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= @@ -1151,6 +1153,7 @@ go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6c go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= +go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.29.0 h1:WDdP9acbMYjbKIyJUhTvtzj601sVJOqgWdUxSdR/Ysc= @@ -1163,6 +1166,7 @@ go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzau go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M= go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= +go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs= go.opentelemetry.io/otel/sdk v1.22.0/go.mod h1:iu7luyVGYovrRpe2fmj3CVKouQNdTOkxtLzPvPz1DOc= go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok= go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= @@ -1175,6 +1179,7 @@ go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06F go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= +go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= @@ -1260,7 +1265,6 @@ golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= -golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1286,6 +1290,7 @@ golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1335,7 +1340,6 @@ golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= -golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1489,6 +1493,7 @@ google.golang.org/genproto/googleapis/bytestream v0.0.0-20250512202823-5a2f75b73 google.golang.org/genproto/googleapis/bytestream v0.0.0-20250512202823-5a2f75b736a9/go.mod h1:h6yxum/C2qRb4txaZRLDHK8RyS0H/o2oEDeKY4onY/Y= google.golang.org/genproto/googleapis/bytestream v0.0.0-20250603155806-513f23925822 h1:zWFRixYR5QlotL+Uv3YfsPRENIrQFXiGs+iwqel6fOQ= google.golang.org/genproto/googleapis/bytestream v0.0.0-20250603155806-513f23925822/go.mod h1:h6yxum/C2qRb4txaZRLDHK8RyS0H/o2oEDeKY4onY/Y= +google.golang.org/genproto/googleapis/bytestream v0.0.0-20250728155136-f173205681a0/go.mod h1:h6yxum/C2qRb4txaZRLDHK8RyS0H/o2oEDeKY4onY/Y= google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/genproto/googleapis/rpc v0.0.0-20240429193739-8cf5692501f6/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= diff --git a/pkg/gofr/factory.go b/pkg/gofr/factory.go index a46322364..6ad0cb24b 100644 --- a/pkg/gofr/factory.go +++ b/pkg/gofr/factory.go @@ -57,7 +57,7 @@ func New() *App { port = defaultGRPCPort } - app.grpcServer = newGRPCServer(app.container, port, app.Config) + app.grpcServer, _ = newGRPCServer(app.container, port, app.Config) app.subscriptionManager = newSubscriptionManager(app.container) diff --git a/pkg/gofr/grpc.go b/pkg/gofr/grpc.go index ec0fa12c9..08c5ee605 100644 --- a/pkg/gofr/grpc.go +++ b/pkg/gofr/grpc.go @@ -3,6 +3,7 @@ package gofr import ( "context" "errors" + "fmt" "net" "reflect" "strconv" @@ -27,6 +28,12 @@ type grpcServer struct { config config.Config } +var ( + ErrNonAddressable = errors.New("cannot inject container as it is not addressable or is nil") + ErrInvalidPort = errors.New("invalid port number") + ErrFailedCreateServer = errors.New("failed to create gRPC server") +) + // AddGRPCServerOptions allows users to add custom gRPC server options such as TLS configuration, // timeouts, interceptors, and other server-specific settings in a single call. // @@ -42,6 +49,12 @@ type grpcServer struct { // This function accepts a variadic list of gRPC server options (grpc.ServerOption) and appends them // to the server's configuration. It allows fine-tuning of the gRPC server's behavior during its initialization. func (a *App) AddGRPCServerOptions(grpcOpts ...grpc.ServerOption) { + if len(grpcOpts) == 0 { + a.container.Logger.Debug("no gRPC server options provided") + return + } + + a.container.Logger.Debugf("adding %d gRPC server options", len(grpcOpts)) a.grpcServer.options = append(a.grpcServer.options, grpcOpts...) } @@ -55,14 +68,32 @@ func (a *App) AddGRPCServerOptions(grpcOpts ...grpc.ServerOption) { // } // app.AddGRPCUnaryInterceptors(loggingInterceptor) func (a *App) AddGRPCUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) { + if len(interceptors) == 0 { + a.container.Logger.Debug("no unary interceptors provided") + return + } + + a.container.Logger.Debugf("adding %d valid unary interceptors", len(interceptors)) a.grpcServer.interceptors = append(a.grpcServer.interceptors, interceptors...) } func (a *App) AddGRPCServerStreamInterceptors(interceptors ...grpc.StreamServerInterceptor) { + if len(interceptors) == 0 { + a.container.Logger.Debug("no stream interceptors provided") + return + } + + a.container.Logger.Debugf("adding %d stream interceptors", len(interceptors)) a.grpcServer.streamInterceptors = append(a.grpcServer.streamInterceptors, interceptors...) } -func newGRPCServer(c *container.Container, port int, cfg config.Config) *grpcServer { +func newGRPCServer(c *container.Container, port int, cfg config.Config) (*grpcServer, error) { + if port <= 0 || port > 65535 { + return nil, fmt.Errorf("%w: %d", ErrInvalidPort, port) + } + + registerGRPCMetrics(c) + middleware := make([]grpc.UnaryServerInterceptor, 0) middleware = append(middleware, grpc_recovery.UnaryServerInterceptor(), @@ -78,41 +109,78 @@ func newGRPCServer(c *container.Container, port int, cfg config.Config) *grpcSer interceptors: middleware, streamInterceptors: streamMiddleware, config: cfg, - } + }, nil } -func (g *grpcServer) createServer() { +// registerGRPCMetrics registers essential gRPC metrics. +func registerGRPCMetrics(c *container.Container) { + c.Metrics().NewGauge("grpc_server_status", "gRPC server status (1=running, 0=stopped)") + c.Metrics().NewCounter("grpc_server_errors_total", "Total gRPC server errors") + c.Metrics().NewCounter("grpc_services_registered_total", "Total gRPC services registered") +} + +func (g *grpcServer) createServer() error { interceptorOption := grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(g.interceptors...)) streamOpt := grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(g.streamInterceptors...)) g.options = append(g.options, interceptorOption, streamOpt) g.server = grpc.NewServer(g.options...) + if g.server == nil { + return ErrFailedCreateServer + } enabled := strings.ToLower(g.config.GetOrDefault("GRPC_ENABLE_REFLECTION", "false")) - if enabled != defaultReflection { + if enabled == defaultTelemetry { reflection.Register(g.server) } + + return nil } func (g *grpcServer) Run(c *container.Container) { if g.server == nil { - g.createServer() + if err := g.createServer(); err != nil { + c.Logger.Fatalf("failed to create gRPC server: %v", err) + c.Metrics().IncrementCounter(context.Background(), "grpc_server_errors_total") + + return + } + } + + if !isPortAvailable(g.port) { + c.Logger.Fatalf("gRPC port %d is blocked or unreachable", g.port) + c.Metrics().IncrementCounter(context.Background(), "grpc_server_errors_total") + c.Metrics().SetGauge("grpc_server_status", 0) + + return } addr := ":" + strconv.Itoa(g.port) c.Logger.Infof("starting gRPC server at %s", addr) - listener, err := net.Listen("tcp", addr) + listener, err := (&net.ListenConfig{}).Listen(context.Background(), "tcp", addr) if err != nil { c.Logger.Errorf("error in starting gRPC server at %s: %s", addr, err) + c.Metrics().IncrementCounter(context.Background(), "grpc_server_errors_total") + c.Metrics().SetGauge("grpc_server_status", 0) + return } + c.Metrics().SetGauge("grpc_server_status", 1) + c.Logger.Infof("gRPC server started successfully on %s", addr) + if err := g.server.Serve(listener); err != nil { c.Logger.Errorf("error in starting gRPC server at %s: %s", addr, err) + c.Metrics().IncrementCounter(context.Background(), "grpc_server_errors_total") + c.Metrics().SetGauge("grpc_server_status", 0) + return } + + c.Logger.Infof("gRPC server stopped on %s", addr) + c.Metrics().SetGauge("grpc_server_status", 0) } func (g *grpcServer) Shutdown(ctx context.Context) error { @@ -129,29 +197,27 @@ func (g *grpcServer) Shutdown(ctx context.Context) error { }) } -var ( - errNonAddressable = errors.New("cannot inject container as it is not addressable or is fail") -) - // RegisterService adds a gRPC service to the GoFr application. func (a *App) RegisterService(desc *grpc.ServiceDesc, impl any) { - if !a.grpcRegistered && !isPortAvailable(a.grpcServer.port) { - a.container.Logger.Fatalf("gRPC port %d is blocked or unreachable", a.grpcServer.port) - } - if !a.grpcRegistered { - a.grpcServer.createServer() + if err := a.grpcServer.createServer(); err != nil { + a.container.Logger.Errorf("failed to create gRPC server for service %s: %v", desc.ServiceName, err) + return + } } - a.container.Logger.Infof("registering gRPC Server: %s", desc.ServiceName) + a.container.Logger.Infof("registering gRPC Service: %s", desc.ServiceName) a.grpcServer.server.RegisterService(desc, impl) + a.container.Metrics().IncrementCounter(context.Background(), "grpc_services_registered_total") + err := injectContainer(impl, a.container) if err != nil { - return + a.container.Logger.Fatalf("failed to inject container into gRPC service %s: %v", desc.ServiceName, err) } a.grpcRegistered = true + a.container.Logger.Infof("successfully registered gRPC service: %s", desc.ServiceName) } func injectContainer(impl any, c *container.Container) error { @@ -177,8 +243,8 @@ func injectContainer(impl any, c *container.Container) error { if f.Type == reflect.TypeOf(c) { if !v.CanSet() { - c.Logger.Error(errNonAddressable) - return errNonAddressable + c.Logger.Error(ErrNonAddressable) + return ErrNonAddressable } v.Set(reflect.ValueOf(c)) @@ -189,8 +255,8 @@ func injectContainer(impl any, c *container.Container) error { if f.Type == reflect.TypeOf(*c) { if !v.CanSet() { - c.Logger.Error(errNonAddressable) - return errNonAddressable + c.Logger.Error(ErrNonAddressable) + return ErrNonAddressable } v.Set(reflect.ValueOf(*c)) @@ -202,3 +268,11 @@ func injectContainer(impl any, c *container.Container) error { return nil } + +func (g *grpcServer) addServerOptions(opts ...grpc.ServerOption) { + g.options = append(g.options, opts...) +} + +func (g *grpcServer) addUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) { + g.interceptors = append(g.interceptors, interceptors...) +} diff --git a/pkg/gofr/grpc/log.go b/pkg/gofr/grpc/log.go index c61727be6..ed634df90 100644 --- a/pkg/gofr/grpc/log.go +++ b/pkg/gofr/grpc/log.go @@ -24,12 +24,16 @@ const ( nanosecondsPerMillisecond = 1e6 debugMethod = "/grpc.health.v1.Health/SetServingStatus" healthCheck = "/grpc.health.v1.Health/Check" + clientStreamSuffix = " [CLIENT-STREAM]" + serverStreamSuffix = " [SERVER-STREAM]" + bidirectionalSuffix = " [BI-DIRECTION_STREAM]" ) type Logger interface { Info(args ...any) Errorf(string, ...any) Debug(...any) + Fatalf(string, ...any) } type Metrics interface { @@ -42,6 +46,7 @@ type gRPCLog struct { ResponseTime int64 `json:"responseTime"` Method string `json:"method"` StatusCode int32 `json:"statusCode"` + StreamType string `json:"streamType,omitempty"` } //nolint:revive // We intend to keep it non-exported for ease in future changes without any breaking change. @@ -49,13 +54,18 @@ func NewgRPCLogger() gRPCLog { return gRPCLog{} } -func (l gRPCLog) PrettyPrint(writer io.Writer) { +func (l *gRPCLog) PrettyPrint(writer io.Writer) { + streamInfo := "" + if l.StreamType != "" { + streamInfo = fmt.Sprintf(" [%s]", l.StreamType) + } + fmt.Fprintf(writer, "\u001B[38;5;8m%s \u001B[38;5;%dm%-*d"+ - "\u001B[0m %*d\u001B[38;5;8mµs\u001B[0m %s %s\n", + "\u001B[0m %*d\u001B[38;5;8mµs\u001B[0m %s%s %s\n", l.ID, colorForGRPCCode(l.StatusCode), statusCodeWidth, l.StatusCode, responseTimeWidth, l.ResponseTime, - "GRPC", l.Method) + "GRPC", streamInfo, l.Method) } func colorForGRPCCode(s int32) int { @@ -99,22 +109,34 @@ func StreamObservabilityInterceptor(logger Logger, metrics Metrics) grpc.StreamS // Process the stream err := handler(srv, wrappedStream) - grpcMethodName := info.FullMethod - if info.IsClientStream && info.IsServerStream { - grpcMethodName += " [BI-DIRECTION_STREAM]" - } else if info.IsClientStream { - grpcMethodName += " [CLIENT-STREAM]" - } else if info.IsServerStream { - grpcMethodName += " [SERVER-STREAM]" - } + streamType, grpcMethodName := getStreamTypeAndMethod(info) // Log and record metrics - logRPC(ctx, logger, metrics, start, err, grpcMethodName, "app_gRPC-Stream_stats") + logStreamRPC(ctx, logger, metrics, start, err, grpcMethodName, streamType, "app_gRPC-Stream_stats") return err } } +// getStreamTypeAndMethod determines the stream type and formats the method name. +func getStreamTypeAndMethod(info *grpc.StreamServerInfo) (streamType, methodName string) { + methodName = info.FullMethod + + switch { + case info.IsClientStream && info.IsServerStream: + streamType = "BIDIRECTIONAL" + methodName += bidirectionalSuffix + case info.IsClientStream: + streamType = "CLIENT_STREAM" + methodName += clientStreamSuffix + case info.IsServerStream: + streamType = "SERVER_STREAM" + methodName += serverStreamSuffix + } + + return streamType, methodName +} + // wrappedServerStream propagates context with tracing for streaming RPCs. type wrappedServerStream struct { grpc.ServerStream @@ -183,13 +205,36 @@ func (gRPCLog) DocumentRPCLog(ctx context.Context, logger Logger, metrics Metric logRPC(ctx, logger, metrics, start, err, method, name) } +func logStreamRPC(ctx context.Context, logger Logger, metrics Metrics, start time.Time, err error, method, streamType, metricName string) { + duration := time.Since(start) + + logEntry := gRPCLog{ + ID: getTraceID(ctx), + StartTime: start.Format("2006-01-02T15:04:05.999999999-07:00"), + ResponseTime: duration.Microseconds(), + Method: method, + StreamType: streamType, + } + + if err != nil { + statusErr, _ := status.FromError(err) + //nolint:gosec //errorcode is garenteed to be in safe range + logEntry.StatusCode = int32(statusErr.Code()) + } else { + logEntry.StatusCode = int32(codes.OK) + } + + logGRPCEntry(logger, &logEntry, method) + recordGRPCMetrics(ctx, metrics, metricName, duration, method, streamType) +} + func logRPC(ctx context.Context, logger Logger, metrics Metrics, start time.Time, err error, method, name string) { duration := time.Since(start) logEntry := gRPCLog{ ID: trace.SpanFromContext(ctx).SpanContext().TraceID().String(), StartTime: start.Format("2006-01-02T15:04:05.999999999-07:00"), - ResponseTime: time.Since(start).Microseconds(), + ResponseTime: duration.Microseconds(), Method: method, } @@ -201,24 +246,54 @@ func logRPC(ctx context.Context, logger Logger, metrics Metrics, start time.Time logEntry.StatusCode = int32(codes.OK) } - if logger != nil { - switch { - case method == debugMethod, - strings.Contains(method, "/Send"), - strings.Contains(method, "/Recv"), - strings.Contains(method, "/SendAndClose"): - logger.Debug(logEntry) - default: - logger.Info(logEntry) - } + logGRPCEntry(logger, &logEntry, method) + recordGRPCMetrics(ctx, metrics, name, duration, method, "") +} + +// Helper function to extract trace ID from context. +func getTraceID(ctx context.Context) string { + if ctx == nil { + return "" + } + + span := trace.SpanFromContext(ctx) + if span == nil { + return "" } - if metrics != nil { - metrics.RecordHistogram(ctx, name, - float64(duration.Milliseconds())+float64(duration.Nanoseconds()%nanosecondsPerMillisecond)/nanosecondsPerMillisecond, - "method", - method) + return span.SpanContext().TraceID().String() +} + +// logGRPCEntry handles the actual logging with improved logic. +func logGRPCEntry(logger Logger, logEntry *gRPCLog, method string) { + if logger == nil { + return } + + switch { + case method == debugMethod, + strings.Contains(method, "/Send"), + strings.Contains(method, "/Recv"), + strings.Contains(method, "/SendAndClose"): + logger.Debug(logEntry) + default: + logger.Info(logEntry) + } +} + +func recordGRPCMetrics(ctx context.Context, metrics Metrics, name string, duration time.Duration, method, streamType string) { + if metrics == nil { + return + } + + durationMs := float64(duration.Milliseconds()) + float64(duration.Nanoseconds()%nanosecondsPerMillisecond)/nanosecondsPerMillisecond + + labels := []string{"method", method} + if streamType != "" { + labels = append(labels, "stream_type", streamType) + } + + metrics.RecordHistogram(ctx, name, durationMs, labels...) } // Helper function to safely extract a value from metadata. diff --git a/pkg/gofr/grpc/log_test.go b/pkg/gofr/grpc/log_test.go index 8b1021f5e..64cca0b70 100644 --- a/pkg/gofr/grpc/log_test.go +++ b/pkg/gofr/grpc/log_test.go @@ -1,12 +1,16 @@ package grpc import ( + "bytes" + "context" "net/http" "os" "testing" "time" "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" "gofr.dev/pkg/gofr/testutil" ) @@ -16,6 +20,11 @@ func TestMain(m *testing.M) { m.Run() } +func TestNewgRPCLogger(t *testing.T) { + logger := NewgRPCLogger() + assert.Equal(t, gRPCLog{}, logger) +} + func TestRPCLog_String(t *testing.T) { l := gRPCLog{ ID: "123", @@ -29,6 +38,21 @@ func TestRPCLog_String(t *testing.T) { assert.Equal(t, expLog, l.String()) } +func TestRPCLog_StringWithStreamType(t *testing.T) { + l := gRPCLog{ + ID: "123", + StartTime: "2020-01-01T12:12:12", + Method: "/test.Service/Method", + StatusCode: 0, + StreamType: "CLIENT_STREAM", + } + + expLog := `{"id":"123","startTime":"2020-01-01T12:12:12","responseTime":0,` + + `"method":"/test.Service/Method","statusCode":0,"streamType":"CLIENT_STREAM"}` + + assert.Equal(t, expLog, l.String()) +} + func Test_colorForGRPCCode(t *testing.T) { testCases := []struct { desc string @@ -71,3 +95,95 @@ func TestRPCLog_PrettyPrint(t *testing.T) { // Check if ID is coming assert.Contains(t, log, `1`) } + +func TestRPCLog_PrettyPrintWithStreamType(t *testing.T) { + var buf bytes.Buffer + + l := gRPCLog{ + ID: "1", + StartTime: "2023-01-01T12:00:00Z", + ResponseTime: 100, + Method: "/test.Service/Method", + StatusCode: 0, + StreamType: "SERVER_STREAM", + } + + l.PrettyPrint(&buf) + + output := buf.String() + assert.Contains(t, output, "[SERVER_STREAM]") + assert.Contains(t, output, "/test.Service/Method") +} + +func TestGetStreamTypeAndMethod(t *testing.T) { + testCases := []struct { + desc string + info *grpc.StreamServerInfo + expectedType string + expectedMethod string + }{ + { + desc: "bidirectional stream", + info: &grpc.StreamServerInfo{ + FullMethod: "/test.Service/Method", + IsClientStream: true, + IsServerStream: true, + }, + expectedType: "BIDIRECTIONAL", + expectedMethod: "/test.Service/Method [BI-DIRECTION_STREAM]", + }, + { + desc: "client stream", + info: &grpc.StreamServerInfo{ + FullMethod: "/test.Service/Method", + IsClientStream: true, + IsServerStream: false, + }, + expectedType: "CLIENT_STREAM", + expectedMethod: "/test.Service/Method [CLIENT-STREAM]", + }, + { + desc: "server stream", + info: &grpc.StreamServerInfo{ + FullMethod: "/test.Service/Method", + IsClientStream: false, + IsServerStream: true, + }, + expectedType: "SERVER_STREAM", + expectedMethod: "/test.Service/Method [SERVER-STREAM]", + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + streamType, method := getStreamTypeAndMethod(tc.info) + assert.Equal(t, tc.expectedType, streamType) + assert.Equal(t, tc.expectedMethod, method) + }) + } +} + +func TestGetMetadataValue(t *testing.T) { + md := metadata.Pairs("key1", "value1", "key2", "value2") + + assert.Equal(t, "value1", getMetadataValue(md, "key1")) + assert.Equal(t, "value2", getMetadataValue(md, "key2")) + assert.Empty(t, getMetadataValue(md, "nonexistent")) +} + +func TestGetTraceID(t *testing.T) { + assert.Equal(t, "00000000000000000000000000000000", getTraceID(t.Context())) + assert.Equal(t, "00000000000000000000000000000000", getTraceID(context.TODO())) +} + +func TestWrappedServerStream_Context(t *testing.T) { + type contextKey string + + originalCtx := t.Context() + newCtx := context.WithValue(originalCtx, contextKey("key"), "value") + wrapped := &wrappedServerStream{ + ctx: newCtx, + } + assert.Equal(t, newCtx, wrapped.Context()) + assert.Equal(t, "value", wrapped.Context().Value(contextKey("key"))) +} diff --git a/pkg/gofr/grpc_test.go b/pkg/gofr/grpc_test.go index 29b6867a6..3810973dc 100644 --- a/pkg/gofr/grpc_test.go +++ b/pkg/gofr/grpc_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -19,15 +20,106 @@ import ( "gofr.dev/pkg/gofr/testutil" ) -func TestNewGRPCServer(t *testing.T) { - c := container.Container{ - Logger: logging.NewLogger(logging.DEBUG), +func (g *grpcServer) registerService(t *testing.T, desc *grpc.ServiceDesc, impl any) { + t.Helper() + + if g.server == nil { + if err := g.createServer(); err != nil { + t.Fatalf("failed to create gRPC server: %v", err) + } } + + g.server.RegisterService(desc, impl) +} + +// Helper function to set up GRPC metrics expectations. +func setupGRPCMetricExpectations(mockMetrics *container.MockMetrics) { + mockMetrics.EXPECT().NewGauge("grpc_server_status", "gRPC server status (1=running, 0=stopped)").Times(1) + mockMetrics.EXPECT().NewCounter("grpc_server_errors_total", "Total gRPC server errors").Times(1) + mockMetrics.EXPECT().NewCounter("grpc_services_registered_total", "Total gRPC services registered").Times(1) +} + +func TestNewGRPCServer(t *testing.T) { + c, mocks := container.NewMockContainer(t) + setupGRPCMetricExpectations(mocks.Metrics) + cfg := testutil.NewServerConfigs(t) - g := newGRPCServer(&c, 9999, cfg) + g, err := newGRPCServer(c, 9999, cfg) + require.NoError(t, err) assert.NotNil(t, g, "TEST Failed.\n") } +func TestGRPCServer_AddServerOptions(t *testing.T) { + c, mocks := container.NewMockContainer(t) + setupGRPCMetricExpectations(mocks.Metrics) + + cfg := testutil.NewServerConfigs(t) + g, err := newGRPCServer(c, 9999, cfg) + require.NoError(t, err) + + option1 := grpc.ConnectionTimeout(5 * time.Second) + option2 := grpc.MaxRecvMsgSize(1024 * 1024) + + g.addServerOptions(option1, option2) + + assert.Len(t, g.options, 2) +} + +func TestGRPCServer_AddUnaryInterceptors(t *testing.T) { + c, mocks := container.NewMockContainer(t) + setupGRPCMetricExpectations(mocks.Metrics) + + cfg := testutil.NewServerConfigs(t) + g, err := newGRPCServer(c, 9999, cfg) + require.NoError(t, err) + + interceptor1 := func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + return handler(ctx, req) + } + + interceptor2 := func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + return handler(ctx, req) + } + + g.addUnaryInterceptors(interceptor1, interceptor2) + + assert.Len(t, g.interceptors, 4) +} + +func TestGRPCServer_CreateServer(t *testing.T) { + c, mocks := container.NewMockContainer(t) + setupGRPCMetricExpectations(mocks.Metrics) + + cfg := testutil.NewServerConfigs(t) + g, err := newGRPCServer(c, 9999, cfg) + require.NoError(t, err) + + err = g.createServer() + require.NoError(t, err) + assert.NotNil(t, g.server) +} + +func TestGRPCServer_RegisterService(t *testing.T) { + c, mocks := container.NewMockContainer(t) + setupGRPCMetricExpectations(mocks.Metrics) + + cfg := testutil.NewServerConfigs(t) + g, err := newGRPCServer(c, 9999, cfg) + require.NoError(t, err) + + err = g.createServer() + require.NoError(t, err) + + healthServer := health.NewServer() + desc := &grpc_health_v1.Health_ServiceDesc + + g.registerService(t, desc, healthServer) + + services := g.server.GetServiceInfo() + _, ok := services["grpc.health.v1.Health"] + assert.True(t, ok, "health service should be registered") +} + func TestGRPC_ServerRun(t *testing.T) { testCases := []struct { desc string @@ -40,13 +132,16 @@ func TestGRPC_ServerRun(t *testing.T) { for i, tc := range testCases { f := func() { - c := &container.Container{ - Logger: logging.NewLogger(logging.INFO), - } + c, mocks := container.NewMockContainer(t) + setupGRPCMetricExpectations(mocks.Metrics) + + // Add expectations for error scenarios + mocks.Metrics.EXPECT().IncrementCounter(gomock.Any(), "grpc_server_errors_total").AnyTimes() + mocks.Metrics.EXPECT().SetGauge("grpc_server_status", gomock.Any()).AnyTimes() // If testing "server.Serve() error", occupy the port first if tc.port == 10000 { - listener, err := net.Listen("tcp", fmt.Sprintf(":%d", tc.port)) + listener, err := (&net.ListenConfig{}).Listen(context.Background(), "tcp", fmt.Sprintf(":%d", tc.port)) if err != nil { t.Fatalf("Failed to occupy port %d: %v", tc.port, err) } @@ -78,7 +173,8 @@ func TestGRPC_ServerShutdown(t *testing.T) { Logger: logging.NewLogger(logging.DEBUG), } cfg := testutil.NewServerConfigs(t) - g := newGRPCServer(&c, 9999, cfg) + g, err := newGRPCServer(&c, 9999, cfg) + require.NoError(t, err) go g.Run(&c) @@ -89,7 +185,7 @@ func TestGRPC_ServerShutdown(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond) defer cancel() - err := g.Shutdown(ctx) + err = g.Shutdown(ctx) require.NoError(t, err, "TestGRPC_ServerShutdown Failed.\n") } @@ -98,7 +194,8 @@ func TestGRPC_ServerShutdown_ContextCanceled(t *testing.T) { Logger: logging.NewLogger(logging.DEBUG), } cfg := testutil.NewServerConfigs(t) - g := newGRPCServer(&c, 9999, cfg) + g, err := newGRPCServer(&c, 9999, cfg) + require.NoError(t, err) go g.Run(&c) @@ -116,7 +213,7 @@ func TestGRPC_ServerShutdown_ContextCanceled(t *testing.T) { // Cancel the context immediately cancel() - err := <-errChan + err = <-errChan require.ErrorContains(t, err, "context canceled", "Expected error due to context cancellation") } @@ -130,7 +227,7 @@ func Test_injectContainer_Fails(t *testing.T) { srv1 := &fail{} err := injectContainer(srv1, c) - require.ErrorIs(t, err, errNonAddressable) + require.ErrorIs(t, err, ErrNonAddressable) require.Nil(t, srv1.c1) // Case: server is passed as unadressable(non-pointer) @@ -187,12 +284,13 @@ func TestGRPC_Shutdown_BeforeStart(t *testing.T) { c := &container.Container{Logger: logger} cfg := testutil.NewServerConfigs(t) - g := newGRPCServer(c, 9999, cfg) + g, err := newGRPCServer(c, 9999, cfg) + require.NoError(t, err) ctx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond) defer cancel() - err := g.Shutdown(ctx) + err = g.Shutdown(ctx) assert.NoError(t, err, "Expected shutdown to succeed even if server was not started") } @@ -227,7 +325,8 @@ func TestGRPC_ServerRun_WithInterceptorAndOptions(t *testing.T) { // Register Health service healthServer := health.NewServer() - app.grpcServer.createServer() + err := app.grpcServer.createServer() + require.NoError(t, err) grpc_health_v1.RegisterHealthServer(app.grpcServer.server, healthServer) @@ -262,11 +361,17 @@ func TestApp_WithReflection(t *testing.T) { c := &container.Container{ Logger: logging.NewLogger(logging.DEBUG), } + + var err error + app := New() app.container = c cfg := testutil.NewServerConfigs(t) - app.grpcServer = newGRPCServer(c, 9999, cfg) - app.grpcServer.createServer() + app.grpcServer, err = newGRPCServer(c, 9999, cfg) + require.NoError(t, err) + + err = app.grpcServer.createServer() + require.NoError(t, err) services := app.grpcServer.server.GetServiceInfo() _, ok := services["grpc.reflection.v1alpha.ServerReflection"]