diff --git a/README.md b/README.md index b128bda2..39e773ed 100644 --- a/README.md +++ b/README.md @@ -241,15 +241,12 @@ For example, build a reliable billing workflow that durably waits for a notifica ```golang func sendWorkflow(ctx dbos.DBOSContext, message string) (string, error) { - err := dbos.Send(ctx, dbos.WorkflowSendInput[string]{ - DestinationID: "receiverID", - Topic: "topic", - Message: message, - }) + err := dbos.Send(ctx, "receiverID", message, "topic") + return "sent", err } func receiveWorkflow(ctx dbos.DBOSContext, topic string) (string, error) { - return dbos.Recv[string](ctx, dbos.WorkflowRecvInput{Topic: topic, Timeout: 48 * time.Hour}) + return dbos.Recv[string](ctx, topic, 48 * time.Hour) } // Start a receiver in the background diff --git a/dbos/admin_server.go b/dbos/admin_server.go index ff99afe0..a95dc62a 100644 --- a/dbos/admin_server.go +++ b/dbos/admin_server.go @@ -412,7 +412,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer { workflowID := r.PathValue("id") ctx.logger.Info("Cancelling workflow", "workflow_id", workflowID) - err := ctx.CancelWorkflow(workflowID) + err := ctx.CancelWorkflow(ctx, workflowID) if err != nil { ctx.logger.Error("Failed to cancel workflow", "workflow_id", workflowID, "error", err) http.Error(w, fmt.Sprintf("Failed to cancel workflow: %v", err), http.StatusInternalServerError) diff --git a/dbos/dbos.go b/dbos/dbos.go index cb464468..501dca60 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -77,18 +77,18 @@ type DBOSContext interface { // Workflow operations RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution - Send(_ DBOSContext, input WorkflowSendInput) error // Send a message to another workflow - Recv(_ DBOSContext, input WorkflowRecvInput) (any, error) // Receive a message sent to this workflow - SetEvent(_ DBOSContext, input WorkflowSetEventInput) error // Set a key-value event for this workflow - GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any, error) // Get a key-value event from a target workflow - Sleep(duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery + Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow + Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow + SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow + GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow + Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows) GetStepID() (int, error) // Get the current step ID (only available within workflows) // Workflow management RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) // Enqueue a new workflow with parameters - CancelWorkflow(workflowID string) error // Cancel a workflow by setting its status to CANCELLED + CancelWorkflow(_ DBOSContext, workflowID string) error // Cancel a workflow by setting its status to CANCELLED ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria diff --git a/dbos/serialization_test.go b/dbos/serialization_test.go index 9eed3d62..180117b3 100644 --- a/dbos/serialization_test.go +++ b/dbos/serialization_test.go @@ -212,7 +212,7 @@ func setEventUserDefinedTypeWorkflow(ctx DBOSContext, input string) (string, err }, } - err := SetEvent(ctx, GenericWorkflowSetEventInput[UserDefinedEventData]{Key: input, Message: eventData}) + err := SetEvent(ctx, input, eventData) if err != nil { return "", err } @@ -236,11 +236,7 @@ func TestSetEventSerialize(t *testing.T) { assert.Equal(t, "user-defined-event-set", result) // Retrieve the event to verify it was properly serialized and can be deserialized - retrievedEvent, err := GetEvent[UserDefinedEventData](executor, WorkflowGetEventInput{ - TargetWorkflowID: setHandle.GetWorkflowID(), - Key: "user-defined-key", - Timeout: 3 * time.Second, - }) + retrievedEvent, err := GetEvent[UserDefinedEventData](executor, setHandle.GetWorkflowID(), "user-defined-key", 3 * time.Second) require.NoError(t, err) // Verify the retrieved data matches what we set @@ -268,12 +264,7 @@ func sendUserDefinedTypeWorkflow(ctx DBOSContext, destinationID string) (string, } // Send should automatically register this type with gob - // Note the explicit type parameter since compiler cannot infer UserDefinedEventData from string input - err := Send(ctx, GenericWorkflowSendInput[UserDefinedEventData]{ - DestinationID: destinationID, - Topic: "user-defined-topic", - Message: sendData, - }) + err := Send(ctx, destinationID, sendData, "user-defined-topic") if err != nil { return "", err } @@ -282,10 +273,7 @@ func sendUserDefinedTypeWorkflow(ctx DBOSContext, destinationID string) (string, func recvUserDefinedTypeWorkflow(ctx DBOSContext, input string) (UserDefinedEventData, error) { // Receive the user-defined type message - result, err := Recv[UserDefinedEventData](ctx, WorkflowRecvInput{ - Topic: "user-defined-topic", - Timeout: 3 * time.Second, - }) + result, err := Recv[UserDefinedEventData](ctx, "user-defined-topic", 3 * time.Second) return result, err } diff --git a/dbos/system_database.go b/dbos/system_database.go index 2388ceab..4de79579 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -50,9 +50,9 @@ type systemDatabase interface { // Communication (special steps) send(ctx context.Context, input WorkflowSendInput) error - recv(ctx context.Context, input WorkflowRecvInput) (any, error) + recv(ctx context.Context, input recvInput) (any, error) setEvent(ctx context.Context, input WorkflowSetEventInput) error - getEvent(ctx context.Context, input WorkflowGetEventInput) (any, error) + getEvent(ctx context.Context, input getEventInput) (any, error) // Timers (special steps) sleep(ctx context.Context, duration time.Duration) (time.Duration, error) @@ -642,7 +642,6 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( wf.Error = errors.New(*errorStr) } - // XXX maybe set wf.Output to outputString and run deserialize out of system DB wf.Output, err = deserialize(outputString) if err != nil { return nil, fmt.Errorf("failed to deserialize output: %w", err) @@ -1511,7 +1510,7 @@ func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error { } // Recv is a special type of step that receives a message destined for a given workflow -func (s *sysDB) recv(ctx context.Context, input WorkflowRecvInput) (any, error) { +func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) { functionName := "DBOS.recv" // Get workflow state from context @@ -1734,7 +1733,7 @@ func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error return nil } -func (s *sysDB) getEvent(ctx context.Context, input WorkflowGetEventInput) (any, error) { +func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error) { functionName := "DBOS.getEvent" // Get workflow state from context (optional for GetEvent as we can get an event from outside a workflow) diff --git a/dbos/workflow.go b/dbos/workflow.go index c6828003..d13fcefe 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -794,6 +794,7 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o // If the afterFunc has started, the workflow was cancelled and the status should be set to cancelled if stopFunc != nil && !stopFunc() { + c.logger.Info("Workflow was cancelled. Waiting for cancel function to complete", "workflow_id", workflowID) // Wait for the cancel function to complete // Note this must happen before we write on the outcome channel (and signal the handler's GetResult) <-cancelFuncCompleted @@ -817,7 +818,7 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o close(outcomeChan) }() - return newWorkflowHandle[any](uncancellableCtx, workflowID, outcomeChan), nil + return newWorkflowHandle(uncancellableCtx, workflowID, outcomeChan), nil } /******************************/ @@ -1082,50 +1083,43 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) /******* WORKFLOW COMMUNICATIONS ********/ /****************************************/ -// GenericWorkflowSendInput defines the parameters for sending a message to another workflow. -type GenericWorkflowSendInput[P any] struct { - DestinationID string // Workflow ID to send the message to - Message P // Message payload (must be gob-encodable) - Topic string // Optional topic for message filtering -} - -func (c *dbosContext) Send(_ DBOSContext, input WorkflowSendInput) error { - return c.systemDB.send(c, input) +func (c *dbosContext) Send(_ DBOSContext, destinationID string, message any, topic string) error { + return c.systemDB.send(c, WorkflowSendInput{ + DestinationID: destinationID, + Message: message, + Topic: topic, + }) } // Send sends a message to another workflow with type safety. -// The message type R is automatically registered for gob encoding. +// The message type P is automatically registered for gob encoding. // // Send can be called from within a workflow (as a durable step) or from outside workflows. // When called within a workflow, the send operation becomes part of the workflow's durable state. // // Example: // -// err := dbos.Send(ctx, dbos.GenericWorkflowSendInput[string]{ -// DestinationID: "target-workflow-id", -// Message: "Hello from sender", -// Topic: "notifications", -// }) -func Send[P any](ctx DBOSContext, input GenericWorkflowSendInput[P]) error { +// err := dbos.Send(ctx, "target-workflow-id", "Hello from sender", "notifications") +func Send[P any](ctx DBOSContext, destinationID string, message P, topic string) error { if ctx == nil { return errors.New("ctx cannot be nil") } var typedMessage P gob.Register(typedMessage) - return ctx.Send(ctx, WorkflowSendInput{ - DestinationID: input.DestinationID, - Message: input.Message, - Topic: input.Topic, - }) + return ctx.Send(ctx, destinationID, message, topic) } -// WorkflowRecvInput defines the parameters for receiving messages sent to this workflow. -type WorkflowRecvInput struct { +// recvInput defines the parameters for receiving messages sent to this workflow. +type recvInput struct { Topic string // Topic to listen for (empty string receives from default topic) Timeout time.Duration // Maximum time to wait for a message } -func (c *dbosContext) Recv(_ DBOSContext, input WorkflowRecvInput) (any, error) { +func (c *dbosContext) Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) { + input := recvInput{ + Topic: topic, + Timeout: timeout, + } return c.systemDB.recv(c, input) } @@ -1138,20 +1132,17 @@ func (c *dbosContext) Recv(_ DBOSContext, input WorkflowRecvInput) (any, error) // // Example: // -// message, err := dbos.Recv[string](ctx, dbos.WorkflowRecvInput{ -// Topic: "notifications", -// Timeout: 30 * time.Second, -// }) +// message, err := dbos.Recv[string](ctx, "notifications", 30 * time.Second) // if err != nil { // // Handle timeout or error // return err // } // log.Printf("Received: %s", message) -func Recv[R any](ctx DBOSContext, input WorkflowRecvInput) (R, error) { +func Recv[R any](ctx DBOSContext, topic string, timeout time.Duration) (R, error) { if ctx == nil { return *new(R), errors.New("ctx cannot be nil") } - msg, err := ctx.Recv(ctx, input) + msg, err := ctx.Recv(ctx, topic, timeout) if err != nil { return *new(R), err } @@ -1167,49 +1158,45 @@ func Recv[R any](ctx DBOSContext, input WorkflowRecvInput) (R, error) { return typedMessage, nil } -// GenericWorkflowSetEventInput defines the parameters for setting a workflow event. -type GenericWorkflowSetEventInput[P any] struct { - Key string // Event key identifier - Message P // Event value (must be gob-encodable) -} - -func (c *dbosContext) SetEvent(_ DBOSContext, input WorkflowSetEventInput) error { - return c.systemDB.setEvent(c, input) +func (c *dbosContext) SetEvent(_ DBOSContext, key string, message any) error { + return c.systemDB.setEvent(c, WorkflowSetEventInput{ + Key: key, + Message: message, + }) } // SetEvent sets a key-value event for the current workflow with type safety. // Events are persistent and can be retrieved by other workflows using GetEvent. -// The event type R is automatically registered for gob encoding. +// The event type P is automatically registered for gob encoding. // // SetEvent can only be called from within a workflow and becomes part of the workflow's durable state. // Setting an event with the same key will overwrite the previous value. // // Example: // -// err := dbos.SetEvent(ctx, dbos.GenericWorkflowSetEventInput[string]{ -// Key: "status", -// Message: "processing-complete", -// }) -func SetEvent[P any](ctx DBOSContext, input GenericWorkflowSetEventInput[P]) error { +// err := dbos.SetEvent(ctx, "status", "processing-complete") +func SetEvent[P any](ctx DBOSContext, key string, message P) error { if ctx == nil { return errors.New("ctx cannot be nil") } var typedMessage P gob.Register(typedMessage) - return ctx.SetEvent(ctx, WorkflowSetEventInput{ - Key: input.Key, - Message: input.Message, - }) + return ctx.SetEvent(ctx, key, message) } -// WorkflowGetEventInput defines the parameters for retrieving an event from a workflow. -type WorkflowGetEventInput struct { +// getEventInput defines the parameters for retrieving an event from a workflow. +type getEventInput struct { TargetWorkflowID string // Workflow ID to get the event from Key string // Event key to retrieve Timeout time.Duration // Maximum time to wait for the event to be set } -func (c *dbosContext) GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any, error) { +func (c *dbosContext) GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) { + input := getEventInput{ + TargetWorkflowID: targetWorkflowID, + Key: key, + Timeout: timeout, + } return c.systemDB.getEvent(c, input) } @@ -1221,21 +1208,17 @@ func (c *dbosContext) GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any, // // Example: // -// status, err := dbos.GetEvent[string](ctx, dbos.WorkflowGetEventInput{ -// TargetWorkflowID: "target-workflow-id", -// Key: "status", -// Timeout: 30 * time.Second, -// }) +// status, err := dbos.GetEvent[string](ctx, "target-workflow-id", "status", 30 * time.Second) // if err != nil { // // Handle timeout or error // return err // } // log.Printf("Status: %s", status) -func GetEvent[R any](ctx DBOSContext, input WorkflowGetEventInput) (R, error) { +func GetEvent[R any](ctx DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (R, error) { if ctx == nil { return *new(R), errors.New("ctx cannot be nil") } - value, err := ctx.GetEvent(ctx, input) + value, err := ctx.GetEvent(ctx, targetWorkflowID, key, timeout) if err != nil { return *new(R), err } @@ -1250,7 +1233,7 @@ func GetEvent[R any](ctx DBOSContext, input WorkflowGetEventInput) (R, error) { return typedValue, nil } -func (c *dbosContext) Sleep(duration time.Duration) (time.Duration, error) { +func (c *dbosContext) Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) { return c.systemDB.sleep(c, duration) } @@ -1269,7 +1252,7 @@ func Sleep(ctx DBOSContext, duration time.Duration) (time.Duration, error) { if ctx == nil { return 0, errors.New("ctx cannot be nil") } - return ctx.Sleep(duration) + return ctx.Sleep(ctx, duration) } /***********************************/ @@ -1550,7 +1533,7 @@ func Enqueue[P any, R any](ctx DBOSContext, params GenericEnqueueOptions[P]) (Wo // - workflowID: The unique identifier of the workflow to cancel // // Returns an error if the workflow does not exist or if the cancellation operation fails. -func (c *dbosContext) CancelWorkflow(workflowID string) error { +func (c *dbosContext) CancelWorkflow(_ DBOSContext, workflowID string) error { return c.systemDB.cancelWorkflow(c, workflowID) } @@ -1573,7 +1556,7 @@ func CancelWorkflow(ctx DBOSContext, workflowID string) error { if ctx == nil { return errors.New("ctx cannot be nil") } - return ctx.CancelWorkflow(workflowID) + return ctx.CancelWorkflow(ctx, workflowID) } func (c *dbosContext) ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) { diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 7ca205fd..12fb8288 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -1292,19 +1292,15 @@ type sendWorkflowInput struct { } func sendWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, error) { - err := Send(ctx, GenericWorkflowSendInput[string]{ - DestinationID: input.DestinationID, - Topic: input.Topic, - Message: "message1", - }) + err := Send(ctx, input.DestinationID, "message1", input.Topic) if err != nil { return "", err } - err = Send(ctx, GenericWorkflowSendInput[string]{DestinationID: input.DestinationID, Topic: input.Topic, Message: "message2"}) + err = Send(ctx, input.DestinationID, "message2", input.Topic) if err != nil { return "", err } - err = Send(ctx, GenericWorkflowSendInput[string]{DestinationID: input.DestinationID, Topic: input.Topic, Message: "message3"}) + err = Send(ctx, input.DestinationID, "message3", input.Topic) if err != nil { return "", err } @@ -1312,15 +1308,15 @@ func sendWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, error) { } func receiveWorkflow(ctx DBOSContext, topic string) (string, error) { - msg1, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 10 * time.Second}) + msg1, err := Recv[string](ctx, topic, 10 * time.Second) if err != nil { return "", err } - msg2, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 10 * time.Second}) + msg2, err := Recv[string](ctx, topic, 10 * time.Second) if err != nil { return "", err } - msg3, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 10 * time.Second}) + msg3, err := Recv[string](ctx, topic, 10 * time.Second) if err != nil { return "", err } @@ -1339,7 +1335,7 @@ func receiveWorkflowCoordinated(ctx DBOSContext, input struct { concurrentRecvStartEvent.Wait() // Do a single Recv call with timeout - msg, err := Recv[string](ctx, WorkflowRecvInput{Topic: input.Topic, Timeout: 3 * time.Second}) + msg, err := Recv[string](ctx, input.Topic, 3 * time.Second) if err != nil { return "", err } @@ -1348,16 +1344,16 @@ func receiveWorkflowCoordinated(ctx DBOSContext, input struct { func sendStructWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, error) { testStruct := sendRecvType{Value: "test-struct-value"} - err := Send(ctx, GenericWorkflowSendInput[sendRecvType]{DestinationID: input.DestinationID, Topic: input.Topic, Message: testStruct}) + err := Send(ctx, input.DestinationID, testStruct, input.Topic) return "", err } func receiveStructWorkflow(ctx DBOSContext, topic string) (sendRecvType, error) { - return Recv[sendRecvType](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second}) + return Recv[sendRecvType](ctx, topic, 3 * time.Second) } func sendIdempotencyWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, error) { - err := Send(ctx, GenericWorkflowSendInput[string]{DestinationID: input.DestinationID, Topic: input.Topic, Message: "m1"}) + err := Send(ctx, input.DestinationID, "m1", input.Topic) if err != nil { return "", err } @@ -1366,7 +1362,7 @@ func sendIdempotencyWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, } func receiveIdempotencyWorkflow(ctx DBOSContext, topic string) (string, error) { - msg, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second}) + msg, err := Recv[string](ctx, topic, 3 * time.Second) if err != nil { // Unlock the test in this case receiveIdempotencyStartEvent.Set() @@ -1378,11 +1374,7 @@ func receiveIdempotencyWorkflow(ctx DBOSContext, topic string) (string, error) { } func stepThatCallsSend(ctx context.Context, input sendWorkflowInput) (string, error) { - err := Send(ctx.(DBOSContext), GenericWorkflowSendInput[string]{ - DestinationID: input.DestinationID, - Topic: input.Topic, - Message: "message-from-step", - }) + err := Send(ctx.(DBOSContext), input.DestinationID, "message-from-step", input.Topic) if err != nil { return "", err } @@ -1519,7 +1511,7 @@ func TestSendRecv(t *testing.T) { t.Run("RecvMustRunInsideWorkflows", func(t *testing.T) { // Attempt to run Recv outside of a workflow context - _, err := Recv[string](dbosCtx, WorkflowRecvInput{Topic: "test-topic", Timeout: 1 * time.Second}) + _, err := Recv[string](dbosCtx, "test-topic", 1 * time.Second) require.Error(t, err, "expected error when running Recv outside of workflow context, but got none") // Check the error type @@ -1539,11 +1531,7 @@ func TestSendRecv(t *testing.T) { // Send messages from outside a workflow context for i := range 3 { - err = Send(dbosCtx, GenericWorkflowSendInput[string]{ - DestinationID: receiveHandle.GetWorkflowID(), - Topic: "outside-workflow-topic", - Message: fmt.Sprintf("message%d", i+1), - }) + err = Send(dbosCtx, receiveHandle.GetWorkflowID(), fmt.Sprintf("message%d", i+1), "outside-workflow-topic") require.NoError(t, err, "failed to send message%d from outside workflow", i+1) } @@ -1726,7 +1714,7 @@ type setEventWorkflowInput struct { } func setEventWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, error) { - err := SetEvent(ctx, GenericWorkflowSetEventInput[string](input)) + err := SetEvent(ctx, input.Key, input.Message) if err != nil { return "", err } @@ -1734,11 +1722,7 @@ func setEventWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, err } func getEventWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, error) { - result, err := GetEvent[string](ctx, WorkflowGetEventInput{ - TargetWorkflowID: input.Key, // Reusing Key field as target workflow ID - Key: input.Message, // Reusing Message field as event key - Timeout: 3 * time.Second, - }) + result, err := GetEvent[string](ctx, input.Key, input.Message, 3 * time.Second) if err != nil { return "", err } @@ -1747,7 +1731,7 @@ func getEventWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, err func setTwoEventsWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, error) { // Set the first event - err := SetEvent(ctx, GenericWorkflowSetEventInput[string]{Key: "event1", Message: "first-event-message"}) + err := SetEvent(ctx, "event1", "first-event-message") if err != nil { return "", err } @@ -1756,7 +1740,7 @@ func setTwoEventsWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, setSecondEventSignal.Wait() // Set the second event - err = SetEvent(ctx, GenericWorkflowSetEventInput[string]{Key: "event2", Message: "second-event-message"}) + err = SetEvent(ctx, "event2", "second-event-message") if err != nil { return "", err } @@ -1765,7 +1749,7 @@ func setTwoEventsWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, } func setEventIdempotencyWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, error) { - err := SetEvent(ctx, GenericWorkflowSetEventInput[string](input)) + err := SetEvent(ctx, input.Key, input.Message) if err != nil { return "", err } @@ -1775,11 +1759,7 @@ func setEventIdempotencyWorkflow(ctx DBOSContext, input setEventWorkflowInput) ( } func getEventIdempotencyWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, error) { - result, err := GetEvent[string](ctx, WorkflowGetEventInput{ - TargetWorkflowID: input.Key, - Key: input.Message, - Timeout: 3 * time.Second, - }) + result, err := GetEvent[string](ctx, input.Key, input.Message, 3 * time.Second) if err != nil { return "", err } @@ -1989,11 +1969,7 @@ func TestSetGetEvent(t *testing.T) { } // Start a workflow that gets the event from outside the original workflow - message, err := GetEvent[string](dbosCtx, WorkflowGetEventInput{ - TargetWorkflowID: setHandle.GetWorkflowID(), - Key: "test-key", - Timeout: 3 * time.Second, - }) + message, err := GetEvent[string](dbosCtx, setHandle.GetWorkflowID(), "test-key", 3 * time.Second) if err != nil { t.Fatalf("failed to get event from outside workflow: %v", err) } @@ -2018,11 +1994,7 @@ func TestSetGetEvent(t *testing.T) { t.Run("GetEventTimeout", func(t *testing.T) { // Try to get an event from a non-existent workflow nonExistentID := uuid.NewString() - message, err := GetEvent[string](dbosCtx, WorkflowGetEventInput{ - TargetWorkflowID: nonExistentID, - Key: "test-key", - Timeout: 3 * time.Second, - }) + message, err := GetEvent[string](dbosCtx, nonExistentID, "test-key", 3 * time.Second) require.NoError(t, err, "failed to get event from non-existent workflow") if message != "" { t.Fatalf("expected empty result on timeout, got '%s'", message) @@ -2036,11 +2008,7 @@ func TestSetGetEvent(t *testing.T) { require.NoError(t, err, "failed to set event") _, err = setHandle.GetResult() require.NoError(t, err, "failed to get result from set event workflow") - message, err = GetEvent[string](dbosCtx, WorkflowGetEventInput{ - TargetWorkflowID: setHandle.GetWorkflowID(), - Key: "non-existent-key", - Timeout: 3 * time.Second, - }) + message, err = GetEvent[string](dbosCtx, setHandle.GetWorkflowID(), "non-existent-key", 3 * time.Second) require.NoError(t, err, "failed to get event with non-existent key") if message != "" { t.Fatalf("expected empty result on timeout with non-existent key, got '%s'", message) @@ -2049,7 +2017,7 @@ func TestSetGetEvent(t *testing.T) { t.Run("SetGetEventMustRunInsideWorkflows", func(t *testing.T) { // Attempt to run SetEvent outside of a workflow context - err := SetEvent(dbosCtx, GenericWorkflowSetEventInput[string]{Key: "test-key", Message: "test-message"}) + err := SetEvent(dbosCtx, "test-key", "test-message") require.Error(t, err, "expected error when running SetEvent outside of workflow context, but got none") // Check the error type @@ -2187,11 +2155,7 @@ func TestSetGetEvent(t *testing.T) { for range numGoroutines { go func() { defer wg.Done() - res, err := GetEvent[string](dbosCtx, WorkflowGetEventInput{ - TargetWorkflowID: setHandle.GetWorkflowID(), - Key: "concurrent-event-key", - Timeout: 10 * time.Second, - }) + res, err := GetEvent[string](dbosCtx, setHandle.GetWorkflowID(), "concurrent-event-key", 10 * time.Second) if err != nil { errors <- fmt.Errorf("failed to get event in goroutine: %v", err) return @@ -2555,11 +2519,7 @@ func TestWorkflowTimeout(t *testing.T) { } func notificationWaiterWorkflow(ctx DBOSContext, pairID int) (string, error) { - result, err := GetEvent[string](ctx, WorkflowGetEventInput{ - TargetWorkflowID: fmt.Sprintf("notification-setter-%d", pairID), - Key: "event-key", - Timeout: 10 * time.Second, - }) + result, err := GetEvent[string](ctx, fmt.Sprintf("notification-setter-%d", pairID), "event-key", 10 * time.Second) if err != nil { return "", err } @@ -2567,10 +2527,7 @@ func notificationWaiterWorkflow(ctx DBOSContext, pairID int) (string, error) { } func notificationSetterWorkflow(ctx DBOSContext, pairID int) (string, error) { - err := SetEvent(ctx, GenericWorkflowSetEventInput[string]{ - Key: "event-key", - Message: fmt.Sprintf("notification-message-%d", pairID), - }) + err := SetEvent(ctx, "event-key", fmt.Sprintf("notification-message-%d", pairID)) if err != nil { return "", err } @@ -2578,10 +2535,7 @@ func notificationSetterWorkflow(ctx DBOSContext, pairID int) (string, error) { } func sendRecvReceiverWorkflow(ctx DBOSContext, pairID int) (string, error) { - result, err := Recv[string](ctx, WorkflowRecvInput{ - Topic: "send-recv-topic", - Timeout: 10 * time.Second, - }) + result, err := Recv[string](ctx, "send-recv-topic", 10 * time.Second) if err != nil { return "", err } @@ -2589,11 +2543,7 @@ func sendRecvReceiverWorkflow(ctx DBOSContext, pairID int) (string, error) { } func sendRecvSenderWorkflow(ctx DBOSContext, pairID int) (string, error) { - err := Send(ctx, GenericWorkflowSendInput[string]{ - DestinationID: fmt.Sprintf("send-recv-receiver-%d", pairID), - Topic: "send-recv-topic", - Message: fmt.Sprintf("send-recv-message-%d", pairID), - }) + err := Send(ctx, fmt.Sprintf("send-recv-receiver-%d", pairID), fmt.Sprintf("send-recv-message-%d", pairID), "send-recv-topic") if err != nil { return "", err }