@@ -59,7 +59,14 @@ import (
5959 "github.com/projectcapsule/capsule-proxy/internal/webserver/middleware"
6060)
6161
62- func NewKubeFilter (opts options.ListenerOpts , srv options.ServerOptions , gates featuregate.FeatureGate , rbReflector * controllers.RoleBindingReflector , clientOverride client.Reader , client client.Client ) (Filter , error ) {
62+ func NewKubeFilter (
63+ opts options.ListenerOpts ,
64+ srv options.ServerOptions ,
65+ gates featuregate.FeatureGate ,
66+ rbReflector * controllers.RoleBindingReflector ,
67+ clientOverride client.Reader ,
68+ mgr ctrl.Manager ,
69+ ) (Filter , error ) {
6370 reverseProxy := httputil .NewSingleHostReverseProxy (opts .KubernetesControlPlaneURL ())
6471 reverseProxy .FlushInterval = time .Millisecond * 100
6572
@@ -71,10 +78,11 @@ func NewKubeFilter(opts options.ListenerOpts, srv options.ServerOptions, gates f
7178 reverseProxy .Transport = reverseProxyTransport
7279
7380 return & kubeFilter {
81+ mgr : mgr ,
7482 gates : gates ,
7583 reader : clientOverride ,
76- writer : client ,
77- managerReader : client ,
84+ writer : mgr . GetClient () ,
85+ managerReader : mgr . GetClient () ,
7886 allowedPaths : sets .New ("/api" , "/apis" , "/version" ),
7987 authTypes : opts .AuthTypes (),
8088 ignoredUserGroups : sets .New (opts .IgnoredGroupNames ()... ),
@@ -93,6 +101,7 @@ func NewKubeFilter(opts options.ListenerOpts, srv options.ServerOptions, gates f
93101}
94102
95103type kubeFilter struct {
104+ mgr ctrl.Manager
96105 allowedPaths sets.Set [string ]
97106 authTypes []req.AuthType
98107 ignoredUserGroups sets.Set [string ]
@@ -113,11 +122,86 @@ type kubeFilter struct {
113122 writer client.Writer
114123}
115124
125+ //nolint:funlen
126+ func (n * kubeFilter ) Start (ctx context.Context ) error {
127+ r := mux .NewRouter ()
128+ r .Use (handlers .RecoveryHandler ())
129+
130+ r .Path ("/_healthz" ).Subrouter ().HandleFunc ("" , func (writer http.ResponseWriter , _ * http.Request ) {
131+ writer .WriteHeader (http .StatusOK )
132+ _ , _ = writer .Write ([]byte ("ok" ))
133+ })
134+
135+ root := r .PathPrefix ("" ).Subrouter ()
136+ n .registerModules (ctx , root )
137+ root .Use (
138+ n .reverseProxyMiddleware ,
139+ middleware .CheckPaths (n .log , n .allowedPaths , n .impersonateHandler ),
140+ middleware .CheckJWTMiddleware (n .writer ),
141+ )
142+ root .PathPrefix ("/" ).HandlerFunc (func (writer http.ResponseWriter , request * http.Request ) {
143+ n .impersonateHandler (writer , request )
144+ })
145+
146+ var srv * http.Server
147+
148+ go func () {
149+ var err error
150+
151+ addr := fmt .Sprintf ("0.0.0.0:%d" , n .serverOptions .ListeningPort ())
152+
153+ if n .serverOptions .IsListeningTLS () {
154+ tlsConfig := & tls.Config {
155+ MinVersion : tls .VersionTLS12 ,
156+ ClientCAs : n .serverOptions .GetCertificateAuthorityPool (),
157+ }
158+
159+ for _ , authType := range n .authTypes {
160+ if authType == req .TLSCertificate {
161+ tlsConfig .ClientAuth = tls .VerifyClientCertIfGiven
162+
163+ break
164+ }
165+ }
166+
167+ srv = & http.Server {
168+ Handler : r ,
169+ Addr : addr ,
170+ TLSConfig : tlsConfig ,
171+ ReadHeaderTimeout : 5 * time .Second ,
172+ }
173+ err = srv .ListenAndServeTLS (n .serverOptions .TLSCertificatePath (), n .serverOptions .TLSCertificateKeyPath ())
174+ } else {
175+ srv = & http.Server {
176+ Handler : r ,
177+ Addr : addr ,
178+ ReadHeaderTimeout : 5 * time .Second ,
179+ }
180+ err = srv .ListenAndServe ()
181+ }
182+
183+ if err != nil {
184+ panic (err )
185+ }
186+ }()
187+
188+ <- ctx .Done ()
189+
190+ return srv .Shutdown (ctx )
191+ }
192+
116193func (n * kubeFilter ) LivenessProbe (* http.Request ) error {
117194 return nil
118195}
119196
120197func (n * kubeFilter ) ReadinessProbe (req * http.Request ) (err error ) {
198+ select {
199+ case <- n .mgr .Elected ():
200+ // OK, we're the leader..
201+ default :
202+ return nil
203+ }
204+
121205 scheme := "http"
122206 clt := & http.Client {}
123207
@@ -158,6 +242,17 @@ func (n *kubeFilter) ReadinessProbe(req *http.Request) (err error) {
158242 return nil
159243}
160244
245+ func (n * kubeFilter ) BearerToken () string {
246+ if time .Now ().After (n .bearerTokenExpirationTime ) {
247+ n .log .V (5 ).Info ("Token expired. Reading new token from file" , "token" , n .bearerToken , "token file" , n .bearerTokenFile )
248+ token , _ := os .ReadFile (n .bearerTokenFile )
249+ n .bearerToken = string (token )
250+ n .bearerTokenExpirationTime = bearerExpirationTime (string (token ))
251+ }
252+
253+ return n .bearerToken
254+ }
255+
161256func (n * kubeFilter ) reverseProxyMiddleware (next http.Handler ) http.Handler {
162257 return http .HandlerFunc (func (writer http.ResponseWriter , request * http.Request ) {
163258 next .ServeHTTP (writer , request )
@@ -346,74 +441,6 @@ func (n *kubeFilter) registerModules(ctx context.Context, root *mux.Router) {
346441 }
347442}
348443
349- //nolint:funlen
350- func (n * kubeFilter ) Start (ctx context.Context ) error {
351- r := mux .NewRouter ()
352- r .Use (handlers .RecoveryHandler ())
353-
354- r .Path ("/_healthz" ).Subrouter ().HandleFunc ("" , func (writer http.ResponseWriter , _ * http.Request ) {
355- writer .WriteHeader (http .StatusOK )
356- _ , _ = writer .Write ([]byte ("ok" ))
357- })
358-
359- root := r .PathPrefix ("" ).Subrouter ()
360- n .registerModules (ctx , root )
361- root .Use (
362- n .reverseProxyMiddleware ,
363- middleware .CheckPaths (n .log , n .allowedPaths , n .impersonateHandler ),
364- middleware .CheckJWTMiddleware (n .writer ),
365- )
366- root .PathPrefix ("/" ).HandlerFunc (func (writer http.ResponseWriter , request * http.Request ) {
367- n .impersonateHandler (writer , request )
368- })
369-
370- var srv * http.Server
371-
372- go func () {
373- var err error
374-
375- addr := fmt .Sprintf ("0.0.0.0:%d" , n .serverOptions .ListeningPort ())
376-
377- if n .serverOptions .IsListeningTLS () {
378- tlsConfig := & tls.Config {
379- MinVersion : tls .VersionTLS12 ,
380- ClientCAs : n .serverOptions .GetCertificateAuthorityPool (),
381- }
382-
383- for _ , authType := range n .authTypes {
384- if authType == req .TLSCertificate {
385- tlsConfig .ClientAuth = tls .VerifyClientCertIfGiven
386-
387- break
388- }
389- }
390-
391- srv = & http.Server {
392- Handler : r ,
393- Addr : addr ,
394- TLSConfig : tlsConfig ,
395- ReadHeaderTimeout : 5 * time .Second ,
396- }
397- err = srv .ListenAndServeTLS (n .serverOptions .TLSCertificatePath (), n .serverOptions .TLSCertificateKeyPath ())
398- } else {
399- srv = & http.Server {
400- Handler : r ,
401- Addr : addr ,
402- ReadHeaderTimeout : 5 * time .Second ,
403- }
404- err = srv .ListenAndServe ()
405- }
406-
407- if err != nil {
408- panic (err )
409- }
410- }()
411-
412- <- ctx .Done ()
413-
414- return srv .Shutdown (ctx )
415- }
416-
417444func (n * kubeFilter ) getTenantsForOwner (ctx context.Context , username string , groups []string ) (proxyTenants []* tenant.ProxyTenant , err error ) {
418445 if strings .HasPrefix (username , serviceaccount .ServiceAccountUsernamePrefix ) {
419446 proxyTenants , err = n .getProxyTenantsForOwnerKind (ctx , capsulev1beta2 .ServiceAccountOwner , username )
@@ -547,17 +574,6 @@ func (n *kubeFilter) removingHopByHopHeaders(request *http.Request) {
547574 request .Header .Del (connectionHeaderName )
548575}
549576
550- func (n * kubeFilter ) BearerToken () string {
551- if time .Now ().After (n .bearerTokenExpirationTime ) {
552- n .log .V (5 ).Info ("Token expired. Reading new token from file" , "token" , n .bearerToken , "token file" , n .bearerTokenFile )
553- token , _ := os .ReadFile (n .bearerTokenFile )
554- n .bearerToken = string (token )
555- n .bearerTokenExpirationTime = bearerExpirationTime (string (token ))
556- }
557-
558- return n .bearerToken
559- }
560-
561577func bearerExpirationTime (tokenString string ) time.Time {
562578 token , _ , _ := new (jwt.Parser ).ParseUnverified (tokenString , jwt.MapClaims {})
563579 claims , _ := token .Claims .(jwt.MapClaims )
0 commit comments