Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/server/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type ProxyRunOptions struct {
LeaseLabel string
// Needs kubernetes client
NeedsKubernetesClient bool
// Graceful shutdown timeout duration
GracefulShutdownTimeout time.Duration
}

func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
Expand Down Expand Up @@ -155,6 +157,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
flags.BoolVar(&o.EnableLeaseController, "enable-lease-controller", o.EnableLeaseController, "Enable lease controller to publish and garbage collect proxy server leases.")
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "The namespace where lease objects are managed by the controller.")
flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.")
flags.DurationVar(&o.GracefulShutdownTimeout, "graceful-shutdown-timeout", o.GracefulShutdownTimeout, "Timeout duration for graceful shutdown of the server. The server will wait for active connections to close before forcefully terminating.")
flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.")
flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.")

Expand Down Expand Up @@ -198,6 +201,7 @@ func (o *ProxyRunOptions) Print() {
klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel)
klog.V(1).Infof("CipherSuites set to %q.\n", o.CipherSuites)
klog.V(1).Infof("XfrChannelSize set to %d.\n", o.XfrChannelSize)
klog.V(1).Infof("GracefulShutdownTimeout set to %v.\n", o.GracefulShutdownTimeout)
}

func (o *ProxyRunOptions) Validate() error {
Expand Down Expand Up @@ -382,6 +386,7 @@ func NewProxyRunOptions() *ProxyRunOptions {
EnableLeaseController: false,
LeaseNamespace: "kube-system",
LeaseLabel: "k8s-app=konnectivity-server",
GracefulShutdownTimeout: 15 * time.Second,
}
return &o
}
Expand Down
116 changes: 97 additions & 19 deletions cmd/server/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type Proxy struct {
server *server.ProxyServer
}

type StopFunc func()
type StopFunc func(context.Context) error

func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
o.Print()
Expand Down Expand Up @@ -145,16 +145,12 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return fmt.Errorf("failed to run the frontend server: %v", err)
}
if frontendStop != nil {
defer frontendStop()
}

klog.V(1).Infoln("Starting agent server for tunnel connections.")
err = p.runAgentServer(o, p.server)
if err != nil {
return fmt.Errorf("failed to run the agent server: %v", err)
}
defer p.agentServer.Stop()

labels, err := util.ParseLabels(o.LeaseLabel)
if err != nil {
Expand Down Expand Up @@ -182,17 +178,97 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return fmt.Errorf("failed to run the admin server: %v", err)
}
defer p.adminServer.Close()

klog.V(1).Infoln("Starting health server for healthchecks.")
err = p.runHealthServer(o, p.server)
if err != nil {
return fmt.Errorf("failed to run the health server: %v", err)
}
defer p.healthServer.Close()

<-stopCh
klog.V(1).Infoln("Shutting down server.")
klog.V(1).Infoln("Received shutdown signal, initiating graceful shutdown.")

// Start graceful shutdown with timeout
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), o.GracefulShutdownTimeout)
defer shutdownCancel()

// Create a WaitGroup to track shutdown of all components
var wg sync.WaitGroup

// Shutdown frontend server gracefully (if available)
if frontendStop != nil {
wg.Add(1)
go func() {
defer wg.Done()
klog.V(1).Infoln("Gracefully stopping frontend server...")
if err := frontendStop(shutdownCtx); err != nil {
klog.ErrorS(err, "failed to shut down frontend server")
} else {
klog.V(1).Infoln("frontend server stopped.")
}
}()
}

// Shutdown agent server gracefully
wg.Add(1)
go func() {
defer wg.Done()
klog.V(1).Infoln("Gracefully stopping agent server...")
p.agentServer.GracefulStop()
klog.V(1).Infoln("agent server stopped.")
}()

// Shutdown admin server gracefully
wg.Add(1)
go func() {
defer wg.Done()
klog.V(1).Infoln("Gracefully stopping admin server...")
if err := p.adminServer.Shutdown(shutdownCtx); err != nil {
klog.ErrorS(err, "failed to shut down admin server")

} else {
klog.V(1).Infoln("admin server stopped.")
}
}()

// Shutdown health server gracefully
wg.Add(1)
go func() {
defer wg.Done()
klog.V(1).Infoln("Gracefully stopping health server...")
if err := p.healthServer.Shutdown(shutdownCtx); err != nil {
klog.ErrorS(err, "failed to shut down health server")
} else {
klog.V(1).Infoln("health server stopped.")
}
}()

// Wait for all servers to shutdown or timeout
shutdownComplete := make(chan struct{})
go func() {
wg.Wait()
close(shutdownComplete)
}()

select {
case <-shutdownComplete:
klog.V(1).Infoln("Graceful shutdown completed successfully.")
case <-shutdownCtx.Done():
klog.Warningf("Graceful shutdown timed out after %v, forcing termination.", o.GracefulShutdownTimeout)
// Force stop all servers that might still be running
if p.agentServer != nil {
p.agentServer.Stop()
}
if p.adminServer != nil {
p.adminServer.Close()
}
if p.healthServer != nil {
p.healthServer.Close()
}
// frontend server's force-stop is handled by its StopFunc
}

klog.V(1).Infoln("Server shutdown complete.")

return nil
}
Expand Down Expand Up @@ -260,7 +336,10 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt
"udsFile", o.UdsName,
)
go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) })
stop = grpcServer.GracefulStop
stop = func(_ context.Context) error {
grpcServer.GracefulStop()
return nil
}
} else {
// http-connect
server := &http.Server{
Expand All @@ -269,9 +348,8 @@ func (p *Proxy) runUDSFrontendServer(ctx context.Context, o *options.ProxyRunOpt
Server: s,
},
}
stop = func() {
err := server.Shutdown(ctx)
klog.ErrorS(err, "error shutting down server")
stop = func(shutdownCtx context.Context) error {
return server.Shutdown(shutdownCtx)
}
labels := runpprof.Labels(
"core", "udsHttpFrontend",
Expand Down Expand Up @@ -329,7 +407,7 @@ func (p *Proxy) getTLSConfig(caFile, certFile, keyFile string, cipherSuites []st
return tlsConfig, nil
}

func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOptions, s *server.ProxyServer) (StopFunc, error) {
func (p *Proxy) runMTLSFrontendServer(_ context.Context, o *options.ProxyRunOptions, s *server.ProxyServer) (StopFunc, error) {
var stop StopFunc

var tlsConfig *tls.Config
Expand All @@ -356,7 +434,10 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp
"port", strconv.Itoa(o.ServerPort),
)
go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) })
stop = grpcServer.GracefulStop
stop = func(_ context.Context) error {
grpcServer.GracefulStop()
return nil
}
} else {
// http-connect
server := &http.Server{
Expand All @@ -368,11 +449,8 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp
},
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
}
stop = func() {
err := server.Shutdown(ctx)
if err != nil {
klog.ErrorS(err, "failed to shutdown server")
}
stop = func(shutdownCtx context.Context) error {
return server.Shutdown(shutdownCtx)
}
labels := runpprof.Labels(
"core", "mtlsHttpFrontend",
Expand Down