@@ -794,6 +794,7 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
794794
795795 // If the afterFunc has started, the workflow was cancelled and the status should be set to cancelled
796796 if stopFunc != nil && ! stopFunc () {
797+ c .logger .Info ("Workflow was cancelled. Waiting for cancel function to complete" , "workflow_id" , workflowID )
797798 // Wait for the cancel function to complete
798799 // Note this must happen before we write on the outcome channel (and signal the handler's GetResult)
799800 <- cancelFuncCompleted
@@ -817,7 +818,7 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
817818 close (outcomeChan )
818819 }()
819820
820- return newWorkflowHandle [ any ] (uncancellableCtx , workflowID , outcomeChan ), nil
821+ return newWorkflowHandle (uncancellableCtx , workflowID , outcomeChan ), nil
821822}
822823
823824/******************************/
@@ -1082,50 +1083,40 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
10821083/******* WORKFLOW COMMUNICATIONS ********/
10831084/****************************************/
10841085
1085- // GenericWorkflowSendInput defines the parameters for sending a message to another workflow.
1086- type GenericWorkflowSendInput [P any ] struct {
1087- DestinationID string // Workflow ID to send the message to
1088- Message P // Message payload (must be gob-encodable)
1089- Topic string // Optional topic for message filtering
1090- }
10911086
1092- func (c * dbosContext ) Send (_ DBOSContext , input WorkflowSendInput ) error {
1093- return c .systemDB .send (c , input )
1087+ func (c * dbosContext ) Send (_ DBOSContext , destinationID string , message any , topic string ) error {
1088+ return c .systemDB .send (c , WorkflowSendInput {
1089+ DestinationID : destinationID ,
1090+ Message : message ,
1091+ Topic : topic ,
1092+ })
10941093}
10951094
10961095// Send sends a message to another workflow with type safety.
1097- // The message type R is automatically registered for gob encoding.
1096+ // The message type P is automatically registered for gob encoding.
10981097//
10991098// Send can be called from within a workflow (as a durable step) or from outside workflows.
11001099// When called within a workflow, the send operation becomes part of the workflow's durable state.
11011100//
11021101// Example:
11031102//
1104- // err := dbos.Send(ctx, dbos.GenericWorkflowSendInput[string]{
1105- // DestinationID: "target-workflow-id",
1106- // Message: "Hello from sender",
1107- // Topic: "notifications",
1108- // })
1109- func Send [P any ](ctx DBOSContext , input GenericWorkflowSendInput [P ]) error {
1103+ // err := dbos.Send(ctx, "target-workflow-id", "Hello from sender", "notifications")
1104+ func Send [P any ](ctx DBOSContext , destinationID string , message P , topic string ) error {
11101105 if ctx == nil {
11111106 return errors .New ("ctx cannot be nil" )
11121107 }
11131108 var typedMessage P
11141109 gob .Register (typedMessage )
1115- return ctx .Send (ctx , WorkflowSendInput {
1116- DestinationID : input .DestinationID ,
1117- Message : input .Message ,
1118- Topic : input .Topic ,
1119- })
1110+ return ctx .Send (ctx , destinationID , message , topic )
11201111}
11211112
1122- // WorkflowRecvInput defines the parameters for receiving messages sent to this workflow.
1123- type WorkflowRecvInput struct {
1113+ // RecvInput defines the parameters for receiving messages sent to this workflow.
1114+ type RecvInput struct {
11241115 Topic string // Topic to listen for (empty string receives from default topic)
11251116 Timeout time.Duration // Maximum time to wait for a message
11261117}
11271118
1128- func (c * dbosContext ) Recv (_ DBOSContext , input WorkflowRecvInput ) (any , error ) {
1119+ func (c * dbosContext ) Recv (_ DBOSContext , input RecvInput ) (any , error ) {
11291120 return c .systemDB .recv (c , input )
11301121}
11311122
@@ -1138,7 +1129,7 @@ func (c *dbosContext) Recv(_ DBOSContext, input WorkflowRecvInput) (any, error)
11381129//
11391130// Example:
11401131//
1141- // message, err := dbos.Recv[string](ctx, dbos.WorkflowRecvInput {
1132+ // message, err := dbos.Recv[string](ctx, dbos.RecvInput {
11421133// Topic: "notifications",
11431134// Timeout: 30 * time.Second,
11441135// })
@@ -1147,7 +1138,7 @@ func (c *dbosContext) Recv(_ DBOSContext, input WorkflowRecvInput) (any, error)
11471138// return err
11481139// }
11491140// log.Printf("Received: %s", message)
1150- func Recv [R any ](ctx DBOSContext , input WorkflowRecvInput ) (R , error ) {
1141+ func Recv [R any ](ctx DBOSContext , input RecvInput ) (R , error ) {
11511142 if ctx == nil {
11521143 return * new (R ), errors .New ("ctx cannot be nil" )
11531144 }
@@ -1167,49 +1158,40 @@ func Recv[R any](ctx DBOSContext, input WorkflowRecvInput) (R, error) {
11671158 return typedMessage , nil
11681159}
11691160
1170- // GenericWorkflowSetEventInput defines the parameters for setting a workflow event.
1171- type GenericWorkflowSetEventInput [P any ] struct {
1172- Key string // Event key identifier
1173- Message P // Event value (must be gob-encodable)
1174- }
1175-
1176- func (c * dbosContext ) SetEvent (_ DBOSContext , input WorkflowSetEventInput ) error {
1177- return c .systemDB .setEvent (c , input )
1161+ func (c * dbosContext ) SetEvent (_ DBOSContext , key string , message any ) error {
1162+ return c .systemDB .setEvent (c , WorkflowSetEventInput {
1163+ Key : key ,
1164+ Message : message ,
1165+ })
11781166}
11791167
11801168// SetEvent sets a key-value event for the current workflow with type safety.
11811169// Events are persistent and can be retrieved by other workflows using GetEvent.
1182- // The event type R is automatically registered for gob encoding.
1170+ // The event type P is automatically registered for gob encoding.
11831171//
11841172// SetEvent can only be called from within a workflow and becomes part of the workflow's durable state.
11851173// Setting an event with the same key will overwrite the previous value.
11861174//
11871175// Example:
11881176//
1189- // err := dbos.SetEvent(ctx, dbos.GenericWorkflowSetEventInput[string]{
1190- // Key: "status",
1191- // Message: "processing-complete",
1192- // })
1193- func SetEvent [P any ](ctx DBOSContext , input GenericWorkflowSetEventInput [P ]) error {
1177+ // err := dbos.SetEvent(ctx, "status", "processing-complete")
1178+ func SetEvent [P any ](ctx DBOSContext , key string , message P ) error {
11941179 if ctx == nil {
11951180 return errors .New ("ctx cannot be nil" )
11961181 }
11971182 var typedMessage P
11981183 gob .Register (typedMessage )
1199- return ctx .SetEvent (ctx , WorkflowSetEventInput {
1200- Key : input .Key ,
1201- Message : input .Message ,
1202- })
1184+ return ctx .SetEvent (ctx , key , message )
12031185}
12041186
1205- // WorkflowGetEventInput defines the parameters for retrieving an event from a workflow.
1206- type WorkflowGetEventInput struct {
1187+ // GetEventInput defines the parameters for retrieving an event from a workflow.
1188+ type GetEventInput struct {
12071189 TargetWorkflowID string // Workflow ID to get the event from
12081190 Key string // Event key to retrieve
12091191 Timeout time.Duration // Maximum time to wait for the event to be set
12101192}
12111193
1212- func (c * dbosContext ) GetEvent (_ DBOSContext , input WorkflowGetEventInput ) (any , error ) {
1194+ func (c * dbosContext ) GetEvent (_ DBOSContext , input GetEventInput ) (any , error ) {
12131195 return c .systemDB .getEvent (c , input )
12141196}
12151197
@@ -1221,7 +1203,7 @@ func (c *dbosContext) GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any,
12211203//
12221204// Example:
12231205//
1224- // status, err := dbos.GetEvent[string](ctx, dbos.WorkflowGetEventInput {
1206+ // status, err := dbos.GetEvent[string](ctx, dbos.GetEventInput {
12251207// TargetWorkflowID: "target-workflow-id",
12261208// Key: "status",
12271209// Timeout: 30 * time.Second,
@@ -1231,7 +1213,7 @@ func (c *dbosContext) GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any,
12311213// return err
12321214// }
12331215// log.Printf("Status: %s", status)
1234- func GetEvent [R any ](ctx DBOSContext , input WorkflowGetEventInput ) (R , error ) {
1216+ func GetEvent [R any ](ctx DBOSContext , input GetEventInput ) (R , error ) {
12351217 if ctx == nil {
12361218 return * new (R ), errors .New ("ctx cannot be nil" )
12371219 }
@@ -1250,7 +1232,7 @@ func GetEvent[R any](ctx DBOSContext, input WorkflowGetEventInput) (R, error) {
12501232 return typedValue , nil
12511233}
12521234
1253- func (c * dbosContext ) Sleep (duration time.Duration ) (time.Duration , error ) {
1235+ func (c * dbosContext ) Sleep (_ DBOSContext , duration time.Duration ) (time.Duration , error ) {
12541236 return c .systemDB .sleep (c , duration )
12551237}
12561238
@@ -1269,7 +1251,7 @@ func Sleep(ctx DBOSContext, duration time.Duration) (time.Duration, error) {
12691251 if ctx == nil {
12701252 return 0 , errors .New ("ctx cannot be nil" )
12711253 }
1272- return ctx .Sleep (duration )
1254+ return ctx .Sleep (ctx , duration )
12731255}
12741256
12751257/***********************************/
@@ -1550,7 +1532,7 @@ func Enqueue[P any, R any](ctx DBOSContext, params GenericEnqueueOptions[P]) (Wo
15501532// - workflowID: The unique identifier of the workflow to cancel
15511533//
15521534// Returns an error if the workflow does not exist or if the cancellation operation fails.
1553- func (c * dbosContext ) CancelWorkflow (workflowID string ) error {
1535+ func (c * dbosContext ) CancelWorkflow (_ DBOSContext , workflowID string ) error {
15541536 return c .systemDB .cancelWorkflow (c , workflowID )
15551537}
15561538
@@ -1573,7 +1555,7 @@ func CancelWorkflow(ctx DBOSContext, workflowID string) error {
15731555 if ctx == nil {
15741556 return errors .New ("ctx cannot be nil" )
15751557 }
1576- return ctx .CancelWorkflow (workflowID )
1558+ return ctx .CancelWorkflow (ctx , workflowID )
15771559}
15781560
15791561func (c * dbosContext ) ResumeWorkflow (_ DBOSContext , workflowID string ) (WorkflowHandle [any ], error ) {
0 commit comments