Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ cloud.google.com/go/compute v1.34.0 h1:+k/kmViu4TEi97NGaxAATYtpYBviOWJySPZ+ekA95
cloud.google.com/go/compute v1.34.0/go.mod h1:zWZwtLwZQyonEvIQBuIa0WvraMYK69J5eDCOw9VZU4g=
cloud.google.com/go/compute v1.37.0 h1:XxtZlXYkZXub3LNaLu90TTemcFqIU1yZ4E4q9VlR39A=
cloud.google.com/go/compute v1.37.0/go.mod h1:AsK4VqrSyXBo4SMbRtfAO1VfaMjUEjEwv1UB/AwVp5Q=
cloud.google.com/go/compute v1.38.0 h1:MilCLYQW2m7Dku8hRIIKo4r0oKastlD74sSu16riYKs=
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
cloud.google.com/go/compute/metadata v0.5.2/go.mod h1:C66sj2AluDcIqakBq/M8lw8/ybHgOZqin2obFxa/E5k=
cloud.google.com/go/compute/metadata v0.6.0/go.mod h1:FjyFAW1MW0C203CEOMDTu3Dk1FlqW3Rga40jzHL4hfg=
Expand Down Expand Up @@ -1144,13 +1145,15 @@ go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0/go.mod h1:umTcuxiv1n/s/S6/c2AT/g2CQ7u5C59sHDNmfSwgz7Q=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0/go.mod h1:UHB22Z8QsdRDrnAtX4PntOl36ajSxcdUMt1sF7Y6E7Q=
go.opentelemetry.io/otel v1.22.0/go.mod h1:eoV4iAi3Ea8LkAEI9+GFT44O6T/D0GWAVFyZVCC6pMI=
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8=
go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE=
go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.29.0 h1:WDdP9acbMYjbKIyJUhTvtzj601sVJOqgWdUxSdR/Ysc=
Expand All @@ -1163,6 +1166,7 @@ go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzau
go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M=
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs=
go.opentelemetry.io/otel/sdk v1.22.0/go.mod h1:iu7luyVGYovrRpe2fmj3CVKouQNdTOkxtLzPvPz1DOc=
go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok=
go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU=
Expand All @@ -1175,6 +1179,7 @@ go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06F
go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
Expand Down Expand Up @@ -1260,7 +1265,6 @@ golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand All @@ -1286,6 +1290,7 @@ golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -1335,7 +1340,6 @@ golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down Expand Up @@ -1489,6 +1493,7 @@ google.golang.org/genproto/googleapis/bytestream v0.0.0-20250512202823-5a2f75b73
google.golang.org/genproto/googleapis/bytestream v0.0.0-20250512202823-5a2f75b736a9/go.mod h1:h6yxum/C2qRb4txaZRLDHK8RyS0H/o2oEDeKY4onY/Y=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20250603155806-513f23925822 h1:zWFRixYR5QlotL+Uv3YfsPRENIrQFXiGs+iwqel6fOQ=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20250603155806-513f23925822/go.mod h1:h6yxum/C2qRb4txaZRLDHK8RyS0H/o2oEDeKY4onY/Y=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20250728155136-f173205681a0/go.mod h1:h6yxum/C2qRb4txaZRLDHK8RyS0H/o2oEDeKY4onY/Y=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240429193739-8cf5692501f6/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
Expand Down
2 changes: 1 addition & 1 deletion pkg/gofr/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func New() *App {
port = defaultGRPCPort
}

app.grpcServer = newGRPCServer(app.container, port, app.Config)
app.grpcServer, _ = newGRPCServer(app.container, port, app.Config)

app.subscriptionManager = newSubscriptionManager(app.container)

Expand Down
116 changes: 95 additions & 21 deletions pkg/gofr/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gofr
import (
"context"
"errors"
"fmt"
"net"
"reflect"
"strconv"
Expand All @@ -27,6 +28,12 @@ type grpcServer struct {
config config.Config
}

var (
ErrNonAddressable = errors.New("cannot inject container as it is not addressable or is nil")
ErrInvalidPort = errors.New("invalid port number")
ErrFailedCreateServer = errors.New("failed to create gRPC server")
)

// AddGRPCServerOptions allows users to add custom gRPC server options such as TLS configuration,
// timeouts, interceptors, and other server-specific settings in a single call.
//
Expand All @@ -42,6 +49,12 @@ type grpcServer struct {
// This function accepts a variadic list of gRPC server options (grpc.ServerOption) and appends them
// to the server's configuration. It allows fine-tuning of the gRPC server's behavior during its initialization.
func (a *App) AddGRPCServerOptions(grpcOpts ...grpc.ServerOption) {
if len(grpcOpts) == 0 {
a.container.Logger.Debug("no gRPC server options provided")
return
}

a.container.Logger.Debugf("adding %d gRPC server options", len(grpcOpts))
a.grpcServer.options = append(a.grpcServer.options, grpcOpts...)
}

Expand All @@ -55,14 +68,32 @@ func (a *App) AddGRPCServerOptions(grpcOpts ...grpc.ServerOption) {
// }
// app.AddGRPCUnaryInterceptors(loggingInterceptor)
func (a *App) AddGRPCUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) {
if len(interceptors) == 0 {
a.container.Logger.Debug("no unary interceptors provided")
return
}

a.container.Logger.Debugf("adding %d valid unary interceptors", len(interceptors))
a.grpcServer.interceptors = append(a.grpcServer.interceptors, interceptors...)
}

func (a *App) AddGRPCServerStreamInterceptors(interceptors ...grpc.StreamServerInterceptor) {
if len(interceptors) == 0 {
a.container.Logger.Debug("no stream interceptors provided")
return
}

a.container.Logger.Debugf("adding %d stream interceptors", len(interceptors))
a.grpcServer.streamInterceptors = append(a.grpcServer.streamInterceptors, interceptors...)
}

func newGRPCServer(c *container.Container, port int, cfg config.Config) *grpcServer {
func newGRPCServer(c *container.Container, port int, cfg config.Config) (*grpcServer, error) {
if port <= 0 || port > 65535 {
return nil, fmt.Errorf("%w: %d", ErrInvalidPort, port)
}

registerGRPCMetrics(c)

middleware := make([]grpc.UnaryServerInterceptor, 0)
middleware = append(middleware,
grpc_recovery.UnaryServerInterceptor(),
Expand All @@ -78,41 +109,78 @@ func newGRPCServer(c *container.Container, port int, cfg config.Config) *grpcSer
interceptors: middleware,
streamInterceptors: streamMiddleware,
config: cfg,
}
}, nil
}

func (g *grpcServer) createServer() {
// registerGRPCMetrics registers essential gRPC metrics.
func registerGRPCMetrics(c *container.Container) {
c.Metrics().NewGauge("grpc_server_status", "gRPC server status (1=running, 0=stopped)")
c.Metrics().NewCounter("grpc_server_errors_total", "Total gRPC server errors")
c.Metrics().NewCounter("grpc_services_registered_total", "Total gRPC services registered")
}

func (g *grpcServer) createServer() error {
interceptorOption := grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(g.interceptors...))
streamOpt := grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(g.streamInterceptors...))
g.options = append(g.options, interceptorOption, streamOpt)

g.server = grpc.NewServer(g.options...)
if g.server == nil {
return ErrFailedCreateServer
}

enabled := strings.ToLower(g.config.GetOrDefault("GRPC_ENABLE_REFLECTION", "false"))
if enabled != defaultReflection {
if enabled == defaultTelemetry {
reflection.Register(g.server)
}

return nil
}

func (g *grpcServer) Run(c *container.Container) {
if g.server == nil {
g.createServer()
if err := g.createServer(); err != nil {
c.Logger.Fatalf("failed to create gRPC server: %v", err)
c.Metrics().IncrementCounter(context.Background(), "grpc_server_errors_total")

return
}
}

if !isPortAvailable(g.port) {
c.Logger.Fatalf("gRPC port %d is blocked or unreachable", g.port)
c.Metrics().IncrementCounter(context.Background(), "grpc_server_errors_total")
c.Metrics().SetGauge("grpc_server_status", 0)

return
}

addr := ":" + strconv.Itoa(g.port)

c.Logger.Infof("starting gRPC server at %s", addr)

listener, err := net.Listen("tcp", addr)
listener, err := (&net.ListenConfig{}).Listen(context.Background(), "tcp", addr)
if err != nil {
c.Logger.Errorf("error in starting gRPC server at %s: %s", addr, err)
c.Metrics().IncrementCounter(context.Background(), "grpc_server_errors_total")
c.Metrics().SetGauge("grpc_server_status", 0)

return
}

c.Metrics().SetGauge("grpc_server_status", 1)
c.Logger.Infof("gRPC server started successfully on %s", addr)

if err := g.server.Serve(listener); err != nil {
c.Logger.Errorf("error in starting gRPC server at %s: %s", addr, err)
c.Metrics().IncrementCounter(context.Background(), "grpc_server_errors_total")
c.Metrics().SetGauge("grpc_server_status", 0)

return
}

c.Logger.Infof("gRPC server stopped on %s", addr)
c.Metrics().SetGauge("grpc_server_status", 0)
}

func (g *grpcServer) Shutdown(ctx context.Context) error {
Expand All @@ -129,29 +197,27 @@ func (g *grpcServer) Shutdown(ctx context.Context) error {
})
}

var (
errNonAddressable = errors.New("cannot inject container as it is not addressable or is fail")
)

// RegisterService adds a gRPC service to the GoFr application.
func (a *App) RegisterService(desc *grpc.ServiceDesc, impl any) {
if !a.grpcRegistered && !isPortAvailable(a.grpcServer.port) {
a.container.Logger.Fatalf("gRPC port %d is blocked or unreachable", a.grpcServer.port)
}

if !a.grpcRegistered {
a.grpcServer.createServer()
if err := a.grpcServer.createServer(); err != nil {
a.container.Logger.Errorf("failed to create gRPC server for service %s: %v", desc.ServiceName, err)
return
}
}

a.container.Logger.Infof("registering gRPC Server: %s", desc.ServiceName)
a.container.Logger.Infof("registering gRPC Service: %s", desc.ServiceName)
a.grpcServer.server.RegisterService(desc, impl)

a.container.Metrics().IncrementCounter(context.Background(), "grpc_services_registered_total")

err := injectContainer(impl, a.container)
if err != nil {
return
a.container.Logger.Fatalf("failed to inject container into gRPC service %s: %v", desc.ServiceName, err)
}

a.grpcRegistered = true
a.container.Logger.Infof("successfully registered gRPC service: %s", desc.ServiceName)
}

func injectContainer(impl any, c *container.Container) error {
Expand All @@ -177,8 +243,8 @@ func injectContainer(impl any, c *container.Container) error {

if f.Type == reflect.TypeOf(c) {
if !v.CanSet() {
c.Logger.Error(errNonAddressable)
return errNonAddressable
c.Logger.Error(ErrNonAddressable)
return ErrNonAddressable
}

v.Set(reflect.ValueOf(c))
Expand All @@ -189,8 +255,8 @@ func injectContainer(impl any, c *container.Container) error {

if f.Type == reflect.TypeOf(*c) {
if !v.CanSet() {
c.Logger.Error(errNonAddressable)
return errNonAddressable
c.Logger.Error(ErrNonAddressable)
return ErrNonAddressable
}

v.Set(reflect.ValueOf(*c))
Expand All @@ -202,3 +268,11 @@ func injectContainer(impl any, c *container.Container) error {

return nil
}

func (g *grpcServer) addServerOptions(opts ...grpc.ServerOption) {
g.options = append(g.options, opts...)
}

func (g *grpcServer) addUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) {
g.interceptors = append(g.interceptors, interceptors...)
}
Loading