Skip to content

Commit 7b1e585

Browse files
authored
Merge branch 'main' into feat/register-workflowhandle
2 parents 58af042 + 28d50f8 commit 7b1e585

File tree

14 files changed

+758
-267
lines changed

14 files changed

+758
-267
lines changed

cmd/dbos/migrate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func init() {
2929

3030
func runMigrate(cmd *cobra.Command, args []string) error {
3131
// Get database URL
32-
dbURL, err := getDBURL(cmd)
32+
dbURL, err := getDBURL()
3333
if err != nil {
3434
return err
3535
}

cmd/dbos/reset.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func runReset(cmd *cobra.Command, args []string) error {
3434
}
3535

3636
// Get database URL
37-
dbURL, err := getDBURL(cmd)
37+
dbURL, err := getDBURL()
3838
if err != nil {
3939
return err
4040
}

cmd/dbos/utils.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"os"
1010

1111
"github.com/dbos-inc/dbos-transact-golang/dbos"
12-
"github.com/spf13/cobra"
1312
"github.com/spf13/viper"
1413
)
1514

@@ -43,7 +42,7 @@ func maskPassword(dbURL string) string {
4342
}
4443

4544
// getDBURL resolves the database URL from flag, config, or environment variable
46-
func getDBURL(_ *cobra.Command) (string, error) {
45+
func getDBURL() (string, error) {
4746
var resolvedURL string
4847
var source string
4948

cmd/dbos/workflow.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func init() {
8585

8686
func runWorkflowList(cmd *cobra.Command, args []string) error {
8787
// Get database URL
88-
dbURL, err := getDBURL(cmd)
88+
dbURL, err := getDBURL()
8989
if err != nil {
9090
return err
9191
}
@@ -192,7 +192,7 @@ func runWorkflowGet(cmd *cobra.Command, args []string) error {
192192
workflowID := args[0]
193193

194194
// Get database URL
195-
dbURL, err := getDBURL(cmd)
195+
dbURL, err := getDBURL()
196196
if err != nil {
197197
return err
198198
}
@@ -227,7 +227,7 @@ func runWorkflowSteps(cmd *cobra.Command, args []string) error {
227227
workflowID := args[0]
228228

229229
// Get database URL
230-
dbURL, err := getDBURL(cmd)
230+
dbURL, err := getDBURL()
231231
if err != nil {
232232
return err
233233
}
@@ -259,7 +259,7 @@ func runWorkflowCancel(cmd *cobra.Command, args []string) error {
259259
workflowID := args[0]
260260

261261
// Get database URL
262-
dbURL, err := getDBURL(cmd)
262+
dbURL, err := getDBURL()
263263
if err != nil {
264264
return err
265265
}
@@ -285,7 +285,7 @@ func runWorkflowResume(cmd *cobra.Command, args []string) error {
285285
workflowID := args[0]
286286

287287
// Get database URL
288-
dbURL, err := getDBURL(cmd)
288+
dbURL, err := getDBURL()
289289
if err != nil {
290290
return err
291291
}
@@ -318,7 +318,7 @@ func runWorkflowFork(cmd *cobra.Command, args []string) error {
318318
workflowID := args[0]
319319

320320
// Get database URL
321-
dbURL, err := getDBURL(cmd)
321+
dbURL, err := getDBURL()
322322
if err != nil {
323323
return err
324324
}

dbos/client.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package dbos
22

33
import (
44
"context"
5-
"encoding/gob"
65
"errors"
76
"fmt"
87
"log/slog"
@@ -32,6 +31,7 @@ type Client interface {
3231
CancelWorkflow(workflowID string) error
3332
ResumeWorkflow(workflowID string) (WorkflowHandle[any], error)
3433
ForkWorkflow(input ForkWorkflowInput) (WorkflowHandle[any], error)
34+
GetWorkflowSteps(workflowID string) ([]StepInfo, error)
3535
Shutdown(timeout time.Duration) // Simply close the system DB connection pool
3636
}
3737

@@ -241,10 +241,16 @@ func Enqueue[P any, R any](c Client, queueName, workflowName string, input P, op
241241
}
242242

243243
// Register the input and outputs for gob encoding
244+
var logger *slog.Logger
245+
if cl, ok := c.(*client); ok {
246+
if ctx, ok := cl.dbosCtx.(*dbosContext); ok {
247+
logger = ctx.logger
248+
}
249+
}
244250
var typedInput P
245-
gob.Register(typedInput)
251+
safeGobRegister(typedInput, logger)
246252
var typedOutput R
247-
gob.Register(typedOutput)
253+
safeGobRegister(typedOutput, logger)
248254

249255
// Call the interface method with the same signature
250256
handle, err := c.Enqueue(queueName, workflowName, input, opts...)
@@ -290,6 +296,11 @@ func (c *client) ForkWorkflow(input ForkWorkflowInput) (WorkflowHandle[any], err
290296
return c.dbosCtx.ForkWorkflow(c.dbosCtx, input)
291297
}
292298

299+
// GetWorkflowSteps retrieves the execution steps of a workflow.
300+
func (c *client) GetWorkflowSteps(workflowID string) ([]StepInfo, error) {
301+
return c.dbosCtx.GetWorkflowSteps(c.dbosCtx, workflowID)
302+
}
303+
293304
// Shutdown gracefully shuts down the client and closes the system database connection.
294305
func (c *client) Shutdown(timeout time.Duration) {
295306
// Get the concrete dbosContext to access internal fields

dbos/client_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -928,3 +928,69 @@ func TestListWorkflows(t *testing.T) {
928928
// Verify all queue entries are cleaned up
929929
require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after list workflows tests")
930930
}
931+
932+
func TestGetWorkflowSteps(t *testing.T) {
933+
// Setup server context
934+
serverCtx := setupDBOS(t, true, true)
935+
936+
// Create queue for communication
937+
queue := NewWorkflowQueue(serverCtx, "get-workflow-steps-queue")
938+
939+
// Workflow with one step
940+
stepFunction := func(ctx context.Context) (string, error) {
941+
return "abc", nil
942+
}
943+
944+
testWorkflow := func(ctx DBOSContext, input string) (string, error) {
945+
result, err := RunAsStep(ctx, stepFunction, WithStepName("TestStep"))
946+
if err != nil {
947+
return "", err
948+
}
949+
return result, nil
950+
}
951+
RegisterWorkflow(serverCtx, testWorkflow, WithWorkflowName("TestWorkflow"))
952+
953+
// Launch server
954+
err := Launch(serverCtx)
955+
require.NoError(t, err)
956+
957+
// Setup client
958+
databaseURL := getDatabaseURL()
959+
config := ClientConfig{
960+
DatabaseURL: databaseURL,
961+
}
962+
client, err := NewClient(context.Background(), config)
963+
require.NoError(t, err)
964+
t.Cleanup(func() {
965+
if client != nil {
966+
client.Shutdown(30 * time.Second)
967+
}
968+
})
969+
970+
// Enqueue and run the workflow
971+
workflowID := "test-get-workflow-steps"
972+
handle, err := Enqueue[string, string](client, queue.Name, "TestWorkflow", "test-input", WithEnqueueWorkflowID(workflowID))
973+
require.NoError(t, err)
974+
975+
// Wait for workflow to complete
976+
result, err := handle.GetResult()
977+
require.NoError(t, err)
978+
assert.Equal(t, "abc", result)
979+
980+
// Test GetWorkflowSteps with loadOutput = true
981+
stepsWithOutput, err := client.GetWorkflowSteps(workflowID)
982+
require.NoError(t, err)
983+
require.Len(t, stepsWithOutput, 1, "expected exactly 1 step")
984+
985+
step := stepsWithOutput[0]
986+
assert.Equal(t, 0, step.StepID, "expected step ID to be 0")
987+
assert.Equal(t, "TestStep", step.StepName, "expected step name to be set")
988+
assert.Nil(t, step.Error, "expected no error in step")
989+
assert.Equal(t, "", step.ChildWorkflowID, "expected no child workflow ID")
990+
991+
// Verify the output wasn't loaded
992+
require.Nil(t, step.Output, "expected output not to be loaded")
993+
994+
// Verify all queue entries are cleaned up
995+
require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after get workflow steps test")
996+
}

dbos/dbos.go

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package dbos
33
import (
44
"context"
55
"crypto/sha256"
6-
"encoding/gob"
76
"encoding/hex"
87
"errors"
98
"fmt"
@@ -121,12 +120,13 @@ type DBOSContext interface {
121120
GetStepID() (int, error) // Get the current step ID (only available within workflows)
122121

123122
// Workflow management
124-
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
125-
CancelWorkflow(_ DBOSContext, workflowID string) error // Cancel a workflow by setting its status to CANCELLED
126-
ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow
127-
ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step
128-
ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria
129-
GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) // Get the execution steps of a workflow
123+
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
124+
CancelWorkflow(_ DBOSContext, workflowID string) error // Cancel a workflow by setting its status to CANCELLED
125+
ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow
126+
ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step
127+
ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria
128+
GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) // Get the execution steps of a workflow
129+
ListRegisteredWorkflows(_ DBOSContext, opts ...ListRegisteredWorkflowsOption) ([]WorkflowRegistryEntry, error) // List registered workflows with filtering options
130130

131131
// Accessors
132132
GetApplicationVersion() string // Get the application version for this context
@@ -159,7 +159,7 @@ type dbosContext struct {
159159
workflowsWg *sync.WaitGroup
160160

161161
// Workflow registry - read-mostly sync.Map since registration happens only before launch
162-
workflowRegistry *sync.Map // map[string]workflowRegistryEntry
162+
workflowRegistry *sync.Map // map[string]WorkflowRegistryEntry
163163
workflowCustomNametoFQN *sync.Map // Maps fully qualified workflow names to custom names. Usefor when client enqueues a workflow by name because registry is indexed by FQN.
164164

165165
// Workflow scheduler
@@ -194,7 +194,8 @@ func WithValue(ctx DBOSContext, key, val any) DBOSContext {
194194
}
195195
// Will do nothing if the concrete type is not dbosContext
196196
if dbosCtx, ok := ctx.(*dbosContext); ok {
197-
return &dbosContext{
197+
launched := dbosCtx.launched.Load()
198+
childCtx := &dbosContext{
198199
ctx: context.WithValue(dbosCtx.ctx, key, val), // Spawn a new child context with the value set
199200
logger: dbosCtx.logger,
200201
systemDB: dbosCtx.systemDB,
@@ -205,6 +206,8 @@ func WithValue(ctx DBOSContext, key, val any) DBOSContext {
205206
executorID: dbosCtx.executorID,
206207
applicationID: dbosCtx.applicationID,
207208
}
209+
childCtx.launched.Store(launched)
210+
return childCtx
208211
}
209212
return nil
210213
}
@@ -217,7 +220,10 @@ func WithoutCancel(ctx DBOSContext) DBOSContext {
217220
return nil
218221
}
219222
if dbosCtx, ok := ctx.(*dbosContext); ok {
220-
return &dbosContext{
223+
launched := dbosCtx.launched.Load()
224+
// Create a new context that is not canceled when the parent is canceled
225+
// but retains all other values
226+
childCtx := &dbosContext{
221227
ctx: context.WithoutCancel(dbosCtx.ctx),
222228
logger: dbosCtx.logger,
223229
systemDB: dbosCtx.systemDB,
@@ -228,6 +234,8 @@ func WithoutCancel(ctx DBOSContext) DBOSContext {
228234
executorID: dbosCtx.executorID,
229235
applicationID: dbosCtx.applicationID,
230236
}
237+
childCtx.launched.Store(launched)
238+
return childCtx
231239
}
232240
return nil
233241
}
@@ -240,8 +248,9 @@ func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.C
240248
return nil, func() {}
241249
}
242250
if dbosCtx, ok := ctx.(*dbosContext); ok {
251+
launched := dbosCtx.launched.Load()
243252
newCtx, cancelFunc := context.WithTimeoutCause(dbosCtx.ctx, timeout, errors.New("DBOS context timeout"))
244-
return &dbosContext{
253+
childCtx := &dbosContext{
245254
ctx: newCtx,
246255
logger: dbosCtx.logger,
247256
systemDB: dbosCtx.systemDB,
@@ -251,7 +260,9 @@ func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.C
251260
applicationVersion: dbosCtx.applicationVersion,
252261
executorID: dbosCtx.executorID,
253262
applicationID: dbosCtx.applicationID,
254-
}, cancelFunc
263+
}
264+
childCtx.launched.Store(launched)
265+
return childCtx, cancelFunc
255266
}
256267
return nil, func() {}
257268
}
@@ -275,6 +286,34 @@ func (c *dbosContext) GetApplicationID() string {
275286
return c.applicationID
276287
}
277288

289+
// ListRegisteredWorkflows returns information about registered workflows with their registration parameters.
290+
// Supports filtering using functional options.
291+
func (c *dbosContext) ListRegisteredWorkflows(_ DBOSContext, opts ...ListRegisteredWorkflowsOption) ([]WorkflowRegistryEntry, error) {
292+
// Initialize parameters with defaults
293+
params := &listRegisteredWorkflowsOptions{}
294+
295+
// Apply all provided options
296+
for _, opt := range opts {
297+
opt(params)
298+
}
299+
300+
// Get all registered workflows and apply filters
301+
var filteredWorkflows []WorkflowRegistryEntry
302+
c.workflowRegistry.Range(func(key, value interface{}) bool {
303+
workflow := value.(WorkflowRegistryEntry)
304+
305+
// Filter by scheduled only
306+
if params.scheduledOnly && workflow.CronSchedule == "" {
307+
return true
308+
}
309+
310+
filteredWorkflows = append(filteredWorkflows, workflow)
311+
return true
312+
})
313+
314+
return filteredWorkflows, nil
315+
}
316+
278317
// NewDBOSContext creates a new DBOS context with the provided configuration.
279318
// The context must be launched with Launch() for workflow execution and should be shut down with Shutdown().
280319
// This function initializes the DBOS system database, sets up the queue sub-system, and prepares the workflow registry.
@@ -317,15 +356,14 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
317356

318357
// Register types we serialize with gob
319358
var t time.Time
320-
gob.Register(t)
359+
safeGobRegister(t, initExecutor.logger)
321360
var ws []WorkflowStatus
322-
gob.Register(ws)
361+
safeGobRegister(ws, initExecutor.logger)
323362
var si []StepInfo
324-
gob.Register(si)
325363
var h workflowHandle[any]
326-
gob.Register(h)
364+
safeGobRegister(h, initExecutor.logger)
327365
var ph workflowPollingHandle[any]
328-
gob.Register(ph)
366+
safeGobRegister(ph, initExecutor.logger)
329367

330368
// Initialize global variables from processed config (already handles env vars and defaults)
331369
initExecutor.applicationVersion = config.ApplicationVersion

dbos/queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func (qr *queueRunner) run(ctx *dbosContext) {
221221
qr.logger.Error("workflow function not found in registry", "workflow_name", workflow.name)
222222
continue
223223
}
224-
registeredWorkflow, ok := registeredWorkflowAny.(workflowRegistryEntry)
224+
registeredWorkflow, ok := registeredWorkflowAny.(WorkflowRegistryEntry)
225225
if !ok {
226226
qr.logger.Error("invalid workflow registry entry type", "workflow_name", workflow.name)
227227
continue

dbos/recovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
4848
ctx.logger.Error("Workflow function not found in registry", "workflow_id", workflow.ID, "name", workflow.Name)
4949
continue
5050
}
51-
registeredWorkflow, ok := registeredWorkflowAny.(workflowRegistryEntry)
51+
registeredWorkflow, ok := registeredWorkflowAny.(WorkflowRegistryEntry)
5252
if !ok {
5353
ctx.logger.Error("invalid workflow registry entry type", "workflow_id", workflow.ID, "name", workflow.Name)
5454
continue

0 commit comments

Comments
 (0)