@@ -19,7 +19,7 @@ import (
1919 "k8s.io/client-go/informers"
2020 "k8s.io/client-go/kubernetes"
2121 listersv1 "k8s.io/client-go/listers/core/v1"
22- kubeCache "k8s.io/client-go/tools/cache"
22+ k8scache "k8s.io/client-go/tools/cache"
2323 "k8s.io/client-go/tools/leaderelection"
2424
2525 "github.com/jacobbrewer1/goredis"
@@ -28,8 +28,8 @@ import (
2828 "github.com/jacobbrewer1/vaulty/vsql"
2929 "github.com/jacobbrewer1/web/cache"
3030 "github.com/jacobbrewer1/web/logging"
31+ pkgsync "github.com/jacobbrewer1/web/sync"
3132 "github.com/jacobbrewer1/web/version"
32- "github.com/jacobbrewer1/workerpool"
3333)
3434
3535const (
@@ -112,19 +112,19 @@ type (
112112 kubernetesInformerFactory informers.SharedInformerFactory
113113
114114 // podInformer is an informer for Kubernetes Pod objects.
115- podInformer kubeCache .SharedIndexInformer
115+ podInformer k8scache .SharedIndexInformer
116116
117117 // podLister is a lister for Kubernetes Pod objects.
118118 podLister listersv1.PodLister
119119
120120 // secretInformer is an informer for Kubernetes Secret objects.
121- secretInformer kubeCache .SharedIndexInformer
121+ secretInformer k8scache .SharedIndexInformer
122122
123123 // secretLister is a lister for Kubernetes Secret objects.
124124 secretLister listersv1.SecretLister
125125
126126 // configMapInformer is an informer for Kubernetes ConfigMap objects.
127- configMapInformer kubeCache .SharedIndexInformer
127+ configMapInformer k8scache .SharedIndexInformer
128128
129129 // configMapLister is a lister for Kubernetes ConfigMap objects.
130130 configMapLister listersv1.ConfigMapLister
@@ -138,8 +138,8 @@ type (
138138 // redisPool is the redis pool for the application.
139139 redisPool goredis.Pool
140140
141- // workerPool is the worker pool that can execute tasks concurrently .
142- workerPool workerpool. Pool
141+ // workerPools is a map of worker pools for the application, keyed by pool name .
142+ workerPools sync. Map
143143
144144 // indefiniteAsyncTasks is the list of indefinite async tasks for the application.
145145 indefiniteAsyncTasks sync.Map
@@ -387,10 +387,13 @@ func (a *App) Shutdown() {
387387 a .serviceEndpointHashBucket .Shutdown ()
388388 }
389389
390- // Stop the worker pool if it exists.
391- if a .workerPool != nil {
392- a .workerPool .Stop ()
393- }
390+ a .workerPools .Range (func (k any , v any ) bool {
391+ if wp , ok := v .(pkgsync.WorkerPool ); ok {
392+ a .l .Info ("stopping worker pool" , "name" , k )
393+ wp .Close ()
394+ }
395+ return true
396+ })
394397
395398 // Close the NATS client if it exists.
396399 if a .natsClient != nil {
@@ -467,10 +470,6 @@ func (a *App) LeaderChange() <-chan struct{} {
467470
468471// StartServer starts a new server with the given name and http.Server.
469472func (a * App ) StartServer (name string , srv * http.Server ) error {
470- if _ , found := a .servers .Load (name ); found {
471- return fmt .Errorf ("server %s already exists" , name )
472- }
473-
474473 // If the server handler is gorilla mux, check the not found handler and method not allowed handler
475474 if muxRouter , ok := srv .Handler .(* mux.Router ); ok {
476475 if muxRouter .NotFoundHandler == nil {
@@ -483,7 +482,9 @@ func (a *App) StartServer(name string, srv *http.Server) error {
483482 }
484483 }
485484
486- a .servers .Store (name , srv )
485+ if _ , loaded := a .servers .LoadOrStore (name , srv ); loaded {
486+ return fmt .Errorf ("server %s already exists" , name )
487+ }
487488 a .startServer (name , srv )
488489 return nil
489490}
@@ -516,13 +517,21 @@ func (a *App) RedisPool() goredis.Pool {
516517 return a .redisPool
517518}
518519
519- // WorkerPool returns the worker pool for the application.
520- func (a * App ) WorkerPool () workerpool.Pool {
521- if a .workerPool == nil {
522- a .l .Error ("worker pool has not been registered" )
523- panic ("worker pool has not been registered" )
520+ // WorkerPool returns the application worker pool, if one exists.
521+ func (a * App ) WorkerPool (name string ) pkgsync.WorkerPool {
522+ v , ok := a .workerPools .Load (name )
523+ if ! ok {
524+ a .l .Error ("worker pool has not been registered" , "name" , name )
525+ panic (fmt .Sprintf ("worker pool '%s' has not been registered" , name ))
524526 }
525- return a .workerPool
527+
528+ wp , ok := v .(pkgsync.WorkerPool )
529+ if ! ok {
530+ a .l .Error ("worker pool is not of type pkgsync.WorkerPool" , "name" , name )
531+ panic (fmt .Sprintf ("worker pool '%s' is not of type pkgsync.WorkerPool" , name ))
532+ }
533+
534+ return wp
526535}
527536
528537// NatsClient returns the NATS client for the application.
@@ -609,7 +618,7 @@ func (a *App) PodLister() listersv1.PodLister {
609618}
610619
611620// PodInformer returns the pod informer for the application.
612- func (a * App ) PodInformer () kubeCache .SharedIndexInformer {
621+ func (a * App ) PodInformer () k8scache .SharedIndexInformer {
613622 if a .podInformer == nil {
614623 a .l .Error ("pod informer has not been registered" )
615624 panic ("pod informer has not been registered" )
@@ -636,7 +645,7 @@ func (a *App) SecretLister() listersv1.SecretLister {
636645}
637646
638647// SecretInformer returns the secret informer for the application.
639- func (a * App ) SecretInformer () kubeCache .SharedIndexInformer {
648+ func (a * App ) SecretInformer () k8scache .SharedIndexInformer {
640649 if a .secretInformer == nil {
641650 a .l .Error ("secret informer has not been registered" )
642651 panic ("secret informer has not been registered" )
@@ -654,7 +663,7 @@ func (a *App) ConfigMapLister() listersv1.ConfigMapLister {
654663}
655664
656665// ConfigMapInformer returns the config map informer for the application.
657- func (a * App ) ConfigMapInformer () kubeCache .SharedIndexInformer {
666+ func (a * App ) ConfigMapInformer () k8scache .SharedIndexInformer {
658667 if a .configMapInformer == nil {
659668 a .l .Error ("config map informer has not been registered" )
660669 panic ("config map informer has not been registered" )
0 commit comments