Skip to content

Commit 02bb44d

Browse files
committed
logger service key
1 parent 646769d commit 02bb44d

File tree

4 files changed

+20
-18
lines changed

4 files changed

+20
-18
lines changed

dbos/conductor.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func NewConductor(config ConductorConfig, dbosCtx *dbosContext) (*Conductor, err
102102
pingInterval: _PING_INTERVAL,
103103
pingTimeout: _PING_TIMEOUT,
104104
reconnectWait: _INITIAL_RECONNECT_WAIT,
105-
logger: dbosCtx.logger,
105+
logger: dbosCtx.logger.With("service", "conductor"),
106106
}
107107

108108
// Start with needsReconnect set to true so we connect on first run
@@ -114,8 +114,6 @@ func NewConductor(config ConductorConfig, dbosCtx *dbosContext) (*Conductor, err
114114
// Shutdown gracefully conductor
115115
func (c *Conductor) Shutdown(timeout time.Duration) {
116116
c.stopOnce.Do(func() {
117-
c.logger.Info("Shutting down conductor")
118-
119117
if c.pingCancel != nil {
120118
c.pingCancel()
121119
}

dbos/dbos.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ func NewDBOSContext(inputConfig Config) (DBOSContext, error) {
314314
initExecutor.logger.Info("System database initialized")
315315

316316
// Initialize the queue runner and register DBOS internal queue
317-
initExecutor.queueRunner = newQueueRunner()
317+
initExecutor.queueRunner = newQueueRunner(initExecutor.logger)
318318
NewWorkflowQueue(initExecutor, _DBOS_INTERNAL_QUEUE_NAME)
319319

320320
// Initialize conductor if API key is provided
@@ -414,8 +414,9 @@ func (c *dbosContext) Launch() error {
414414
// 2. Waits for the queue runner to complete processing
415415
// 3. Stops the workflow scheduler and waits for scheduled jobs to finish
416416
// 4. Shuts down the system database connection pool and notification listener
417-
// 5. Shuts down the admin server
418-
// 6. Marks the context as not launched
417+
// 5. Shuts down conductor
418+
// 6. Shuts down the admin server
419+
// 7. Marks the context as not launched
419420
//
420421
// Each step respects the provided timeout. If any component doesn't shut down within the timeout,
421422
// a warning is logged and the shutdown continues to the next component.

dbos/queue.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"encoding/base64"
77
"encoding/gob"
8+
"log/slog"
89
"math"
910
"math/rand"
1011
"time"
@@ -132,6 +133,8 @@ func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption)
132133
}
133134

134135
type queueRunner struct {
136+
logger *slog.Logger
137+
135138
// Queue runner iteration parameters
136139
baseInterval float64
137140
minInterval float64
@@ -148,7 +151,7 @@ type queueRunner struct {
148151
completionChan chan struct{}
149152
}
150153

151-
func newQueueRunner() *queueRunner {
154+
func newQueueRunner(logger *slog.Logger) *queueRunner {
152155
return &queueRunner{
153156
baseInterval: 1.0,
154157
minInterval: 1.0,
@@ -159,6 +162,7 @@ func newQueueRunner() *queueRunner {
159162
jitterMax: 1.05,
160163
workflowQueueRegistry: make(map[string]WorkflowQueue),
161164
completionChan: make(chan struct{}, 1),
165+
logger: logger.With("service", "queue_runner"),
162166
}
163167
}
164168

@@ -193,31 +197,31 @@ func (qr *queueRunner) run(ctx *dbosContext) {
193197
hasBackoffError = true
194198
}
195199
} else {
196-
ctx.logger.Error("Error dequeuing workflows from queue", "queue_name", queueName, "error", err)
200+
qr.logger.Error("Error dequeuing workflows from queue", "queue_name", queueName, "error", err)
197201
}
198202
continue
199203
}
200204

201205
if len(dequeuedWorkflows) > 0 {
202-
ctx.logger.Debug("Dequeued workflows from queue", "queue_name", queueName, "workflows", dequeuedWorkflows)
206+
qr.logger.Debug("Dequeued workflows from queue", "queue_name", queueName, "workflows", dequeuedWorkflows)
203207
}
204208
for _, workflow := range dequeuedWorkflows {
205209
// Find the workflow in the registry
206210

207211
wfName, ok := ctx.workflowCustomNametoFQN.Load(workflow.name)
208212
if !ok {
209-
ctx.logger.Error("Workflow not found in registry", "workflow_name", workflow.name)
213+
qr.logger.Error("Workflow not found in registry", "workflow_name", workflow.name)
210214
continue
211215
}
212216

213217
registeredWorkflowAny, exists := ctx.workflowRegistry.Load(wfName.(string))
214218
if !exists {
215-
ctx.logger.Error("workflow function not found in registry", "workflow_name", workflow.name)
219+
qr.logger.Error("workflow function not found in registry", "workflow_name", workflow.name)
216220
continue
217221
}
218222
registeredWorkflow, ok := registeredWorkflowAny.(workflowRegistryEntry)
219223
if !ok {
220-
ctx.logger.Error("invalid workflow registry entry type", "workflow_name", workflow.name)
224+
qr.logger.Error("invalid workflow registry entry type", "workflow_name", workflow.name)
221225
continue
222226
}
223227

@@ -226,20 +230,20 @@ func (qr *queueRunner) run(ctx *dbosContext) {
226230
if len(workflow.input) > 0 {
227231
inputBytes, err := base64.StdEncoding.DecodeString(workflow.input)
228232
if err != nil {
229-
ctx.logger.Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err)
233+
qr.logger.Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err)
230234
continue
231235
}
232236
buf := bytes.NewBuffer(inputBytes)
233237
dec := gob.NewDecoder(buf)
234238
if err := dec.Decode(&input); err != nil {
235-
ctx.logger.Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err)
239+
qr.logger.Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err)
236240
continue
237241
}
238242
}
239243

240244
_, err := registeredWorkflow.wrappedFunction(ctx, input, WithWorkflowID(workflow.id))
241245
if err != nil {
242-
ctx.logger.Error("Error running queued workflow", "error", err)
246+
qr.logger.Error("Error running queued workflow", "error", err)
243247
}
244248
}
245249
}
@@ -260,7 +264,7 @@ func (qr *queueRunner) run(ctx *dbosContext) {
260264
// Sleep with jittered interval, but allow early exit on context cancellation
261265
select {
262266
case <-ctx.Done():
263-
ctx.logger.Info("Queue runner stopping due to context cancellation", "cause", context.Cause(ctx))
267+
qr.logger.Info("Queue runner stopping due to context cancellation", "cause", context.Cause(ctx))
264268
qr.completionChan <- struct{}{}
265269
return
266270
case <-time.After(sleepDuration):

dbos/system_database.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func newSystemDatabase(ctx context.Context, databaseURL string, logger *slog.Log
243243
notificationListenerConnection: notificationListenerConnection,
244244
notificationsMap: notificationsMap,
245245
notificationLoopDone: make(chan struct{}),
246-
logger: logger,
246+
logger: logger.With("service", "system_database"),
247247
}, nil
248248
}
249249

@@ -860,7 +860,6 @@ func (s *sysDB) garbageCollectWorkflows(ctx context.Context, input garbageCollec
860860
return nil
861861
}
862862

863-
864863
func (s *sysDB) resumeWorkflow(ctx context.Context, workflowID string) error {
865864
tx, err := s.pool.Begin(ctx)
866865
if err != nil {

0 commit comments

Comments
 (0)