Skip to content

Commit acbfb48

Browse files
Merge pull request #28431 from neisw/trt-1042-data-collection
trt-1042: collect watch request counts
2 parents f346154 + 35b3c22 commit acbfb48

File tree

4 files changed

+263
-99
lines changed

4 files changed

+263
-99
lines changed

pkg/dataloader/types.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package dataloader
2+
3+
import (
4+
"encoding/json"
5+
"io/ioutil"
6+
)
7+
8+
type DataType = string
9+
10+
const (
11+
DataTypeFloat64 DataType = "float64"
12+
DataTypeString DataType = "string"
13+
DataTypeInteger DataType = "int64"
14+
// RFC3339 based value "2006-01-02T15:04:05Z07:00
15+
DataTypeTimestamp DataType = "timestamp"
16+
DataTypeJSON DataType = "json"
17+
18+
// files that end with this suffix will be automatically written to the specified table name via ci-data-loader
19+
AutoDataLoaderSuffix = "autodl.json"
20+
)
21+
22+
type DataFile struct {
23+
// Table name to be created / updated with the corresponding data
24+
TableName string `json:"table_name"`
25+
// Schema identifying the data types associated with the row values
26+
// JobRunName, PartitionTime and Source will be provided by default and do not need to be specified here
27+
// Schema defined here are optional columns, unless used as PartitionColumn
28+
// New columns will be added but columns that get removed here will *not* be deleted
29+
// from the table in order to preserve integrity across releases
30+
// However as optional columns the data does not have to be
31+
// included if no longer necessary
32+
// if breaking changes are needed best to define a new table name
33+
Schema map[string]DataType `json:"schema"`
34+
// If the existing row key differs from the specified schema column name you need to map a row key to a different schema name rowKey->newName
35+
SchemaMapping map[string]string `json:"schema_mapping"`
36+
// The data to be uploaded
37+
Rows []map[string]string `json:"rows"`
38+
39+
// Optional
40+
// Depending on the size of your data the rows might have to be chunked
41+
// when writing. Default chunk size is 5k rows.
42+
// If the row data is large this can be changed to make smaller chunks
43+
ChunkSize int `json:"chunk_size"`
44+
45+
// ExpirationDays and PartitionColumn will only
46+
// be used when first creating the table
47+
// if the table exists changing these
48+
// values will not update the table
49+
// Default expiration days is 365
50+
ExpirationDays int `json:"expiration_days"`
51+
// A partition column, PartitionTime, will automatically be added with the value
52+
// of the file creation timestamp. If your data has a timestamp value already
53+
// it can be specified as the partition column instead
54+
// and the default PartitionTime will be omitted
55+
PartitionColumn string `json:"partition_column"`
56+
}
57+
58+
func WriteDataFile(filename string, dataFile DataFile) error {
59+
jsonContent, err := json.MarshalIndent(dataFile, "", " ")
60+
if err != nil {
61+
return err
62+
}
63+
return ioutil.WriteFile(filename, jsonContent, 0644)
64+
}

pkg/defaultmonitortests/types.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package defaultmonitortests
22

33
import (
44
"fmt"
5-
5+
"github.com/openshift/origin/pkg/monitortests/testframework/watchrequestcountscollector"
66
"github.com/sirupsen/logrus"
77

88
"github.com/openshift/origin/pkg/monitortests/kubeapiserver/disruptionnewapiserver"
@@ -168,6 +168,7 @@ func newUniversalMonitorTests() monitortestframework.MonitorTestRegistry {
168168
monitorTestRegistry.AddMonitorTestOrDie("clusteroperator-collector", "Test Framework", watchclusteroperators.NewOperatorWatcher())
169169

170170
monitorTestRegistry.AddMonitorTestOrDie("azure-metrics-collector", "Test Framework", azuremetrics.NewAzureMetricsCollector())
171+
monitorTestRegistry.AddMonitorTestOrDie("watch-request-counts-collector", "Test Framework", watchrequestcountscollector.NewWatchRequestCountSerializer())
171172

172173
return monitorTestRegistry
173174
}
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
package watchrequestcountscollector
2+
3+
import (
4+
"context"
5+
"fmt"
6+
apiserverclientv1 "github.com/openshift/client-go/apiserver/clientset/versioned/typed/apiserver/v1"
7+
"github.com/openshift/origin/pkg/dataloader"
8+
"github.com/openshift/origin/pkg/monitor/monitorapi"
9+
"github.com/openshift/origin/pkg/monitortestframework"
10+
"github.com/openshift/origin/pkg/test/ginkgo/junitapi"
11+
exutil "github.com/openshift/origin/test/extended/util"
12+
"github.com/sirupsen/logrus"
13+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
"k8s.io/client-go/rest"
15+
"path/filepath"
16+
"sort"
17+
"strconv"
18+
"strings"
19+
"time"
20+
)
21+
22+
type watchRequestCountSerializer struct {
23+
monitorStartTime time.Time
24+
adminRESTConfig *rest.Config
25+
}
26+
27+
func NewWatchRequestCountSerializer() monitortestframework.MonitorTest {
28+
return &watchRequestCountSerializer{}
29+
}
30+
31+
func (w *watchRequestCountSerializer) StartCollection(ctx context.Context, adminRESTConfig *rest.Config, recorder monitorapi.RecorderWriter) error {
32+
w.monitorStartTime = time.Now()
33+
w.adminRESTConfig = adminRESTConfig
34+
return nil
35+
}
36+
37+
func (w *watchRequestCountSerializer) CollectData(ctx context.Context, storageDir string, beginning, end time.Time) (monitorapi.Intervals, []*junitapi.JUnitTestCase, error) {
38+
return nil, nil, nil
39+
}
40+
41+
func (w *watchRequestCountSerializer) ConstructComputedIntervals(ctx context.Context, startingIntervals monitorapi.Intervals, recordedResources monitorapi.ResourcesMap, beginning, end time.Time) (constructedIntervals monitorapi.Intervals, err error) {
42+
return nil, nil
43+
}
44+
45+
func (w *watchRequestCountSerializer) EvaluateTestsFromConstructedIntervals(ctx context.Context, finalIntervals monitorapi.Intervals) ([]*junitapi.JUnitTestCase, error) {
46+
return nil, nil
47+
}
48+
49+
func (w *watchRequestCountSerializer) WriteContentToStorage(ctx context.Context, storageDir, timeSuffix string, finalIntervals monitorapi.Intervals, finalResourceState monitorapi.ResourcesMap) error {
50+
oc := exutil.NewCLIWithoutNamespace("api-requests")
51+
52+
infra, err := oc.AdminConfigClient().ConfigV1().Infrastructures().Get(context.Background(), "cluster", metav1.GetOptions{})
53+
if err != nil {
54+
logrus.WithError(err).Warn("unable to get cluster infrastructure")
55+
return nil
56+
}
57+
58+
watchRequestCounts, err := GetWatchRequestCounts(ctx, oc)
59+
if err != nil {
60+
logrus.WithError(err).Warn("unable to get watch request counts")
61+
return nil
62+
}
63+
64+
// infra.Status.ControlPlaneTopology, infra.Spec.PlatformSpec.Type, operator, value
65+
rows := make([]map[string]string, 0)
66+
for _, item := range watchRequestCounts {
67+
operator := strings.Split(item.Operator, ":")[3]
68+
rows = append(rows, map[string]string{"ControlPlaneTopology": string(infra.Status.ControlPlaneTopology), "PlatformType": string(infra.Spec.PlatformSpec.Type), "Operator": operator, "WatchRequestCount": strconv.FormatInt(item.Count, 10)})
69+
}
70+
71+
dataFile := dataloader.DataFile{
72+
TableName: "operator_watch_requests",
73+
Schema: map[string]dataloader.DataType{"ControlPlaneTopology": dataloader.DataTypeString, "PlatformType": dataloader.DataTypeString, "Operator": dataloader.DataTypeString, "WatchRequestCount": dataloader.DataTypeInteger},
74+
Rows: rows,
75+
}
76+
fileName := filepath.Join(storageDir, fmt.Sprintf("operator-watch-requests%s-%s", timeSuffix, dataloader.AutoDataLoaderSuffix))
77+
err = dataloader.WriteDataFile(fileName, dataFile)
78+
if err != nil {
79+
logrus.WithError(err).Warnf("unable to write data file: %s", fileName)
80+
return nil
81+
}
82+
83+
return nil
84+
}
85+
86+
func (w *watchRequestCountSerializer) Cleanup(ctx context.Context) error {
87+
return nil
88+
}
89+
90+
type OperatorKey struct {
91+
NodeName string
92+
Operator string
93+
Hour int
94+
}
95+
96+
type RequestCount struct {
97+
NodeName string
98+
Operator string
99+
Count int64
100+
Hour int
101+
}
102+
103+
func GetWatchRequestCounts(ctx context.Context, oc *exutil.CLI) ([]*RequestCount, error) {
104+
105+
apirequestCountClient, err := apiserverclientv1.NewForConfig(oc.AdminConfig())
106+
if err != nil {
107+
logrus.WithError(err).Warn("unable to initialize apirequestCountClient")
108+
return nil, err
109+
}
110+
111+
apiRequestCounts, err := apirequestCountClient.APIRequestCounts().List(ctx, metav1.ListOptions{})
112+
113+
watchRequestCounts := []*RequestCount{}
114+
watchRequestCountsMap := map[OperatorKey]*RequestCount{}
115+
116+
for _, apiRequestCount := range apiRequestCounts.Items {
117+
if apiRequestCount.Status.RequestCount <= 0 {
118+
continue
119+
}
120+
for hourIdx, perResourceAPIRequestLog := range apiRequestCount.Status.Last24h {
121+
if perResourceAPIRequestLog.RequestCount > 0 {
122+
for _, perNodeCount := range perResourceAPIRequestLog.ByNode {
123+
if perNodeCount.RequestCount <= 0 {
124+
continue
125+
}
126+
for _, perUserCount := range perNodeCount.ByUser {
127+
if perUserCount.RequestCount <= 0 {
128+
continue
129+
}
130+
// take only operators into account
131+
if !strings.HasSuffix(perUserCount.UserName, "-operator") {
132+
continue
133+
}
134+
for _, verb := range perUserCount.ByVerb {
135+
if verb.Verb != "watch" || verb.RequestCount == 0 {
136+
continue
137+
}
138+
key := OperatorKey{
139+
NodeName: perNodeCount.NodeName,
140+
Operator: perUserCount.UserName,
141+
Hour: hourIdx,
142+
}
143+
// group requests by a resource (the number of watchers in the code does not change
144+
// so much as the number of requests)
145+
if _, exists := watchRequestCountsMap[key]; exists {
146+
watchRequestCountsMap[key].Count += verb.RequestCount
147+
} else {
148+
watchRequestCountsMap[key] = &RequestCount{
149+
NodeName: perNodeCount.NodeName,
150+
Operator: perUserCount.UserName,
151+
Count: verb.RequestCount,
152+
Hour: hourIdx,
153+
}
154+
}
155+
}
156+
}
157+
}
158+
}
159+
}
160+
}
161+
162+
// take maximum from all hours through all nodes
163+
watchRequestCountsMapMax := map[OperatorKey]*RequestCount{}
164+
for _, requestCount := range watchRequestCountsMap {
165+
key := OperatorKey{
166+
Operator: requestCount.Operator,
167+
}
168+
if _, exists := watchRequestCountsMapMax[key]; exists {
169+
if watchRequestCountsMapMax[key].Count < requestCount.Count {
170+
watchRequestCountsMapMax[key].Count = requestCount.Count
171+
watchRequestCountsMapMax[key].NodeName = requestCount.NodeName
172+
watchRequestCountsMapMax[key].Hour = requestCount.Hour
173+
}
174+
} else {
175+
watchRequestCountsMapMax[key] = requestCount
176+
}
177+
}
178+
179+
// sort the requsts counts so it's easy to see the biggest offenders
180+
for _, requestCount := range watchRequestCountsMapMax {
181+
watchRequestCounts = append(watchRequestCounts, requestCount)
182+
}
183+
184+
sort.Slice(watchRequestCounts, func(i int, j int) bool {
185+
return watchRequestCounts[i].Count > watchRequestCounts[j].Count
186+
})
187+
188+
return watchRequestCounts, nil
189+
}

0 commit comments

Comments
 (0)