Skip to content

Commit 416db9b

Browse files
authored
Accept a user provided logger (#19)
This PR allows users to inject their existing logger to DBOS, for us to reuse. # Logging landscape Go as four major loggers: * log/slog https://pkg.go.dev/log/slog * https://pkg.go.dev/go.uber.org/zap * logrus (in maintenance mode but widely popular) https://github.com/sirupsen/logrus * zerolog (up and coming) https://github.com/rs/zerolog # Why log/slog It is design to be extensible with a custom Handler interface, and, fortunately, a long time golang contributor provided two libraries to convert a logrus or a zerolog logger into a slog instance: * https://github.com/samber/slog-logrus * https://github.com/samber/slog-zerolog Zap has its own Handler compatibility layer. These provide a simple way for a user to create a slog logger from their existing logger and retain their configuration. # example usage ```golang func main() { //SLOG logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelDebug, })).With("logger", "dbos-slog") err := dbos.Launch(dbos.WithLogger(logger)) if err != nil { panic(err) } dbos.Shutdown() // ZAP zapLogger, err := zap.NewProduction() if err != nil { panic(err) } defer zapLogger.Sync() logger = slog.New(zapslog.NewHandler(zapLogger.Core(), zapslog.WithName("dbos-zap"))) err = dbos.Launch(dbos.WithLogger(logger)) if err != nil { panic(err) } dbos.Shutdown() // LOGRUS logrusLogger := logrus.New() logger = slog.New(sloglogrus.Option{ Level: slog.LevelDebug, Logger: logrusLogger, }.NewLogrusHandler()) err = dbos.Launch(dbos.WithLogger(logger)) if err != nil { panic(err) } dbos.Shutdown() // ZEROLOG zerologLogger := zerolog.New(zerolog.ConsoleWriter{ Out: os.Stdout, }).With().Timestamp().Logger() logger = slog.New(slogzerolog.Option{ Level: slog.LevelDebug, Logger: &zerologLogger, }.NewZerologHandler()) err = dbos.Launch(dbos.WithLogger(logger)) if err != nil { panic(err) } dbos.Shutdown() } ``` <img width="775" height="766" alt="Screenshot 2025-07-17 at 16 54 52" src="https://github.com/user-attachments/assets/e7a9165a-ac72-4e03-96cc-72f07df18ba0" />
1 parent d3658fc commit 416db9b

File tree

9 files changed

+174
-70
lines changed

9 files changed

+174
-70
lines changed

dbos/dbos.go

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"crypto/sha256"
66
"encoding/hex"
77
"fmt"
8+
"log/slog"
89
"os"
910
"reflect"
1011
"runtime"
@@ -56,42 +57,74 @@ type Executor interface {
5657

5758
var workflowScheduler *cron.Cron
5859

59-
// DBOS represents the main DBOS instance
6060
type executor struct {
6161
systemDB SystemDatabase
6262
queueRunnerCtx context.Context
6363
queueRunnerCancelFunc context.CancelFunc
6464
queueRunnerDone chan struct{}
6565
}
6666

67-
// New creates a new DBOS instance with an initialized system database
6867
var dbos *executor
6968

7069
func getExecutor() *executor {
7170
if dbos == nil {
72-
fmt.Println("warning: DBOS instance not initiliazed")
7371
return nil
7472
}
7573
return dbos
7674
}
7775

78-
func Launch() error {
76+
var logger *slog.Logger
77+
78+
func getLogger() *slog.Logger {
79+
if dbos == nil {
80+
fmt.Println("warning: DBOS instance not initialized, using default logger")
81+
return slog.New(slog.NewTextHandler(os.Stderr, nil))
82+
}
83+
if logger == nil {
84+
fmt.Println("warning: DBOS logger is nil, using default logger")
85+
return slog.New(slog.NewTextHandler(os.Stderr, nil))
86+
}
87+
return logger
88+
}
89+
90+
type config struct {
91+
logger *slog.Logger
92+
}
93+
94+
type LaunchOption func(*config)
95+
96+
func WithLogger(logger *slog.Logger) LaunchOption {
97+
return func(config *config) {
98+
config.logger = logger
99+
}
100+
}
101+
102+
func Launch(options ...LaunchOption) error {
79103
if dbos != nil {
80104
fmt.Println("warning: DBOS instance already initialized, skipping re-initialization")
81105
return NewInitializationError("DBOS already initialized")
82106
}
83107

108+
config := &config{
109+
logger: slog.New(slog.NewTextHandler(os.Stderr, nil)),
110+
}
111+
for _, option := range options {
112+
option(config)
113+
}
114+
115+
logger = config.logger
116+
84117
// Initialize with environment variables, providing defaults if not set
85118
APP_VERSION = os.Getenv("DBOS__APPVERSION")
86119
if APP_VERSION == "" {
87120
APP_VERSION = computeApplicationVersion()
88-
fmt.Println("DBOS: DBOS__APPVERSION not set, using computed hash")
121+
logger.Info("DBOS__APPVERSION not set, using computed hash")
89122
}
90123

91124
EXECUTOR_ID = os.Getenv("DBOS__VMID")
92125
if EXECUTOR_ID == "" {
93126
EXECUTOR_ID = "local"
94-
fmt.Printf("DBOS: DBOS__VMID not set, using default: %s\n", EXECUTOR_ID)
127+
logger.Info("DBOS__VMID not set, using default", "executor_id", EXECUTOR_ID)
95128
}
96129

97130
APP_ID = os.Getenv("DBOS__APPID")
@@ -101,7 +134,7 @@ func Launch() error {
101134
if err != nil {
102135
return NewInitializationError(fmt.Sprintf("failed to create system database: %v", err))
103136
}
104-
fmt.Println("DBOS: System database initialized")
137+
logger.Info("System database initialized")
105138

106139
systemDB.Launch(context.Background())
107140

@@ -120,12 +153,12 @@ func Launch() error {
120153
defer close(dbos.queueRunnerDone)
121154
queueRunner(ctx)
122155
}()
123-
fmt.Println("DBOS: Queue runner started")
156+
logger.Info("Queue runner started")
124157

125158
// Start the workflow scheduler if it has been initialized
126159
if workflowScheduler != nil {
127160
workflowScheduler.Start()
128-
fmt.Println("DBOS: Workflow scheduler started")
161+
logger.Info("Workflow scheduler started")
129162
}
130163

131164
// Run a round of recovery on the local executor
@@ -134,23 +167,25 @@ func Launch() error {
134167
return NewInitializationError(fmt.Sprintf("failed to recover pending workflows during launch: %v", err))
135168
}
136169

137-
fmt.Printf("DBOS: Initialized with APP_VERSION=%s, EXECUTOR_ID=%s\n", APP_VERSION, EXECUTOR_ID)
170+
logger.Info("DBOS initialized", "app_version", APP_VERSION, "executor_id", EXECUTOR_ID)
138171
return nil
139172
}
140173

141174
// Close closes the DBOS instance and its resources
142175
func Shutdown() {
143176
if dbos == nil {
144-
fmt.Println("warning: DBOS instance is nil, cannot destroy")
177+
fmt.Println("DBOS instance is nil, cannot destroy")
145178
return
146179
}
147180

181+
// XXX is there a way to ensure all workflows goroutine are done before closing?
182+
148183
// Cancel the context to stop the queue runner
149184
if dbos.queueRunnerCancelFunc != nil {
150185
dbos.queueRunnerCancelFunc()
151186
// Wait for queue runner to finish
152187
<-dbos.queueRunnerDone
153-
fmt.Println("DBOS: Queue runner stopped")
188+
getLogger().Info("Queue runner stopped")
154189
}
155190

156191
if workflowScheduler != nil {
@@ -161,9 +196,9 @@ func Shutdown() {
161196

162197
select {
163198
case <-ctx.Done():
164-
fmt.Println("DBOS: All scheduled jobs completed")
199+
getLogger().Info("All scheduled jobs completed")
165200
case <-timeoutCtx.Done():
166-
fmt.Println("DBOS: Timeout waiting for jobs to complete (5s)")
201+
getLogger().Warn("Timeout waiting for jobs to complete", "timeout", "5s")
167202
}
168203
}
169204

@@ -172,5 +207,9 @@ func Shutdown() {
172207
dbos.systemDB = nil
173208
}
174209

210+
if logger != nil {
211+
logger = nil
212+
}
213+
175214
dbos = nil
176215
}

dbos/logger_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package dbos
2+
3+
import (
4+
"bytes"
5+
"log/slog"
6+
"strings"
7+
"testing"
8+
)
9+
10+
func TestLogger(t *testing.T) {
11+
12+
t.Run("Default logger", func(t *testing.T) {
13+
err := Launch() // Launch with default logger
14+
if err != nil {
15+
t.Fatalf("Failed to launch with default logger: %v", err)
16+
}
17+
t.Cleanup(func() {
18+
Shutdown()
19+
})
20+
21+
if logger == nil {
22+
t.Fatal("Logger is nil")
23+
}
24+
25+
// Test logger access
26+
logger.Info("Test message from default logger")
27+
28+
})
29+
30+
t.Run("Custom logger", func(t *testing.T) {
31+
// Test with custom slog logger
32+
var buf bytes.Buffer
33+
slogLogger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{
34+
Level: slog.LevelDebug,
35+
}))
36+
37+
// Add some context to the slog logger
38+
slogLogger = slogLogger.With("service", "dbos-test", "environment", "test")
39+
40+
err := Launch(WithLogger(slogLogger))
41+
if err != nil {
42+
t.Fatalf("Failed to launch with custom logger: %v", err)
43+
}
44+
t.Cleanup(func() {
45+
Shutdown()
46+
})
47+
48+
if logger == nil {
49+
t.Fatal("Logger is nil")
50+
}
51+
52+
// Test that we can use the logger and it maintains context
53+
logger.Info("Test message from custom logger", "test_key", "test_value")
54+
55+
// Check that our custom logger was used and captured the output
56+
logOutput := buf.String()
57+
if !strings.Contains(logOutput, "service=dbos-test") {
58+
t.Errorf("Expected log output to contain service=dbos-test, got: %s", logOutput)
59+
}
60+
if !strings.Contains(logOutput, "environment=test") {
61+
t.Errorf("Expected log output to contain environment=test, got: %s", logOutput)
62+
}
63+
if !strings.Contains(logOutput, "test_key=test_value") {
64+
t.Errorf("Expected log output to contain test_key=test_value, got: %s", logOutput)
65+
}
66+
})
67+
}

dbos/queue.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"context"
66
"encoding/base64"
77
"encoding/gob"
8-
"fmt"
98
"math"
109
"math/rand"
1110
"time"
@@ -71,7 +70,7 @@ func WithMaxTasksPerIteration(maxTasks uint) QueueOption {
7170
// NewWorkflowQueue creates a new workflow queue with optional configuration
7271
func NewWorkflowQueue(name string, options ...QueueOption) WorkflowQueue {
7372
if getExecutor() != nil {
74-
fmt.Println("warning: NewWorkflowQueue called after DBOS initialization, dynamic registration is not supported")
73+
getLogger().Warn("NewWorkflowQueue called after DBOS initialization, dynamic registration is not supported")
7574
return WorkflowQueue{}
7675
}
7776
if _, exists := workflowQueueRegistry[name]; exists {
@@ -120,6 +119,7 @@ func queueRunner(ctx context.Context) {
120119

121120
// Iterate through all queues in the registry
122121
for queueName, queue := range workflowQueueRegistry {
122+
getLogger().Debug("Processing queue", "queue_name", queueName)
123123
// Call DequeueWorkflows for each queue
124124
dequeuedWorkflows, err := getExecutor().systemDB.DequeueWorkflows(runnerContext, queue)
125125
if err != nil {
@@ -131,20 +131,20 @@ func queueRunner(ctx context.Context) {
131131
hasBackoffError = true
132132
}
133133
} else {
134-
fmt.Printf("Error dequeuing workflows from queue '%s': %v\n", queueName, err)
134+
getLogger().Error("Error dequeuing workflows from queue", "queue_name", queueName, "error", err)
135135
}
136136
continue
137137
}
138138

139139
// Print what was dequeued
140140
if len(dequeuedWorkflows) > 0 {
141-
//fmt.Printf("Dequeued %d workflows from queue '%s': %v\n", len(dequeuedWorkflows), queueName, dequeuedWorkflows)
141+
getLogger().Debug("Dequeued workflows from queue", "queue_name", queueName, "workflows", dequeuedWorkflows)
142142
}
143143
for _, workflow := range dequeuedWorkflows {
144144
// Find the workflow in the registry
145145
registeredWorkflow, exists := registry[workflow.name]
146146
if !exists {
147-
fmt.Println("Error: workflow function not found in registry:", workflow.name)
147+
getLogger().Error("workflow function not found in registry", "workflow_name", workflow.name)
148148
continue
149149
}
150150

@@ -153,20 +153,20 @@ func queueRunner(ctx context.Context) {
153153
if len(workflow.input) > 0 {
154154
inputBytes, err := base64.StdEncoding.DecodeString(workflow.input)
155155
if err != nil {
156-
fmt.Printf("failed to decode input for workflow %s: %v\n", workflow.id, err)
156+
getLogger().Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err)
157157
continue
158158
}
159159
buf := bytes.NewBuffer(inputBytes)
160160
dec := gob.NewDecoder(buf)
161161
if err := dec.Decode(&input); err != nil {
162-
fmt.Printf("failed to decode input for workflow %s: %v\n", workflow.id, err)
162+
getLogger().Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err)
163163
continue
164164
}
165165
}
166166

167167
_, err := registeredWorkflow.wrappedFunction(runnerContext, input, WithWorkflowID(workflow.id))
168168
if err != nil {
169-
fmt.Println("Error recovering workflow:", err)
169+
getLogger().Error("Error recovering workflow", "error", err)
170170
}
171171
}
172172
}
@@ -187,7 +187,7 @@ func queueRunner(ctx context.Context) {
187187
// Sleep with jittered interval, but allow early exit on context cancellation
188188
select {
189189
case <-ctx.Done():
190-
fmt.Println("Queue runner stopping due to context cancellation")
190+
getLogger().Info("Queue runner stopping due to context cancellation")
191191
return
192192
case <-time.After(sleepDuration):
193193
// Continue to next iteration

dbos/recovery.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package dbos
22

33
import (
44
"context"
5-
"fmt"
65
"strings"
76
)
87

@@ -21,7 +20,7 @@ func recoverPendingWorkflows(ctx context.Context, executorIDs []string) ([]Workf
2120
for _, workflow := range pendingWorkflows {
2221
if inputStr, ok := workflow.Input.(string); ok {
2322
if strings.Contains(inputStr, "Failed to decode") {
24-
fmt.Println("Skipping workflow recovery due to input decoding failure:", workflow.ID, "Name:", workflow.Name)
23+
getLogger().Warn("Skipping workflow recovery due to input decoding failure", "workflow_id", workflow.ID, "name", workflow.Name)
2524
continue
2625
}
2726
}
@@ -30,7 +29,7 @@ func recoverPendingWorkflows(ctx context.Context, executorIDs []string) ([]Workf
3029
if workflow.QueueName != "" {
3130
cleared, err := getExecutor().systemDB.ClearQueueAssignment(ctx, workflow.ID)
3231
if err != nil {
33-
fmt.Println("Error clearing queue assignment for workflow:", workflow.ID, "Name:", workflow.Name, "Error:", err)
32+
getLogger().Error("Error clearing queue assignment for workflow", "workflow_id", workflow.ID, "name", workflow.Name, "error", err)
3433
continue
3534
}
3635
if cleared {
@@ -41,7 +40,7 @@ func recoverPendingWorkflows(ctx context.Context, executorIDs []string) ([]Workf
4140

4241
registeredWorkflow, exists := registry[workflow.Name]
4342
if !exists {
44-
fmt.Println(NewWorkflowFunctionNotFoundError(workflow.ID, fmt.Sprintf("Workflow function %s not found in registry", workflow.Name)))
43+
getLogger().Error("Workflow function not found in registry", "workflow_id", workflow.ID, "name", workflow.Name)
4544
continue
4645
}
4746

0 commit comments

Comments
 (0)