Skip to content

Commit 8851b17

Browse files
author
Samu
authored
Collect cloud volume metrics from GCP (#630)
1 parent c3d442f commit 8851b17

File tree

8 files changed

+226
-26
lines changed

8 files changed

+226
-26
lines changed

cmd/controller/controllers/volume_state_controller.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,15 @@ func (c *VolumeStateController) fetchInitialStorageState(ctx context.Context, vo
6565
return nil
6666
}
6767

68-
instanceIDs := make([]string, len(nodes))
68+
instanceIDs := make([]string, 0, len(nodes))
6969
instanceIDToNodeName := make(map[string]string, len(nodes))
70-
for i, node := range nodes {
70+
for _, node := range nodes {
7171
instanceID := extractInstanceIDFromProviderID(node.Spec.ProviderID)
72-
instanceIDs[i] = instanceID
72+
if instanceID == "" {
73+
c.log.WithField("provider_id", node.Spec.ProviderID).Warn("could not extract instance id from provider id")
74+
continue
75+
}
76+
instanceIDs = append(instanceIDs, instanceID)
7377
instanceIDToNodeName[instanceID] = node.Name
7478
}
7579

@@ -114,11 +118,15 @@ func (c *VolumeStateController) runRefreshLoop(ctx context.Context, volumeIndex
114118
continue
115119
}
116120

117-
instanceIDs := make([]string, len(nodes))
121+
instanceIDs := make([]string, 0, len(nodes))
118122
instanceIDToNodeName := make(map[string]string, len(nodes))
119-
for i, node := range nodes {
123+
for _, node := range nodes {
120124
instanceID := extractInstanceIDFromProviderID(node.Spec.ProviderID)
121-
instanceIDs[i] = instanceID
125+
if instanceID == "" {
126+
c.log.WithField("provider_id", node.Spec.ProviderID).Warn("could not extract instance id from provider id")
127+
continue
128+
}
129+
instanceIDs = append(instanceIDs, instanceID)
122130
instanceIDToNodeName[instanceID] = node.Name
123131
}
124132

@@ -162,10 +170,10 @@ func extractInstanceIDFromProviderID(providerID string) string {
162170
}
163171

164172
// GCP format: gce://project-id/zone/instance-name
165-
if strings.HasPrefix(providerID, "gce://") {
166-
parts := strings.Split(providerID, "/")
167-
if len(parts) >= 4 {
168-
return parts[len(parts)-1]
173+
if instanceID, ok := strings.CutPrefix(providerID, "gce://"); ok {
174+
parts := strings.Split(instanceID, "/")
175+
if len(parts) == 3 {
176+
return instanceID
169177
}
170178
}
171179

cmd/controller/controllers/volume_state_controller_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,12 @@ func TestExtractInstanceIDFromProviderID(t *testing.T) {
8787
{
8888
name: "GCP format",
8989
providerID: "gce://my-project/us-central1-a/instance-name",
90-
want: "instance-name",
90+
want: "my-project/us-central1-a/instance-name",
91+
},
92+
{
93+
name: "GCP format with missing zone",
94+
providerID: "gce://my-project/instance-name",
95+
want: "",
9196
},
9297
{
9398
name: "Azure format",

pkg/cloudprovider/aws/provider.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@ type Provider struct {
1717
// AWS clients
1818
ec2Client *ec2.Client
1919

20-
// Cached storage state
21-
storageStateMu sync.RWMutex
22-
storageState *types.StorageState
23-
2420
// Cached network state
2521
networkStateMu sync.RWMutex
2622
networkState *types.NetworkState

pkg/cloudprovider/aws/storage_state.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@ func (p *Provider) GetStorageState(ctx context.Context, instanceIds ...string) (
1515
p.log.Debug("refreshing storage state")
1616

1717
state := &types.StorageState{
18-
Domain: "amazonaws.com",
19-
Provider: types.TypeAWS,
20-
InstanceVolumes: make(map[string][]types.Volume),
18+
Domain: "amazonaws.com",
19+
Provider: types.TypeAWS,
2120
}
2221

2322
instanceVolumes, err := p.fetchInstanceVolumes(ctx, instanceIds...)
@@ -26,11 +25,7 @@ func (p *Provider) GetStorageState(ctx context.Context, instanceIds ...string) (
2625
}
2726
state.InstanceVolumes = instanceVolumes
2827

29-
p.storageStateMu.Lock()
30-
defer p.storageStateMu.Unlock()
31-
p.storageState = state
32-
33-
return p.storageState, nil
28+
return state, nil
3429
}
3530

3631
// fetchInstanceVolumes retrieves instance volumes from https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_Volume.html

pkg/cloudprovider/gcp/provider.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package gcp
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"sync"
78

@@ -17,6 +18,7 @@ type Provider struct {
1718
// GCP clients
1819
networksClient *compute.NetworksClient
1920
subnetworksClient *compute.SubnetworksClient
21+
disksClient *compute.DisksClient
2022

2123
// Cached network state
2224
networkStateMu sync.RWMutex
@@ -43,11 +45,17 @@ func NewProvider(ctx context.Context, cfg types.ProviderConfig) (types.Provider,
4345
return nil, fmt.Errorf("creating subnetworks client: %w", err)
4446
}
4547

48+
disksClient, err := compute.NewDisksRESTClient(ctx, clientOptions...)
49+
if err != nil {
50+
return nil, fmt.Errorf("creating disks client: %w", err)
51+
}
52+
4653
p := &Provider{
4754
log: log,
4855
cfg: cfg,
4956
networksClient: networksClient,
5057
subnetworksClient: subnetworksClient,
58+
disksClient: disksClient,
5159
}
5260

5361
log.With("project", cfg.GCPProjectID).Info("gcp provider initialized")
@@ -71,10 +79,14 @@ func (p *Provider) Close() error {
7179
errs = append(errs, fmt.Errorf("closing subnetworks client: %w", err))
7280
}
7381
}
82+
if p.disksClient != nil {
83+
if err := p.disksClient.Close(); err != nil {
84+
errs = append(errs, fmt.Errorf("closing disks client: %w", err))
85+
}
86+
}
7487

7588
if len(errs) > 0 {
76-
return fmt.Errorf("errors closing GCP provider: %v", errs)
89+
return fmt.Errorf("errors closing GCP provider: %w", errors.Join(errs...))
7790
}
78-
7991
return nil
8092
}

pkg/cloudprovider/gcp/storage_state.go

Lines changed: 130 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,140 @@ package gcp
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"math"
8+
"path"
9+
"strings"
10+
11+
"cloud.google.com/go/compute/apiv1/computepb"
12+
"github.com/samber/lo"
13+
"google.golang.org/api/iterator"
614

715
"github.com/castai/kvisor/pkg/cloudprovider/types"
816
)
917

1018
func (p *Provider) GetStorageState(ctx context.Context, instanceIds ...string) (*types.StorageState, error) {
11-
return nil, fmt.Errorf("GetStorageState not yet implemented for GCP")
19+
p.log.Debug("refreshing storage state")
20+
21+
state := &types.StorageState{
22+
Domain: "googleapis.com",
23+
Provider: types.TypeGCP,
24+
}
25+
26+
instanceVolumes, err := p.fetchInstanceVolumes(ctx, instanceIds...)
27+
if err != nil {
28+
return nil, fmt.Errorf("fetching volumes: %w", err)
29+
}
30+
state.InstanceVolumes = instanceVolumes
31+
32+
return state, nil
33+
}
34+
35+
// fetchInstanceVolumes retrieves instance volumes from https://docs.cloud.google.com/compute/docs/reference/rest/v1/disks/aggregatedList
36+
func (p *Provider) fetchInstanceVolumes(ctx context.Context, instanceIds ...string) (map[string][]types.Volume, error) {
37+
instanceVolumes := make(map[string][]types.Volume, len(instanceIds))
38+
39+
if len(instanceIds) == 0 {
40+
return instanceVolumes, nil
41+
}
42+
43+
instanceUrlsMap := make(map[string]string, len(instanceIds))
44+
for _, instanceId := range instanceIds {
45+
url := buildInstanceUrlFromId(instanceId)
46+
if url == "" {
47+
p.log.WithField("instance_id", instanceId).Warn("could not build instance url")
48+
continue
49+
}
50+
instanceUrlsMap[url] = instanceId
51+
}
52+
53+
filter := buildDisksUsedByInstanceFilter(lo.Keys(instanceUrlsMap))
54+
55+
req := &computepb.AggregatedListDisksRequest{
56+
Project: p.cfg.GCPProjectID,
57+
Filter: &filter,
58+
}
59+
60+
it := p.disksClient.AggregatedList(ctx, req)
61+
for result, err := range it.All() {
62+
if errors.Is(err, iterator.Done) {
63+
break
64+
}
65+
66+
if err != nil {
67+
return instanceVolumes, fmt.Errorf("listing disks: %w", err)
68+
}
69+
70+
for _, disk := range result.Value.Disks {
71+
if disk.GetName() == "" {
72+
p.log.Error("disk missing name, skipping")
73+
continue
74+
}
75+
76+
for _, instanceUrl := range disk.Users {
77+
instanceId, ok := instanceUrlsMap[instanceUrl]
78+
if !ok {
79+
continue
80+
}
81+
82+
volume := types.Volume{
83+
VolumeID: disk.GetName(),
84+
VolumeState: strings.ToLower(disk.GetStatus()),
85+
Encrypted: true, // GCP disks are encrypted by default
86+
}
87+
88+
if disk.GetType() != "" {
89+
volume.VolumeType = path.Base(disk.GetType())
90+
}
91+
92+
if disk.GetZone() != "" {
93+
volume.Zone = path.Base(disk.GetZone())
94+
}
95+
96+
if disk.GetSizeGb() > 0 {
97+
// Size is in GB, convert to bytes
98+
volume.SizeBytes = disk.GetSizeGb() * 1024 * 1024 * 1024
99+
}
100+
101+
if disk.GetProvisionedIops() > 0 {
102+
volume.IOPS = safeInt64ToInt32(disk.GetProvisionedIops())
103+
}
104+
105+
if disk.GetProvisionedThroughput() > 0 {
106+
// Throughput is in MB/s, convert to bytes/s
107+
volume.ThroughputBytes = safeInt64ToInt32(disk.GetProvisionedThroughput() * 1024 * 1024)
108+
}
109+
110+
instanceVolumes[instanceId] = append(instanceVolumes[instanceId], volume)
111+
}
112+
}
113+
}
114+
115+
return instanceVolumes, nil
116+
}
117+
118+
// buildInstanceUrlFromId converts an instance ID (project/zone/instance-name) to a full GCP instance URL
119+
func buildInstanceUrlFromId(instanceId string) string {
120+
parts := strings.Split(instanceId, "/")
121+
if len(parts) != 3 {
122+
return ""
123+
}
124+
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/zones/%s/instances/%s", parts[0], parts[1], parts[2])
125+
}
126+
127+
// buildDisksUsedByInstanceFilter builds a GCP API filter for disks attached to specific instances
128+
func buildDisksUsedByInstanceFilter(instanceUrls []string) string {
129+
conditions := make([]string, len(instanceUrls))
130+
for i, url := range instanceUrls {
131+
conditions[i] = fmt.Sprintf(`(users:%q)`, url)
132+
}
133+
return strings.Join(conditions, " OR ")
134+
}
135+
136+
func safeInt64ToInt32(val int64) int32 {
137+
if val > math.MaxInt32 {
138+
return math.MaxInt32
139+
}
140+
return int32(val) // nolint:gosec
12141
}

pkg/cloudprovider/gcp/test/env.example

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ GCP_PROJECT_ID=
44
# Required: Your network (VPC) name to test
55
NETWORK_NAME=
66

7+
# Required: GCP Instance ID to test volume listing
8+
# Example: my-gcp-project/us-east4-a/my-gcp-pool-3556b234,my-gcp-project/us-east4-c/my-gcp-pool-a7579587
9+
GCP_INSTANCE_IDS=
10+
711
# Optional: Path to service account key file
812
# If not set, will use GOOGLE_APPLICATION_CREDENTIALS or default credentials
913
GCP_CREDENTIALS_FILE=

pkg/cloudprovider/gcp/test/integration_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package integration_test
55
import (
66
"context"
77
"os"
8+
"strings"
89
"testing"
910

1011
"github.com/joho/godotenv"
@@ -84,3 +85,53 @@ func TestRefreshNetworkState(t *testing.T) {
8485
}
8586
// t.Logf(" Service Ranges: %+v", state.ServiceRanges)
8687
}
88+
89+
// TestGetStorageState calls GetStorageState and prints the results.
90+
func TestGetStorageState(t *testing.T) {
91+
cfg := getTestConfig(t)
92+
ctx := t.Context()
93+
94+
provider, err := gcp.NewProvider(ctx, cfg)
95+
if err != nil {
96+
t.Fatalf("NewProvider failed: %v", err)
97+
}
98+
99+
p := provider.(*gcp.Provider)
100+
101+
instanceIDsStr := os.Getenv("GCP_INSTANCE_IDS")
102+
if instanceIDsStr == "" {
103+
t.Fatal("GCP_INSTANCE_IDS not set")
104+
}
105+
106+
instanceIDs := strings.Split(instanceIDsStr, ",")
107+
for i := range instanceIDs {
108+
instanceIDs[i] = strings.TrimSpace(instanceIDs[i])
109+
}
110+
111+
state, err := p.GetStorageState(ctx, instanceIDs...)
112+
if err != nil {
113+
t.Fatalf("GetStorageState failed: %v", err)
114+
}
115+
116+
for _, instanceID := range instanceIDs {
117+
t.Logf("Testing instance: %s", instanceID)
118+
119+
volumes, ok := state.InstanceVolumes[instanceID]
120+
if !ok {
121+
t.Fatalf("No volumes found for instance %s", instanceID)
122+
}
123+
124+
t.Logf("Found %d volumes attached to instance %s:", len(volumes), instanceID)
125+
for _, v := range volumes {
126+
t.Logf(" Volume:")
127+
t.Logf(" VolumeID: %s", v.VolumeID)
128+
t.Logf(" VolumeType: %s", v.VolumeType)
129+
t.Logf(" VolumeState: %s", v.VolumeState)
130+
t.Logf(" SizeBytes: %d", v.SizeBytes)
131+
t.Logf(" Zone: %s", v.Zone)
132+
t.Logf(" Encrypted: %v", v.Encrypted)
133+
t.Logf(" IOPS: %d", v.IOPS)
134+
t.Logf(" ThroughputBytes: %d B/s", v.ThroughputBytes)
135+
}
136+
}
137+
}

0 commit comments

Comments
 (0)