Skip to content

Commit 060a4d8

Browse files
authored
add histo metric for IP allocation latency per unique pod (#1027)
* add histo metric for IP allocation latency per unique pod Signed-off-by: Evan Baker <[email protected]> * fix typo in ipam metrics Signed-off-by: Evan Baker <[email protected]> * delint Signed-off-by: Evan Baker <[email protected]> * noop on non-existent key Signed-off-by: Evan Baker <[email protected]> * create bounded, mapped heap to record pod-first-seen times during ip allocation Signed-off-by: Evan Baker <[email protected]>
1 parent c970d07 commit 060a4d8

File tree

8 files changed

+312
-49
lines changed

8 files changed

+312
-49
lines changed

cns/ipampoolmonitor/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ var (
5656
)
5757
ipamRequestedIPConfigCount = prometheus.NewGauge(
5858
prometheus.GaugeOpts{
59-
Name: "ipam_reuested_ips",
59+
Name: "ipam_requested_ips",
6060
Help: "Requested IP count.",
6161
},
6262
)

cns/restserver/internalapi.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,8 +241,8 @@ func (service *HTTPRestService) ReconcileNCState(
241241
ipconfigRequest := cns.IPConfigRequest{
242242
DesiredIPAddress: secIpConfig.IPAddress,
243243
OrchestratorContext: jsonContext,
244-
PodInterfaceID: podInfo.InterfaceID(),
245244
InfraContainerID: podInfo.InfraContainerID(),
245+
PodInterfaceID: podInfo.InterfaceID(),
246246
}
247247

248248
if _, err := requestIPConfigHelper(service, ipconfigRequest); err != nil {

cns/restserver/ipam.go

Lines changed: 61 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -12,76 +12,96 @@ import (
1212
"github.com/Azure/azure-container-networking/cns/filter"
1313
"github.com/Azure/azure-container-networking/cns/logger"
1414
"github.com/Azure/azure-container-networking/cns/types"
15+
"github.com/pkg/errors"
1516
)
1617

1718
// used to request an IPConfig from the CNS state
1819
func (service *HTTPRestService) requestIPConfigHandler(w http.ResponseWriter, r *http.Request) {
19-
var (
20-
err error
21-
ipconfigRequest cns.IPConfigRequest
22-
podIpInfo cns.PodIpInfo
23-
returnCode types.ResponseCode
24-
returnMessage string
25-
)
26-
27-
err = service.Listener.Decode(w, r, &ipconfigRequest)
20+
var ipconfigRequest cns.IPConfigRequest
21+
err := service.Listener.Decode(w, r, &ipconfigRequest)
2822
operationName := "requestIPConfigHandler"
2923
logger.Request(service.Name+operationName, ipconfigRequest, err)
3024
if err != nil {
3125
return
3226
}
3327

3428
// retrieve ipconfig from nc
35-
_, returnCode, returnMessage = service.validateIPConfigRequest(ipconfigRequest)
36-
if returnCode == types.Success {
37-
if podIpInfo, err = requestIPConfigHelper(service, ipconfigRequest); err != nil {
38-
returnCode = types.FailedToAllocateIPConfig
39-
returnMessage = fmt.Sprintf("AllocateIPConfig failed: %v, IP config request is %s", err, ipconfigRequest)
29+
podInfo, returnCode, returnMessage := service.validateIPConfigRequest(ipconfigRequest)
30+
if returnCode != types.Success {
31+
reserveResp := &cns.IPConfigResponse{
32+
Response: cns.Response{
33+
ReturnCode: returnCode,
34+
Message: returnMessage,
35+
},
4036
}
37+
err = service.Listener.Encode(w, &reserveResp)
38+
logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, reserveResp.Response.ReturnCode, err)
39+
return
4140
}
4241

43-
resp := cns.Response{
44-
ReturnCode: returnCode,
45-
Message: returnMessage,
42+
// record a pod requesting an IP
43+
service.podsPendingIPAllocation.Push(podInfo.Key())
44+
45+
podIPInfo, err := requestIPConfigHelper(service, ipconfigRequest)
46+
if err != nil {
47+
reserveResp := &cns.IPConfigResponse{
48+
Response: cns.Response{
49+
ReturnCode: types.FailedToAllocateIPConfig,
50+
Message: fmt.Sprintf("AllocateIPConfig failed: %v, IP config request is %s", err, ipconfigRequest),
51+
},
52+
PodIpInfo: podIPInfo,
53+
}
54+
err = service.Listener.Encode(w, &reserveResp)
55+
logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, reserveResp.Response.ReturnCode, err)
56+
return
4657
}
4758

59+
// record a pod allocated an IP
60+
defer func() {
61+
// observe allocation wait time
62+
if since := service.podsPendingIPAllocation.Pop(podInfo.Key()); since > 0 {
63+
ipAllocationLatency.Observe(float64(since))
64+
}
65+
}()
4866
reserveResp := &cns.IPConfigResponse{
49-
Response: resp,
67+
Response: cns.Response{
68+
ReturnCode: types.Success,
69+
},
70+
PodIpInfo: podIPInfo,
5071
}
51-
reserveResp.PodIpInfo = podIpInfo
5272

5373
err = service.Listener.Encode(w, &reserveResp)
54-
logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, resp.ReturnCode, err)
74+
logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, reserveResp.Response.ReturnCode, err)
5575
}
5676

5777
func (service *HTTPRestService) releaseIPConfigHandler(w http.ResponseWriter, r *http.Request) {
58-
req := cns.IPConfigRequest{}
59-
resp := cns.Response{}
60-
var err error
61-
62-
defer func() {
63-
err = service.Listener.Encode(w, &resp)
64-
logger.ResponseEx(service.Name, req, resp, resp.ReturnCode, err)
65-
}()
66-
67-
err = service.Listener.Decode(w, r, &req)
78+
var req cns.IPConfigRequest
79+
err := service.Listener.Decode(w, r, &req)
6880
logger.Request(service.Name+"releaseIPConfigHandler", req, err)
6981
if err != nil {
70-
resp.ReturnCode = types.UnexpectedError
71-
resp.Message = err.Error()
82+
resp := cns.Response{
83+
ReturnCode: types.UnexpectedError,
84+
Message: err.Error(),
85+
}
7286
logger.Errorf("releaseIPConfigHandler decode failed becase %v, release IP config info %s", resp.Message, req)
87+
err = service.Listener.Encode(w, &resp)
88+
logger.ResponseEx(service.Name, req, resp, resp.ReturnCode, err)
7389
return
7490
}
7591

76-
var podInfo cns.PodInfo
77-
podInfo, resp.ReturnCode, resp.Message = service.validateIPConfigRequest(req)
92+
podInfo, returnCode, message := service.validateIPConfigRequest(req)
7893

7994
if err = service.releaseIPConfig(podInfo); err != nil {
80-
resp.ReturnCode = types.UnexpectedError
81-
resp.Message = err.Error()
82-
logger.Errorf("releaseIPConfigHandler releaseIPConfig failed because %v, release IP config info %s", resp.Message, req)
83-
return
95+
returnCode = types.UnexpectedError
96+
message = err.Error()
97+
logger.Errorf("releaseIPConfigHandler releaseIPConfig failed because %v, release IP config info %s", message, req)
8498
}
99+
resp := cns.Response{
100+
ReturnCode: returnCode,
101+
Message: message,
102+
}
103+
err = service.Listener.Encode(w, &resp)
104+
logger.ResponseEx(service.Name, req, resp, resp.ReturnCode, err)
85105
}
86106

87107
// MarkIPAsPendingRelease will set the IPs which are in PendingProgramming or Available to PendingRelease state
@@ -418,20 +438,15 @@ func (service *HTTPRestService) AllocateAnyAvailableIPConfig(podInfo cns.PodInfo
418438

419439
// If IPConfig is already allocated for pod, it returns that else it returns one of the available ipconfigs.
420440
func requestIPConfigHelper(service *HTTPRestService, req cns.IPConfigRequest) (cns.PodIpInfo, error) {
421-
var (
422-
podIpInfo cns.PodIpInfo
423-
isExist bool
424-
)
425-
426441
// check if ipconfig already allocated for this pod and return if exists or error
427442
// if error, ipstate is nil, if exists, ipstate is not nil and error is nil
428443
podInfo, err := cns.NewPodInfoFromIPConfigRequest(req)
429444
if err != nil {
430-
return podIpInfo, err
445+
return cns.PodIpInfo{}, errors.Wrapf(err, "failed to parse IPConfigRequest %v", req)
431446
}
432447

433-
if podIpInfo, isExist, err = service.GetExistingIPConfig(podInfo); err != nil || isExist {
434-
return podIpInfo, err
448+
if podIPInfo, isExist, err := service.GetExistingIPConfig(podInfo); err != nil || isExist {
449+
return podIPInfo, err
435450
}
436451

437452
// return desired IPConfig

cns/restserver/metrics.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ var httpRequestLatency = prometheus.NewHistogramVec(
1212
prometheus.HistogramOpts{
1313
Name: "http_request_latency_seconds",
1414
Help: "Request latency in seconds by endpoint, verb, and response code.",
15-
//nolint:gomnd
15+
//nolint:gomnd // default bucket consts
1616
Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), // 1 ms to ~16 seconds
1717
},
1818
// TODO(rbtr):
@@ -22,9 +22,19 @@ var httpRequestLatency = prometheus.NewHistogramVec(
2222
[]string{"url", "verb"},
2323
)
2424

25+
var ipAllocationLatency = prometheus.NewHistogram(
26+
prometheus.HistogramOpts{
27+
Name: "ip_allocation_latency_seconds",
28+
Help: "IP allocation latency in seconds",
29+
//nolint:gomnd // default bucket consts
30+
Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), // 1 ms to ~16 seconds
31+
},
32+
)
33+
2534
func init() {
2635
metrics.Registry.MustRegister(
2736
httpRequestLatency,
37+
ipAllocationLatency,
2838
)
2939
}
3040

cns/restserver/restserver.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/Azure/azure-container-networking/cns/nmagentclient"
1818
"github.com/Azure/azure-container-networking/cns/routes"
1919
"github.com/Azure/azure-container-networking/cns/types"
20+
"github.com/Azure/azure-container-networking/cns/types/bounded"
2021
acn "github.com/Azure/azure-container-networking/common"
2122
"github.com/Azure/azure-container-networking/store"
2223
)
@@ -48,6 +49,7 @@ type HTTPRestService struct {
4849
routingTable *routes.RoutingTable
4950
store store.KeyValueStore
5051
state *httpRestServiceState
52+
podsPendingIPAllocation *bounded.TimedSet
5153
sync.RWMutex
5254
dncPartitionKey string
5355
}
@@ -137,6 +139,7 @@ func NewHTTPRestService(config *common.ServiceConfig, imdsClientInterface imdscl
137139
PodIPConfigState: podIPConfigState,
138140
routingTable: routingTable,
139141
state: serviceState,
142+
podsPendingIPAllocation: bounded.NewTimedSet(250), // nolint:gomnd // maxpods
140143
}, nil
141144
}
142145

cns/types/bounded/mappedheap.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package bounded
2+
3+
import (
4+
"container/heap"
5+
)
6+
7+
// Item describes a type accepted by the mapped heap implementation.
8+
type Item interface {
9+
// Key is used for map index operations.
10+
Key() string
11+
// Less is used for heap sorting operations.
12+
Less(Item) bool
13+
// SetIndex is called by heap implementations to set the Item heap index.
14+
SetIndex(int)
15+
// Index returns the index of this Item.
16+
Index() int
17+
}
18+
19+
var _ heap.Interface = (*MappedHeap)(nil)
20+
21+
// MappedHeap is a combination of map and heap structures which allows for
22+
// efficient sorting, uniqueness guarantees, and constant time lookups.
23+
// Implements heap.Interface.
24+
type MappedHeap struct {
25+
m map[string]Item
26+
items []Item
27+
}
28+
29+
func NewMappedHeap() *MappedHeap {
30+
return &MappedHeap{
31+
m: make(map[string]Item),
32+
}
33+
}
34+
35+
func (mh MappedHeap) Contains(key string) (int, bool) {
36+
item, ok := mh.m[key]
37+
if ok {
38+
return item.Index(), true
39+
}
40+
return -1, false
41+
}
42+
43+
func (mh MappedHeap) Len() int {
44+
return len(mh.items)
45+
}
46+
47+
func (mh MappedHeap) Less(i, j int) bool {
48+
return mh.items[i].Less(mh.items[j])
49+
}
50+
51+
func (mh *MappedHeap) Swap(i, j int) {
52+
mh.items[i], mh.items[j] = mh.items[j], mh.items[i]
53+
mh.items[i].SetIndex(i)
54+
mh.items[j].SetIndex(j)
55+
}
56+
57+
func (mh *MappedHeap) Push(x interface{}) {
58+
n := len(mh.items)
59+
item := x.(Item)
60+
item.SetIndex(n)
61+
mh.items = append(mh.items, item)
62+
mh.m[item.Key()] = item
63+
}
64+
65+
func (mh *MappedHeap) Pop() interface{} {
66+
old := mh.items
67+
n := len(old)
68+
item := old[n-1]
69+
old[n-1] = nil
70+
item.SetIndex(-1)
71+
mh.items = old[0 : n-1]
72+
delete(mh.m, item.Key())
73+
return item
74+
}

cns/types/bounded/timedset.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package bounded
2+
3+
import (
4+
"container/heap"
5+
"sync"
6+
"time"
7+
)
8+
9+
var _ Item = (*TimedItem)(nil)
10+
11+
// TimedItem implements Item for a string: time.Time tuple.
12+
type TimedItem struct {
13+
Name string
14+
Time time.Time
15+
index int
16+
}
17+
18+
func (t *TimedItem) Key() string {
19+
return t.Name
20+
}
21+
22+
func (t *TimedItem) Less(o Item) bool {
23+
other := o.(*TimedItem)
24+
return t.Time.Before(other.Time)
25+
}
26+
27+
func (t *TimedItem) Index() int {
28+
return t.index
29+
}
30+
31+
func (t *TimedItem) SetIndex(i int) {
32+
t.index = i
33+
}
34+
35+
type TimedSet struct {
36+
sync.Mutex
37+
capacity int
38+
items *MappedHeap
39+
}
40+
41+
func NewTimedSet(c int) *TimedSet {
42+
return &TimedSet{
43+
capacity: c,
44+
items: NewMappedHeap(),
45+
}
46+
}
47+
48+
// Push registers the passed key and saves the timestamp it is first registered.
49+
// If the key is already registered, does not overwrite the saved timestamp.
50+
func (ts *TimedSet) Push(key string) {
51+
ts.Lock()
52+
defer ts.Unlock()
53+
if _, ok := ts.items.Contains(key); ok {
54+
return
55+
}
56+
if ts.items.Len() >= ts.capacity {
57+
_ = heap.Pop(ts.items)
58+
}
59+
item := &TimedItem{Name: key}
60+
item.Time = time.Now()
61+
heap.Push(ts.items, item)
62+
}
63+
64+
// Pop returns the elapsed duration since the passed key was first registered,
65+
// or -1 if it is not found.
66+
func (ts *TimedSet) Pop(key string) time.Duration {
67+
ts.Lock()
68+
defer ts.Unlock()
69+
idx, ok := ts.items.Contains(key)
70+
if !ok {
71+
return -1
72+
}
73+
item := heap.Remove(ts.items, idx)
74+
return time.Since(item.(*TimedItem).Time)
75+
}

0 commit comments

Comments
 (0)