Skip to content

Commit 9a65746

Browse files
committed
more lenient test
1 parent cc9d76e commit 9a65746

File tree

3 files changed

+13
-12
lines changed

3 files changed

+13
-12
lines changed

dbos/dbos.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,11 @@ var logger *slog.Logger
7979
func getLogger() *slog.Logger {
8080
if dbos == nil {
8181
fmt.Println("warning: DBOS instance not initialized, using default logger")
82-
return slog.Default()
82+
return slog.New(slog.NewTextHandler(os.Stderr, nil))
8383
}
8484
if logger == nil {
8585
fmt.Println("warning: DBOS logger is nil, using default logger")
86-
return slog.Default()
86+
return slog.New(slog.NewTextHandler(os.Stderr, nil))
8787
}
8888
return logger
8989
}
@@ -107,7 +107,7 @@ func Launch(options ...LaunchOption) error {
107107
}
108108

109109
config := &config{
110-
logger: slog.Default(),
110+
logger: slog.New(slog.NewTextHandler(os.Stderr, nil)),
111111
}
112112
for _, option := range options {
113113
option(config)
@@ -142,16 +142,16 @@ func Launch(options ...LaunchOption) error {
142142
// Create context with cancel function for queue runner
143143
ctx, cancel := context.WithCancel(context.Background())
144144

145+
// Create the internal workflow queue
146+
NewWorkflowQueue(DBOS_INTERNAL_QUEUE_NAME)
147+
145148
dbos = &executor{
146149
systemDB: systemDB,
147150
queueRunnerCtx: ctx,
148151
queueRunnerCancelFunc: cancel,
149152
queueRunnerDone: make(chan struct{}),
150153
}
151154

152-
// Create the internal workflow queue
153-
NewWorkflowQueue(DBOS_INTERNAL_QUEUE_NAME)
154-
155155
// Start the queue runner in a goroutine
156156
go func() {
157157
defer close(dbos.queueRunnerDone)

dbos/queue.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ func queueRunner(ctx context.Context) {
118118

119119
// Iterate through all queues in the registry
120120
for queueName, queue := range workflowQueueRegistry {
121+
getLogger().Debug("Processing queue", "queue_name", queueName)
121122
// Call DequeueWorkflows for each queue
122123
dequeuedWorkflows, err := getExecutor().systemDB.DequeueWorkflows(runnerContext, queue)
123124
if err != nil {
@@ -136,7 +137,7 @@ func queueRunner(ctx context.Context) {
136137

137138
// Print what was dequeued
138139
if len(dequeuedWorkflows) > 0 {
139-
//fmt.Printf("Dequeued %d workflows from queue '%s': %v\n", len(dequeuedWorkflows), queueName, dequeuedWorkflows)
140+
getLogger().Debug("Dequeued workflows from queue", "queue_name", queueName, "workflows", dequeuedWorkflows)
140141
}
141142
for _, workflow := range dequeuedWorkflows {
142143
// Find the workflow in the registry

dbos/workflows_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -973,16 +973,16 @@ func TestScheduledWorkflows(t *testing.T) {
973973
currentCounter := counter
974974
workflowScheduler.Stop()
975975
time.Sleep(3 * time.Second) // Wait a bit to ensure no more executions
976-
if counter >= currentCounter+1 {
977-
t.Fatalf("Scheduled workflow continued executing after stopping scheduler: %d (expected < %d)", counter, currentCounter+1)
976+
if counter >= currentCounter+2 {
977+
t.Fatalf("Scheduled workflow continued executing after stopping scheduler: %d (expected < %d)", counter, currentCounter+2)
978978
}
979979
})
980980
}
981981

982982
var (
983-
sendWf = WithWorkflow(sendWorkflow)
984-
receiveWf = WithWorkflow(receiveWorkflow)
985-
sendStructWf = WithWorkflow(sendStructWorkflow)
983+
sendWf = WithWorkflow(sendWorkflow)
984+
receiveWf = WithWorkflow(receiveWorkflow)
985+
sendStructWf = WithWorkflow(sendStructWorkflow)
986986
receiveStructWf = WithWorkflow(receiveStructWorkflow)
987987
)
988988

0 commit comments

Comments
 (0)