Skip to content

Commit b8c10e2

Browse files
Merge pull request #16 from gocrane/feature/vnode-pods-interface
[Feature] support /pods interface for http server
2 parents 9037b71 + f034283 commit b8c10e2

File tree

7 files changed

+807
-79
lines changed

7 files changed

+807
-79
lines changed

cmd/kubeocean-proxier/main.go

Lines changed: 97 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444

4545
cloudv1beta1 "github.com/gocrane/kubeocean/api/v1beta1"
4646
"github.com/gocrane/kubeocean/pkg/proxier"
47+
"github.com/gocrane/kubeocean/pkg/proxier/podcache"
4748
"github.com/gocrane/kubeocean/pkg/version"
4849
)
4950

@@ -90,8 +91,15 @@ func main() {
9091
os.Exit(1)
9192
}
9293

94+
// Setup managers
95+
physicalManager, virtualManager, err := setupManager(physicalConfig, config.ClusterBindingName)
96+
if err != nil {
97+
setupLog.Error(err, "failed to setup managers")
98+
os.Exit(1)
99+
}
100+
93101
// Setup pod monitoring
94-
err = setupPodMonitoring(ctx, physicalConfig)
102+
podController, err := setupPodMonitoring(physicalManager)
95103
if err != nil {
96104
setupLog.Error(err, "failed to setup pod monitoring")
97105
os.Exit(1)
@@ -105,21 +113,21 @@ func main() {
105113
}
106114

107115
// Setup proxier services
108-
kubeletProxy, httpServer, vnodeProxierAgent, err := setupProxierServices(ctx, config, tlsConfig, virtualClient, physicalClient, physicalConfig, clusterBinding, tokenManager)
116+
kubeletProxy, httpServer, vnodeProxierAgent, err := setupProxierServices(ctx, config, tlsConfig, virtualClient, physicalClient, physicalConfig, clusterBinding, tokenManager, virtualManager)
109117
if err != nil {
110118
setupLog.Error(err, "failed to setup proxier services")
111119
os.Exit(1)
112120
}
113121

114122
// Setup node monitoring
115-
err = setupNodeMonitoring(ctx, config, vnodeProxierAgent)
123+
nodeController, err := setupNodeMonitoring(virtualManager, config, vnodeProxierAgent)
116124
if err != nil {
117125
setupLog.Error(err, "failed to setup node monitoring")
118126
os.Exit(1)
119127
}
120128

121129
// Start all services
122-
err = startServices(ctx, config, kubeletProxy, httpServer, vnodeProxierAgent)
130+
err = startServices(ctx, config, physicalManager, virtualManager, podController, nodeController, kubeletProxy, httpServer, vnodeProxierAgent)
123131
if err != nil {
124132
setupLog.Error(err, "failed to start services")
125133
os.Exit(1)
@@ -250,13 +258,8 @@ func setupAuthentication(ctx context.Context, physicalConfig *rest.Config) (*pro
250258
return tokenManager, nil
251259
}
252260

253-
// setupPodMonitoring sets up pod controller and monitoring
254-
func setupPodMonitoring(ctx context.Context, physicalConfig *rest.Config) error {
255-
proxier.InitGlobalPodMapper()
256-
setupLog.Info("Global pod mapper initialized")
257-
258-
setupLog.Info("Setting up POD Controller for physical cluster")
259-
261+
// setupManager creates both physical and virtual managers
262+
func setupManager(physicalConfig *rest.Config, clusterBindingName string) (ctrl.Manager, ctrl.Manager, error) {
260263
// Create physical cluster manager
261264
physicalManager, err := ctrl.NewManager(physicalConfig, ctrl.Options{
262265
Scheme: scheme,
@@ -275,9 +278,42 @@ func setupPodMonitoring(ctx context.Context, physicalConfig *rest.Config) error
275278
},
276279
})
277280
if err != nil {
278-
return fmt.Errorf("unable to create physical manager for pod controller: %w", err)
281+
return nil, nil, fmt.Errorf("unable to create physical manager: %w", err)
282+
}
283+
284+
// Create virtual cluster manager
285+
virtualConfig := ctrl.GetConfigOrDie()
286+
virtualManager, err := ctrl.NewManager(virtualConfig, ctrl.Options{
287+
Scheme: scheme,
288+
Metrics: server.Options{
289+
BindAddress: "0",
290+
},
291+
HealthProbeBindAddress: "0",
292+
Cache: cache.Options{
293+
ByObject: map[client.Object]cache.ByObject{
294+
&corev1.Node{}: {
295+
Label: labels.SelectorFromSet(map[string]string{
296+
cloudv1beta1.LabelClusterBinding: clusterBindingName,
297+
cloudv1beta1.LabelManagedBy: cloudv1beta1.LabelManagedByValue,
298+
}),
299+
},
300+
},
301+
},
302+
})
303+
if err != nil {
304+
return nil, nil, fmt.Errorf("unable to create virtual manager: %w", err)
279305
}
280306

307+
return physicalManager, virtualManager, nil
308+
}
309+
310+
// setupPodMonitoring sets up pod controller and monitoring
311+
func setupPodMonitoring(physicalManager ctrl.Manager) (*proxier.PodController, error) {
312+
proxier.InitGlobalPodMapper()
313+
setupLog.Info("Global pod mapper initialized")
314+
315+
setupLog.Info("Setting up POD Controller for physical cluster")
316+
281317
// Create and setup POD controller
282318
podController := proxier.NewPodController(
283319
physicalManager.GetClient(),
@@ -286,26 +322,11 @@ func setupPodMonitoring(ctx context.Context, physicalConfig *rest.Config) error
286322
)
287323

288324
if err := podController.SetupWithManager(physicalManager); err != nil {
289-
return fmt.Errorf("unable to setup pod controller: %w", err)
325+
return nil, fmt.Errorf("unable to setup pod controller: %w", err)
290326
}
291327

292-
// Start physical manager
293-
go func() {
294-
setupLog.Info("Starting physical cluster manager for pod controller")
295-
if err := physicalManager.Start(ctx); err != nil {
296-
setupLog.Error(err, "problem running physical manager")
297-
}
298-
}()
299-
300-
// Initialize existing pods
301-
time.Sleep(2 * time.Second)
302-
setupLog.Info("Initializing existing pods in physical cluster")
303-
if err := podController.InitializeExistingPods(ctx); err != nil {
304-
return fmt.Errorf("failed to initialize existing pods: %w", err)
305-
}
306-
setupLog.Info("Pod controller initialization completed")
307-
308-
return nil
328+
setupLog.Info("Pod controller setup completed")
329+
return podController, nil
309330
}
310331

311332
// createPhysicalClusterClients creates clients for the physical cluster
@@ -530,7 +551,7 @@ func setupTLSConfiguration(ctx context.Context, config *ProxierConfig, clusterBi
530551
}
531552

532553
// setupProxierServices sets up kubelet proxy, HTTP server, and optionally VNode proxier agent
533-
func setupProxierServices(ctx context.Context, config *ProxierConfig, tlsConfig *TLSConfiguration, virtualClient client.Client, physicalClient kubernetes.Interface, physicalConfig *rest.Config, clusterBinding *cloudv1beta1.ClusterBinding, tokenManager *proxier.TokenManager) (proxier.KubeletProxy, proxier.HTTPServer, *proxier.VNodeProxierAgent, error) {
554+
func setupProxierServices(ctx context.Context, config *ProxierConfig, tlsConfig *TLSConfiguration, virtualClient client.Client, physicalClient kubernetes.Interface, physicalConfig *rest.Config, clusterBinding *cloudv1beta1.ClusterBinding, tokenManager *proxier.TokenManager, virtualManager ctrl.Manager) (proxier.KubeletProxy, proxier.HTTPServer, *proxier.VNodeProxierAgent, error) {
534555
virtualClientset, err := kubernetes.NewForConfig(ctrl.GetConfigOrDie())
535556
if err != nil {
536557
return nil, nil, nil, fmt.Errorf("unable to create virtual cluster clientset: %w", err)
@@ -570,6 +591,13 @@ func setupProxierServices(ctx context.Context, config *ProxierConfig, tlsConfig
570591
ctrl.Log.WithName("http-server"),
571592
)
572593

594+
// Create pod cache manager using virtualManager
595+
podCacheManager := podcache.NewPodCacheManager(virtualManager)
596+
if err := podCacheManager.Setup(ctx); err != nil {
597+
return nil, nil, nil, fmt.Errorf("failed to setup pod cache manager: %w", err)
598+
}
599+
setupLog.Info("Pod cache manager setup completed")
600+
573601
// Create VNode proxier agent if metrics enabled
574602
var vnodeProxierAgent *proxier.VNodeProxierAgent
575603
if config.MetricsEnabled {
@@ -594,6 +622,7 @@ func setupProxierServices(ctx context.Context, config *ProxierConfig, tlsConfig
594622
virtualClientset,
595623
clusterBinding.Spec.ClusterID,
596624
ctrl.Log.WithName("vnode-proxier-agent"),
625+
podCacheManager,
597626
)
598627

599628
if err := vnodeProxierAgent.Start(ctx); err != nil {
@@ -608,31 +637,9 @@ func setupProxierServices(ctx context.Context, config *ProxierConfig, tlsConfig
608637
}
609638

610639
// setupNodeMonitoring sets up node controller and monitoring
611-
func setupNodeMonitoring(ctx context.Context, config *ProxierConfig, vnodeProxierAgent *proxier.VNodeProxierAgent) error {
640+
func setupNodeMonitoring(virtualManager ctrl.Manager, config *ProxierConfig, vnodeProxierAgent *proxier.VNodeProxierAgent) (*proxier.NodeController, error) {
612641
setupLog.Info("Setting up Node Controller for proxier", "clusterBinding", config.ClusterBindingName)
613642

614-
virtualConfig := ctrl.GetConfigOrDie()
615-
virtualManager, err := ctrl.NewManager(virtualConfig, ctrl.Options{
616-
Scheme: scheme,
617-
Metrics: server.Options{
618-
BindAddress: "0",
619-
},
620-
HealthProbeBindAddress: "0",
621-
Cache: cache.Options{
622-
ByObject: map[client.Object]cache.ByObject{
623-
&corev1.Node{}: {
624-
Label: labels.SelectorFromSet(map[string]string{
625-
cloudv1beta1.LabelClusterBinding: config.ClusterBindingName,
626-
cloudv1beta1.LabelManagedBy: cloudv1beta1.LabelManagedByValue,
627-
}),
628-
},
629-
},
630-
},
631-
})
632-
if err != nil {
633-
return fmt.Errorf("unable to create virtual manager for node controller: %w", err)
634-
}
635-
636643
nodeController := &proxier.NodeController{
637644
Client: virtualManager.GetClient(),
638645
Scheme: virtualManager.GetScheme(),
@@ -642,50 +649,78 @@ func setupNodeMonitoring(ctx context.Context, config *ProxierConfig, vnodeProxie
642649
}
643650

644651
if err := nodeController.SetupWithManager(virtualManager); err != nil {
645-
return fmt.Errorf("unable to setup node controller: %w", err)
652+
return nil, fmt.Errorf("unable to setup node controller: %w", err)
646653
}
647654

648655
if vnodeProxierAgent != nil {
649656
nodeController.SetMetricsCollector(vnodeProxierAgent)
650657
setupLog.Info("Connected NodeController with VNodeProxierAgent")
651658
}
652659

660+
setupLog.Info("Node Controller setup completed")
661+
return nodeController, nil
662+
}
663+
664+
// startServices starts all the services
665+
func startServices(ctx context.Context, config *ProxierConfig, physicalManager, virtualManager ctrl.Manager, podController *proxier.PodController, nodeController *proxier.NodeController, kubeletProxy proxier.KubeletProxy, httpServer proxier.HTTPServer, vnodeProxierAgent *proxier.VNodeProxierAgent) error {
666+
// Start physical manager
667+
go func() {
668+
setupLog.Info("Starting physical cluster manager for pod controller")
669+
if err := physicalManager.Start(ctx); err != nil {
670+
setupLog.Error(err, "problem running physical manager")
671+
}
672+
}()
673+
674+
// Start virtual manager
653675
go func() {
676+
setupLog.Info("Starting virtual cluster manager for node controller")
654677
if err := virtualManager.Start(ctx); err != nil {
655678
setupLog.Error(err, "problem running virtual manager")
656679
}
657680
}()
658681

682+
// Wait for cache sync
683+
if !physicalManager.GetCache().WaitForCacheSync(ctx) {
684+
return fmt.Errorf("failed to wait for physical manager cache sync")
685+
}
659686
if !virtualManager.GetCache().WaitForCacheSync(ctx) {
660-
return fmt.Errorf("failed to wait for cache sync")
687+
return fmt.Errorf("failed to wait for virtual manager cache sync")
688+
}
689+
690+
// Initialize existing pods
691+
setupLog.Info("Initializing existing pods in physical cluster")
692+
if err := podController.InitializeExistingPods(ctx); err != nil {
693+
return fmt.Errorf("failed to initialize existing pods: %w", err)
661694
}
695+
setupLog.Info("Pod controller initialization completed")
662696

697+
// Sync existing nodes
663698
if err := nodeController.SyncExistingNodes(ctx); err != nil {
664699
return fmt.Errorf("failed to sync existing nodes: %w", err)
665700
}
701+
setupLog.Info("Node controller synchronization completed")
666702

703+
// Initialize VNodeProxierAgent with existing nodes
667704
if vnodeProxierAgent != nil {
668705
existingNodes := nodeController.GetCurrentNodes()
669706
if len(existingNodes) > 0 {
670707
setupLog.Info("Initializing VNodeProxierAgent with existing nodes", "nodeCount", len(existingNodes))
671708
vnodeProxierAgent.InitializeWithNodes(existingNodes)
672709
}
673710
}
711+
setupLog.Info("VNodeProxierAgent initialization completed")
674712

675-
setupLog.Info("Node Controller setup completed successfully")
676-
return nil
677-
}
678-
679-
// startServices starts all the services
680-
func startServices(ctx context.Context, config *ProxierConfig, kubeletProxy proxier.KubeletProxy, httpServer proxier.HTTPServer, vnodeProxierAgent *proxier.VNodeProxierAgent) error {
713+
// Start kubelet proxy
681714
if err := kubeletProxy.Start(ctx); err != nil {
682715
return fmt.Errorf("failed to start Kubelet proxy: %w", err)
683716
}
684717

718+
// Start HTTP server
685719
if err := httpServer.Start(ctx); err != nil {
686720
return fmt.Errorf("failed to start HTTP server: %w", err)
687721
}
688722

723+
// Start VNode HTTP server
689724
if vnodeProxierAgent != nil {
690725
if err := proxier.StartVNodeHTTPServer(vnodeProxierAgent, ctrl.Log.WithName("vnode-http-server"), config.VnodeBasePort); err != nil {
691726
return fmt.Errorf("failed to start VNode HTTP server: %w", err)

0 commit comments

Comments
 (0)