Skip to content

Commit 34aa17d

Browse files
Allow users to pass service name for profiler for Java And Go SDK (#35903)
1 parent abdec1b commit 34aa17d

File tree

4 files changed

+237
-65
lines changed

4 files changed

+237
-65
lines changed

sdks/go/container/boot.go

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,22 +61,46 @@ const (
6161
workerPoolIdEnv = "BEAM_GO_WORKER_POOL_ID"
6262
)
6363

64-
func configureGoogleCloudProfilerEnvVars(ctx context.Context, logger *tools.Logger, metadata map[string]string) error {
65-
if metadata == nil {
66-
return errors.New("enable_google_cloud_profiler is set to true, but no metadata is received from provision server, profiling will not be enabled")
64+
func configureGoogleCloudProfilerEnvVars(ctx context.Context, logger *tools.Logger, metadata map[string]string, options string) error {
65+
const profilerKey = "enable_google_cloud_profiler="
66+
67+
var parsed map[string]interface{}
68+
if err := json.Unmarshal([]byte(options), &parsed); err != nil {
69+
panic(err)
6770
}
68-
jobName, nameExists := metadata["job_name"]
69-
if !nameExists {
70-
return errors.New("required job_name missing from metadata, profiling will not be enabled without it")
71+
72+
var profilerServiceName string
73+
74+
// Try from "beam:option:go_options:v1" -> "options" -> "dataflow_service_options"
75+
if goOpts, ok := parsed["beam:option:go_options:v1"].(map[string]interface{}); ok {
76+
if options, ok := goOpts["options"].(map[string]interface{}); ok {
77+
if profilerServiceNameRaw, ok := options["dataflow_service_options"].(string); ok {
78+
if strings.HasPrefix(profilerServiceNameRaw, profilerKey) {
79+
profilerServiceName = strings.TrimPrefix(profilerServiceNameRaw, profilerKey)
80+
}
81+
}
82+
}
7183
}
84+
85+
// Fallback to job_name from metadata
86+
if profilerServiceName == "" {
87+
if jobName, jobNameExists := metadata["job_name"]; jobNameExists {
88+
profilerServiceName = jobName
89+
} else {
90+
return errors.New("required job_name missing from metadata, profiling will not be enabled without it")
91+
}
92+
}
93+
7294
jobID, idExists := metadata["job_id"]
7395
if !idExists {
7496
return errors.New("required job_id missing from metadata, profiling will not be enabled without it")
7597
}
76-
os.Setenv(cloudProfilingJobName, jobName)
98+
99+
os.Setenv(cloudProfilingJobName, profilerServiceName)
77100
os.Setenv(cloudProfilingJobID, jobID)
78-
logger.Printf(ctx, "Cloud Profiling Job Name: %v, Job IDL %v", jobName, jobID)
101+
logger.Printf(ctx, "Cloud Profiling Job Name: %v, Job IDL %v", profilerServiceName, jobID)
79102
return nil
103+
80104
}
81105

82106
func main() {
@@ -184,7 +208,7 @@ func main() {
184208

185209
enableGoogleCloudProfiler := strings.Contains(options, enableGoogleCloudProfilerOption)
186210
if enableGoogleCloudProfiler {
187-
err := configureGoogleCloudProfilerEnvVars(ctx, logger, info.Metadata)
211+
err := configureGoogleCloudProfilerEnvVars(ctx, logger, info.Metadata, options)
188212
if err != nil {
189213
logger.Printf(ctx, "could not configure Google Cloud Profiler variables, got %v", err)
190214
}

sdks/go/container/boot_test.go

Lines changed: 90 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -205,57 +205,110 @@ func constructArtifactInformation(t *testing.T, roleUrn string, path string, sha
205205
}
206206
}
207207

208+
func clearEnvVars() {
209+
_ = os.Unsetenv(cloudProfilingJobName)
210+
_ = os.Unsetenv(cloudProfilingJobID)
211+
}
212+
208213
func TestConfigureGoogleCloudProfilerEnvVars(t *testing.T) {
209214
tests := []struct {
210-
name string
211-
inputMetadata map[string]string
212-
expectedName string
213-
expectedID string
214-
expectedError string
215+
name string
216+
options string
217+
metadata map[string]string
218+
expectedName string
219+
expectedID string
220+
expectingError bool
215221
}{
216222
{
217-
"nil metadata",
218-
nil,
219-
"",
220-
"",
221-
"enable_google_cloud_profiler is set to true, but no metadata is received from provision server, profiling will not be enabled",
223+
name: "Profiler name from options",
224+
options: `{
225+
"beam:option:go_options:v1": {
226+
"options": {
227+
"dataflow_service_options": "enable_google_cloud_profiler=custom_profiler"
228+
}
229+
}
230+
}`,
231+
metadata: map[string]string{
232+
"job_id": "job-123",
233+
},
234+
expectedName: "custom_profiler",
235+
expectedID: "job-123",
236+
expectingError: false,
222237
},
223238
{
224-
"missing name",
225-
map[string]string{"job_id": "12345"},
226-
"",
227-
"",
228-
"required job_name missing from metadata, profiling will not be enabled without it",
239+
name: "Fallback to job_name",
240+
options: `{
241+
"beam:option:go_options:v1": {
242+
"options": {
243+
"dataflow_service_options": "enable_google_cloud_profiler"
244+
}
245+
}
246+
}`,
247+
metadata: map[string]string{
248+
"job_name": "fallback_profiler",
249+
"job_id": "job-456",
250+
},
251+
expectedName: "fallback_profiler",
252+
expectedID: "job-456",
253+
expectingError: false,
229254
},
230255
{
231-
"missing id",
232-
map[string]string{"job_name": "my_job"},
233-
"",
234-
"",
235-
"required job_id missing from metadata, profiling will not be enabled without it",
256+
name: "Missing job_id",
257+
options: `{
258+
"beam:option:go_options:v1": {
259+
"options": {
260+
"dataflow_service_options": "enable_google_cloud_profiler=custom_profiler"
261+
}
262+
}
263+
}`,
264+
metadata: map[string]string{
265+
"job_name": "custom_profiler",
266+
},
267+
expectingError: true,
236268
},
237269
{
238-
"correct",
239-
map[string]string{"job_name": "my_job", "job_id": "42"},
240-
"my_job",
241-
"42",
242-
"",
243-
},
270+
name: "Missing profiler name and job_name",
271+
options: `{
272+
"beam:option:go_options:v1": {
273+
"options": {
274+
"dataflow_service_options": "enable_google_cloud_profiler"
275+
}
276+
}
277+
}`,
278+
metadata: map[string]string{
279+
"job_id": "job-789",
280+
},
281+
expectingError: true,
282+
},
244283
}
245-
for _, test := range tests {
246-
t.Run(test.name, func(t *testing.T) {
247-
t.Cleanup(os.Clearenv)
248-
err := configureGoogleCloudProfilerEnvVars(context.Background(), &tools.Logger{}, test.inputMetadata)
249-
if err != nil {
250-
if got, want := err.Error(), test.expectedError; got != want {
251-
t.Errorf("got error %v, want error %v", got, want)
284+
285+
for _, tt := range tests {
286+
t.Run(tt.name, func(t *testing.T) {
287+
clearEnvVars()
288+
ctx := context.Background()
289+
290+
err := configureGoogleCloudProfilerEnvVars(ctx, &tools.Logger{}, tt.metadata, tt.options)
291+
292+
if tt.expectingError {
293+
if err == nil {
294+
t.Errorf("Expected error but got nil")
295+
}
296+
return
297+
} else {
298+
if err != nil {
299+
t.Errorf("Did not expect error but got: %v", err)
300+
return
252301
}
253302
}
254-
if got, want := os.Getenv(cloudProfilingJobName), test.expectedName; got != want {
255-
t.Errorf("got job name %v, want %v", got, want)
303+
304+
gotName := os.Getenv(cloudProfilingJobName)
305+
gotID := os.Getenv(cloudProfilingJobID)
306+
307+
if gotName != tt.expectedName {
308+
t.Errorf("Expected profiler name '%s', got '%s'", tt.expectedName, gotName)
256309
}
257-
if got, want := os.Getenv(cloudProfilingJobID), test.expectedID; got != want {
258-
t.Errorf("got job id %v, want %v", got, want)
310+
if gotID != tt.expectedID {
311+
t.Errorf("Expected job ID '%s', got '%s'", tt.expectedID, gotID)
259312
}
260313
})
261314
}

sdks/java/container/boot.go

Lines changed: 69 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package main
2020
import (
2121
"context"
2222
"encoding/json"
23+
"errors"
2324
"flag"
2425
"fmt"
2526
"log"
@@ -196,25 +197,22 @@ func main() {
196197
enableGoogleCloudProfiler := strings.Contains(options, enableGoogleCloudProfilerOption)
197198
enableGoogleCloudHeapSampling := strings.Contains(options, enableGoogleCloudHeapSamplingOption)
198199
if enableGoogleCloudProfiler {
199-
if metadata := info.GetMetadata(); metadata != nil {
200-
if jobName, nameExists := metadata["job_name"]; nameExists {
201-
if jobId, idExists := metadata["job_id"]; idExists {
202-
if enableGoogleCloudHeapSampling {
203-
args = append(args, fmt.Sprintf(googleCloudProfilerAgentHeapArgs, jobName, jobId))
204-
} else {
205-
args = append(args, fmt.Sprintf(googleCloudProfilerAgentBaseArgs, jobName, jobId))
206-
}
207-
logger.Printf(ctx, "Turning on Cloud Profiling. Profile heap: %t", enableGoogleCloudHeapSampling)
208-
} else {
209-
logger.Printf(ctx, "Required job_id missing from metadata, profiling will not be enabled without it.")
210-
}
211-
} else {
212-
logger.Printf(ctx, "Required job_name missing from metadata, profiling will not be enabled without it.")
213-
}
214-
} else {
215-
logger.Printf(ctx, "enable_google_cloud_profiler is set to true, but no metadata is received from provision server, profiling will not be enabled.")
216-
}
217-
}
200+
metadata := info.GetMetadata()
201+
profilerServiceName := ExtractProfilerServiceName(options, metadata)
202+
203+
if profilerServiceName != "" {
204+
if jobId, idExists := metadata["job_id"]; idExists {
205+
if enableGoogleCloudHeapSampling {
206+
args = append(args, fmt.Sprintf(googleCloudProfilerAgentHeapArgs, profilerServiceName, jobId))
207+
} else {
208+
args = append(args, fmt.Sprintf(googleCloudProfilerAgentBaseArgs, profilerServiceName, jobId))
209+
}
210+
logger.Printf(ctx, "Turning on Cloud Profiling. Profile heap: %t, service: %s", enableGoogleCloudHeapSampling, profilerServiceName)
211+
} else {
212+
logger.Printf(ctx, "job_id is missing from metadata. Cannot enable profiling.")
213+
}
214+
}
215+
}
218216

219217
disableJammAgent := strings.Contains(options, disableJammAgentOption)
220218
if disableJammAgent {
@@ -426,3 +424,55 @@ func BuildOptions(ctx context.Context, logger *tools.Logger, metaOptions []*Meta
426424
}
427425
return options
428426
}
427+
428+
func ExtractProfilerServiceName(options string, metadata map[string]string) string {
429+
const profilerKeyPrefix = "enable_google_cloud_profiler="
430+
431+
var profilerServiceName string
432+
433+
var parsed map[string]interface{}
434+
if err := json.Unmarshal([]byte(options), &parsed); err != nil {
435+
return ""
436+
}
437+
438+
displayData, ok := parsed["display_data"].([]interface{})
439+
if !ok {
440+
return ""
441+
}
442+
443+
for _, item := range displayData {
444+
entry, ok := item.(map[string]interface{})
445+
if !ok {
446+
continue
447+
}
448+
if entry["key"] == "dataflowServiceOptions" {
449+
rawValue, ok := entry["value"].(string)
450+
if !ok {
451+
continue
452+
}
453+
cleaned := strings.Trim(rawValue, "[]")
454+
opts := strings.Split(cleaned, ",")
455+
for _, opt := range opts {
456+
opt = strings.TrimSpace(opt)
457+
if strings.HasPrefix(opt, profilerKeyPrefix) {
458+
parts := strings.SplitN(opt, "=", 2)
459+
if len(parts) == 2 {
460+
profilerServiceName = parts[1]
461+
break
462+
}
463+
}
464+
}
465+
}
466+
}
467+
468+
// Fallback to job_name from metadata
469+
if profilerServiceName == "" {
470+
if jobName, exists := metadata["job_name"]; exists {
471+
profilerServiceName = jobName
472+
}else {
473+
return errors.New("required job_name missing from metadata, profiling will not be enabled without it").Error()
474+
}
475+
}
476+
477+
return profilerServiceName
478+
}

sdks/java/container/boot_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,48 @@ func TestHeapSizeLimit(t *testing.T) {
9090
t.Errorf("HeapSizeLimit(200 GB). Actual (%d). want 168 GB", lim)
9191
}
9292
}
93+
94+
func TestExtractProfilerServiceName(t *testing.T) {
95+
tests := []struct {
96+
name string
97+
options string
98+
metadata map[string]string
99+
expected string
100+
}{
101+
{
102+
name: "Extracts custom profiler name from options",
103+
options: `{
104+
"display_data": [
105+
{
106+
"key": "dataflowServiceOptions",
107+
"value": "[enable_google_cloud_profiler=custom_profiler, enable_google_cloud_heap_sampling]"
108+
}
109+
]
110+
}`,
111+
metadata: map[string]string{"job_name": "fallback_profiler"},
112+
expected: "custom_profiler",
113+
},
114+
{
115+
name: "Fallback to job_name when profiler not specified",
116+
options: `{
117+
"display_data": [
118+
{
119+
"key": "dataflowServiceOptions",
120+
"value": "[enable_google_cloud_heap_sampling]"
121+
}
122+
]
123+
}`,
124+
metadata: map[string]string{"job_name": "fallback_profiler"},
125+
expected: "fallback_profiler",
126+
},
127+
}
128+
129+
for _, tt := range tests {
130+
t.Run(tt.name, func(t *testing.T) {
131+
result := ExtractProfilerServiceName(tt.options, tt.metadata)
132+
if result != tt.expected {
133+
t.Errorf("Expected '%s', got '%s'", tt.expected, result)
134+
}
135+
})
136+
}
137+
}

0 commit comments

Comments
 (0)