Skip to content

Commit 2b9827b

Browse files
authored
Refactor prism and go sdk logging and clean up messages (#36484)
* Move slog setup logic to sdk. * Add wrapper for slog so sdk can share the same logging framework with runner. * Beautify and clean up some logging messages. * Fix print tests.
1 parent 554a73b commit 2b9827b

File tree

9 files changed

+134
-55
lines changed

9 files changed

+134
-55
lines changed

sdks/go/cmd/prism/prism.go

Lines changed: 3 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,10 @@ import (
2222
"flag"
2323
"fmt"
2424
"log"
25-
"log/slog"
26-
"os"
27-
"strings"
28-
"time"
2925

26+
beamlog "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
3027
jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
3128
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
32-
"github.com/golang-cz/devslog"
3329
"google.golang.org/grpc"
3430
"google.golang.org/grpc/credentials/insecure"
3531
)
@@ -44,57 +40,17 @@ var (
4440

4541
// Logging flags
4642
var (
47-
logKind = flag.String("log_kind", "dev",
43+
logKindFlag = flag.String("log_kind", "dev",
4844
"Determines the format of prism's logging to std err: valid values are `dev', 'json', or 'text'. Default is `dev`.")
4945
logLevelFlag = flag.String("log_level", "info",
5046
"Sets the minimum log level of Prism. Valid options are 'debug', 'info','warn', and 'error'. Default is 'info'. Debug adds prism source lines.")
5147
)
5248

53-
var logLevel = new(slog.LevelVar)
54-
5549
func main() {
5650
flag.Parse()
5751
ctx, cancel := context.WithCancelCause(context.Background())
5852

59-
var logHandler slog.Handler
60-
loggerOutput := os.Stderr
61-
handlerOpts := &slog.HandlerOptions{
62-
Level: logLevel,
63-
}
64-
switch strings.ToLower(*logLevelFlag) {
65-
case "debug":
66-
logLevel.Set(slog.LevelDebug)
67-
handlerOpts.AddSource = true
68-
case "info":
69-
logLevel.Set(slog.LevelInfo)
70-
case "warn":
71-
logLevel.Set(slog.LevelWarn)
72-
case "error":
73-
logLevel.Set(slog.LevelError)
74-
default:
75-
log.Fatalf("Invalid value for log_level: %v, must be 'debug', 'info', 'warn', or 'error'", *logKind)
76-
}
77-
switch strings.ToLower(*logKind) {
78-
case "dev":
79-
logHandler =
80-
devslog.NewHandler(loggerOutput, &devslog.Options{
81-
TimeFormat: "[" + time.RFC3339Nano + "]",
82-
StringerFormatter: true,
83-
HandlerOptions: handlerOpts,
84-
StringIndentation: false,
85-
NewLineAfterLog: true,
86-
MaxErrorStackTrace: 3,
87-
})
88-
case "json":
89-
logHandler = slog.NewJSONHandler(loggerOutput, handlerOpts)
90-
case "text":
91-
logHandler = slog.NewTextHandler(loggerOutput, handlerOpts)
92-
default:
93-
log.Fatalf("Invalid value for log_kind: %v, must be 'dev', 'json', or 'text'", *logKind)
94-
}
95-
96-
slog.SetDefault(slog.New(logHandler))
97-
53+
beamlog.SetupLogging(*logLevelFlag, *logKindFlag)
9854
cli, err := makeJobClient(ctx,
9955
prism.Options{
10056
Port: *jobPort,

sdks/go/pkg/beam/core/runtime/harness/harness.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func MainWithOptions(ctx context.Context, loggingEndpoint, controlEndpoint strin
101101

102102
elmTimeout, err := parseTimeoutDurationFlag(ctx, beam.PipelineOptions.Get("element_processing_timeout"))
103103
if err != nil {
104-
log.Infof(ctx, "Failed to parse element_processing_timeout: %v, there will be no timeout for processing an element in a PTransform operation", err)
104+
log.Debugf(ctx, "Failed to parse element_processing_timeout: %v, there will be no timeout for processing an element in a PTransform operation", err)
105105
}
106106

107107
// Connect to FnAPI control server. Receive and execute work.

sdks/go/pkg/beam/forward.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/genx"
2525
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
2626
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
27+
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
2728
)
2829

2930
// IMPLEMENTATION NOTE: functions and types in this file are assumed to be
@@ -51,6 +52,10 @@ func RegisterType(t reflect.Type) {
5152
}
5253

5354
func init() {
55+
runtime.RegisterInit(func() {
56+
log.SetupLoggingWithDefault()
57+
})
58+
5459
runtime.RegisterInit(func() {
5560
if EnableSchemas {
5661
schema.Initialize()

sdks/go/pkg/beam/log/log.go

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,14 @@ package log
2121
import (
2222
"context"
2323
"fmt"
24+
"log"
25+
"log/slog"
2426
"os"
27+
"strings"
2528
"sync/atomic"
29+
"time"
30+
31+
"github.com/golang-cz/devslog"
2632
)
2733

2834
// Severity is the severity of the log message.
@@ -37,6 +43,11 @@ const (
3743
SevFatal
3844
)
3945

46+
var (
47+
LogLevel = "info" // The logging level for slog. Valid values are `debug`, `info`, `warn` or `error`. Default is `info`.
48+
LogKind = "text" // The logging format for slog. Valid values are `dev', 'json', or 'text'. Default is `text`.
49+
)
50+
4051
// Logger is a context-aware logging backend. The richer context allows for
4152
// more sophisticated logging setups. Must be concurrency safe.
4253
type Logger interface {
@@ -54,7 +65,7 @@ type concreteLogger struct {
5465
}
5566

5667
func init() {
57-
logger.Store(&concreteLogger{&Standard{}})
68+
logger.Store(&concreteLogger{&Structural{}})
5869
}
5970

6071
// SetLogger sets the global Logger. Intended to be called during initialization
@@ -190,3 +201,51 @@ func Exitln(ctx context.Context, v ...any) {
190201
Output(ctx, SevFatal, 1, fmt.Sprintln(v...))
191202
os.Exit(1)
192203
}
204+
205+
func SetupLoggingWithDefault() {
206+
var logLevel = new(slog.LevelVar)
207+
var logHandler slog.Handler
208+
loggerOutput := os.Stderr
209+
handlerOpts := &slog.HandlerOptions{
210+
Level: logLevel,
211+
}
212+
switch strings.ToLower(LogLevel) {
213+
case "debug":
214+
logLevel.Set(slog.LevelDebug)
215+
handlerOpts.AddSource = true
216+
case "info":
217+
logLevel.Set(slog.LevelInfo)
218+
case "warn":
219+
logLevel.Set(slog.LevelWarn)
220+
case "error":
221+
logLevel.Set(slog.LevelError)
222+
default:
223+
log.Fatalf("Invalid value for log_level: %v, must be 'debug', 'info', 'warn', or 'error'", LogLevel)
224+
}
225+
switch strings.ToLower(LogKind) {
226+
case "dev":
227+
logHandler =
228+
devslog.NewHandler(loggerOutput, &devslog.Options{
229+
TimeFormat: "[" + time.RFC3339Nano + "]",
230+
StringerFormatter: true,
231+
HandlerOptions: handlerOpts,
232+
StringIndentation: false,
233+
NewLineAfterLog: true,
234+
MaxErrorStackTrace: 3,
235+
})
236+
case "json":
237+
logHandler = slog.NewJSONHandler(loggerOutput, handlerOpts)
238+
case "text":
239+
logHandler = slog.NewTextHandler(loggerOutput, handlerOpts)
240+
default:
241+
log.Fatalf("Invalid value for log_kind: %v, must be 'dev', 'json', or 'text'", LogKind)
242+
}
243+
244+
slog.SetDefault(slog.New(logHandler))
245+
}
246+
247+
func SetupLogging(logLevel, logKind string) {
248+
LogLevel = logLevel
249+
LogKind = logKind
250+
SetupLoggingWithDefault()
251+
}

sdks/go/pkg/beam/log/structural.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
package log
17+
18+
import (
19+
"context"
20+
slogger "log/slog"
21+
)
22+
23+
// Structural is a wrapper over slog
24+
type Structural struct{}
25+
26+
var loggerMap = map[Severity]func(string, ...any){
27+
SevUnspecified: slogger.Info,
28+
SevDebug: slogger.Debug,
29+
SevInfo: slogger.Info,
30+
SevWarn: slogger.Warn,
31+
SevError: slogger.Error,
32+
SevFatal: slogger.Error,
33+
}
34+
35+
// Log logs the message to the structural Go logger. For Panic, it does not
36+
// perform the os.Exit(1) call, but defers to the log wrapper.
37+
func (s *Structural) Log(ctx context.Context, sev Severity, _ int, msg string) {
38+
loggerMap[sev](msg)
39+
}

sdks/go/pkg/beam/runners/prism/internal/worker/worker.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
3737
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
3838
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
39+
beamlog "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
3940
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
4041
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
4142
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
@@ -224,7 +225,6 @@ func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
224225
slog.String("transformID", l.GetTransformId()), // TODO: pull the unique name from the pipeline graph.
225226
slog.String("location", l.GetLogLocation()),
226227
slog.Time(slog.TimeKey, l.GetTimestamp().AsTime()),
227-
slog.String(slog.MessageKey, l.GetMessage()),
228228
}
229229
if fs := l.GetCustomData().GetFields(); len(fs) > 0 {
230230
var grp []any
@@ -245,7 +245,11 @@ func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
245245
attrs = append(attrs, slog.Group("customData", grp...))
246246
}
247247

248-
slog.LogAttrs(stream.Context(), toSlogSev(l.GetSeverity()), "log from SDK worker", slog.Any("worker", wk), slog.Group("sdk", attrs...))
248+
if beamlog.LogLevel == "debug" {
249+
slog.LogAttrs(stream.Context(), toSlogSev(l.GetSeverity()), "[SDK] "+l.GetMessage(), slog.Group("sdk", attrs...), slog.Any("worker", wk))
250+
} else {
251+
slog.LogAttrs(stream.Context(), toSlogSev(l.GetSeverity()), "[SDK] "+l.GetMessage())
252+
}
249253
}
250254
}
251255
}

sdks/go/pkg/beam/runners/universal/runnerlib/job.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"context"
2020
"fmt"
2121
"io"
22+
"strings"
2223

2324
"github.com/apache/beam/sdks/v2/go/container/tools"
2425
"github.com/apache/beam/sdks/v2/go/pkg/beam"
@@ -138,7 +139,16 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID
138139
case msg.GetMessageResponse() != nil:
139140
resp := msg.GetMessageResponse()
140141

141-
text := fmt.Sprintf("%v (%v): %v", resp.GetTime(), resp.GetMessageId(), resp.GetMessageText())
142+
var b strings.Builder
143+
if resp.GetTime() != "" {
144+
fmt.Fprintf(&b, "(time=%v)", resp.GetTime())
145+
}
146+
if resp.GetMessageId() != "" {
147+
fmt.Fprintf(&b, "(id=%v)", resp.GetMessageId())
148+
}
149+
b.WriteString(resp.GetMessageText())
150+
text := b.String()
151+
142152
log.Output(ctx, messageSeverity(resp.GetImportance()), 1, text)
143153

144154
if resp.GetImportance() >= jobpb.JobMessage_JOB_MESSAGE_ERROR {

sdks/go/pkg/beam/runners/universal/universal.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/apache/beam/sdks/v2/go/pkg/beam"
2424
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
2525
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx"
26+
"google.golang.org/protobuf/encoding/prototext"
2627

2728
// Importing to get the side effect of the remote execution hook. See init().
2829
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/init"
@@ -92,7 +93,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error)
9293
return nil, errors.WithContextf(err, "generating model pipeline")
9394
}
9495

95-
log.Info(ctx, pipeline.String())
96+
log.Debugf(ctx, "Pipeline proto: %s", prototext.Format(pipeline))
9697

9798
opt := &runnerlib.JobOptions{
9899
Name: jobopts.GetJobName(),

sdks/go/pkg/beam/x/debug/print_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package debug
1818
import (
1919
"bytes"
2020
"log"
21+
"log/slog"
2122
"os"
2223
"strings"
2324
"testing"
@@ -92,10 +93,14 @@ func captureRunLogging(p *beam.Pipeline) string {
9293
// Pipe output to out
9394
var out bytes.Buffer
9495
log.SetOutput(&out)
96+
defer log.SetOutput(os.Stderr)
97+
98+
oldLogger := slog.Default()
99+
logHandler := slog.NewTextHandler(&out, nil)
100+
slog.SetDefault(slog.New(logHandler))
101+
defer slog.SetDefault((oldLogger))
95102

96103
ptest.Run(p)
97104

98-
// Return to original state
99-
log.SetOutput(os.Stderr)
100105
return out.String()
101106
}

0 commit comments

Comments
 (0)