Skip to content

Commit c78c260

Browse files
committed
Merge branch 'main' of github.com:openmfp/kubernetes-graphql-gateway into feat/relations
2 parents 0bf4da3 + bd85d78 commit c78c260

38 files changed

+702
-929
lines changed

.testcoverage.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ exclude:
66
- common/config/config.go
77
- mocks
88
- common/apis/*
9+
- export_test.go
10+
- export_test_integration.go
911
# remove it later:
1012
- listener/reconciler/clusteraccess/subroutines.go
11-
- listener/reconciler/singlecluster
13+

cmd/gateway.go

Lines changed: 131 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -8,130 +8,176 @@ import (
88
"time"
99

1010
openmfpcontext "github.com/openmfp/golang-commons/context"
11+
"github.com/openmfp/golang-commons/logger"
1112
"github.com/openmfp/golang-commons/sentry"
1213
"github.com/openmfp/golang-commons/traces"
1314
"github.com/prometheus/client_golang/prometheus/promhttp"
1415
"github.com/spf13/cobra"
1516
ctrl "sigs.k8s.io/controller-runtime"
1617

17-
"github.com/openmfp/golang-commons/logger"
18-
1918
"github.com/openmfp/kubernetes-graphql-gateway/gateway/manager"
2019
)
2120

2221
var gatewayCmd = &cobra.Command{
2322
Use: "gateway",
2423
Short: "Run the GQL Gateway",
2524
Example: "go run main.go gateway",
26-
RunE: func(_ *cobra.Command, _ []string) error {
27-
log, err := setupLogger(defaultCfg.Log.Level)
28-
if err != nil {
29-
return fmt.Errorf("failed to setup logger: %w", err)
30-
}
31-
32-
log.Info().Str("LogLevel", log.GetLevel().String()).Msg("Starting server...")
25+
Run: func(_ *cobra.Command, _ []string) {
26+
log.Info().Str("LogLevel", log.GetLevel().String()).Msg("Starting the Gateway...")
3327

3428
ctx, _, shutdown := openmfpcontext.StartContext(log, appCfg, 1*time.Second)
3529
defer shutdown()
3630

37-
if defaultCfg.Sentry.Dsn != "" {
38-
err := sentry.Start(ctx,
39-
defaultCfg.Sentry.Dsn, defaultCfg.Environment, defaultCfg.Region,
40-
defaultCfg.Image.Name, defaultCfg.Image.Tag,
41-
)
42-
if err != nil {
43-
log.Fatal().Err(err).Msg("Sentry init failed")
44-
}
45-
46-
defer openmfpcontext.Recover(log)
31+
if err := initializeSentry(ctx, log); err != nil {
32+
log.Fatal().Err(err).Msg("Failed to initialize Sentry")
4733
}
4834

4935
ctrl.SetLogger(log.Logr())
5036

5137
gatewayInstance, err := manager.NewGateway(ctx, log, appCfg)
5238
if err != nil {
53-
log.Error().Err(err).Msg("Error creating gateway")
54-
return fmt.Errorf("failed to create gateway: %w", err)
39+
log.Fatal().Err(err).Msg("Failed to create gateway")
5540
}
5641

57-
// Initialize tracing provider
58-
var providerShutdown func(ctx context.Context) error
59-
if defaultCfg.Tracing.Enabled {
60-
providerShutdown, err = traces.InitProvider(ctx, defaultCfg.Tracing.Collector)
61-
if err != nil {
62-
log.Fatal().Err(err).Msg("unable to start gRPC-Sidecar TracerProvider")
63-
}
64-
} else {
65-
providerShutdown, err = traces.InitLocalProvider(ctx, defaultCfg.Tracing.Collector, false)
66-
if err != nil {
67-
log.Fatal().Err(err).Msg("unable to start local TracerProvider")
68-
}
42+
tracingShutdown, err := initializeTracing(ctx, log)
43+
if err != nil {
44+
log.Fatal().Err(err).Msg("Failed to initialize tracing")
6945
}
70-
7146
defer func() {
72-
if err := providerShutdown(ctx); err != nil {
73-
log.Fatal().Err(err).Msg("failed to shutdown TracerProvider")
47+
if err := tracingShutdown(ctx); err != nil {
48+
log.Error().Err(err).Msg("failed to shutdown TracerProvider")
7449
}
7550
}()
7651

77-
defer func() {
78-
if err := providerShutdown(ctx); err != nil {
79-
log.Fatal().Err(err).Msg("failed to shutdown TracerProvider")
80-
}
81-
}()
52+
if err := runServers(ctx, log, gatewayInstance); err != nil {
53+
log.Fatal().Err(err).Msg("Failed to run servers")
54+
}
55+
},
56+
}
57+
58+
func initializeSentry(ctx context.Context, log *logger.Logger) error {
59+
if defaultCfg.Sentry.Dsn == "" {
60+
return nil
61+
}
62+
63+
err := sentry.Start(ctx,
64+
defaultCfg.Sentry.Dsn, defaultCfg.Environment, defaultCfg.Region,
65+
defaultCfg.Image.Name, defaultCfg.Image.Tag,
66+
)
67+
if err != nil {
68+
log.Fatal().Err(err).Msg("Sentry init failed")
69+
}
70+
71+
defer openmfpcontext.Recover(log)
72+
return nil
73+
}
8274

83-
// Set up HTTP handler
84-
http.Handle("/", gatewayInstance)
85-
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
86-
w.WriteHeader(http.StatusOK)
87-
})
88-
http.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
89-
w.WriteHeader(http.StatusOK)
90-
})
91-
92-
// Replace the /metrics endpoint handler
93-
http.Handle("/metrics", promhttp.Handler())
94-
95-
// Start HTTP server with context
96-
server := &http.Server{
97-
Addr: fmt.Sprintf(":%s", appCfg.Gateway.Port),
98-
Handler: nil,
75+
func initializeTracing(ctx context.Context, log *logger.Logger) (func(ctx context.Context) error, error) {
76+
if defaultCfg.Tracing.Enabled {
77+
shutdown, err := traces.InitProvider(ctx, defaultCfg.Tracing.Collector)
78+
if err != nil {
79+
log.Fatal().Err(err).Msg("unable to start gRPC-Sidecar TracerProvider")
9980
}
81+
return shutdown, nil
82+
}
83+
84+
shutdown, err := traces.InitLocalProvider(ctx, defaultCfg.Tracing.Collector, false)
85+
if err != nil {
86+
log.Fatal().Err(err).Msg("unable to start local TracerProvider")
87+
}
88+
return shutdown, nil
89+
}
10090

101-
// Start the HTTP server in a goroutine so that we can listen for shutdown signals
102-
go func() {
103-
err := server.ListenAndServe()
104-
if err != nil && !errors.Is(err, http.ErrServerClosed) {
105-
log.Error().Err(err).Msg("Error starting HTTP server")
106-
}
107-
}()
91+
func createServers(gatewayInstance http.Handler) (*http.Server, *http.Server, *http.Server) {
92+
// Main server for GraphQL
93+
mainMux := http.NewServeMux()
94+
mainMux.Handle("/", gatewayInstance)
95+
mainServer := &http.Server{
96+
Addr: fmt.Sprintf(":%s", appCfg.Gateway.Port),
97+
Handler: mainMux,
98+
}
99+
100+
// Metrics server
101+
metricsMux := http.NewServeMux()
102+
metricsMux.Handle("/metrics", promhttp.Handler())
103+
metricsServer := &http.Server{
104+
Addr: defaultCfg.Metrics.BindAddress,
105+
Handler: metricsMux,
106+
}
107+
108+
// Health server
109+
healthMux := http.NewServeMux()
110+
healthMux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
111+
w.WriteHeader(http.StatusOK)
112+
})
113+
healthMux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
114+
w.WriteHeader(http.StatusOK)
115+
})
116+
healthServer := &http.Server{
117+
Addr: defaultCfg.HealthProbeBindAddress,
118+
Handler: healthMux,
119+
}
120+
121+
return mainServer, metricsServer, healthServer
122+
}
123+
124+
func shutdownServers(ctx context.Context, log *logger.Logger, mainServer, metricsServer, healthServer *http.Server) {
125+
log.Info().Msg("Shutting down HTTP servers...")
126+
127+
if err := mainServer.Shutdown(ctx); err != nil {
128+
log.Error().Err(err).Msg("Main HTTP server shutdown failed")
129+
}
130+
131+
if err := metricsServer.Shutdown(ctx); err != nil {
132+
log.Error().Err(err).Msg("Metrics HTTP server shutdown failed")
133+
}
134+
135+
if err := healthServer.Shutdown(ctx); err != nil {
136+
log.Error().Err(err).Msg("Health HTTP server shutdown failed")
137+
}
138+
}
108139

109-
// Wait for shutdown signal via the context
110-
<-ctx.Done()
140+
func runServers(ctx context.Context, log *logger.Logger, gatewayInstance http.Handler) error {
141+
mainServer, metricsServer, healthServer := createServers(gatewayInstance)
111142

112-
shutdownCtx, cancel := context.WithTimeout(context.Background(), defaultCfg.ShutdownTimeout) // ctx is closed, we need a new one
113-
defer cancel()
114-
log.Info().Msg("Shutting down HTTP server...")
115-
if err := server.Shutdown(shutdownCtx); err != nil {
116-
log.Fatal().Err(err).Msg("HTTP server shutdown failed")
143+
// Start main server (GraphQL)
144+
go func() {
145+
log.Info().Str("addr", mainServer.Addr).Msg("Starting main HTTP server")
146+
if err := mainServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
147+
log.Error().Err(err).Msg("Error starting main HTTP server")
117148
}
149+
}()
118150

119-
if err := gatewayInstance.Close(); err != nil {
120-
log.Error().Err(err).Msg("Error closing gateway services")
151+
// Start metrics server
152+
go func() {
153+
log.Info().Str("addr", metricsServer.Addr).Msg("Starting metrics HTTP server")
154+
if err := metricsServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
155+
log.Error().Err(err).Msg("Error starting metrics HTTP server")
121156
}
157+
}()
122158

123-
// Call the shutdown cleanup
124-
shutdown()
159+
// Start health server
160+
go func() {
161+
log.Info().Str("addr", healthServer.Addr).Msg("Starting health HTTP server")
162+
if err := healthServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
163+
log.Error().Err(err).Msg("Error starting health HTTP server")
164+
}
165+
}()
125166

126-
log.Info().Msg("Server shut down successfully")
127-
return nil
128-
},
129-
}
167+
// Wait for shutdown signal
168+
<-ctx.Done()
169+
170+
shutdownCtx, cancel := context.WithTimeout(context.Background(), defaultCfg.ShutdownTimeout)
171+
defer cancel()
172+
173+
shutdownServers(shutdownCtx, log, mainServer, metricsServer, healthServer)
174+
175+
if closer, ok := gatewayInstance.(interface{ Close() error }); ok {
176+
if err := closer.Close(); err != nil {
177+
log.Error().Err(err).Msg("Error closing gateway services")
178+
}
179+
}
130180

131-
// setupLogger initializes the logger with the given log level
132-
func setupLogger(logLevel string) (*logger.Logger, error) {
133-
loggerCfg := logger.DefaultConfig()
134-
loggerCfg.Name = "crdGateway"
135-
loggerCfg.Level = logLevel
136-
return logger.New(loggerCfg)
181+
log.Info().Msg("Server shut down successfully")
182+
return nil
137183
}

cmd/listener.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package cmd
33
import (
44
"context"
55
"crypto/tls"
6-
"os"
76

87
kcpapis "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
98
kcpcore "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
@@ -21,6 +20,8 @@ import (
2120
"sigs.k8s.io/controller-runtime/pkg/webhook"
2221

2322
gatewayv1alpha1 "github.com/openmfp/kubernetes-graphql-gateway/common/apis/v1alpha1"
23+
"github.com/openmfp/kubernetes-graphql-gateway/listener/pkg/apischema"
24+
"github.com/openmfp/kubernetes-graphql-gateway/listener/pkg/workspacefile"
2425
"github.com/openmfp/kubernetes-graphql-gateway/listener/reconciler"
2526
"github.com/openmfp/kubernetes-graphql-gateway/listener/reconciler/clusteraccess"
2627
"github.com/openmfp/kubernetes-graphql-gateway/listener/reconciler/kcp"
@@ -74,6 +75,8 @@ var listenCmd = &cobra.Command{
7475
}
7576
},
7677
Run: func(cmd *cobra.Command, args []string) {
78+
log.Info().Str("LogLevel", log.GetLevel().String()).Msg("Starting the Listener...")
79+
7780
ctx := ctrl.SetupSignalHandler()
7881
restCfg := ctrl.GetConfigOrDie()
7982

@@ -90,8 +93,7 @@ var listenCmd = &cobra.Command{
9093
Scheme: scheme,
9194
})
9295
if err != nil {
93-
log.Error().Err(err).Msg("failed to create client from config")
94-
os.Exit(1)
96+
log.Fatal().Err(err).Msg("failed to create client from config")
9597
}
9698

9799
reconcilerOpts := reconciler.ReconcilerOpts{
@@ -107,32 +109,34 @@ var listenCmd = &cobra.Command{
107109
if appCfg.EnableKcp {
108110
kcpReconciler, err := kcp.NewKCPReconciler(appCfg, reconcilerOpts, log)
109111
if err != nil {
110-
log.Error().Err(err).Msg("unable to create KCP reconciler")
111-
os.Exit(1)
112+
log.Fatal().Err(err).Msg("unable to create KCP reconciler")
112113
}
113114

114115
// Start virtual workspace watching if path is configured
115116
if appCfg.Listener.VirtualWorkspacesConfigPath != "" {
116117
go func() {
117118
if err := kcpReconciler.StartVirtualWorkspaceWatching(ctx, appCfg.Listener.VirtualWorkspacesConfigPath); err != nil {
118-
log.Error().Err(err).Msg("failed to start virtual workspace watching")
119-
os.Exit(1)
119+
log.Fatal().Err(err).Msg("failed to start virtual workspace watching")
120120
}
121121
}()
122122
}
123123

124124
reconcilerInstance = kcpReconciler
125125
} else {
126-
reconcilerInstance, err = clusteraccess.CreateMultiClusterReconciler(appCfg, reconcilerOpts, log)
126+
ioHandler, err := workspacefile.NewIOHandler(appCfg.OpenApiDefinitionsPath)
127+
if err != nil {
128+
log.Fatal().Err(err).Msg("unable to create IO handler")
129+
}
130+
131+
reconcilerInstance, err = clusteraccess.NewClusterAccessReconciler(ctx, appCfg, reconcilerOpts, ioHandler, apischema.NewResolver(log), log)
127132
if err != nil {
128-
log.Error().Err(err).Msg("unable to create cluster access reconciler")
129-
os.Exit(1)
133+
log.Fatal().Err(err).Msg("unable to create cluster access reconciler")
130134
}
131135
}
132136

133137
// Setup reconciler with its own manager and start everything
134138
if err := startManagerWithReconciler(ctx, reconcilerInstance); err != nil {
135-
os.Exit(1)
139+
log.Fatal().Err(err).Msg("failed to start manager with reconciler")
136140
}
137141
},
138142
}

cmd/root.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,14 @@ func initConfig() {
8181
v.SetDefault("gateway-url-graphql-suffix", "graphql")
8282
}
8383

84+
// setupLogger initializes the logger with the given log level
85+
func setupLogger(logLevel string) (*logger.Logger, error) {
86+
loggerCfg := logger.DefaultConfig()
87+
loggerCfg.Name = "crdGateway"
88+
loggerCfg.Level = logLevel
89+
return logger.New(loggerCfg)
90+
}
91+
8492
func Execute() {
8593
cobra.CheckErr(rootCmd.Execute())
8694
}

0 commit comments

Comments
 (0)