Skip to content

Commit 0cdd893

Browse files
committed
feat(orchestrator): remove staticMachineCapacity and integrate local telemetry tracking for internal users
1 parent f399cfb commit 0cdd893

File tree

2 files changed

+131
-84
lines changed

2 files changed

+131
-84
lines changed

pkg/orchestrator/gke/gke_job_orchestrator.go

Lines changed: 36 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"hpc-toolkit/pkg/orchestrator"
2525
"hpc-toolkit/pkg/scheduling"
2626
"hpc-toolkit/pkg/shell"
27+
"hpc-toolkit/pkg/telemetry"
2728
"io"
2829
"net/http"
2930
"os"
@@ -181,8 +182,21 @@ func NewGKEOrchestrator() (*GKEOrchestrator, error) {
181182
func (g *GKEOrchestrator) SubmitJob(job orchestrator.JobDefinition) error {
182183
logging.Info("Starting gcluster job submit workflow...")
183184

185+
startTime := time.Now()
186+
var success bool
187+
defer func() {
188+
latencySecs := time.Since(startTime).Seconds()
189+
profile := map[string]string{
190+
"accelerator_type": job.AcceleratorType,
191+
"nodes": fmt.Sprintf("%d", job.NumSlices),
192+
}
193+
194+
telemetry.RecordLocalMetrics(job.WorkloadName, latencySecs, success, profile)
195+
}()
196+
184197
var err error
185198
job, err = g.initializeJobSubmission(job)
199+
186200
if err != nil {
187201
return err
188202
}
@@ -231,9 +245,11 @@ func (g *GKEOrchestrator) SubmitJob(job orchestrator.JobDefinition) error {
231245
}
232246

233247
logging.Info("gcluster job submit workflow completed.")
248+
success = true
234249
return nil
235250
}
236251

252+
237253
func (g *GKEOrchestrator) generatePathwaysManifest(job orchestrator.JobDefinition, fullImageName string) (string, error) {
238254
// Set default values for Pathways-specific fields if not provided
239255
if job.Pathways.ProxyServerImage == "" {
@@ -1254,77 +1270,8 @@ type MachineTypeCap struct {
12541270
GuestCpus int `json:"guestCpus"` // Parse vCPUs for CPU-only machines
12551271
}
12561272

1257-
var staticMachineCapacity = map[string]int{
1258-
// GPUs - RTX-Pro 6000 (g4 series)
1259-
"g4-standard-48": 1,
1260-
"g4-standard-96": 2,
1261-
"g4-standard-192": 4,
1262-
"g4-standard-384": 8,
1263-
1264-
// GPUs - Tesla A100 (a2 series)
1265-
"a2-highgpu-1g": 1,
1266-
"a2-highgpu-2g": 2,
1267-
"a2-highgpu-4g": 4,
1268-
"a2-highgpu-8g": 8,
1269-
"a2-megagpu-16g": 16,
1270-
1271-
// GPUs - A100 80GB
1272-
"a2-ultragpu-1g": 1,
1273-
"a2-ultragpu-2g": 2,
1274-
"a2-ultragpu-4g": 4,
1275-
"a2-ultragpu-8g": 8,
1276-
1277-
// GPUs - H100 80GB
1278-
"a3-highgpu-1g": 1,
1279-
"a3-highgpu-2g": 2,
1280-
"a3-highgpu-4g": 4,
1281-
"a3-highgpu-8g": 8,
1282-
1283-
// GPUs - H100 Mega & H200
1284-
"a3-megagpu-8g": 8,
1285-
"a3-ultragpu-8g": 8,
1286-
1287-
// GPUs - B200 and GB200
1288-
"a4-highgpu-8g-lowmem": 8,
1289-
"a4-highgpu-8g": 8,
1290-
"a4x-highgpu-4g": 4,
1291-
"a4x-highgpu-4g-nolssd": 4,
1292-
1293-
// GPUs - L4 (g2 series)
1294-
"g2-standard-4": 1,
1295-
"g2-standard-8": 1,
1296-
"g2-standard-12": 1,
1297-
"g2-standard-24": 2,
1298-
"g2-standard-48": 4,
1299-
"g2-standard-96": 8,
1300-
1301-
// TPUs
1302-
"ct4p-hightpu-4t": 4,
1303-
"ct5p-hightpu-1t": 1,
1304-
"ct5p-hightpu-2t": 2,
1305-
"ct5p-hightpu-4t": 4,
1306-
"ct6e-standard-1t": 1,
1307-
"ct6e-standard-4t": 4,
1308-
"ct6e-standard-8t": 8,
1309-
}
1310-
13111273
var acceleratorShorthandMap = map[string]string{
1312-
"nvidia-l4": "g2-standard-12",
1313-
"nvidia-tesla-a100": "a2-highgpu-1g",
1314-
"nvidia-a100-80gb": "a3-highgpu-8g",
1315-
"nvidia-h100-80gb": "a3-highgpu-8g",
1316-
"nvidia-h100-mega-80gb": "a3-megagpu-8g",
1317-
"nvidia-h200-141gb": "a3-ultragpu-8g",
1318-
"nvidia-b200": "a4-highgpu-8g",
1319-
"nvidia-gb200": "a4x-highgpu-4g",
1320-
"tpu-v7": "tpu7-standard-1t",
1321-
"tpu-v7x": "tpu7x-standard-4t",
1322-
"tpu-v6e-slice": "ct6e-standard-4t",
1323-
"tpu-v5p-slice": "ct5p-hightpu-4t",
1324-
"tpu-v5-lite-podslice": "ct5lp-hightpu-4t",
1325-
"tpu-v4-podslice": "ct4p-hightpu-4t",
1326-
1327-
// xpk compatibility mappings (GPUs)
1274+
// GPU mappings
13281275
"l4-1": "g2-standard-12",
13291276
"l4-2": "g2-standard-24",
13301277
"l4-4": "g2-standard-48",
@@ -1351,22 +1298,23 @@ var acceleratorShorthandMap = map[string]string{
13511298
"b200-8": "a4-highgpu-8g",
13521299
"gb200-4": "a4x-highgpu-4g",
13531300

1354-
// xpk compatibility mappings (TPUs - Single VM configurations)
1355-
"v4-8": "ct4p-hightpu-4t",
1356-
"v5p-2": "ct5p-hightpu-1t",
1357-
"v5p-4": "ct5p-hightpu-2t",
1358-
"v5p-8": "ct5p-hightpu-4t",
1359-
"v6e-1": "ct6e-standard-1t",
1360-
"v6e-4": "ct6e-standard-4t",
1361-
"v6e-8": "ct6e-standard-8t",
1301+
// TPUs mappings
1302+
"v4-8": "ct4p-hightpu-4t",
1303+
"v5p-1": "ct5p-hightpu-1t",
1304+
"v5p-2": "ct5p-hightpu-2t",
1305+
"v5p-4": "ct5p-hightpu-4t",
1306+
"v5e-1": "ct5lp-hightpu-1t",
1307+
"v5e-4": "ct5lp-hightpu-4t",
1308+
"v5e-8": "ct5lp-hightpu-8t",
1309+
"v6e-1": "ct6e-standard-1t",
1310+
"v6e-4": "ct6e-standard-4t",
1311+
"v6e-8": "ct6e-standard-8t",
1312+
"tpu-v7": "tpu7-standard-1t",
1313+
"tpu-v7x": "tpu7x-standard-4t",
1314+
13621315
}
13631316

13641317
func (g *GKEOrchestrator) FetchMachineCapacity(machineType, zone string) (int, error) {
1365-
if cap, exists := staticMachineCapacity[machineType]; exists {
1366-
logging.Info("Using hardcoded capacity for machine type %s: %d", machineType, cap)
1367-
return cap, nil
1368-
}
1369-
13701318
if zone == "" {
13711319
return 0, fmt.Errorf("zone is required for machine capacity lookup")
13721320
}
@@ -1504,11 +1452,15 @@ func (g *GKEOrchestrator) calculateGCPMachineResourceLimits(opts ManifestOptions
15041452
if strings.Contains(strings.ToLower(mapped), "tpu") {
15051453
return "", "", "", fmt.Sprintf("%d", count), nil
15061454
}
1455+
return "", "", "", "", fmt.Errorf("machine type %s resolved to %d capacity but could not be classified as GPU or TPU (mapped label: %s)", machineName, count, mapped)
15071456
}
1508-
return "", "", "", "", nil
1457+
return "", "", "", "", fmt.Errorf("failed to determine capacity for machine type %s", machineName)
15091458
}
15101459

15111460

1461+
1462+
1463+
15121464
func (g *GKEOrchestrator) calculateCPUMachineResourceLimits(opts ManifestOptions, profile JobProfile) (string, error) {
15131465
count := profile.CapacityCount
15141466
logging.Info("Using cached capacity for CPU machine %s during limits calculation: %d", opts.AcceleratorType, count)

pkg/telemetry/telemetry.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package telemetry
16+
17+
import (
18+
"encoding/json"
19+
"hpc-toolkit/pkg/logging"
20+
"hpc-toolkit/pkg/shell"
21+
"os"
22+
"path/filepath"
23+
"strings"
24+
"time"
25+
)
26+
27+
// MetricEntry holds telemetry data for internal Google usage.
28+
type MetricEntry struct {
29+
Timestamp time.Time `json:"timestamp"`
30+
WorkloadName string `json:"workload_name"`
31+
LatencySeconds float64 `json:"latency_seconds"`
32+
SubmissionSuccess bool `json:"submission_success"`
33+
StaticResourceProfile map[string]string `json:"static_resource_profile"`
34+
}
35+
36+
// isInternalUser checks if the active gcloud account is a @google.com domain.
37+
func isInternalUser() bool {
38+
res := shell.ExecuteCommand("gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)")
39+
if res.ExitCode != 0 {
40+
return false
41+
}
42+
account := strings.TrimSpace(res.Stdout)
43+
return strings.HasSuffix(account, "@google.com")
44+
}
45+
46+
// RecordLocalMetrics appends a telemetry metric entry to ~/.gcluster-job/telemetry_metrics.jsonl only for internal Googlers.
47+
func RecordLocalMetrics(workloadName string, latency float64, success bool, profile map[string]string) {
48+
if os.Getenv("GCLUSTER_SKIP_TELEMETRY") == "true" {
49+
logging.Info("Skipping telemetry metrics due to GCLUSTER_SKIP_TELEMETRY environment variable.")
50+
return
51+
}
52+
53+
if !isInternalUser() {
54+
return // Skip quietly if not an internal Googler
55+
}
56+
57+
entry := MetricEntry{
58+
Timestamp: time.Now(),
59+
WorkloadName: workloadName,
60+
LatencySeconds: latency,
61+
SubmissionSuccess: success,
62+
StaticResourceProfile: profile,
63+
}
64+
65+
homeDir, err := os.UserHomeDir()
66+
if err != nil {
67+
logging.Error("Failed to resolve user home for internal telemetry: %v", err)
68+
return
69+
}
70+
71+
metricsDir := filepath.Join(homeDir, ".gcluster-job")
72+
if err := os.MkdirAll(metricsDir, 0755); err != nil {
73+
logging.Error("Failed to create metrics storage directory %s: %v", metricsDir, err)
74+
return
75+
}
76+
77+
metricsFile := filepath.Join(metricsDir, "telemetry_metrics.jsonl")
78+
79+
data, err := json.Marshal(entry)
80+
if err != nil {
81+
logging.Error("Failed to marshal telemetry metrics to JSON: %v", err)
82+
return
83+
}
84+
85+
f, err := os.OpenFile(metricsFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
86+
if err != nil {
87+
logging.Error("Failed to open telemetry metrics file %s: %v", metricsFile, err)
88+
return
89+
}
90+
defer f.Close()
91+
92+
if _, err := f.Write(append(data, '\n')); err != nil {
93+
logging.Error("Failed to write telemetry metrics to %s: %v", metricsFile, err)
94+
}
95+
}

0 commit comments

Comments
 (0)