Skip to content

Commit d30f87c

Browse files
authored
DBOS client interface (#127)
Note that we need to retain a package-level `Enqueue` method, that is generic and performs gob encoding registration.
1 parent fc9aa09 commit d30f87c

File tree

6 files changed

+417
-241
lines changed

6 files changed

+417
-241
lines changed

dbos/client.go

Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
package dbos
2+
3+
import (
4+
"context"
5+
"encoding/gob"
6+
"errors"
7+
"fmt"
8+
"math"
9+
"time"
10+
11+
"github.com/google/uuid"
12+
)
13+
14+
// Client provides a programmatic way to interact with your DBOS application from external code.
15+
// It manages the underlying DBOSContext and provides methods for workflow operations
16+
// without requiring direct management of the context lifecycle.
17+
type Client interface {
18+
Enqueue(queueName, workflowName string, input any, opts ...EnqueueOption) (WorkflowHandle[any], error)
19+
ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error)
20+
Send(destinationID string, message any, topic string) error
21+
GetEvent(targetWorkflowID, key string, timeout time.Duration) (any, error)
22+
RetrieveWorkflow(workflowID string) (WorkflowHandle[any], error)
23+
CancelWorkflow(workflowID string) error
24+
ResumeWorkflow(workflowID string) (WorkflowHandle[any], error)
25+
ForkWorkflow(input ForkWorkflowInput) (WorkflowHandle[any], error)
26+
Shutdown(timeout time.Duration) // Simply close the system DB connection pool
27+
}
28+
29+
type client struct {
30+
dbosCtx DBOSContext
31+
}
32+
33+
// NewClient creates a new DBOS client with the provided configuration.
34+
// The client manages its own DBOSContext internally.
35+
//
36+
// Example:
37+
//
38+
// config := dbos.Config{
39+
// DatabaseURL: "postgres://user:pass@localhost:5432/dbname",
40+
// AppName: "my-app",
41+
// }
42+
// client, err := dbos.NewClient(context.Background(), config)
43+
// if err != nil {
44+
// log.Fatal(err)
45+
// }
46+
func NewClient(ctx context.Context, config Config) (Client, error) {
47+
dbosCtx, err := NewDBOSContext(ctx, config)
48+
if err != nil {
49+
return nil, err
50+
}
51+
52+
return &client{
53+
dbosCtx: dbosCtx,
54+
}, nil
55+
}
56+
57+
// EnqueueOption is a functional option for configuring workflow enqueue parameters.
58+
type EnqueueOption func(*enqueueOptions)
59+
60+
// WithEnqueueWorkflowID sets a custom workflow ID instead of generating one automatically.
61+
func WithEnqueueWorkflowID(id string) EnqueueOption {
62+
return func(opts *enqueueOptions) {
63+
opts.workflowID = id
64+
}
65+
}
66+
67+
// WithEnqueueApplicationVersion overrides the application version for the enqueued workflow.
68+
func WithEnqueueApplicationVersion(version string) EnqueueOption {
69+
return func(opts *enqueueOptions) {
70+
opts.applicationVersion = version
71+
}
72+
}
73+
74+
// WithEnqueueDeduplicationID sets a deduplication ID for the enqueued workflow.
75+
func WithEnqueueDeduplicationID(id string) EnqueueOption {
76+
return func(opts *enqueueOptions) {
77+
opts.deduplicationID = id
78+
}
79+
}
80+
81+
// WithEnqueuePriority sets the execution priority for the enqueued workflow.
82+
func WithEnqueuePriority(priority uint) EnqueueOption {
83+
return func(opts *enqueueOptions) {
84+
opts.priority = priority
85+
}
86+
}
87+
88+
// WithEnqueueTimeout sets the maximum execution time for the enqueued workflow.
89+
func WithEnqueueTimeout(timeout time.Duration) EnqueueOption {
90+
return func(opts *enqueueOptions) {
91+
opts.workflowTimeout = timeout
92+
}
93+
}
94+
95+
type enqueueOptions struct {
96+
workflowName string
97+
workflowID string
98+
applicationVersion string
99+
deduplicationID string
100+
priority uint
101+
workflowTimeout time.Duration
102+
workflowInput any
103+
}
104+
105+
// EnqueueWorkflow enqueues a workflow to a named queue for deferred execution.
106+
func (c *client) Enqueue(queueName, workflowName string, input any, opts ...EnqueueOption) (WorkflowHandle[any], error) {
107+
// Get the concrete dbosContext to access internal fields
108+
dbosCtx, ok := c.dbosCtx.(*dbosContext)
109+
if !ok {
110+
return nil, fmt.Errorf("invalid DBOSContext type")
111+
}
112+
113+
// Process options
114+
params := &enqueueOptions{
115+
workflowName: workflowName,
116+
applicationVersion: dbosCtx.GetApplicationVersion(),
117+
workflowInput: input,
118+
}
119+
for _, opt := range opts {
120+
opt(params)
121+
}
122+
123+
workflowID := params.workflowID
124+
if workflowID == "" {
125+
workflowID = uuid.New().String()
126+
}
127+
128+
var deadline time.Time
129+
if params.workflowTimeout > 0 {
130+
deadline = time.Now().Add(params.workflowTimeout)
131+
}
132+
133+
if params.priority > uint(math.MaxInt) {
134+
return nil, fmt.Errorf("priority %d exceeds maximum allowed value %d", params.priority, math.MaxInt)
135+
}
136+
status := WorkflowStatus{
137+
Name: params.workflowName,
138+
ApplicationVersion: params.applicationVersion,
139+
Status: WorkflowStatusEnqueued,
140+
ID: workflowID,
141+
CreatedAt: time.Now(),
142+
Deadline: deadline,
143+
Timeout: params.workflowTimeout,
144+
Input: params.workflowInput,
145+
QueueName: queueName,
146+
DeduplicationID: params.deduplicationID,
147+
Priority: int(params.priority),
148+
}
149+
150+
uncancellableCtx := WithoutCancel(dbosCtx)
151+
152+
tx, err := dbosCtx.systemDB.(*sysDB).pool.Begin(uncancellableCtx)
153+
if err != nil {
154+
return nil, newWorkflowExecutionError(workflowID, fmt.Sprintf("failed to begin transaction: %v", err))
155+
}
156+
defer tx.Rollback(uncancellableCtx) // Rollback if not committed
157+
158+
// Insert workflow status with transaction
159+
insertInput := insertWorkflowStatusDBInput{
160+
status: status,
161+
tx: tx,
162+
}
163+
_, err = dbosCtx.systemDB.insertWorkflowStatus(uncancellableCtx, insertInput)
164+
if err != nil {
165+
dbosCtx.logger.Error("failed to insert workflow status", "error", err, "workflow_id", workflowID)
166+
return nil, err
167+
}
168+
169+
if err := tx.Commit(uncancellableCtx); err != nil {
170+
return nil, fmt.Errorf("failed to commit transaction: %w", err)
171+
}
172+
173+
return newWorkflowPollingHandle[any](uncancellableCtx, workflowID), nil
174+
}
175+
176+
// Enqueue adds a workflow to a named queue for later execution with type safety.
177+
// The workflow will be persisted with ENQUEUED status until picked up by a DBOS process.
178+
// This provides asynchronous workflow execution with durability guarantees.
179+
//
180+
// Parameters:
181+
// - c: Client instance for the operation
182+
// - queueName: Name of the queue to enqueue the workflow to
183+
// - workflowName: Name of the registered workflow function to execute
184+
// - input: Input parameters to pass to the workflow (type P)
185+
// - opts: Optional configuration options
186+
//
187+
// Available options:
188+
// - WithEnqueueWorkflowID: Custom workflow ID (auto-generated if not provided)
189+
// - WithEnqueueApplicationVersion: Application version override
190+
// - WithEnqueueDeduplicationID: Deduplication identifier for idempotent enqueuing
191+
// - WithEnqueuePriority: Execution priority
192+
// - WithEnqueueTimeout: Maximum execution time for the workflow
193+
//
194+
// Returns a typed workflow handle that can be used to check status and retrieve results.
195+
// The handle uses polling to check workflow completion since the execution is asynchronous.
196+
//
197+
// Example usage:
198+
//
199+
// // Enqueue a workflow with string input and int output
200+
// handle, err := dbos.Enqueue[string, int](client, "data-processing", "ProcessDataWorkflow", "input data",
201+
// dbos.WithEnqueueTimeout(30 * time.Minute))
202+
// if err != nil {
203+
// log.Fatal(err)
204+
// }
205+
//
206+
// // Check status
207+
// status, err := handle.GetStatus()
208+
// if err != nil {
209+
// log.Printf("Failed to get status: %v", err)
210+
// }
211+
//
212+
// // Wait for completion and get result
213+
// result, err := handle.GetResult()
214+
// if err != nil {
215+
// log.Printf("Workflow failed: %v", err)
216+
// } else {
217+
// log.Printf("Result: %d", result)
218+
// }
219+
//
220+
// // Enqueue with deduplication and custom workflow ID
221+
// handle, err := dbos.Enqueue[MyInputType, MyOutputType](client, "my-queue", "MyWorkflow", MyInputType{Field: "value"},
222+
// dbos.WithEnqueueWorkflowID("custom-workflow-id"),
223+
// dbos.WithEnqueueDeduplicationID("unique-operation-id"))
224+
func Enqueue[P any, R any](c Client, queueName, workflowName string, input P, opts ...EnqueueOption) (WorkflowHandle[R], error) {
225+
if c == nil {
226+
return nil, errors.New("client cannot be nil")
227+
}
228+
229+
// Register the input and outputs for gob encoding
230+
var typedInput P
231+
gob.Register(typedInput)
232+
var typedOutput R
233+
gob.Register(typedOutput)
234+
235+
// Call the interface method with the same signature
236+
handle, err := c.Enqueue(queueName, workflowName, input, opts...)
237+
if err != nil {
238+
return nil, err
239+
}
240+
241+
return newWorkflowPollingHandle[R](c.(*client).dbosCtx, handle.GetWorkflowID()), nil
242+
}
243+
244+
// ListWorkflows retrieves a list of workflows based on the provided filters.
245+
func (c *client) ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error) {
246+
return c.dbosCtx.ListWorkflows(c.dbosCtx, opts...)
247+
}
248+
249+
// Send sends a message to another workflow.
250+
func (c *client) Send(destinationID string, message any, topic string) error {
251+
return c.dbosCtx.Send(c.dbosCtx, destinationID, message, topic)
252+
}
253+
254+
// GetEvent retrieves a key-value event from a target workflow.
255+
func (c *client) GetEvent(targetWorkflowID, key string, timeout time.Duration) (any, error) {
256+
return c.dbosCtx.GetEvent(c.dbosCtx, targetWorkflowID, key, timeout)
257+
}
258+
259+
// RetrieveWorkflow returns a handle to an existing workflow.
260+
func (c *client) RetrieveWorkflow(workflowID string) (WorkflowHandle[any], error) {
261+
return c.dbosCtx.RetrieveWorkflow(c.dbosCtx, workflowID)
262+
}
263+
264+
// CancelWorkflow cancels a running or enqueued workflow.
265+
func (c *client) CancelWorkflow(workflowID string) error {
266+
return c.dbosCtx.CancelWorkflow(c.dbosCtx, workflowID)
267+
}
268+
269+
// ResumeWorkflow resumes a workflow from its last completed step.
270+
func (c *client) ResumeWorkflow(workflowID string) (WorkflowHandle[any], error) {
271+
return c.dbosCtx.ResumeWorkflow(c.dbosCtx, workflowID)
272+
}
273+
274+
// ForkWorkflow creates a new workflow instance by copying an existing workflow from a specific step.
275+
func (c *client) ForkWorkflow(input ForkWorkflowInput) (WorkflowHandle[any], error) {
276+
return c.dbosCtx.ForkWorkflow(c.dbosCtx, input)
277+
}
278+
279+
// Shutdown gracefully shuts down the client and closes the system database connection.
280+
func (c *client) Shutdown(timeout time.Duration) {
281+
// Get the concrete dbosContext to access internal fields
282+
dbosCtx, ok := c.dbosCtx.(*dbosContext)
283+
if !ok {
284+
return
285+
}
286+
287+
// Close the system database
288+
if dbosCtx.systemDB != nil {
289+
dbosCtx.logger.Debug("Shutting down system database")
290+
dbosCtx.systemDB.shutdown(dbosCtx, timeout)
291+
}
292+
}

0 commit comments

Comments
 (0)