Skip to content

Commit 2109042

Browse files
committed
Agent: report intermediate resource utilization metrics
1 parent 5e52c1e commit 2109042

File tree

9 files changed

+2303
-1810
lines changed

9 files changed

+2303
-1810
lines changed

api/cirrus_ci_service.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ service CirrusCIService {
288288
}
289289
rpc ReportAgentLogs (ReportAgentLogsRequest) returns (google.protobuf.Empty) {
290290
}
291+
rpc ReportAgentResourceUtilization (ReportAgentResourceUtilizationRequest) returns (google.protobuf.Empty);
291292
rpc ReportAgentFinished (ReportAgentFinishedRequest) returns (ReportAgentFinishedResponse) {
292293
}
293294
rpc ReportTerminalAttached (ReportTerminalAttachedRequest) returns (ReportTerminalAttachedResponse) {
@@ -572,6 +573,11 @@ message ReportAgentLogsRequest {
572573
string logs = 2;
573574
}
574575

576+
message ReportAgentResourceUtilizationRequest {
577+
TaskIdentification task_identification = 1 [deprecated = true];
578+
ResourceUtilization resource_utilization = 2;
579+
}
580+
575581
message CacheRetrievalAttempt {
576582
message Hit {
577583
uint64 size_bytes = 1;

internal/agent/executor/metrics/metrics.go

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ import (
66
"context"
77
"errors"
88
"fmt"
9+
"log"
10+
"runtime"
11+
"time"
12+
13+
"github.com/cirruslabs/cirrus-cli/internal/agent/client"
914
"github.com/cirruslabs/cirrus-cli/internal/agent/executor/metrics/source"
1015
"github.com/cirruslabs/cirrus-cli/internal/agent/executor/metrics/source/cgroup/cpu"
1116
"github.com/cirruslabs/cirrus-cli/internal/agent/executor/metrics/source/cgroup/memory"
@@ -16,9 +21,6 @@ import (
1621
gopsutilcpu "github.com/shirou/gopsutil/v3/cpu"
1722
gopsutilmem "github.com/shirou/gopsutil/v3/mem"
1823
"github.com/sirupsen/logrus"
19-
"log"
20-
"runtime"
21-
"time"
2224
)
2325

2426
var (
@@ -28,11 +30,13 @@ var (
2830
)
2931

3032
type Result struct {
31-
errors map[string]error
32-
ResourceUtilization *api.ResourceUtilization
33+
errors map[string]error
34+
lastReportedCPUChartOffset int
35+
lastReportedMemoryChartOffset int
36+
ResourceUtilization *api.ResourceUtilization
3337
}
3438

35-
func (result Result) Errors() []error {
39+
func (result *Result) Errors() []error {
3640
var deduplicatedErrors []error
3741

3842
for _, err := range result.errors {
@@ -42,6 +46,35 @@ func (result Result) Errors() []error {
4246
return deduplicatedErrors
4347
}
4448

49+
func (result *Result) reportIntermediateResults(ctx context.Context) error {
50+
boundedCtx, boundedCtxCancel := context.WithTimeout(ctx, time.Second)
51+
defer boundedCtxCancel()
52+
53+
cpuChartToReport := result.ResourceUtilization.CpuChart[result.lastReportedCPUChartOffset:]
54+
memoryChartToReport := result.ResourceUtilization.MemoryChart[result.lastReportedMemoryChartOffset:]
55+
56+
if client.CirrusClient == nil {
57+
return fmt.Errorf("skipping intermediate results reporting because gRPC client was not initialized")
58+
}
59+
60+
if _, err := client.CirrusClient.ReportAgentResourceUtilization(boundedCtx, &api.ReportAgentResourceUtilizationRequest{
61+
TaskIdentification: client.CirrusTaskIdentification,
62+
ResourceUtilization: &api.ResourceUtilization{
63+
CpuChart: cpuChartToReport,
64+
MemoryChart: memoryChartToReport,
65+
CpuTotal: result.ResourceUtilization.CpuTotal,
66+
MemoryTotal: result.ResourceUtilization.MemoryTotal,
67+
},
68+
}); err != nil {
69+
return err
70+
} else {
71+
result.lastReportedCPUChartOffset += len(cpuChartToReport)
72+
result.lastReportedMemoryChartOffset += len(memoryChartToReport)
73+
}
74+
75+
return nil
76+
}
77+
4578
func Run(ctx context.Context, logger logrus.FieldLogger) chan *Result {
4679
resultChan := make(chan *Result, 1)
4780

@@ -150,6 +183,11 @@ func Run(ctx context.Context, logger logrus.FieldLogger) chan *Result {
150183
})
151184
}
152185

186+
if err := result.reportIntermediateResults(ctx); err != nil {
187+
err := fmt.Errorf("failed to report intermediate resource utilization: %w", err)
188+
result.errors[err.Error()] = err
189+
}
190+
153191
// Make sure we wait the whole pollInterval
154192
timeLeftToWait := pollInterval - time.Since(cycleStartTime)
155193
select {

internal/agent/executor/metrics/metrics_test.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,25 @@ package metrics_test
55
import (
66
"context"
77
"fmt"
8+
"testing"
9+
"time"
10+
11+
"github.com/cirruslabs/cirrus-cli/internal/agent/client"
812
"github.com/cirruslabs/cirrus-cli/internal/agent/executor/metrics"
13+
"github.com/cirruslabs/cirrus-cli/internal/agent/http_cache/ghacache/cirruscimock"
14+
"github.com/cirruslabs/cirrus-cli/internal/testutil"
915
"github.com/shirou/gopsutil/v3/cpu"
1016
"github.com/shirou/gopsutil/v3/mem"
1117
"github.com/stretchr/testify/assert"
1218
"github.com/stretchr/testify/require"
13-
"testing"
14-
"time"
1519
)
1620

1721
func TestMetrics(t *testing.T) {
22+
// Needed for intermediate resource utilization reporting
23+
testutil.NeedsContainerization(t)
24+
clientConn, cirrusCIMock := cirruscimock.ClientConnWithMock(t)
25+
client.InitClient(clientConn, "test", "test")
26+
1827
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second+500*time.Millisecond)
1928
defer cancel()
2029

@@ -28,6 +37,10 @@ func TestMetrics(t *testing.T) {
2837
require.Empty(t, result.Errors(), "we should never get errors here, but got %d", len(result.Errors()))
2938
require.Len(t, result.ResourceUtilization.CpuChart, 4)
3039
require.Len(t, result.ResourceUtilization.MemoryChart, 4)
40+
41+
// Ensure that at least one intermediate resource utilization was reported
42+
resourceUtilizationEntries := cirrusCIMock.InspectIntermediateResourceUtilizations()
43+
require.NotEmpty(t, resourceUtilizationEntries)
3144
}
3245

3346
func TestTotals(t *testing.T) {

internal/agent/http_cache/ghacache/cirruscimock/cirruscimock.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ import (
44
"bytes"
55
"context"
66
"errors"
7+
"io"
8+
"net"
9+
"net/http"
10+
"testing"
11+
"time"
12+
713
"github.com/aws/aws-sdk-go/aws"
814
"github.com/aws/aws-sdk-go/aws/awserr"
915
"github.com/aws/aws-sdk-go/service/s3"
@@ -15,16 +21,12 @@ import (
1521
"google.golang.org/grpc/credentials/insecure"
1622
"google.golang.org/grpc/status"
1723
"google.golang.org/protobuf/types/known/emptypb"
18-
"io"
19-
"net"
20-
"net/http"
21-
"testing"
22-
"time"
2324
)
2425

2526
type cirrusCIMock struct {
26-
s3Client *s3.S3
27-
s3Bucket *string
27+
s3Client *s3.S3
28+
s3Bucket *string
29+
intermediateResourceUtilizations []*api.ResourceUtilization
2830

2931
api.UnimplementedCirrusCIServiceServer
3032
}
@@ -44,24 +46,32 @@ func newCirrusCIMock(t *testing.T, s3Client *s3.S3) *cirrusCIMock {
4446
}
4547

4648
func ClientConn(t *testing.T) *grpc.ClientConn {
49+
clientConn, _ := ClientConnWithMock(t)
50+
51+
return clientConn
52+
}
53+
54+
func ClientConnWithMock(t *testing.T) (*grpc.ClientConn, *cirrusCIMock) {
4755
t.Helper()
4856

4957
s3Client := S3Client(t)
5058

5159
lis, err := net.Listen("tcp", "127.0.0.1:0")
5260
require.NoError(t, err)
5361

62+
cirrusCIMock := newCirrusCIMock(t, s3Client)
63+
5464
go func() {
5565
server := grpc.NewServer()
56-
api.RegisterCirrusCIServiceServer(server, newCirrusCIMock(t, s3Client))
66+
api.RegisterCirrusCIServiceServer(server, cirrusCIMock)
5767
require.NoError(t, server.Serve(lis))
5868
}()
5969

6070
clientConn, err := grpc.NewClient(lis.Addr().String(),
6171
grpc.WithTransportCredentials(insecure.NewCredentials()))
6272
require.NoError(t, err)
6373

64-
return clientConn
74+
return clientConn, cirrusCIMock
6575
}
6676

6777
func (mock *cirrusCIMock) UploadCache(stream api.CirrusCIService_UploadCacheServer) error {
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package cirruscimock
2+
3+
import (
4+
"context"
5+
6+
"github.com/cirruslabs/cirrus-cli/pkg/api"
7+
"google.golang.org/protobuf/types/known/emptypb"
8+
)
9+
10+
func (mock *cirrusCIMock) ReportAgentResourceUtilization(
11+
ctx context.Context,
12+
request *api.ReportAgentResourceUtilizationRequest,
13+
) (*emptypb.Empty, error) {
14+
mock.intermediateResourceUtilizations = append(mock.intermediateResourceUtilizations, request.GetResourceUtilization())
15+
16+
return &emptypb.Empty{}, nil
17+
}
18+
19+
func (mock *cirrusCIMock) InspectIntermediateResourceUtilizations() []*api.ResourceUtilization {
20+
return mock.intermediateResourceUtilizations
21+
}

internal/executor/instance/persistentworker/remoteagent/remoteagent.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,19 @@ import (
55
"context"
66
"errors"
77
"fmt"
8+
"io"
9+
"net"
10+
"os"
11+
"strings"
12+
"time"
13+
814
"github.com/avast/retry-go/v4"
915
"github.com/cirruslabs/chacha/pkg/localnetworkhelper"
1016
"github.com/cirruslabs/cirrus-cli/internal/executor/endpoint"
1117
"github.com/cirruslabs/cirrus-cli/internal/executor/instance/runconfig"
1218
"github.com/cirruslabs/cirrus-cli/internal/logger"
1319
"go.opentelemetry.io/otel"
1420
"golang.org/x/crypto/ssh"
15-
"io"
16-
"net"
17-
"os"
18-
"strings"
19-
"time"
2021
)
2122

2223
var (

0 commit comments

Comments
 (0)