Skip to content

Commit 386849f

Browse files
committed
1:1 mapping from DBOSContext interface and package level methods
1 parent 45e468d commit 386849f

File tree

5 files changed

+142
-23
lines changed

5 files changed

+142
-23
lines changed

dbos/client_test.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ func TestCancelResume(t *testing.T) {
269269
}
270270

271271
// Cancel the workflow
272-
err = clientCtx.CancelWorkflow(workflowID)
272+
err = CancelWorkflow(clientCtx, workflowID)
273273
if err != nil {
274274
t.Fatalf("failed to cancel workflow: %v", err)
275275
}
@@ -368,7 +368,7 @@ func TestCancelResume(t *testing.T) {
368368
time.Sleep(500 * time.Millisecond)
369369

370370
// Cancel the workflow before timeout expires
371-
err = clientCtx.CancelWorkflow(workflowID)
371+
err = CancelWorkflow(clientCtx, workflowID)
372372
if err != nil {
373373
t.Fatalf("failed to cancel workflow: %v", err)
374374
}
@@ -447,7 +447,7 @@ func TestCancelResume(t *testing.T) {
447447
nonExistentWorkflowID := "non-existent-workflow-id"
448448

449449
// Try to cancel a non-existent workflow
450-
err := clientCtx.CancelWorkflow(nonExistentWorkflowID)
450+
err := CancelWorkflow(clientCtx, nonExistentWorkflowID)
451451
if err == nil {
452452
t.Fatal("expected error when canceling non-existent workflow, but got none")
453453
}
@@ -848,7 +848,7 @@ func TestListWorkflows(t *testing.T) {
848848
}
849849

850850
// Test 1: List all workflows (no filters)
851-
allWorkflows, err := clientCtx.ListWorkflows()
851+
allWorkflows, err := ListWorkflows(clientCtx)
852852
if err != nil {
853853
t.Fatalf("failed to list all workflows: %v", err)
854854
}
@@ -858,7 +858,7 @@ func TestListWorkflows(t *testing.T) {
858858

859859
// Test 2: Filter by workflow IDs
860860
expectedIDs := workflowIDs[:3]
861-
specificWorkflows, err := clientCtx.ListWorkflows(WithWorkflowIDs(expectedIDs))
861+
specificWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDs(expectedIDs))
862862
if err != nil {
863863
t.Fatalf("failed to list workflows by IDs: %v", err)
864864
}
@@ -877,7 +877,7 @@ func TestListWorkflows(t *testing.T) {
877877
}
878878

879879
// Test 3: Filter by workflow ID prefix
880-
batchWorkflows, err := clientCtx.ListWorkflows(WithWorkflowIDPrefix("test-batch-"))
880+
batchWorkflows, err := ListWorkflows(clientCtx, WithWorkflowIDPrefix("test-batch-"))
881881
if err != nil {
882882
t.Fatalf("failed to list workflows by prefix: %v", err)
883883
}
@@ -892,7 +892,7 @@ func TestListWorkflows(t *testing.T) {
892892
}
893893

894894
// Test 4: Filter by status - SUCCESS
895-
successWorkflows, err := clientCtx.ListWorkflows(
895+
successWorkflows, err := ListWorkflows(clientCtx,
896896
WithWorkflowIDPrefix("test-"), // Only our test workflows
897897
WithStatus([]WorkflowStatusType{WorkflowStatusSuccess}))
898898
if err != nil {
@@ -909,7 +909,7 @@ func TestListWorkflows(t *testing.T) {
909909
}
910910

911911
// Test 5: Filter by status - ERROR
912-
errorWorkflows, err := clientCtx.ListWorkflows(
912+
errorWorkflows, err := ListWorkflows(clientCtx,
913913
WithWorkflowIDPrefix("test-"),
914914
WithStatus([]WorkflowStatusType{WorkflowStatusError}))
915915
if err != nil {
@@ -927,7 +927,7 @@ func TestListWorkflows(t *testing.T) {
927927

928928
// Test 6: Filter by time range - first 5 workflows (start to start+500ms)
929929
firstHalfTime := testStartTime.Add(500 * time.Millisecond)
930-
firstHalfWorkflows, err := clientCtx.ListWorkflows(
930+
firstHalfWorkflows, err := ListWorkflows(clientCtx,
931931
WithWorkflowIDPrefix("test-"),
932932
WithEndTime(firstHalfTime))
933933
if err != nil {
@@ -938,7 +938,7 @@ func TestListWorkflows(t *testing.T) {
938938
}
939939

940940
// Test 6b: Filter by time range - last 5 workflows (start+500ms to end)
941-
secondHalfWorkflows, err := clientCtx.ListWorkflows(
941+
secondHalfWorkflows, err := ListWorkflows(clientCtx,
942942
WithWorkflowIDPrefix("test-"),
943943
WithStartTime(firstHalfTime))
944944
if err != nil {
@@ -949,15 +949,15 @@ func TestListWorkflows(t *testing.T) {
949949
}
950950

951951
// Test 7: Test sorting order (ascending - default)
952-
ascWorkflows, err := clientCtx.ListWorkflows(
952+
ascWorkflows, err := ListWorkflows(clientCtx,
953953
WithWorkflowIDPrefix("test-"),
954954
WithSortDesc(false))
955955
if err != nil {
956956
t.Fatalf("failed to list workflows ascending: %v", err)
957957
}
958958

959959
// Test 8: Test sorting order (descending)
960-
descWorkflows, err := clientCtx.ListWorkflows(
960+
descWorkflows, err := ListWorkflows(clientCtx,
961961
WithWorkflowIDPrefix("test-"),
962962
WithSortDesc(true))
963963
if err != nil {
@@ -991,7 +991,7 @@ func TestListWorkflows(t *testing.T) {
991991
}
992992

993993
// Test 9: Test limit and offset
994-
limitedWorkflows, err := clientCtx.ListWorkflows(
994+
limitedWorkflows, err := ListWorkflows(clientCtx,
995995
WithWorkflowIDPrefix("test-"),
996996
WithLimit(5))
997997
if err != nil {
@@ -1008,7 +1008,7 @@ func TestListWorkflows(t *testing.T) {
10081008
}
10091009
}
10101010

1011-
offsetWorkflows, err := clientCtx.ListWorkflows(
1011+
offsetWorkflows, err := ListWorkflows(clientCtx,
10121012
WithWorkflowIDPrefix("test-"),
10131013
WithOffset(5),
10141014
WithLimit(3))
@@ -1027,7 +1027,7 @@ func TestListWorkflows(t *testing.T) {
10271027
}
10281028

10291029
// Test 10: Test input/output loading
1030-
noDataWorkflows, err := clientCtx.ListWorkflows(
1030+
noDataWorkflows, err := ListWorkflows(clientCtx,
10311031
WithWorkflowIDs(workflowIDs[:2]),
10321032
WithLoadInput(false),
10331033
WithLoadOutput(false))

dbos/dbos.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ type DBOSContext interface {
8282
GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any, error) // Get a key-value event from a target workflow
8383
Sleep(duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
8484
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
85+
GetStepID() (int, error) // Get the current step ID (only available within workflows)
8586

8687
// Workflow management
8788
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow

dbos/serialization_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func TestWorkflowEncoding(t *testing.T) {
103103
}
104104

105105
// Test results from ListWorkflows
106-
workflows, err := executor.ListWorkflows(WithWorkflowIDs(
106+
workflows, err := ListWorkflows(executor, WithWorkflowIDs(
107107
[]string{directHandle.GetWorkflowID()},
108108
))
109109
if err != nil {
@@ -220,7 +220,7 @@ func TestWorkflowEncoding(t *testing.T) {
220220
}
221221

222222
// Test results from ListWorkflows
223-
workflows, err := executor.ListWorkflows(WithWorkflowIDs(
223+
workflows, err := ListWorkflows(executor, WithWorkflowIDs(
224224
[]string{directHandle.GetWorkflowID()},
225225
))
226226
if err != nil {

dbos/workflow.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1186,6 +1186,24 @@ func (c *dbosContext) Sleep(duration time.Duration) (time.Duration, error) {
11861186
return c.systemDB.sleep(c, duration)
11871187
}
11881188

1189+
// Sleep pauses workflow execution for the specified duration.
1190+
// This is a durable sleep - if the workflow is recovered during the sleep period,
1191+
// it will continue sleeping for the remaining time.
1192+
// Returns the actual duration slept.
1193+
//
1194+
// Example:
1195+
//
1196+
// actualDuration, err := dbos.Sleep(ctx, 5*time.Second)
1197+
// if err != nil {
1198+
// return err
1199+
// }
1200+
func Sleep(ctx DBOSContext, duration time.Duration) (time.Duration, error) {
1201+
if ctx == nil {
1202+
return 0, errors.New("ctx cannot be nil")
1203+
}
1204+
return ctx.Sleep(duration)
1205+
}
1206+
11891207
/***********************************/
11901208
/******* WORKFLOW MANAGEMENT *******/
11911209
/***********************************/
@@ -1208,6 +1226,42 @@ func (c *dbosContext) GetStepID() (int, error) {
12081226
return wfState.stepID, nil
12091227
}
12101228

1229+
// GetWorkflowID retrieves the workflow ID from the context if called within a DBOS workflow.
1230+
// Returns an error if not called from within a workflow context.
1231+
//
1232+
// Example:
1233+
//
1234+
// workflowID, err := dbos.GetWorkflowID(ctx)
1235+
// if err != nil {
1236+
// log.Printf("Not in a workflow context: %v", err)
1237+
// } else {
1238+
// log.Printf("Current workflow ID: %s", workflowID)
1239+
// }
1240+
func GetWorkflowID(ctx DBOSContext) (string, error) {
1241+
if ctx == nil {
1242+
return "", errors.New("ctx cannot be nil")
1243+
}
1244+
return ctx.GetWorkflowID()
1245+
}
1246+
1247+
// GetStepID retrieves the current step ID from the context if called within a DBOS workflow.
1248+
// Returns -1 and an error if not called from within a workflow context.
1249+
//
1250+
// Example:
1251+
//
1252+
// stepID, err := dbos.GetStepID(ctx)
1253+
// if err != nil {
1254+
// log.Printf("Not in a workflow context: %v", err)
1255+
// } else {
1256+
// log.Printf("Current step ID: %d", stepID)
1257+
// }
1258+
func GetStepID(ctx DBOSContext) (int, error) {
1259+
if ctx == nil {
1260+
return -1, errors.New("ctx cannot be nil")
1261+
}
1262+
return ctx.GetStepID()
1263+
}
1264+
12111265
func (c *dbosContext) RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) {
12121266
workflowStatus, err := c.systemDB.listWorkflows(c, listWorkflowsDBInput{
12131267
workflowIDs: []string{workflowID},
@@ -1428,6 +1482,28 @@ func (c *dbosContext) CancelWorkflow(workflowID string) error {
14281482
return c.systemDB.cancelWorkflow(c, workflowID)
14291483
}
14301484

1485+
// CancelWorkflow cancels a running or enqueued workflow by setting its status to CANCELLED.
1486+
// Once cancelled, the workflow will stop executing. Currently executing steps will not be interrupted.
1487+
//
1488+
// Parameters:
1489+
// - ctx: DBOS context for the operation
1490+
// - workflowID: The unique identifier of the workflow to cancel
1491+
//
1492+
// Returns an error if the workflow does not exist or if the cancellation operation fails.
1493+
//
1494+
// Example:
1495+
//
1496+
// err := dbos.CancelWorkflow(ctx, "workflow-to-cancel")
1497+
// if err != nil {
1498+
// log.Printf("Failed to cancel workflow: %v", err)
1499+
// }
1500+
func CancelWorkflow(ctx DBOSContext, workflowID string) error {
1501+
if ctx == nil {
1502+
return errors.New("ctx cannot be nil")
1503+
}
1504+
return ctx.CancelWorkflow(workflowID)
1505+
}
1506+
14311507
func (c *dbosContext) ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) {
14321508
err := c.systemDB.resumeWorkflow(c, workflowID)
14331509
if err != nil {
@@ -1826,3 +1902,45 @@ func (c *dbosContext) ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStat
18261902

18271903
return workflows, nil
18281904
}
1905+
1906+
// ListWorkflows retrieves a list of workflows based on the provided filters.
1907+
//
1908+
// The function supports filtering by workflow IDs, status, time ranges, names, application versions,
1909+
// authenticated users, workflow ID prefixes, and more. It also supports pagination through
1910+
// limit/offset parameters and sorting control (ascending by default, or descending with WithSortDesc).
1911+
//
1912+
// By default, both input and output data are loaded for each workflow. This can be controlled
1913+
// using WithLoadInput(false) and WithLoadOutput(false) options for better performance when
1914+
// the data is not needed.
1915+
//
1916+
// Parameters:
1917+
// - ctx: DBOS context for the operation
1918+
// - opts: Functional options to configure the query filters and parameters
1919+
//
1920+
// Returns a slice of WorkflowStatus structs containing the workflow information.
1921+
//
1922+
// Example usage:
1923+
//
1924+
// // List all successful workflows from the last 24 hours
1925+
// workflows, err := dbos.ListWorkflows(ctx,
1926+
// dbos.WithStatus([]dbos.WorkflowStatusType{dbos.WorkflowStatusSuccess}),
1927+
// dbos.WithStartTime(time.Now().Add(-24*time.Hour)),
1928+
// dbos.WithLimit(100))
1929+
// if err != nil {
1930+
// log.Fatal(err)
1931+
// }
1932+
//
1933+
// // List workflows by specific IDs without loading input/output data
1934+
// workflows, err := dbos.ListWorkflows(ctx,
1935+
// dbos.WithWorkflowIDs([]string{"workflow1", "workflow2"}),
1936+
// dbos.WithLoadInput(false),
1937+
// dbos.WithLoadOutput(false))
1938+
// if err != nil {
1939+
// log.Fatal(err)
1940+
// }
1941+
func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) {
1942+
if ctx == nil {
1943+
return nil, errors.New("ctx cannot be nil")
1944+
}
1945+
return ctx.ListWorkflows(opts...)
1946+
}

dbos/workflows_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ func TestChildWorkflow(t *testing.T) {
541541

542542
// Create child workflows with executor
543543
childWf := func(dbosCtx DBOSContext, input Inheritance) (string, error) {
544-
workflowID, err := dbosCtx.GetWorkflowID()
544+
workflowID, err := GetWorkflowID(dbosCtx)
545545
if err != nil {
546546
return "", fmt.Errorf("failed to get workflow ID: %w", err)
547547
}
@@ -557,7 +557,7 @@ func TestChildWorkflow(t *testing.T) {
557557
RegisterWorkflow(dbosCtx, childWf)
558558

559559
parentWf := func(ctx DBOSContext, input Inheritance) (string, error) {
560-
workflowID, err := ctx.GetWorkflowID()
560+
workflowID, err := GetWorkflowID(ctx)
561561
if err != nil {
562562
return "", fmt.Errorf("failed to get workflow ID: %w", err)
563563
}
@@ -624,7 +624,7 @@ func TestChildWorkflow(t *testing.T) {
624624
RegisterWorkflow(dbosCtx, parentWf)
625625

626626
grandParentWf := func(ctx DBOSContext, r int) (string, error) {
627-
workflowID, err := ctx.GetWorkflowID()
627+
workflowID, err := GetWorkflowID(ctx)
628628
if err != nil {
629629
return "", fmt.Errorf("failed to get workflow ID: %w", err)
630630
}
@@ -1064,7 +1064,7 @@ var (
10641064

10651065
func deadLetterQueueWorkflow(ctx DBOSContext, input string) (int, error) {
10661066
recoveryCount++
1067-
wfid, err := ctx.GetWorkflowID()
1067+
wfid, err := GetWorkflowID(ctx)
10681068
if err != nil {
10691069
return 0, fmt.Errorf("failed to get workflow ID: %v", err)
10701070
}
@@ -2528,7 +2528,7 @@ var (
25282528
)
25292529

25302530
func sleepRecoveryWorkflow(dbosCtx DBOSContext, duration time.Duration) (time.Duration, error) {
2531-
result, err := dbosCtx.Sleep(duration)
2531+
result, err := Sleep(dbosCtx, duration)
25322532
if err != nil {
25332533
return 0, err
25342534
}
@@ -2603,7 +2603,7 @@ func TestSleep(t *testing.T) {
26032603

26042604
t.Run("SleepCannotBeCalledOutsideWorkflow", func(t *testing.T) {
26052605
// Attempt to call Sleep outside of a workflow context
2606-
_, err := dbosCtx.Sleep(1 * time.Second)
2606+
_, err := Sleep(dbosCtx, 1*time.Second)
26072607
if err == nil {
26082608
t.Fatal("expected error when calling Sleep outside of workflow context, but got none")
26092609
}

0 commit comments

Comments
 (0)