Skip to content

Commit 9c65b79

Browse files
committed
refactor peerproxy_handler and add unit test
1 parent 3a0f6c1 commit 9c65b79

File tree

2 files changed

+148
-76
lines changed

2 files changed

+148
-76
lines changed

staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go

Lines changed: 55 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,6 @@ type peerProxyHandler struct {
8282
finishedSync atomic.Bool
8383
}
8484

85-
type serviceableByResponse struct {
86-
locallyServiceable bool
87-
errorFetchingAddressFromLease bool
88-
peerEndpoints []string
89-
}
90-
9185
// responder implements rest.Responder for assisting a connector in writing objects or errors.
9286
type responder struct {
9387
w http.ResponseWriter
@@ -149,84 +143,97 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
149143
gvr.Group = "core"
150144
}
151145

152-
// find servers that are capable of serving this request
153-
serviceableByResp, err := h.findServiceableByServers(gvr)
146+
apiservers, err := h.findServiceableByServers(gvr)
154147
if err != nil {
155-
// this means that resource is an aggregated API or a CR since it wasn't found in SV informer cache, pass as it is
156-
handler.ServeHTTP(w, r)
157-
return
158-
}
159-
// found the gvr locally, pass request to the next handler in local apiserver
160-
if serviceableByResp.locallyServiceable {
148+
// resource wasn't found in SV informer cache which means that resource is an aggregated API
149+
// or a CR. This situation is ok to be handled by local handler.
161150
handler.ServeHTTP(w, r)
162151
return
163152
}
164153

165-
gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version}
166-
if serviceableByResp.errorFetchingAddressFromLease {
167-
klog.ErrorS(err, "error fetching ip and port of remote server while proxying")
154+
locallyServiceable, peerEndpoints, err := h.resolveServingLocation(apiservers)
155+
if err != nil {
156+
gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version}
157+
klog.ErrorS(err, "error finding serviceable-by apiservers for the requested resource", "gvr", gvr)
168158
responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable("Error getting ip and port info of the remote server while proxying"), h.serializer, gv, w, r)
169159
return
170160
}
171161

172-
// no apiservers were found that could serve the request, pass request to
173-
// next handler, that should eventually serve 404
174-
162+
// pass request to the next handler if found the gvr locally.
175163
// TODO: maintain locally serviceable GVRs somewhere so that we dont have to
176164
// consult the storageversion-informed map for those
177-
if len(serviceableByResp.peerEndpoints) == 0 {
165+
if locallyServiceable {
166+
handler.ServeHTTP(w, r)
167+
return
168+
}
169+
170+
if len(peerEndpoints) == 0 {
178171
klog.Errorf("gvr %v is not served by anything in this cluster", gvr)
179172
handler.ServeHTTP(w, r)
180173
return
181174
}
182175

183176
// otherwise, randomly select an apiserver and proxy request to it
184-
rand := rand.Intn(len(serviceableByResp.peerEndpoints))
185-
destServerHostPort := serviceableByResp.peerEndpoints[rand]
177+
rand := rand.Intn(len(peerEndpoints))
178+
destServerHostPort := peerEndpoints[rand]
186179
h.proxyRequestToDestinationAPIServer(r, w, destServerHostPort)
187-
188180
})
189181
}
190182

191-
func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResource) (serviceableByResponse, error) {
192-
183+
func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResource) (*sync.Map, error) {
193184
apiserversi, ok := h.svMap.Load(gvr)
194-
195-
// no value found for the requested gvr in svMap
196185
if !ok || apiserversi == nil {
197-
return serviceableByResponse{}, fmt.Errorf("no StorageVersions found for the GVR: %v", gvr)
186+
return nil, fmt.Errorf("no storageVersions found for the GVR: %v", gvr)
198187
}
199-
apiservers := apiserversi.(*sync.Map)
200-
response := serviceableByResponse{}
188+
189+
apiservers, _ := apiserversi.(*sync.Map)
190+
return apiservers, nil
191+
}
192+
193+
func (h *peerProxyHandler) resolveServingLocation(apiservers *sync.Map) (bool, []string, error) {
201194
var peerServerEndpoints []string
195+
var locallyServiceable bool
196+
var respErr error
197+
202198
apiservers.Range(func(key, value interface{}) bool {
203199
apiserverKey := key.(string)
204200
if apiserverKey == h.serverId {
205-
response.locallyServiceable = true
201+
locallyServiceable = true
206202
// stop iteration
207203
return false
208204
}
209205

210-
hostPort, err := h.reconciler.GetEndpoint(apiserverKey)
206+
hostPort, err := h.hostportInfo(apiserverKey)
211207
if err != nil {
212-
response.errorFetchingAddressFromLease = true
213-
klog.ErrorS(err, "failed to get peer ip from storage lease for server", "serverID", apiserverKey)
208+
respErr = err
214209
// continue with iteration
215210
return true
216211
}
217-
// check ip format
218-
_, _, err = net.SplitHostPort(hostPort)
219-
if err != nil {
220-
response.errorFetchingAddressFromLease = true
221-
klog.ErrorS(err, "invalid address found for server", "serverID", apiserverKey)
222-
return true
223-
}
212+
224213
peerServerEndpoints = append(peerServerEndpoints, hostPort)
225214
return true
226215
})
227216

228-
response.peerEndpoints = peerServerEndpoints
229-
return response, nil
217+
// reset err if there was atleast one valid peer server found.
218+
if len(peerServerEndpoints) > 0 {
219+
respErr = nil
220+
}
221+
222+
return locallyServiceable, peerServerEndpoints, respErr
223+
}
224+
225+
func (h *peerProxyHandler) hostportInfo(apiserverKey string) (string, error) {
226+
hostport, err := h.reconciler.GetEndpoint(apiserverKey)
227+
if err != nil {
228+
return "", err
229+
}
230+
// check ip format
231+
_, _, err = net.SplitHostPort(hostport)
232+
if err != nil {
233+
return "", err
234+
}
235+
236+
return hostport, nil
230237
}
231238

232239
func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, rw http.ResponseWriter, host string) {
@@ -248,13 +255,11 @@ func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request,
248255
defer cancelFn()
249256

250257
proxyRoundTripper := transport.NewAuthProxyRoundTripper(user.GetName(), user.GetUID(), user.GetGroups(), user.GetExtra(), h.proxyTransport)
251-
252258
delegate := &epmetrics.ResponseWriterDelegator{ResponseWriter: rw}
253259
w := responsewriter.WrapForHTTP1Or2(delegate)
254260

255261
handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, false, &responder{w: w, ctx: req.Context()})
256262
handler.ServeHTTP(w, newReq)
257-
// Increment the count of proxied requests
258263
metrics.IncPeerProxiedRequest(req.Context(), strconv.Itoa(delegate.Status()))
259264
}
260265

@@ -280,11 +285,13 @@ func (h *peerProxyHandler) updateSV(oldObj interface{}, newObj interface{}) {
280285
klog.Error("Invalid StorageVersion provided to updateSV()")
281286
return
282287
}
288+
283289
newSV, ok := newObj.(*v1alpha1.StorageVersion)
284290
if !ok {
285291
klog.Error("Invalid StorageVersion provided to updateSV()")
286292
return
287293
}
294+
288295
h.updateSVMap(oldSV, newSV)
289296
}
290297

@@ -295,17 +302,17 @@ func (h *peerProxyHandler) deleteSV(obj interface{}) {
295302
klog.Error("Invalid StorageVersion provided to deleteSV()")
296303
return
297304
}
305+
298306
h.updateSVMap(sv, nil)
299307
}
300308

301309
// Delete old storageversion, add new storagversion
302310
func (h *peerProxyHandler) updateSVMap(oldSV *v1alpha1.StorageVersion, newSV *v1alpha1.StorageVersion) {
303311
if oldSV != nil {
304-
// delete old SV entries
305312
h.deleteSVFromMap(oldSV)
306313
}
314+
307315
if newSV != nil {
308-
// add new SV entries
309316
h.addSVToMap(newSV)
310317
}
311318
}

0 commit comments

Comments
 (0)