Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/cirrus_ci_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ service CirrusCIService {
}
rpc ReportAgentLogs (ReportAgentLogsRequest) returns (google.protobuf.Empty) {
}
rpc ReportAgentResourceUtilization (ReportAgentResourceUtilizationRequest) returns (google.protobuf.Empty);
rpc ReportAgentFinished (ReportAgentFinishedRequest) returns (ReportAgentFinishedResponse) {
}
rpc ReportTerminalAttached (ReportTerminalAttachedRequest) returns (ReportTerminalAttachedResponse) {
Expand Down Expand Up @@ -572,6 +573,11 @@ message ReportAgentLogsRequest {
string logs = 2;
}

message ReportAgentResourceUtilizationRequest {
TaskIdentification task_identification = 1 [deprecated = true];
ResourceUtilization resource_utilization = 2;
}

message CacheRetrievalAttempt {
message Hit {
uint64 size_bytes = 1;
Expand Down
50 changes: 44 additions & 6 deletions internal/agent/executor/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"context"
"errors"
"fmt"
"log"
"runtime"
"time"

"github.com/cirruslabs/cirrus-cli/internal/agent/client"
"github.com/cirruslabs/cirrus-cli/internal/agent/executor/metrics/source"
"github.com/cirruslabs/cirrus-cli/internal/agent/executor/metrics/source/cgroup/cpu"
"github.com/cirruslabs/cirrus-cli/internal/agent/executor/metrics/source/cgroup/memory"
Expand All @@ -16,9 +21,6 @@ import (
gopsutilcpu "github.com/shirou/gopsutil/v3/cpu"
gopsutilmem "github.com/shirou/gopsutil/v3/mem"
"github.com/sirupsen/logrus"
"log"
"runtime"
"time"
)

var (
Expand All @@ -28,11 +30,13 @@ var (
)

type Result struct {
errors map[string]error
ResourceUtilization *api.ResourceUtilization
errors map[string]error
lastReportedCPUChartOffset int
lastReportedMemoryChartOffset int
ResourceUtilization *api.ResourceUtilization
}

func (result Result) Errors() []error {
func (result *Result) Errors() []error {
var deduplicatedErrors []error

for _, err := range result.errors {
Expand All @@ -42,6 +46,35 @@ func (result Result) Errors() []error {
return deduplicatedErrors
}

func (result *Result) reportIntermediateResults(ctx context.Context) error {
boundedCtx, boundedCtxCancel := context.WithTimeout(ctx, time.Second)
defer boundedCtxCancel()

cpuChartToReport := result.ResourceUtilization.CpuChart[result.lastReportedCPUChartOffset:]
memoryChartToReport := result.ResourceUtilization.MemoryChart[result.lastReportedMemoryChartOffset:]

if client.CirrusClient == nil {
return fmt.Errorf("skipping intermediate results reporting because gRPC client was not initialized")
}

if _, err := client.CirrusClient.ReportAgentResourceUtilization(boundedCtx, &api.ReportAgentResourceUtilizationRequest{
TaskIdentification: client.CirrusTaskIdentification,
ResourceUtilization: &api.ResourceUtilization{
CpuChart: cpuChartToReport,
MemoryChart: memoryChartToReport,
CpuTotal: result.ResourceUtilization.CpuTotal,
MemoryTotal: result.ResourceUtilization.MemoryTotal,
},
}); err != nil {
return err
} else {
result.lastReportedCPUChartOffset += len(cpuChartToReport)
result.lastReportedMemoryChartOffset += len(memoryChartToReport)
}

return nil
}

func Run(ctx context.Context, logger logrus.FieldLogger) chan *Result {
resultChan := make(chan *Result, 1)

Expand Down Expand Up @@ -150,6 +183,11 @@ func Run(ctx context.Context, logger logrus.FieldLogger) chan *Result {
})
}

if err := result.reportIntermediateResults(ctx); err != nil {
err := fmt.Errorf("failed to report intermediate resource utilization: %w", err)
result.errors[err.Error()] = err
}

// Make sure we wait the whole pollInterval
timeLeftToWait := pollInterval - time.Since(cycleStartTime)
select {
Expand Down
17 changes: 15 additions & 2 deletions internal/agent/executor/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,25 @@ package metrics_test
import (
"context"
"fmt"
"testing"
"time"

"github.com/cirruslabs/cirrus-cli/internal/agent/client"
"github.com/cirruslabs/cirrus-cli/internal/agent/executor/metrics"
"github.com/cirruslabs/cirrus-cli/internal/agent/http_cache/ghacache/cirruscimock"
"github.com/cirruslabs/cirrus-cli/internal/testutil"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/mem"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"
)

func TestMetrics(t *testing.T) {
// Needed for intermediate resource utilization reporting
testutil.NeedsContainerization(t)
clientConn, cirrusCIMock := cirruscimock.ClientConnWithMock(t)
client.InitClient(clientConn, "test", "test")

ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second+500*time.Millisecond)
defer cancel()

Expand All @@ -28,6 +37,10 @@ func TestMetrics(t *testing.T) {
require.Empty(t, result.Errors(), "we should never get errors here, but got %d", len(result.Errors()))
require.Len(t, result.ResourceUtilization.CpuChart, 4)
require.Len(t, result.ResourceUtilization.MemoryChart, 4)

// Ensure that at least one intermediate resource utilization was reported
resourceUtilizationEntries := cirrusCIMock.InspectIntermediateResourceUtilizations()
require.NotEmpty(t, resourceUtilizationEntries)
}

func TestTotals(t *testing.T) {
Expand Down
28 changes: 19 additions & 9 deletions internal/agent/http_cache/ghacache/cirruscimock/cirruscimock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ import (
"bytes"
"context"
"errors"
"io"
"net"
"net/http"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
Expand All @@ -15,16 +21,12 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"io"
"net"
"net/http"
"testing"
"time"
)

type cirrusCIMock struct {
s3Client *s3.S3
s3Bucket *string
s3Client *s3.S3
s3Bucket *string
intermediateResourceUtilizations []*api.ResourceUtilization

api.UnimplementedCirrusCIServiceServer
}
Expand All @@ -44,24 +46,32 @@ func newCirrusCIMock(t *testing.T, s3Client *s3.S3) *cirrusCIMock {
}

func ClientConn(t *testing.T) *grpc.ClientConn {
clientConn, _ := ClientConnWithMock(t)

return clientConn
}

func ClientConnWithMock(t *testing.T) (*grpc.ClientConn, *cirrusCIMock) {
t.Helper()

s3Client := S3Client(t)

lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

cirrusCIMock := newCirrusCIMock(t, s3Client)

go func() {
server := grpc.NewServer()
api.RegisterCirrusCIServiceServer(server, newCirrusCIMock(t, s3Client))
api.RegisterCirrusCIServiceServer(server, cirrusCIMock)
require.NoError(t, server.Serve(lis))
}()

clientConn, err := grpc.NewClient(lis.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)

return clientConn
return clientConn, cirrusCIMock
}

func (mock *cirrusCIMock) UploadCache(stream api.CirrusCIService_UploadCacheServer) error {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package cirruscimock

import (
"context"

"github.com/cirruslabs/cirrus-cli/pkg/api"
"google.golang.org/protobuf/types/known/emptypb"
)

func (mock *cirrusCIMock) ReportAgentResourceUtilization(
ctx context.Context,
request *api.ReportAgentResourceUtilizationRequest,
) (*emptypb.Empty, error) {
mock.intermediateResourceUtilizations = append(mock.intermediateResourceUtilizations, request.GetResourceUtilization())

return &emptypb.Empty{}, nil
}

func (mock *cirrusCIMock) InspectIntermediateResourceUtilizations() []*api.ResourceUtilization {
return mock.intermediateResourceUtilizations
}
Loading