Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,8 @@ For example, build a reliable billing workflow that durably waits for a notifica

```golang
func sendWorkflow(ctx dbos.DBOSContext, message string) (string, error) {
err := dbos.Send(ctx, dbos.WorkflowSendInput[string]{
DestinationID: "receiverID",
Topic: "topic",
Message: message,
})
err := dbos.Send(ctx, "receiverID", message, "topic")
return "sent", err
}

func receiveWorkflow(ctx dbos.DBOSContext, topic string) (string, error) {
Expand Down
2 changes: 1 addition & 1 deletion dbos/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
workflowID := r.PathValue("id")
ctx.logger.Info("Cancelling workflow", "workflow_id", workflowID)

err := ctx.CancelWorkflow(workflowID)
err := ctx.CancelWorkflow(ctx, workflowID)
if err != nil {
ctx.logger.Error("Failed to cancel workflow", "workflow_id", workflowID, "error", err)
http.Error(w, fmt.Sprintf("Failed to cancel workflow: %v", err), http.StatusInternalServerError)
Expand Down
12 changes: 6 additions & 6 deletions dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,18 @@ type DBOSContext interface {
// Workflow operations
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
Send(_ DBOSContext, input WorkflowSendInput) error // Send a message to another workflow
Recv(_ DBOSContext, input WorkflowRecvInput) (any, error) // Receive a message sent to this workflow
SetEvent(_ DBOSContext, input WorkflowSetEventInput) error // Set a key-value event for this workflow
GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any, error) // Get a key-value event from a target workflow
Sleep(duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow
Recv(_ DBOSContext, input RecvInput) (any, error) // Receive a message sent to this workflow
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow
GetEvent(_ DBOSContext, input GetEventInput) (any, error) // Get a key-value event from a target workflow
Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
GetStepID() (int, error) // Get the current step ID (only available within workflows)

// Workflow management
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) // Enqueue a new workflow with parameters
CancelWorkflow(workflowID string) error // Cancel a workflow by setting its status to CANCELLED
CancelWorkflow(_ DBOSContext, workflowID string) error // Cancel a workflow by setting its status to CANCELLED
ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow
ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step
ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria
Expand Down
13 changes: 4 additions & 9 deletions dbos/serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func setEventUserDefinedTypeWorkflow(ctx DBOSContext, input string) (string, err
},
}

err := SetEvent(ctx, GenericWorkflowSetEventInput[UserDefinedEventData]{Key: input, Message: eventData})
err := SetEvent(ctx, input, eventData)
if err != nil {
return "", err
}
Expand All @@ -236,7 +236,7 @@ func TestSetEventSerialize(t *testing.T) {
assert.Equal(t, "user-defined-event-set", result)

// Retrieve the event to verify it was properly serialized and can be deserialized
retrievedEvent, err := GetEvent[UserDefinedEventData](executor, WorkflowGetEventInput{
retrievedEvent, err := GetEvent[UserDefinedEventData](executor, GetEventInput{
TargetWorkflowID: setHandle.GetWorkflowID(),
Key: "user-defined-key",
Timeout: 3 * time.Second,
Expand Down Expand Up @@ -268,12 +268,7 @@ func sendUserDefinedTypeWorkflow(ctx DBOSContext, destinationID string) (string,
}

// Send should automatically register this type with gob
// Note the explicit type parameter since compiler cannot infer UserDefinedEventData from string input
err := Send(ctx, GenericWorkflowSendInput[UserDefinedEventData]{
DestinationID: destinationID,
Topic: "user-defined-topic",
Message: sendData,
})
err := Send(ctx, destinationID, sendData, "user-defined-topic")
if err != nil {
return "", err
}
Expand All @@ -282,7 +277,7 @@ func sendUserDefinedTypeWorkflow(ctx DBOSContext, destinationID string) (string,

func recvUserDefinedTypeWorkflow(ctx DBOSContext, input string) (UserDefinedEventData, error) {
// Receive the user-defined type message
result, err := Recv[UserDefinedEventData](ctx, WorkflowRecvInput{
result, err := Recv[UserDefinedEventData](ctx, RecvInput{
Topic: "user-defined-topic",
Timeout: 3 * time.Second,
})
Expand Down
9 changes: 4 additions & 5 deletions dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ type systemDatabase interface {

// Communication (special steps)
send(ctx context.Context, input WorkflowSendInput) error
recv(ctx context.Context, input WorkflowRecvInput) (any, error)
recv(ctx context.Context, input RecvInput) (any, error)
setEvent(ctx context.Context, input WorkflowSetEventInput) error
getEvent(ctx context.Context, input WorkflowGetEventInput) (any, error)
getEvent(ctx context.Context, input GetEventInput) (any, error)

// Timers (special steps)
sleep(ctx context.Context, duration time.Duration) (time.Duration, error)
Expand Down Expand Up @@ -642,7 +642,6 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) (
wf.Error = errors.New(*errorStr)
}

// XXX maybe set wf.Output to outputString and run deserialize out of system DB
wf.Output, err = deserialize(outputString)
if err != nil {
return nil, fmt.Errorf("failed to deserialize output: %w", err)
Expand Down Expand Up @@ -1511,7 +1510,7 @@ func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error {
}

// Recv is a special type of step that receives a message destined for a given workflow
func (s *sysDB) recv(ctx context.Context, input WorkflowRecvInput) (any, error) {
func (s *sysDB) recv(ctx context.Context, input RecvInput) (any, error) {
functionName := "DBOS.recv"

// Get workflow state from context
Expand Down Expand Up @@ -1734,7 +1733,7 @@ func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error
return nil
}

func (s *sysDB) getEvent(ctx context.Context, input WorkflowGetEventInput) (any, error) {
func (s *sysDB) getEvent(ctx context.Context, input GetEventInput) (any, error) {
functionName := "DBOS.getEvent"

// Get workflow state from context (optional for GetEvent as we can get an event from outside a workflow)
Expand Down
88 changes: 35 additions & 53 deletions dbos/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o

// If the afterFunc has started, the workflow was cancelled and the status should be set to cancelled
if stopFunc != nil && !stopFunc() {
c.logger.Info("Workflow was cancelled. Waiting for cancel function to complete", "workflow_id", workflowID)
// Wait for the cancel function to complete
// Note this must happen before we write on the outcome channel (and signal the handler's GetResult)
<-cancelFuncCompleted
Expand All @@ -817,7 +818,7 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
close(outcomeChan)
}()

return newWorkflowHandle[any](uncancellableCtx, workflowID, outcomeChan), nil
return newWorkflowHandle(uncancellableCtx, workflowID, outcomeChan), nil
}

/******************************/
Expand Down Expand Up @@ -1082,50 +1083,40 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
/******* WORKFLOW COMMUNICATIONS ********/
/****************************************/

// GenericWorkflowSendInput defines the parameters for sending a message to another workflow.
type GenericWorkflowSendInput[P any] struct {
DestinationID string // Workflow ID to send the message to
Message P // Message payload (must be gob-encodable)
Topic string // Optional topic for message filtering
}

func (c *dbosContext) Send(_ DBOSContext, input WorkflowSendInput) error {
return c.systemDB.send(c, input)
func (c *dbosContext) Send(_ DBOSContext, destinationID string, message any, topic string) error {
return c.systemDB.send(c, WorkflowSendInput{
DestinationID: destinationID,
Message: message,
Topic: topic,
})
}

// Send sends a message to another workflow with type safety.
// The message type R is automatically registered for gob encoding.
// The message type P is automatically registered for gob encoding.
//
// Send can be called from within a workflow (as a durable step) or from outside workflows.
// When called within a workflow, the send operation becomes part of the workflow's durable state.
//
// Example:
//
// err := dbos.Send(ctx, dbos.GenericWorkflowSendInput[string]{
// DestinationID: "target-workflow-id",
// Message: "Hello from sender",
// Topic: "notifications",
// })
func Send[P any](ctx DBOSContext, input GenericWorkflowSendInput[P]) error {
// err := dbos.Send(ctx, "target-workflow-id", "Hello from sender", "notifications")
func Send[P any](ctx DBOSContext, destinationID string, message P, topic string) error {
if ctx == nil {
return errors.New("ctx cannot be nil")
}
var typedMessage P
gob.Register(typedMessage)
return ctx.Send(ctx, WorkflowSendInput{
DestinationID: input.DestinationID,
Message: input.Message,
Topic: input.Topic,
})
return ctx.Send(ctx, destinationID, message, topic)
}

// WorkflowRecvInput defines the parameters for receiving messages sent to this workflow.
type WorkflowRecvInput struct {
// RecvInput defines the parameters for receiving messages sent to this workflow.
type RecvInput struct {
Topic string // Topic to listen for (empty string receives from default topic)
Timeout time.Duration // Maximum time to wait for a message
}

func (c *dbosContext) Recv(_ DBOSContext, input WorkflowRecvInput) (any, error) {
func (c *dbosContext) Recv(_ DBOSContext, input RecvInput) (any, error) {
return c.systemDB.recv(c, input)
}

Expand All @@ -1138,7 +1129,7 @@ func (c *dbosContext) Recv(_ DBOSContext, input WorkflowRecvInput) (any, error)
//
// Example:
//
// message, err := dbos.Recv[string](ctx, dbos.WorkflowRecvInput{
// message, err := dbos.Recv[string](ctx, dbos.RecvInput{
// Topic: "notifications",
// Timeout: 30 * time.Second,
// })
Expand All @@ -1147,7 +1138,7 @@ func (c *dbosContext) Recv(_ DBOSContext, input WorkflowRecvInput) (any, error)
// return err
// }
// log.Printf("Received: %s", message)
func Recv[R any](ctx DBOSContext, input WorkflowRecvInput) (R, error) {
func Recv[R any](ctx DBOSContext, input RecvInput) (R, error) {
if ctx == nil {
return *new(R), errors.New("ctx cannot be nil")
}
Expand All @@ -1167,49 +1158,40 @@ func Recv[R any](ctx DBOSContext, input WorkflowRecvInput) (R, error) {
return typedMessage, nil
}

// GenericWorkflowSetEventInput defines the parameters for setting a workflow event.
type GenericWorkflowSetEventInput[P any] struct {
Key string // Event key identifier
Message P // Event value (must be gob-encodable)
}

func (c *dbosContext) SetEvent(_ DBOSContext, input WorkflowSetEventInput) error {
return c.systemDB.setEvent(c, input)
func (c *dbosContext) SetEvent(_ DBOSContext, key string, message any) error {
return c.systemDB.setEvent(c, WorkflowSetEventInput{
Key: key,
Message: message,
})
}

// SetEvent sets a key-value event for the current workflow with type safety.
// Events are persistent and can be retrieved by other workflows using GetEvent.
// The event type R is automatically registered for gob encoding.
// The event type P is automatically registered for gob encoding.
//
// SetEvent can only be called from within a workflow and becomes part of the workflow's durable state.
// Setting an event with the same key will overwrite the previous value.
//
// Example:
//
// err := dbos.SetEvent(ctx, dbos.GenericWorkflowSetEventInput[string]{
// Key: "status",
// Message: "processing-complete",
// })
func SetEvent[P any](ctx DBOSContext, input GenericWorkflowSetEventInput[P]) error {
// err := dbos.SetEvent(ctx, "status", "processing-complete")
func SetEvent[P any](ctx DBOSContext, key string, message P) error {
if ctx == nil {
return errors.New("ctx cannot be nil")
}
var typedMessage P
gob.Register(typedMessage)
return ctx.SetEvent(ctx, WorkflowSetEventInput{
Key: input.Key,
Message: input.Message,
})
return ctx.SetEvent(ctx, key, message)
}

// WorkflowGetEventInput defines the parameters for retrieving an event from a workflow.
type WorkflowGetEventInput struct {
// GetEventInput defines the parameters for retrieving an event from a workflow.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to make one in the pair flat, they should both be flat

type GetEventInput struct {
TargetWorkflowID string // Workflow ID to get the event from
Key string // Event key to retrieve
Timeout time.Duration // Maximum time to wait for the event to be set
}

func (c *dbosContext) GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any, error) {
func (c *dbosContext) GetEvent(_ DBOSContext, input GetEventInput) (any, error) {
return c.systemDB.getEvent(c, input)
}

Expand All @@ -1221,7 +1203,7 @@ func (c *dbosContext) GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any,
//
// Example:
//
// status, err := dbos.GetEvent[string](ctx, dbos.WorkflowGetEventInput{
// status, err := dbos.GetEvent[string](ctx, dbos.GetEventInput{
// TargetWorkflowID: "target-workflow-id",
// Key: "status",
// Timeout: 30 * time.Second,
Expand All @@ -1231,7 +1213,7 @@ func (c *dbosContext) GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any,
// return err
// }
// log.Printf("Status: %s", status)
func GetEvent[R any](ctx DBOSContext, input WorkflowGetEventInput) (R, error) {
func GetEvent[R any](ctx DBOSContext, input GetEventInput) (R, error) {
if ctx == nil {
return *new(R), errors.New("ctx cannot be nil")
}
Expand All @@ -1250,7 +1232,7 @@ func GetEvent[R any](ctx DBOSContext, input WorkflowGetEventInput) (R, error) {
return typedValue, nil
}

func (c *dbosContext) Sleep(duration time.Duration) (time.Duration, error) {
func (c *dbosContext) Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) {
return c.systemDB.sleep(c, duration)
}

Expand All @@ -1269,7 +1251,7 @@ func Sleep(ctx DBOSContext, duration time.Duration) (time.Duration, error) {
if ctx == nil {
return 0, errors.New("ctx cannot be nil")
}
return ctx.Sleep(duration)
return ctx.Sleep(ctx, duration)
}

/***********************************/
Expand Down Expand Up @@ -1550,7 +1532,7 @@ func Enqueue[P any, R any](ctx DBOSContext, params GenericEnqueueOptions[P]) (Wo
// - workflowID: The unique identifier of the workflow to cancel
//
// Returns an error if the workflow does not exist or if the cancellation operation fails.
func (c *dbosContext) CancelWorkflow(workflowID string) error {
func (c *dbosContext) CancelWorkflow(_ DBOSContext, workflowID string) error {
return c.systemDB.cancelWorkflow(c, workflowID)
}

Expand All @@ -1573,7 +1555,7 @@ func CancelWorkflow(ctx DBOSContext, workflowID string) error {
if ctx == nil {
return errors.New("ctx cannot be nil")
}
return ctx.CancelWorkflow(workflowID)
return ctx.CancelWorkflow(ctx, workflowID)
}

func (c *dbosContext) ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) {
Expand Down
Loading
Loading