Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 53 additions & 14 deletions dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"log/slog"
"os"
"reflect"
"runtime"
Expand Down Expand Up @@ -56,42 +57,74 @@ type Executor interface {

var workflowScheduler *cron.Cron

// DBOS represents the main DBOS instance
type executor struct {
systemDB SystemDatabase
queueRunnerCtx context.Context
queueRunnerCancelFunc context.CancelFunc
queueRunnerDone chan struct{}
}

// New creates a new DBOS instance with an initialized system database
var dbos *executor

func getExecutor() *executor {
if dbos == nil {
fmt.Println("warning: DBOS instance not initiliazed")
return nil
}
return dbos
}

func Launch() error {
var logger *slog.Logger

func getLogger() *slog.Logger {
if dbos == nil {
fmt.Println("warning: DBOS instance not initialized, using default logger")
return slog.New(slog.NewTextHandler(os.Stderr, nil))
}
if logger == nil {
fmt.Println("warning: DBOS logger is nil, using default logger")
return slog.New(slog.NewTextHandler(os.Stderr, nil))
}
return logger
}
Comment on lines +78 to +88
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simple gate to not panic


type config struct {
logger *slog.Logger
}

type LaunchOption func(*config)

func WithLogger(logger *slog.Logger) LaunchOption {
return func(config *config) {
config.logger = logger
}
}

func Launch(options ...LaunchOption) error {
if dbos != nil {
fmt.Println("warning: DBOS instance already initialized, skipping re-initialization")
return NewInitializationError("DBOS already initialized")
}

config := &config{
logger: slog.New(slog.NewTextHandler(os.Stderr, nil)),
}
for _, option := range options {
option(config)
}

logger = config.logger

// Initialize with environment variables, providing defaults if not set
APP_VERSION = os.Getenv("DBOS__APPVERSION")
if APP_VERSION == "" {
APP_VERSION = computeApplicationVersion()
fmt.Println("DBOS: DBOS__APPVERSION not set, using computed hash")
logger.Info("DBOS__APPVERSION not set, using computed hash")
}

EXECUTOR_ID = os.Getenv("DBOS__VMID")
if EXECUTOR_ID == "" {
EXECUTOR_ID = "local"
fmt.Printf("DBOS: DBOS__VMID not set, using default: %s\n", EXECUTOR_ID)
logger.Info("DBOS__VMID not set, using default", "executor_id", EXECUTOR_ID)
}

APP_ID = os.Getenv("DBOS__APPID")
Expand All @@ -101,7 +134,7 @@ func Launch() error {
if err != nil {
return NewInitializationError(fmt.Sprintf("failed to create system database: %v", err))
}
fmt.Println("DBOS: System database initialized")
logger.Info("System database initialized")

systemDB.Launch(context.Background())

Expand All @@ -120,12 +153,12 @@ func Launch() error {
defer close(dbos.queueRunnerDone)
queueRunner(ctx)
}()
fmt.Println("DBOS: Queue runner started")
logger.Info("Queue runner started")

// Start the workflow scheduler if it has been initialized
if workflowScheduler != nil {
workflowScheduler.Start()
fmt.Println("DBOS: Workflow scheduler started")
logger.Info("Workflow scheduler started")
}

// Run a round of recovery on the local executor
Expand All @@ -134,23 +167,25 @@ func Launch() error {
return NewInitializationError(fmt.Sprintf("failed to recover pending workflows during launch: %v", err))
}

fmt.Printf("DBOS: Initialized with APP_VERSION=%s, EXECUTOR_ID=%s\n", APP_VERSION, EXECUTOR_ID)
logger.Info("DBOS initialized", "app_version", APP_VERSION, "executor_id", EXECUTOR_ID)
return nil
}

// Close closes the DBOS instance and its resources
func Shutdown() {
if dbos == nil {
fmt.Println("warning: DBOS instance is nil, cannot destroy")
fmt.Println("DBOS instance is nil, cannot destroy")
return
}

// XXX is there a way to ensure all workflows goroutine are done before closing?

// Cancel the context to stop the queue runner
if dbos.queueRunnerCancelFunc != nil {
dbos.queueRunnerCancelFunc()
// Wait for queue runner to finish
<-dbos.queueRunnerDone
fmt.Println("DBOS: Queue runner stopped")
getLogger().Info("Queue runner stopped")
}

if workflowScheduler != nil {
Expand All @@ -161,9 +196,9 @@ func Shutdown() {

select {
case <-ctx.Done():
fmt.Println("DBOS: All scheduled jobs completed")
getLogger().Info("All scheduled jobs completed")
case <-timeoutCtx.Done():
fmt.Println("DBOS: Timeout waiting for jobs to complete (5s)")
getLogger().Warn("Timeout waiting for jobs to complete", "timeout", "5s")
}
}

Expand All @@ -172,5 +207,9 @@ func Shutdown() {
dbos.systemDB = nil
}

if logger != nil {
logger = nil
}

dbos = nil
}
67 changes: 67 additions & 0 deletions dbos/logger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package dbos

import (
"bytes"
"log/slog"
"strings"
"testing"
)

func TestLogger(t *testing.T) {

t.Run("Default logger", func(t *testing.T) {
err := Launch() // Launch with default logger
if err != nil {
t.Fatalf("Failed to launch with default logger: %v", err)
}
t.Cleanup(func() {
Shutdown()
})

if logger == nil {
t.Fatal("Logger is nil")
}

// Test logger access
logger.Info("Test message from default logger")

})

t.Run("Custom logger", func(t *testing.T) {
// Test with custom slog logger
var buf bytes.Buffer
slogLogger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))

// Add some context to the slog logger
slogLogger = slogLogger.With("service", "dbos-test", "environment", "test")

err := Launch(WithLogger(slogLogger))
if err != nil {
t.Fatalf("Failed to launch with custom logger: %v", err)
}
t.Cleanup(func() {
Shutdown()
})

if logger == nil {
t.Fatal("Logger is nil")
}

// Test that we can use the logger and it maintains context
logger.Info("Test message from custom logger", "test_key", "test_value")

// Check that our custom logger was used and captured the output
logOutput := buf.String()
if !strings.Contains(logOutput, "service=dbos-test") {
t.Errorf("Expected log output to contain service=dbos-test, got: %s", logOutput)
}
if !strings.Contains(logOutput, "environment=test") {
t.Errorf("Expected log output to contain environment=test, got: %s", logOutput)
}
if !strings.Contains(logOutput, "test_key=test_value") {
t.Errorf("Expected log output to contain test_key=test_value, got: %s", logOutput)
}
})
}
18 changes: 9 additions & 9 deletions dbos/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"encoding/base64"
"encoding/gob"
"fmt"
"math"
"math/rand"
"time"
Expand Down Expand Up @@ -71,7 +70,7 @@ func WithMaxTasksPerIteration(maxTasks uint) QueueOption {
// NewWorkflowQueue creates a new workflow queue with optional configuration
func NewWorkflowQueue(name string, options ...QueueOption) WorkflowQueue {
if getExecutor() != nil {
fmt.Println("warning: NewWorkflowQueue called after DBOS initialization, dynamic registration is not supported")
getLogger().Warn("NewWorkflowQueue called after DBOS initialization, dynamic registration is not supported")
return WorkflowQueue{}
}
if _, exists := workflowQueueRegistry[name]; exists {
Expand Down Expand Up @@ -120,6 +119,7 @@ func queueRunner(ctx context.Context) {

// Iterate through all queues in the registry
for queueName, queue := range workflowQueueRegistry {
getLogger().Debug("Processing queue", "queue_name", queueName)
// Call DequeueWorkflows for each queue
dequeuedWorkflows, err := getExecutor().systemDB.DequeueWorkflows(runnerContext, queue)
if err != nil {
Expand All @@ -131,20 +131,20 @@ func queueRunner(ctx context.Context) {
hasBackoffError = true
}
} else {
fmt.Printf("Error dequeuing workflows from queue '%s': %v\n", queueName, err)
getLogger().Error("Error dequeuing workflows from queue", "queue_name", queueName, "error", err)
}
continue
}

// Print what was dequeued
if len(dequeuedWorkflows) > 0 {
//fmt.Printf("Dequeued %d workflows from queue '%s': %v\n", len(dequeuedWorkflows), queueName, dequeuedWorkflows)
getLogger().Debug("Dequeued workflows from queue", "queue_name", queueName, "workflows", dequeuedWorkflows)
}
for _, workflow := range dequeuedWorkflows {
// Find the workflow in the registry
registeredWorkflow, exists := registry[workflow.name]
if !exists {
fmt.Println("Error: workflow function not found in registry:", workflow.name)
getLogger().Error("workflow function not found in registry", "workflow_name", workflow.name)
continue
}

Expand All @@ -153,20 +153,20 @@ func queueRunner(ctx context.Context) {
if len(workflow.input) > 0 {
inputBytes, err := base64.StdEncoding.DecodeString(workflow.input)
if err != nil {
fmt.Printf("failed to decode input for workflow %s: %v\n", workflow.id, err)
getLogger().Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err)
continue
}
buf := bytes.NewBuffer(inputBytes)
dec := gob.NewDecoder(buf)
if err := dec.Decode(&input); err != nil {
fmt.Printf("failed to decode input for workflow %s: %v\n", workflow.id, err)
getLogger().Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err)
continue
}
}

_, err := registeredWorkflow.wrappedFunction(runnerContext, input, WithWorkflowID(workflow.id))
if err != nil {
fmt.Println("Error recovering workflow:", err)
getLogger().Error("Error recovering workflow", "error", err)
}
}
}
Expand All @@ -187,7 +187,7 @@ func queueRunner(ctx context.Context) {
// Sleep with jittered interval, but allow early exit on context cancellation
select {
case <-ctx.Done():
fmt.Println("Queue runner stopping due to context cancellation")
getLogger().Info("Queue runner stopping due to context cancellation")
return
case <-time.After(sleepDuration):
// Continue to next iteration
Expand Down
7 changes: 3 additions & 4 deletions dbos/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dbos

import (
"context"
"fmt"
"strings"
)

Expand All @@ -21,7 +20,7 @@ func recoverPendingWorkflows(ctx context.Context, executorIDs []string) ([]Workf
for _, workflow := range pendingWorkflows {
if inputStr, ok := workflow.Input.(string); ok {
if strings.Contains(inputStr, "Failed to decode") {
fmt.Println("Skipping workflow recovery due to input decoding failure:", workflow.ID, "Name:", workflow.Name)
getLogger().Warn("Skipping workflow recovery due to input decoding failure", "workflow_id", workflow.ID, "name", workflow.Name)
continue
}
}
Expand All @@ -30,7 +29,7 @@ func recoverPendingWorkflows(ctx context.Context, executorIDs []string) ([]Workf
if workflow.QueueName != "" {
cleared, err := getExecutor().systemDB.ClearQueueAssignment(ctx, workflow.ID)
if err != nil {
fmt.Println("Error clearing queue assignment for workflow:", workflow.ID, "Name:", workflow.Name, "Error:", err)
getLogger().Error("Error clearing queue assignment for workflow", "workflow_id", workflow.ID, "name", workflow.Name, "error", err)
continue
}
if cleared {
Expand All @@ -41,7 +40,7 @@ func recoverPendingWorkflows(ctx context.Context, executorIDs []string) ([]Workf

registeredWorkflow, exists := registry[workflow.Name]
if !exists {
fmt.Println(NewWorkflowFunctionNotFoundError(workflow.ID, fmt.Sprintf("Workflow function %s not found in registry", workflow.Name)))
getLogger().Error("Workflow function not found in registry", "workflow_id", workflow.ID, "name", workflow.Name)
continue
}

Expand Down
Loading