Skip to content

Commit 58d6be4

Browse files
ntnyarpechenin
andauthored
feat(backend): add gRPC metrics to api-server (RPS/latency), optimize execution spec reporting (#12010)
* - add standard grpc metrics to api-server - add report gap histogram - optimize create or update tasks query Signed-off-by: ntny <[email protected]> Signed-off-by: arpechenin <[email protected]> * - move to the latest version of grpc-prometheus Signed-off-by: arpechenin <[email protected]> * - register metrics Signed-off-by: arpechenin <[email protected]> * - bump metrics lib Signed-off-by: arpechenin <[email protected]> * - fix metrics injection Signed-off-by: arpechenin <[email protected]> * merge Signed-off-by: arpechenin <[email protected]> --------- Signed-off-by: ntny <[email protected]> Signed-off-by: arpechenin <[email protected]> Co-authored-by: arpechenin <[email protected]>
1 parent b26bf2f commit 58d6be4

File tree

8 files changed

+74
-12
lines changed

8 files changed

+74
-12
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
## Metrics
2+
3+
Standard gRPC metrics are available via https://github.com/grpc-ecosystem/go-grpc-middleware/tree/main/providers/prometheus
4+
5+
### Sample
6+
7+
gRPC RPMs by grpc_method and grpc_code for the ml-pipeline server:
8+
```yaml
9+
sum by (grpc_service, grpc_method, grpc_code) (
10+
rate(grpc_server_handled_total{app=~"ml-pipeline", kubernetes_namespace="kubeflow"}[1m])
11+
) * 60
12+
```
13+
14+
95th percentile gRPC latency by grpc_service and grpc_method for the ml-pipeline server:
15+
```yaml
16+
histogram_quantile(
17+
0.95,
18+
sum by (grpc_service, grpc_method, le) (
19+
rate(grpc_server_handling_seconds_bucket{
20+
app="ml-pipeline",
21+
kubernetes_namespace="kubeflow"
22+
}[1m])
23+
)
24+
)
25+
```
26+
27+
Gap in seconds between creating an execution spec (Argo or other backend) for a recurring run and reporting it via the persistence agent
28+
```yaml
29+
histogram_quantile(0.95, sum(rate(resource_manager_recurring_run_report_gap_bucket[1h])) by (le))
30+
```
31+
32+

backend/src/apiserver/main.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"strings"
2929
"sync"
3030

31+
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
3132
"google.golang.org/grpc/credentials"
3233

3334
"github.com/fsnotify/fsnotify"
@@ -247,18 +248,32 @@ func grpcCustomMatcher(key string) (string, bool) {
247248

248249
func startRPCServer(resourceManager *resource.ResourceManager, tlsCfg *tls.Config) {
249250
var s *grpc.Server
251+
252+
grpc_prometheus.EnableHandlingTimeHistogram(
253+
grpc_prometheus.WithHistogramBuckets([]float64{
254+
0.005, 0.01, 0.03, 0.1, 0.3, 1, 3, 10, 15, 30, 60, 120, 300, // 5 ms -> 5 min
255+
}),
256+
)
257+
250258
if tlsCfg != nil {
251259
glog.Info("Starting RPC server (TLS enabled)")
252260
tlsCredentials := credentials.NewTLS(tlsCfg)
253261
s = grpc.NewServer(
254262
grpc.Creds(tlsCredentials),
255-
grpc.UnaryInterceptor(apiServerInterceptor),
263+
grpc.ChainUnaryInterceptor(
264+
grpc_prometheus.UnaryServerInterceptor,
265+
apiServerInterceptor,
266+
),
256267
grpc.MaxRecvMsgSize(math.MaxInt32),
257268
)
258269
} else {
259270
glog.Info("Starting RPC server")
260-
s = grpc.NewServer(grpc.UnaryInterceptor(apiServerInterceptor), grpc.MaxRecvMsgSize(math.MaxInt32))
271+
s = grpc.NewServer(grpc.ChainUnaryInterceptor(
272+
grpc_prometheus.UnaryServerInterceptor,
273+
apiServerInterceptor,
274+
), grpc.MaxRecvMsgSize(math.MaxInt32))
261275
}
276+
262277
listener, err := net.Listen("tcp", *rpcPortFlag)
263278
if err != nil {
264279
glog.Fatalf("Failed to start RPC server: %v", err)

backend/src/apiserver/resource/resource_manager.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"net"
2323
"reflect"
2424
"strconv"
25+
"time"
2526

2627
apiv2beta1 "github.com/kubeflow/pipelines/backend/api/v2beta1/go_client"
2728
scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
@@ -79,6 +80,13 @@ var (
7980
Help: "The current number of failed workflow runs",
8081
}, extraLabels)
8182

83+
// Gap in seconds between creating an execution spec (Argo or other backend) for a recurring run and reporting it via the persistence agent.
84+
recurringPipelineRunReportGap = promauto.NewHistogram(prometheus.HistogramOpts{
85+
Name: "resource_manager_recurring_run_report_gap",
86+
Help: "Recurring Run Report Delay",
87+
Buckets: prometheus.ExponentialBuckets(0.5, 2, 10), // 0.5s -> 4min
88+
})
89+
8290
// Map API enum values to Kubernetes DeletionPropagation values
8391
propagationPolicyMap = map[apiv2beta1.DeletePropagationPolicy]v1.DeletionPropagation{
8492
apiv2beta1.DeletePropagationPolicy_FOREGROUND: v1.DeletePropagationForeground,
@@ -1286,8 +1294,8 @@ func (r *ResourceManager) DeleteJob(ctx context.Context, jobID string, propagati
12861294

12871295
// Creates new tasks or updates existing ones.
12881296
// This is not a part of internal API exposed to persistence agent only.
1289-
func (r *ResourceManager) CreateOrUpdateTasks(t []*model.Task) ([]*model.Task, error) {
1290-
tasks, err := r.taskStore.CreateOrUpdateTasks(t)
1297+
func (r *ResourceManager) CreateOrUpdateTasks(t []*model.Task, runID string) ([]*model.Task, error) {
1298+
tasks, err := r.taskStore.CreateOrUpdateTasks(t, runID)
12911299
if err != nil {
12921300
return nil, util.Wrap(err, "Failed to create or update tasks")
12931301
}
@@ -1436,6 +1444,10 @@ func (r *ResourceManager) ReportWorkflowResource(ctx context.Context, execSpec u
14361444
},
14371445
}
14381446
run, err = r.runStore.CreateRun(run)
1447+
if r.options.CollectMetrics && !execStatus.StartedAtTime().Time.IsZero() {
1448+
reportGap := time.Since(execStatus.StartedAtTime().Time).Seconds()
1449+
recurringPipelineRunReportGap.Observe(reportGap)
1450+
}
14391451
if err != nil {
14401452
return nil, util.Wrapf(err, "Failed to report a workflow due to error creating run %s", runId)
14411453
} else {

backend/src/apiserver/server/report_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (s *BaseReportServer) reportTasksFromExecution(execSpec util.ExecutionSpec,
5757
if err != nil {
5858
return nil, util.Wrap(err, "Failed to report tasks of an execution")
5959
}
60-
return s.resourceManager.CreateOrUpdateTasks(tasks)
60+
return s.resourceManager.CreateOrUpdateTasks(tasks, runId)
6161
}
6262

6363
// Reports a workflow.

backend/src/apiserver/storage/task_store.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ type TaskStoreInterface interface {
6161
ListTasks(filterContext *model.FilterContext, opts *list.Options) ([]*model.Task, int, string, error)
6262

6363
// Creates new tasks or updates the existing ones.
64-
CreateOrUpdateTasks(tasks []*model.Task) ([]*model.Task, error)
64+
CreateOrUpdateTasks(tasks []*model.Task, runID string) ([]*model.Task, error)
6565
}
6666

6767
type TaskStore struct {
@@ -324,15 +324,15 @@ func (s *TaskStore) GetTask(id string) (*model.Task, error) {
324324
}
325325

326326
// Updates missing fields with existing data entries.
327-
func (s *TaskStore) patchWithExistingTasks(tasks []*model.Task) error {
327+
func (s *TaskStore) patchWithExistingTasks(tasks []*model.Task, runID string) error {
328328
var podNames []string
329329
for _, task := range tasks {
330330
podNames = append(podNames, task.PodName)
331331
}
332332
sql, args, err := sq.
333333
Select(taskColumns...).
334334
From("tasks").
335-
Where(sq.Eq{"PodName": podNames}).
335+
Where(sq.Eq{"PodName": podNames, "RunUUID": runID}).
336336
ToSql()
337337
if err != nil {
338338
return util.NewInternalServerError(err, "Failed to create query to check existing tasks")
@@ -359,7 +359,7 @@ func (s *TaskStore) patchWithExistingTasks(tasks []*model.Task) error {
359359
}
360360

361361
// Creates new entries or updates existing ones.
362-
func (s *TaskStore) CreateOrUpdateTasks(tasks []*model.Task) ([]*model.Task, error) {
362+
func (s *TaskStore) CreateOrUpdateTasks(tasks []*model.Task, runID string) ([]*model.Task, error) {
363363
buildQuery := func(ts []*model.Task) (string, []interface{}, error) {
364364
sqlInsert := sq.Insert("tasks").Columns(taskColumnsWithPayload...)
365365
for _, t := range ts {
@@ -405,7 +405,7 @@ func (s *TaskStore) CreateOrUpdateTasks(tasks []*model.Task) ([]*model.Task, err
405405

406406
// Check for existing tasks and fill empty field with existing data.
407407
// Assumes that PodName column is a unique key.
408-
if err := s.patchWithExistingTasks(tasks); err != nil {
408+
if err := s.patchWithExistingTasks(tasks, runID); err != nil {
409409
return nil, util.NewInternalServerError(err, "Failed to check for existing tasks")
410410
}
411411
for _, task := range tasks {

backend/src/apiserver/storage/task_store_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ func TestTaskStore_patchWithExistingTasks(t *testing.T) {
504504
}
505505
for _, tt := range tests {
506506
t.Run(tt.name, func(t *testing.T) {
507-
err := taskStore.patchWithExistingTasks(tt.tasks)
507+
err := taskStore.patchWithExistingTasks(tt.tasks, defaultFakeRunIdTwo)
508508
if tt.wantErr {
509509
assert.NotNil(t, err)
510510
assert.Contains(t, err.Error(), tt.errMsg)
@@ -611,7 +611,7 @@ func TestTaskStore_UpdateOrCreateTasks(t *testing.T) {
611611
}
612612
for _, tt := range tests {
613613
t.Run(tt.name, func(t *testing.T) {
614-
got, err := taskStore.CreateOrUpdateTasks(tt.tasks)
614+
got, err := taskStore.CreateOrUpdateTasks(tt.tasks, defaultFakeRunIdTwo)
615615
assert.Nil(t, err)
616616
assert.Equal(t, tt.want, got)
617617
})

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ require (
7474
github.com/aws/aws-sdk-go-v2/credentials v1.17.67
7575
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3
7676
github.com/aws/smithy-go v1.22.3
77+
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
7778
gorm.io/driver/mysql v1.6.0
7879
gorm.io/driver/postgres v1.6.0
7980
gorm.io/driver/sqlite v1.6.0

go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)