Skip to content

Commit 4763d72

Browse files
committed
DBOS client interface
1 parent 2a091a2 commit 4763d72

File tree

4 files changed

+402
-229
lines changed

4 files changed

+402
-229
lines changed

dbos/client.go

Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
package dbos
2+
3+
import (
4+
"context"
5+
"encoding/gob"
6+
"errors"
7+
"fmt"
8+
"math"
9+
"time"
10+
11+
"github.com/google/uuid"
12+
)
13+
14+
// Client provides a programmatic way to interact with your DBOS application from external code.
15+
// It manages the underlying DBOSContext and provides methods for workflow operations
16+
// without requiring direct management of the context lifecycle.
17+
type Client interface {
18+
// Enqueue enqueues a workflow to a named queue for deferred execution.
19+
// The workflow will be executed when the queue runner processes it.
20+
Enqueue(queueName, workflowName string, input any, opts ...EnqueueOption) (WorkflowHandle[any], error)
21+
22+
// ListWorkflows retrieves a list of workflows based on the provided filters.
23+
ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error)
24+
25+
// Send sends a message to another workflow.
26+
Send(destinationID string, message any, topic string) error
27+
28+
// GetEvent retrieves a key-value event from a target workflow.
29+
GetEvent(targetWorkflowID, key string, timeout time.Duration) (any, error)
30+
31+
// RetrieveWorkflow returns a handle to an existing workflow.
32+
RetrieveWorkflow(workflowID string) (WorkflowHandle[any], error)
33+
34+
// CancelWorkflow cancels a running or enqueued workflow.
35+
CancelWorkflow(workflowID string) error
36+
37+
// ResumeWorkflow resumes a workflow from its last completed step.
38+
ResumeWorkflow(workflowID string) (WorkflowHandle[any], error)
39+
40+
// ForkWorkflow creates a new workflow instance by copying an existing workflow from a specific step.
41+
ForkWorkflow(input ForkWorkflowInput) (WorkflowHandle[any], error)
42+
43+
// Shutdown gracefully shuts down the client and closes the system database connection.
44+
Shutdown(timeout time.Duration)
45+
}
46+
47+
type client struct {
48+
dbosCtx DBOSContext
49+
}
50+
51+
// NewClient creates a new DBOS client with the provided configuration.
52+
// The client manages its own DBOSContext internally.
53+
//
54+
// Example:
55+
//
56+
// config := dbos.Config{
57+
// DatabaseURL: "postgres://user:pass@localhost:5432/dbname",
58+
// AppName: "my-app",
59+
// }
60+
// client, err := dbos.NewClient(context.Background(), config)
61+
// if err != nil {
62+
// log.Fatal(err)
63+
// }
64+
func NewClient(ctx context.Context, config Config) (Client, error) {
65+
dbosCtx, err := NewDBOSContext(ctx, config)
66+
if err != nil {
67+
return nil, err
68+
}
69+
70+
return &client{
71+
dbosCtx: dbosCtx,
72+
}, nil
73+
}
74+
75+
// EnqueueOption is a functional option for configuring workflow enqueue parameters.
76+
type EnqueueOption func(*enqueueOptions)
77+
78+
// WithEnqueueWorkflowID sets a custom workflow ID instead of generating one automatically.
79+
func WithEnqueueWorkflowID(id string) EnqueueOption {
80+
return func(opts *enqueueOptions) {
81+
opts.workflowID = id
82+
}
83+
}
84+
85+
// WithEnqueueApplicationVersion overrides the application version for the enqueued workflow.
86+
func WithEnqueueApplicationVersion(version string) EnqueueOption {
87+
return func(opts *enqueueOptions) {
88+
opts.applicationVersion = version
89+
}
90+
}
91+
92+
// WithEnqueueDeduplicationID sets a deduplication ID for the enqueued workflow.
93+
func WithEnqueueDeduplicationID(id string) EnqueueOption {
94+
return func(opts *enqueueOptions) {
95+
opts.deduplicationID = id
96+
}
97+
}
98+
99+
// WithEnqueuePriority sets the execution priority for the enqueued workflow.
100+
func WithEnqueuePriority(priority uint) EnqueueOption {
101+
return func(opts *enqueueOptions) {
102+
opts.priority = priority
103+
}
104+
}
105+
106+
// WithEnqueueTimeout sets the maximum execution time for the enqueued workflow.
107+
func WithEnqueueTimeout(timeout time.Duration) EnqueueOption {
108+
return func(opts *enqueueOptions) {
109+
opts.workflowTimeout = timeout
110+
}
111+
}
112+
113+
type enqueueOptions struct {
114+
workflowName string
115+
workflowID string
116+
applicationVersion string
117+
deduplicationID string
118+
priority uint
119+
workflowTimeout time.Duration
120+
workflowInput any
121+
}
122+
123+
// EnqueueWorkflow enqueues a workflow to a named queue for deferred execution.
124+
func (c *client) Enqueue(queueName, workflowName string, input any, opts ...EnqueueOption) (WorkflowHandle[any], error) {
125+
// Get the concrete dbosContext to access internal fields
126+
dbosCtx, ok := c.dbosCtx.(*dbosContext)
127+
if !ok {
128+
return nil, fmt.Errorf("invalid DBOSContext type")
129+
}
130+
131+
// Process options
132+
params := &enqueueOptions{
133+
workflowName: workflowName,
134+
applicationVersion: dbosCtx.GetApplicationVersion(),
135+
workflowInput: input,
136+
}
137+
for _, opt := range opts {
138+
opt(params)
139+
}
140+
141+
workflowID := params.workflowID
142+
if workflowID == "" {
143+
workflowID = uuid.New().String()
144+
}
145+
146+
var deadline time.Time
147+
if params.workflowTimeout > 0 {
148+
deadline = time.Now().Add(params.workflowTimeout)
149+
}
150+
151+
if params.priority > uint(math.MaxInt) {
152+
return nil, fmt.Errorf("priority %d exceeds maximum allowed value %d", params.priority, math.MaxInt)
153+
}
154+
status := WorkflowStatus{
155+
Name: params.workflowName,
156+
ApplicationVersion: params.applicationVersion,
157+
Status: WorkflowStatusEnqueued,
158+
ID: workflowID,
159+
CreatedAt: time.Now(),
160+
Deadline: deadline,
161+
Timeout: params.workflowTimeout,
162+
Input: params.workflowInput,
163+
QueueName: queueName,
164+
DeduplicationID: params.deduplicationID,
165+
Priority: int(params.priority),
166+
}
167+
168+
uncancellableCtx := WithoutCancel(dbosCtx)
169+
170+
tx, err := dbosCtx.systemDB.(*sysDB).pool.Begin(uncancellableCtx)
171+
if err != nil {
172+
return nil, newWorkflowExecutionError(workflowID, fmt.Sprintf("failed to begin transaction: %v", err))
173+
}
174+
defer tx.Rollback(uncancellableCtx) // Rollback if not committed
175+
176+
// Insert workflow status with transaction
177+
insertInput := insertWorkflowStatusDBInput{
178+
status: status,
179+
tx: tx,
180+
}
181+
_, err = dbosCtx.systemDB.insertWorkflowStatus(uncancellableCtx, insertInput)
182+
if err != nil {
183+
dbosCtx.logger.Error("failed to insert workflow status", "error", err, "workflow_id", workflowID)
184+
return nil, err
185+
}
186+
187+
if err := tx.Commit(uncancellableCtx); err != nil {
188+
return nil, fmt.Errorf("failed to commit transaction: %w", err)
189+
}
190+
191+
return newWorkflowPollingHandle[any](uncancellableCtx, workflowID), nil
192+
}
193+
194+
// Enqueue adds a workflow to a named queue for later execution with type safety.
195+
// The workflow will be persisted with ENQUEUED status until picked up by a DBOS process.
196+
// This provides asynchronous workflow execution with durability guarantees.
197+
//
198+
// Parameters:
199+
// - ctx: DBOS context for the operation
200+
// - queueName: Name of the queue to enqueue the workflow to
201+
// - workflowName: Name of the registered workflow function to execute
202+
// - input: Input parameters to pass to the workflow (type P)
203+
// - opts: Optional configuration options
204+
//
205+
// Available options:
206+
// - WithEnqueueWorkflowID: Custom workflow ID (auto-generated if not provided)
207+
// - WithEnqueueApplicationVersion: Application version override
208+
// - WithEnqueueDeduplicationID: Deduplication identifier for idempotent enqueuing
209+
// - WithEnqueuePriority: Execution priority
210+
// - WithEnqueueTimeout: Maximum execution time for the workflow
211+
//
212+
// Returns a typed workflow handle that can be used to check status and retrieve results.
213+
// The handle uses polling to check workflow completion since the execution is asynchronous.
214+
//
215+
// Example usage:
216+
//
217+
// // Enqueue a workflow with string input and int output
218+
// handle, err := dbos.Enqueue[string, int](ctx, "data-processing", "ProcessDataWorkflow", "input data",
219+
// dbos.WithEnqueueTimeout(30 * time.Minute))
220+
// if err != nil {
221+
// log.Fatal(err)
222+
// }
223+
//
224+
// // Check status
225+
// status, err := handle.GetStatus()
226+
// if err != nil {
227+
// log.Printf("Failed to get status: %v", err)
228+
// }
229+
//
230+
// // Wait for completion and get result
231+
// result, err := handle.GetResult()
232+
// if err != nil {
233+
// log.Printf("Workflow failed: %v", err)
234+
// } else {
235+
// log.Printf("Result: %d", result)
236+
// }
237+
//
238+
// // Enqueue with deduplication and custom workflow ID
239+
// handle, err := dbos.Enqueue[MyInputType, MyOutputType](ctx, "my-queue", "MyWorkflow", MyInputType{Field: "value"},
240+
// dbos.WithEnqueueWorkflowID("custom-workflow-id"),
241+
// dbos.WithEnqueueDeduplicationID("unique-operation-id"))
242+
func Enqueue[P any, R any](c Client, queueName, workflowName string, input P, opts ...EnqueueOption) (WorkflowHandle[R], error) {
243+
if c == nil {
244+
return nil, errors.New("client cannot be nil")
245+
}
246+
247+
// Register the input and outputs for gob encoding
248+
var typedInput P
249+
gob.Register(typedInput)
250+
var typedOutput R
251+
gob.Register(typedOutput)
252+
253+
// Call the interface method with the same signature
254+
handle, err := c.Enqueue(queueName, workflowName, input, opts...)
255+
if err != nil {
256+
return nil, err
257+
}
258+
259+
return newWorkflowPollingHandle[R](c.(*client).dbosCtx, handle.GetWorkflowID()), nil
260+
}
261+
262+
// ListWorkflows retrieves a list of workflows based on the provided filters.
263+
func (c *client) ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error) {
264+
return c.dbosCtx.ListWorkflows(c.dbosCtx, opts...)
265+
}
266+
267+
// Send sends a message to another workflow.
268+
func (c *client) Send(destinationID string, message any, topic string) error {
269+
return c.dbosCtx.Send(c.dbosCtx, destinationID, message, topic)
270+
}
271+
272+
// GetEvent retrieves a key-value event from a target workflow.
273+
func (c *client) GetEvent(targetWorkflowID, key string, timeout time.Duration) (any, error) {
274+
return c.dbosCtx.GetEvent(c.dbosCtx, targetWorkflowID, key, timeout)
275+
}
276+
277+
// RetrieveWorkflow returns a handle to an existing workflow.
278+
func (c *client) RetrieveWorkflow(workflowID string) (WorkflowHandle[any], error) {
279+
return c.dbosCtx.RetrieveWorkflow(c.dbosCtx, workflowID)
280+
}
281+
282+
// CancelWorkflow cancels a running or enqueued workflow.
283+
func (c *client) CancelWorkflow(workflowID string) error {
284+
return c.dbosCtx.CancelWorkflow(c.dbosCtx, workflowID)
285+
}
286+
287+
// ResumeWorkflow resumes a workflow from its last completed step.
288+
func (c *client) ResumeWorkflow(workflowID string) (WorkflowHandle[any], error) {
289+
return c.dbosCtx.ResumeWorkflow(c.dbosCtx, workflowID)
290+
}
291+
292+
// ForkWorkflow creates a new workflow instance by copying an existing workflow from a specific step.
293+
func (c *client) ForkWorkflow(input ForkWorkflowInput) (WorkflowHandle[any], error) {
294+
return c.dbosCtx.ForkWorkflow(c.dbosCtx, input)
295+
}
296+
297+
// Shutdown gracefully shuts down the client and closes the system database connection.
298+
func (c *client) Shutdown(timeout time.Duration) {
299+
// Get the concrete dbosContext to access internal fields
300+
dbosCtx, ok := c.dbosCtx.(*dbosContext)
301+
if !ok {
302+
return
303+
}
304+
305+
// Close the system database
306+
if dbosCtx.systemDB != nil {
307+
dbosCtx.logger.Debug("Shutting down system database")
308+
dbosCtx.systemDB.shutdown(dbosCtx, timeout)
309+
}
310+
}

0 commit comments

Comments
 (0)