From a0cda1813602324e134010b7650c789e2e087857 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 22 Jul 2025 17:28:26 -0700 Subject: [PATCH 01/10] WIP: send/get events --- .../000001_initial_dbos_schema.down.sql | 4 +- .../000001_initial_dbos_schema.up.sql | 17 +- dbos/system_database.go | 192 +++++++++++++++++- dbos/workflow.go | 31 +++ 4 files changed, 238 insertions(+), 6 deletions(-) diff --git a/dbos/migrations/000001_initial_dbos_schema.down.sql b/dbos/migrations/000001_initial_dbos_schema.down.sql index b037b93b..10526474 100644 --- a/dbos/migrations/000001_initial_dbos_schema.down.sql +++ b/dbos/migrations/000001_initial_dbos_schema.down.sql @@ -1,10 +1,12 @@ -- 001_initial_dbos_schema.down.sql --- Drop trigger first +-- Drop triggers first DROP TRIGGER IF EXISTS dbos_notifications_trigger ON dbos.notifications; +DROP TRIGGER IF EXISTS dbos_workflow_events_trigger ON dbos.workflow_events; -- Drop function DROP FUNCTION IF EXISTS dbos.notifications_function(); +DROP FUNCTION IF EXISTS dbos.workflow_events_function(); -- Drop tables in reverse order to respect foreign key constraints DROP TABLE IF EXISTS dbos.workflow_events; diff --git a/dbos/migrations/000001_initial_dbos_schema.up.sql b/dbos/migrations/000001_initial_dbos_schema.up.sql index 4da3b693..94afdcce 100644 --- a/dbos/migrations/000001_initial_dbos_schema.up.sql +++ b/dbos/migrations/000001_initial_dbos_schema.up.sql @@ -91,4 +91,19 @@ CREATE TABLE dbos.workflow_events ( PRIMARY KEY (workflow_uuid, key), FOREIGN KEY (workflow_uuid) REFERENCES dbos.workflow_status(workflow_uuid) ON UPDATE CASCADE ON DELETE CASCADE -); \ No newline at end of file +); + +-- Create events function +CREATE OR REPLACE FUNCTION dbos.workflow_events_function() RETURNS TRIGGER AS $$ +DECLARE + payload text := NEW.workflow_uuid || '::' || NEW.key; +BEGIN + PERFORM pg_notify('dbos_workflow_events_channel', payload); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Create events trigger +CREATE TRIGGER dbos_workflow_events_trigger +AFTER INSERT ON dbos.workflow_events +FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function(); \ No newline at end of file diff --git a/dbos/system_database.go b/dbos/system_database.go index a6ea0d6d..742187eb 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -39,6 +39,8 @@ type SystemDatabase interface { GetWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error) Send(ctx context.Context, input WorkflowSendInput) error Recv(ctx context.Context, input WorkflowRecvInput) (any, error) + SetEvent(ctx context.Context, input WorkflowSetEventInput) error + GetEvent(ctx context.Context, input WorkflowGetEventInput) (any, error) } type systemDatabase struct { @@ -157,7 +159,7 @@ func NewSystemDatabase(databaseURL string) (SystemDatabase, error) { return nil, fmt.Errorf("failed to parse database URL: %v", err) } config.OnNotification = func(c *pgconn.PgConn, n *pgconn.Notification) { - if n.Channel == "dbos_notifications_channel" { + if n.Channel == "dbos_notifications_channel" || n.Channel == "dbos_workflow_events_channel" { // Check if an entry exists in the map, indexed by the payload // If yes, send a signal to the channel so the listener can wake up if ch, exists := notificationsMap.Load(n.Payload); exists { @@ -971,17 +973,17 @@ func (s *systemDatabase) GetWorkflowSteps(ctx context.Context, workflowID string /****************************************/ func (s *systemDatabase) notificationListenerLoop(ctx context.Context) { - mrr := s.notificationListenerConnection.Exec(ctx, "LISTEN dbos_notifications_channel") + mrr := s.notificationListenerConnection.Exec(ctx, "LISTEN dbos_notifications_channel; LISTEN dbos_workflow_events_channel") results, err := mrr.ReadAll() if err != nil { - getLogger().Error("Failed to listen on dbos_notifications_channel", "error", err) + getLogger().Error("Failed to listen on notification channels", "error", err) return } mrr.Close() for _, result := range results { if result.Err != nil { - getLogger().Error("Error listening on dbos_notifications_channel", "error", result.Err) + getLogger().Error("Error listening on notification channels", "error", result.Err) return } } @@ -1231,6 +1233,188 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any return message, nil } +func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInput) error { + functionName := "DBOS.setEvent" + + // Get workflow state from context + workflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) + if !ok || workflowState == nil { + return newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?") + } + + if workflowState.isWithinStep { + return newStepExecutionError(workflowState.WorkflowID, functionName, "cannot call SetEvent within a step") + } + + stepID := workflowState.NextStepID() + + tx, err := s.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback(ctx) + + // Check if operation was already executed and do nothing if so + checkInput := checkOperationExecutionDBInput{ + workflowID: workflowState.WorkflowID, + operationID: stepID, + functionName: functionName, + tx: tx, + } + recordedResult, err := s.CheckOperationExecution(ctx, checkInput) + if err != nil { + return err + } + if recordedResult != nil { + return nil + } + + // Serialize the message. It must have been registered with encoding/gob by the user if not a basic type. + messageString, err := serialize(input.Message) + if err != nil { + return fmt.Errorf("failed to serialize message: %w", err) + } + + // Insert or update the event using UPSERT + insertQuery := `INSERT INTO dbos.workflow_events (workflow_uuid, key, value) + VALUES ($1, $2, $3) + ON CONFLICT (workflow_uuid, key) + DO UPDATE SET value = EXCLUDED.value` + + _, err = tx.Exec(ctx, insertQuery, workflowState.WorkflowID, input.Key, messageString) + if err != nil { + return fmt.Errorf("failed to insert/update workflow event: %w", err) + } + + // Record the operation result + recordInput := recordOperationResultDBInput{ + workflowID: workflowState.WorkflowID, + operationID: stepID, + operationName: functionName, + output: nil, + err: nil, + tx: tx, + } + + err = s.RecordOperationResult(ctx, recordInput) + if err != nil { + return fmt.Errorf("failed to record operation result: %w", err) + } + + // Commit transaction + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + return nil +} + +func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInput) (any, error) { + functionName := "DBOS.getEvent" + + // Get workflow state from context (optional for GetEvent as we can get an event from outside a workflow) + workflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) + var stepID int + var isInWorkflow bool + + if ok && workflowState != nil { + isInWorkflow = true + if workflowState.isWithinStep { + return nil, newStepExecutionError(workflowState.WorkflowID, functionName, "cannot call GetEvent within a step") + } + stepID = workflowState.NextStepID() + + // Check if operation was already executed (only if in workflow) + checkInput := checkOperationExecutionDBInput{ + workflowID: workflowState.WorkflowID, + operationID: stepID, + functionName: functionName, + } + recordedResult, err := s.CheckOperationExecution(ctx, checkInput) + if err != nil { + return nil, err + } + if recordedResult != nil { + return recordedResult.output, recordedResult.err + } + } + + // Create notification payload and channel + payload := fmt.Sprintf("%s::%s", input.TargetWorkflowID, input.Key) + c := make(chan bool, 1) + _, loaded := s.notificationsMap.LoadOrStore(payload, c) + if loaded { + close(c) + // TODO: consider adding a specific error type for this case to help users handle it + getLogger().Error("GetEvent already called for target workflow", "target_workflow_id", input.TargetWorkflowID, "key", input.Key) + return nil, fmt.Errorf("GetEvent already called for target workflow %s and key %s", input.TargetWorkflowID, input.Key) + } + defer func() { + s.notificationsMap.Delete(payload) + close(c) + }() + + // Check if the event already exists in the database + query := `SELECT value FROM dbos.workflow_events WHERE workflow_uuid = $1 AND key = $2` + var value any + var valueString *string + + row := s.pool.QueryRow(ctx, query, input.TargetWorkflowID, input.Key) + err := row.Scan(&valueString) + if err != nil && err != pgx.ErrNoRows { + return nil, fmt.Errorf("failed to query workflow event: %w", err) + } + + if err == pgx.ErrNoRows || valueString == nil { // XXX valueString should never be `nil` + // Wait for notification with timeout + select { + case <-c: + // Received notification + case <-time.After(input.Timeout): + // Timeout reached + case <-ctx.Done(): + return nil, fmt.Errorf("context cancelled while waiting for event: %w", ctx.Err()) + } + + // Query the database again after waiting + row = s.pool.QueryRow(ctx, query, input.TargetWorkflowID, input.Key) + err = row.Scan(&valueString) + if err != nil { + if err == pgx.ErrNoRows { + value = nil // Event still doesn't exist + } else { + return nil, fmt.Errorf("failed to query workflow event after wait: %w", err) + } + } + } + + // Deserialize the value if it exists XXX valueString should never be `nil`s + if valueString != nil { + value, err = deserialize(valueString) + if err != nil { + return nil, fmt.Errorf("failed to deserialize event value: %w", err) + } + } + + // Record the operation result if this is called within a workflow + if isInWorkflow { + recordInput := recordOperationResultDBInput{ + workflowID: workflowState.WorkflowID, + operationID: stepID, + operationName: functionName, + output: value, + err: nil, + } + + err = s.RecordOperationResult(ctx, recordInput) + if err != nil { + return nil, fmt.Errorf("failed to record operation result: %w", err) + } + } + + return value, nil +} + /*******************************/ /******* QUEUES ********/ /*******************************/ diff --git a/dbos/workflow.go b/dbos/workflow.go index 0e15d7fa..fc696555 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -744,6 +744,37 @@ func Recv[R any](ctx context.Context, input WorkflowRecvInput) (R, error) { return typedMessage, nil } +type WorkflowSetEventInput struct { + Key string + Message any +} + +func SetEvent(ctx context.Context, input WorkflowSetEventInput) error { + return dbos.systemDB.SetEvent(ctx, input) +} + +type WorkflowGetEventInput struct { + TargetWorkflowID string + Key string + Timeout time.Duration +} + +func GetEvent[R any](ctx context.Context, input WorkflowGetEventInput) (R, error) { + value, err := dbos.systemDB.GetEvent(ctx, input) + if err != nil { + return *new(R), err + } + if value == nil { + return *new(R), nil + } + // Type check + typedValue, ok := value.(R) + if !ok { + return *new(R), newWorkflowUnexpectedResultType("", fmt.Sprintf("%T", new(R)), fmt.Sprintf("%T", value)) + } + return typedValue, nil +} + /***********************************/ /******* WORKFLOW MANAGEMENT *******/ /***********************************/ From d89fe165c12729234ada8f51e6593b348501f173 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 18:51:20 -0700 Subject: [PATCH 02/10] useful info --- dbos/dbos.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbos/dbos.go b/dbos/dbos.go index f56fd54d..ef3d0be7 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -211,10 +211,13 @@ func Launch() error { } // Run a round of recovery on the local executor - _, err := recoverPendingWorkflows(context.Background(), []string{_EXECUTOR_ID}) // XXX maybe use the queue runner context here to allow Shutdown to cancel it? + recoveryHandles, err := recoverPendingWorkflows(context.Background(), []string{_EXECUTOR_ID}) // XXX maybe use the queue runner context here to allow Shutdown to cancel it? if err != nil { return newInitializationError(fmt.Sprintf("failed to recover pending workflows during launch: %v", err)) } + if len(recoveryHandles) > 0 { + logger.Info("Recovered pending workflows", "count", len(recoveryHandles)) + } logger.Info("DBOS initialized", "app_version", _APP_VERSION, "executor_id", _EXECUTOR_ID) return nil From a4e57f211e2f5f57c994db53dc95dfca6f3c8cec Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 18:52:28 -0700 Subject: [PATCH 03/10] change the notification system to use a condition variable. Channels work fine work 1 consumer <-> 1 producer, but set/get event accepts multiple consumers --- dbos/system_database.go | 71 ++++++++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 29 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 742187eb..8cf28d18 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -161,13 +161,9 @@ func NewSystemDatabase(databaseURL string) (SystemDatabase, error) { config.OnNotification = func(c *pgconn.PgConn, n *pgconn.Notification) { if n.Channel == "dbos_notifications_channel" || n.Channel == "dbos_workflow_events_channel" { // Check if an entry exists in the map, indexed by the payload - // If yes, send a signal to the channel so the listener can wake up - if ch, exists := notificationsMap.Load(n.Payload); exists { - select { - case ch.(chan bool) <- true: // Send a signal to wake up the listener - default: - getLogger().Warn("notification channel for payload is full, skipping", "payload", n.Payload) - } + // If yes, broadcast on the condition variable so listeners can wake up + if cond, exists := notificationsMap.Load(n.Payload); exists { + cond.(*sync.Cond).Broadcast() } } } @@ -1143,18 +1139,17 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any // First check if there's already a receiver for this workflow/topic to avoid unnecessary database load payload := fmt.Sprintf("%s::%s", destinationID, topic) - c := make(chan bool, 1) // Make it buffered to allow the notification listener to post a signal even if the receiver has not reached its select statement yet - _, loaded := s.notificationsMap.LoadOrStore(payload, c) + cond := sync.NewCond(&sync.Mutex{}) + _, loaded := s.notificationsMap.LoadOrStore(payload, cond) if loaded { - close(c) getLogger().Error("Receive already called for workflow", "destination_id", destinationID) return nil, newWorkflowConflictIDError(destinationID) } defer func() { - // Clean up the channel after we're done + // Clean up the condition variable after we're done and broadcast to wake up any waiting goroutines // XXX We should handle panics in this function and make sure we call this. Not a problem for now as panic will crash the importing package. + cond.Broadcast() s.notificationsMap.Delete(payload) - close(c) }() // Now check if there is already a message available in the database. @@ -1166,15 +1161,23 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any return false, fmt.Errorf("failed to check message: %w", err) } if !exists { - // Listen for notifications on the channel + // Wait for notifications using condition variable with timeout pattern // XXX should we prevent zero or negative timeouts? - getLogger().Debug("Waiting for notification on channel", "payload", payload) + getLogger().Debug("Waiting for notification on condition variable", "payload", payload) + + done := make(chan struct{}) + go func() { + cond.L.Lock() + defer cond.L.Unlock() + cond.Wait() + close(done) + }() + select { - case <-c: - getLogger().Debug("Received notification on channel", "payload", payload) + case <-done: + getLogger().Debug("Received notification on condition variable", "payload", payload) case <-time.After(input.Timeout): - // If we reach the timeout, we can check if there is a message in the database, and if not return nil. - getLogger().Warn("Timeout reached for channel", "payload", payload, "timeout", input.Timeout) + getLogger().Warn("Recv() timeout reached", "payload", payload, "timeout", input.Timeout) } } @@ -1339,19 +1342,20 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp } } - // Create notification payload and channel + // Create notification payload and condition variable payload := fmt.Sprintf("%s::%s", input.TargetWorkflowID, input.Key) - c := make(chan bool, 1) - _, loaded := s.notificationsMap.LoadOrStore(payload, c) + cond := sync.NewCond(&sync.Mutex{}) + existingCond, loaded := s.notificationsMap.LoadOrStore(payload, cond) if loaded { - close(c) - // TODO: consider adding a specific error type for this case to help users handle it - getLogger().Error("GetEvent already called for target workflow", "target_workflow_id", input.TargetWorkflowID, "key", input.Key) - return nil, fmt.Errorf("GetEvent already called for target workflow %s and key %s", input.TargetWorkflowID, input.Key) + // Reuse the existing condition variable + cond = existingCond.(*sync.Cond) } + + // Defer broadcast to ensure any waiting goroutines eventually unlock defer func() { + cond.Broadcast() + // Clean up the condition variable after we're done s.notificationsMap.Delete(payload) - close(c) }() // Check if the event already exists in the database @@ -1366,12 +1370,21 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp } if err == pgx.ErrNoRows || valueString == nil { // XXX valueString should never be `nil` - // Wait for notification with timeout + // Wait for notification with timeout using condition variable + done := make(chan struct{}) + go func() { + cond.L.Lock() + defer cond.L.Unlock() + cond.Wait() + close(done) + }() + select { - case <-c: + case <-done: // Received notification case <-time.After(input.Timeout): // Timeout reached + getLogger().Warn("GetEvent() timeout reached", "target_workflow_id", input.TargetWorkflowID, "key", input.Key, "timeout", input.Timeout) case <-ctx.Done(): return nil, fmt.Errorf("context cancelled while waiting for event: %w", ctx.Err()) } @@ -1388,7 +1401,7 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp } } - // Deserialize the value if it exists XXX valueString should never be `nil`s + // Deserialize the value if it exists if valueString != nil { value, err = deserialize(valueString) if err != nil { From 473428506405afd9fd3ef5f50ce07f9db23c1cb4 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 18:52:51 -0700 Subject: [PATCH 04/10] tests for set/get events --- dbos/workflows_test.go | 400 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 358 insertions(+), 42 deletions(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 00c025ab..0e3b3554 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "strings" + "sync" "testing" "time" @@ -1085,48 +1086,6 @@ func TestSendRecv(t *testing.T) { if time.Since(start) > 5*time.Second { t.Fatalf("receive workflow took too long to complete, expected < 5s, got %v", time.Since(start)) } - - // Send and receive again the same workflows to verify idempotency - _, err = sendWf(context.Background(), sendWorkflowInput{ - DestinationID: receiveHandle.GetWorkflowID(), - Topic: "test-topic", - }, WithWorkflowID(handle.GetWorkflowID())) - if err != nil { - t.Fatalf("failed to send message with same workflow ID: %v", err) - } - receiveHandle2, err := receiveWf(context.Background(), "test-topic", WithWorkflowID(receiveHandle.GetWorkflowID())) - if err != nil { - t.Fatalf("failed to start receive workflow with same ID: %v", err) - } - result, err = receiveHandle2.GetResult(context.Background()) - if err != nil { - t.Fatalf("failed to get result from receive workflow with same ID: %v", err) - } - if result != "message1-message2-message3" { - t.Fatalf("expected received message to be 'message1-message2-message3', got '%s'", result) - } - - // Get steps for both workflows and verify we have the expected number - sendSteps, err := dbos.systemDB.GetWorkflowSteps(context.Background(), handle.GetWorkflowID()) - if err != nil { - t.Fatalf("failed to get steps for send workflow: %v", err) - } - receiveSteps, err := dbos.systemDB.GetWorkflowSteps(context.Background(), receiveHandle.GetWorkflowID()) - if err != nil { - t.Fatalf("failed to get steps for receive workflow: %v", err) - } - - // Verify the number of steps matches the number of send() and recv() calls - // sendWorkflow has 3 Send() calls, receiveWorkflow has 3 Recv() calls - expectedSendSteps := 3 - expectedReceiveSteps := 3 - - if len(sendSteps) != expectedSendSteps { - t.Fatalf("expected %d steps in send workflow, got %d", expectedSendSteps, len(sendSteps)) - } - if len(receiveSteps) != expectedReceiveSteps { - t.Fatalf("expected %d steps in receive workflow, got %d", expectedReceiveSteps, len(receiveSteps)) - } }) t.Run("SendRecvCustomStruct", func(t *testing.T) { @@ -1317,3 +1276,360 @@ func TestSendRecv(t *testing.T) { } }) } + +var ( + setEventWf = WithWorkflow(setEventWorkflow) + getEventWf = WithWorkflow(getEventWorkflow) + setTwoEventsWf = WithWorkflow(setTwoEventsWorkflow) + setEventIdempotencyWf = WithWorkflow(setEventIdempotencyWorkflow) + getEventIdempotencyWf = WithWorkflow(getEventIdempotencyWorkflow) + setEventIdempotencyEvent = NewEvent() + getEventStartIdempotencyEvent = NewEvent() + getEventStopIdempotencyEvent = NewEvent() + setSecondEventSignal = NewEvent() +) + +type setEventWorkflowInput struct { + Key string + Message string +} + +func setEventWorkflow(ctx context.Context, input setEventWorkflowInput) (string, error) { + err := SetEvent(ctx, WorkflowSetEventInput{Key: input.Key, Message: input.Message}) + if err != nil { + return "", err + } + return "event-set", nil +} + +func getEventWorkflow(ctx context.Context, input setEventWorkflowInput) (string, error) { + result, err := GetEvent[string](ctx, WorkflowGetEventInput{ + TargetWorkflowID: input.Key, // Reusing Key field as target workflow ID + Key: input.Message, // Reusing Message field as event key + Timeout: 3 * time.Second, + }) + if err != nil { + return "", err + } + return result, nil +} + +func setTwoEventsWorkflow(ctx context.Context, input setEventWorkflowInput) (string, error) { + // Set the first event + err := SetEvent(ctx, WorkflowSetEventInput{Key: "event1", Message: "first-event-message"}) + if err != nil { + return "", err + } + + // Wait for external signal before setting the second event + setSecondEventSignal.Wait() + + // Set the second event + err = SetEvent(ctx, WorkflowSetEventInput{Key: "event2", Message: "second-event-message"}) + if err != nil { + return "", err + } + + return "two-events-set", nil +} + +func setEventIdempotencyWorkflow(ctx context.Context, input setEventWorkflowInput) (string, error) { + err := SetEvent(ctx, WorkflowSetEventInput{Key: input.Key, Message: input.Message}) + if err != nil { + return "", err + } + setEventIdempotencyEvent.Wait() + return "idempotent-set-completed", nil +} + +func getEventIdempotencyWorkflow(ctx context.Context, input setEventWorkflowInput) (string, error) { + result, err := GetEvent[string](ctx, WorkflowGetEventInput{ + TargetWorkflowID: input.Key, + Key: input.Message, + Timeout: 3 * time.Second, + }) + if err != nil { + return "", err + } + getEventStartIdempotencyEvent.Set() + getEventStopIdempotencyEvent.Wait() + return result, nil +} + +func TestSetGetEvent(t *testing.T) { + setupDBOS(t) + + t.Run("SetGetEventFromWorkflow", func(t *testing.T) { + // Clear the signal event before starting + setSecondEventSignal.Clear() + + // Start the workflow that sets two events + setHandle, err := setTwoEventsWf(context.Background(), setEventWorkflowInput{ + Key: "test-workflow", + Message: "unused", + }) + if err != nil { + t.Fatalf("failed to start set two events workflow: %v", err) + } + + // Start a workflow to get the first event + getFirstEventHandle, err := getEventWf(context.Background(), setEventWorkflowInput{ + Key: setHandle.GetWorkflowID(), // Target workflow ID + Message: "event1", // Event key + }) + if err != nil { + t.Fatalf("failed to start get first event workflow: %v", err) + } + + // Verify we can get the first event + firstMessage, err := getFirstEventHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result from first event workflow: %v", err) + } + if firstMessage != "first-event-message" { + t.Fatalf("expected first message to be 'first-event-message', got '%s'", firstMessage) + } + + // Signal the workflow to set the second event + setSecondEventSignal.Set() + + // Start a workflow to get the second event + getSecondEventHandle, err := getEventWf(context.Background(), setEventWorkflowInput{ + Key: setHandle.GetWorkflowID(), // Target workflow ID + Message: "event2", // Event key + }) + if err != nil { + t.Fatalf("failed to start get second event workflow: %v", err) + } + + // Verify we can get the second event + secondMessage, err := getSecondEventHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result from second event workflow: %v", err) + } + if secondMessage != "second-event-message" { + t.Fatalf("expected second message to be 'second-event-message', got '%s'", secondMessage) + } + + // Wait for the workflow to complete + result, err := setHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result from set two events workflow: %v", err) + } + if result != "two-events-set" { + t.Fatalf("expected result to be 'two-events-set', got '%s'", result) + } + }) + + t.Run("GetEventFromOutsideWorkflow", func(t *testing.T) { + // Start a workflow that sets an event + setHandle, err := setEventWf(context.Background(), setEventWorkflowInput{ + Key: "test-key", + Message: "test-message", + }) + if err != nil { + t.Fatalf("failed to start set event workflow: %v", err) + } + + // Wait for the event to be set + _, err = setHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result from set event workflow: %v", err) + } + + // Start a workflow that gets the event from outside the original workflow + message, err := GetEvent[string](context.Background(), WorkflowGetEventInput{ + TargetWorkflowID: setHandle.GetWorkflowID(), + Key: "test-key", + Timeout: 3 * time.Second, + }) + if err != nil { + t.Fatalf("failed to get event from outside workflow: %v", err) + } + if message != "test-message" { + t.Fatalf("expected received message to be 'test-message', got '%s'", message) + } + }) + + t.Run("GetEventTimeout", func(t *testing.T) { + // Try to get an event from a non-existent workflow + nonExistentID := uuid.NewString() + message, err := GetEvent[string](context.Background(), WorkflowGetEventInput{ + TargetWorkflowID: nonExistentID, + Key: "test-key", + Timeout: 3 * time.Second, + }) + if err != nil { + t.Fatal("failed to get event from non-existent workflow:", err) + } + if message != "" { + t.Fatalf("expected empty result on timeout, got '%s'", message) + } + + // Try to get an event from an existing workflow but with a key that doesn't exist + setHandle, err := setEventWf(context.Background(), setEventWorkflowInput{ + Key: "test-key", + Message: "test-message", + }) + if err != nil { + t.Fatal("failed to set event:", err) + } + _, err = setHandle.GetResult(context.Background()) + if err != nil { + t.Fatal("failed to get result from set event workflow:", err) + } + message, err = GetEvent[string](context.Background(), WorkflowGetEventInput{ + TargetWorkflowID: setHandle.GetWorkflowID(), + Key: "non-existent-key", + Timeout: 3 * time.Second, + }) + if err != nil { + t.Fatal("failed to get event with non-existent key:", err) + } + if message != "" { + t.Fatalf("expected empty result on timeout with non-existent key, got '%s'", message) + } + }) + + t.Run("SetGetEventMustRunInsideWorkflows", func(t *testing.T) { + ctx := context.Background() + + // Attempt to run SetEvent outside of a workflow context + err := SetEvent(ctx, WorkflowSetEventInput{Key: "test-key", Message: "test-message"}) + if err == nil { + t.Fatal("expected error when running SetEvent outside of workflow context, but got none") + } + + // Check the error type + dbosErr, ok := err.(*DBOSError) + if !ok { + t.Fatalf("expected error to be of type *DBOSError, got %T", err) + } + + if dbosErr.Code != StepExecutionError { + t.Fatalf("expected error code to be StepExecutionError, got %v", dbosErr.Code) + } + + // Test the specific message from the error + expectedMessagePart := "workflow state not found in context: are you running this step within a workflow?" + if !strings.Contains(err.Error(), expectedMessagePart) { + t.Fatalf("expected error message to contain %q, but got %q", expectedMessagePart, err.Error()) + } + }) + + t.Run("SetGetEventIdempotency", func(t *testing.T) { + // Start the set event workflow + setHandle, err := setEventIdempotencyWf(context.Background(), setEventWorkflowInput{ + Key: "idempotency-key", + Message: "idempotency-message", + }) + if err != nil { + t.Fatalf("failed to start set event idempotency workflow: %v", err) + } + + // Start the get event workflow + getHandle, err := getEventIdempotencyWf(context.Background(), setEventWorkflowInput{ + Key: setHandle.GetWorkflowID(), + Message: "idempotency-key", + }) + if err != nil { + t.Fatalf("failed to start get event idempotency workflow: %v", err) + } + + // Wait for the get event workflow to signal it has received the event + getEventStartIdempotencyEvent.Wait() + + // Attempt recovering both workflows. Each should have exactly 1 step. + recoveredHandles, err := recoverPendingWorkflows(context.Background(), []string{"local"}) + if err != nil { + t.Fatalf("failed to recover pending workflows: %v", err) + } + if len(recoveredHandles) != 2 { + t.Fatalf("expected 2 recovered handles, got %d", len(recoveredHandles)) + } + + // Verify step counts + setSteps, err := dbos.systemDB.GetWorkflowSteps(context.Background(), setHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get steps for set event idempotency workflow: %v", err) + } + if len(setSteps) != 1 { + t.Fatalf("expected 1 step in set event idempotency workflow, got %d", len(setSteps)) + } + + getSteps, err := dbos.systemDB.GetWorkflowSteps(context.Background(), getHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get steps for get event idempotency workflow: %v", err) + } + if len(getSteps) != 1 { + t.Fatalf("expected 1 step in get event idempotency workflow, got %d", len(getSteps)) + } + + // Complete the workflows + setEventIdempotencyEvent.Set() + getEventStopIdempotencyEvent.Set() + + setResult, err := setHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result from set event idempotency workflow: %v", err) + } + if setResult != "idempotent-set-completed" { + t.Fatalf("expected result to be 'idempotent-set-completed', got '%s'", setResult) + } + + getResult, err := getHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result from get event idempotency workflow: %v", err) + } + if getResult != "idempotency-message" { + t.Fatalf("expected result to be 'idempotency-message', got '%s'", getResult) + } + }) + + t.Run("ConcurrentGetEvent", func(t *testing.T) { + // Set event + setHandle, err := setEventWf(context.Background(), setEventWorkflowInput{ + Key: "concurrent-event-key", + Message: "concurrent-event-message", + }) + if err != nil { + t.Fatalf("failed to start set event workflow: %v", err) + } + + // Wait for the set event workflow to complete + _, err = setHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result from set event workflow: %v", err) + } + // Start a few goroutines that'll concurrently get the event + numGoroutines := 5 + var wg sync.WaitGroup + errors := make(chan error, numGoroutines) + wg.Add(numGoroutines) + for range numGoroutines { + go func() { + defer wg.Done() + res, err := GetEvent[string](context.Background(), WorkflowGetEventInput{ + TargetWorkflowID: setHandle.GetWorkflowID(), + Key: "concurrent-event-key", + Timeout: 10 * time.Second, + }) + if err != nil { + errors <- fmt.Errorf("failed to get event in goroutine: %v", err) + return + } + if res != "concurrent-event-message" { + errors <- fmt.Errorf("expected result in goroutine to be 'concurrent-event-message', got '%s'", res) + return + } + }() + } + wg.Wait() + close(errors) + + // Check for any errors from goroutines + for err := range errors { + t.Fatal(err) + } + }) +} From dec1e915d2977b92924de3545e9a6ffb697fd62c Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 20:17:00 -0700 Subject: [PATCH 05/10] add a test for concurrent receive error behavior --- dbos/workflows_test.go | 109 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 0e3b3554..74e151a0 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -970,6 +970,7 @@ func TestScheduledWorkflows(t *testing.T) { var ( sendWf = WithWorkflow(sendWorkflow) receiveWf = WithWorkflow(receiveWorkflow) + receiveWfCoordinated = WithWorkflow(receiveWorkflowCoordinated) sendStructWf = WithWorkflow(sendStructWorkflow) receiveStructWf = WithWorkflow(receiveStructWorkflow) sendIdempotencyWf = WithWorkflow(sendIdempotencyWorkflow) @@ -977,6 +978,9 @@ var ( recvIdempotencyWf = WithWorkflow(receiveIdempotencyWorkflow) receiveIdempotencyStartEvent = NewEvent() receiveIdempotencyStopEvent = NewEvent() + numConcurrentRecvWfs = 5 + concurrentRecvReadyEvents = make([]*Event, numConcurrentRecvWfs) + concurrentRecvStartEvent = NewEvent() ) type sendWorkflowInput struct { @@ -1018,6 +1022,26 @@ func receiveWorkflow(ctx context.Context, topic string) (string, error) { return msg1 + "-" + msg2 + "-" + msg3, nil } +func receiveWorkflowCoordinated(ctx context.Context, input struct { + Topic string + i int +}) (string, error) { + // Signal that this workflow has started and is ready + concurrentRecvReadyEvents[input.i].Set() + + // Wait for the coordination event before starting to receive + + concurrentRecvStartEvent.Wait() + + // Do a single Recv call with timeout + msg, err := Recv[string](ctx, WorkflowRecvInput{Topic: input.Topic, Timeout: 3 * time.Second}) + fmt.Println(err) + if err != nil { + return "", err + } + return msg, nil +} + func sendStructWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) { testStruct := sendRecvType{Value: "test-struct-value"} err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: testStruct}) @@ -1275,6 +1299,91 @@ func TestSendRecv(t *testing.T) { t.Fatalf("expected result to be 'idempotent-send-completed', got '%s'", result) } }) + + t.Run("ConcurrentRecv", func(t *testing.T) { + // Test concurrent receivers - only 1 should timeout, others should get errors + receiveTopic := "concurrent-recv-topic" + + // Start multiple concurrent receive workflows - no messages will be sent + numReceivers := 5 + var wg sync.WaitGroup + results := make(chan string, numReceivers) + errors := make(chan error, numReceivers) + receiverHandles := make([]WorkflowHandle[string], numReceivers) + + // Start all receivers - they will signal when ready and wait for coordination + for i := range numReceivers { + concurrentRecvReadyEvents[i] = NewEvent() + receiveHandle, err := receiveWfCoordinated(context.Background(), struct { + Topic string + i int + }{ + Topic: receiveTopic, + i: i, + }, WithWorkflowID("concurrent-recv-wfid")) + if err != nil { + t.Fatalf("failed to start receive workflow %d: %v", i, err) + } + receiverHandles[i] = receiveHandle + } + + // Wait for all workflows to signal they are ready + for i := range numReceivers { + concurrentRecvReadyEvents[i].Wait() + } + + // Now unblock all receivers simultaneously so they race to the Recv call + concurrentRecvStartEvent.Set() + + // Collect results from all receivers concurrently + // Only 1 should timeout (winner of the CV), others should get errors + wg.Add(numReceivers) + for i := range numReceivers { + go func(index int) { + defer wg.Done() + result, err := receiverHandles[index].GetResult(context.Background()) + if err != nil { + errors <- err + } else { + results <- result + } + }(i) + } + + wg.Wait() + close(results) + close(errors) + + // Count timeout results and errors + timeoutCount := 0 + errorCount := 0 + + for result := range results { + if result == "" { + // Empty string indicates a timeout - only 1 receiver should get this + timeoutCount++ + } + } + + for err := range errors { + t.Logf("Receiver error (expected): %v", err) + errorCount++ + } + + // Verify that exactly 1 receiver timed out and 4 got errors + if timeoutCount != 1 { + t.Fatalf("expected exactly 1 receiver to timeout, got %d timeouts", timeoutCount) + } + + if errorCount != 4 { + t.Fatalf("expected exactly 4 receivers to get errors, got %d errors", errorCount) + } + + // Ensure total results match expected + if timeoutCount+errorCount != numReceivers { + t.Fatalf("expected total results (%d) to equal number of receivers (%d)", timeoutCount+errorCount, numReceivers) + } + }) } var ( From 4ca8b7e88637990695083fb8fe2272b232a11839 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 20:30:40 -0700 Subject: [PATCH 06/10] comment to clarify the durable check --- dbos/system_database.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbos/system_database.go b/dbos/system_database.go index 8cf28d18..f479765e 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1038,6 +1038,7 @@ func (s *systemDatabase) Send(ctx context.Context, input WorkflowSendInput) erro return err } if recordedResult != nil { + // when hitting this case, recordedResult will be &{ } return nil } @@ -1269,6 +1270,7 @@ func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInp return err } if recordedResult != nil { + // when hitting this case, recordedResult will be &{ } return nil } From 0bfabde114899a99ddcf352a74ba57638c58a52b Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 20:40:31 -0700 Subject: [PATCH 07/10] fix txn usage in Recv() --- dbos/system_database.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index f479765e..51d37538 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1113,19 +1113,12 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any topic = input.Topic } - tx, err := s.pool.Begin(ctx) - if err != nil { - return nil, fmt.Errorf("failed to begin transaction: %w", err) - } - defer tx.Rollback(ctx) - // Check if operation was already executed // XXX this might not need to be in the transaction checkInput := checkOperationExecutionDBInput{ workflowID: destinationID, operationID: stepID, functionName: functionName, - tx: tx, } recordedResult, err := s.CheckOperationExecution(ctx, checkInput) if err != nil { @@ -1157,7 +1150,7 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any // If not, we'll wait for a notification and timeout var exists bool query := `SELECT EXISTS (SELECT 1 FROM dbos.notifications WHERE destination_uuid = $1 AND topic = $2)` - err = tx.QueryRow(ctx, query, destinationID, topic).Scan(&exists) + err = s.pool.QueryRow(ctx, query, destinationID, topic).Scan(&exists) if err != nil { return false, fmt.Errorf("failed to check message: %w", err) } @@ -1183,6 +1176,11 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any } // Find the oldest message and delete it atomically + tx, err := s.pool.Begin(ctx) + if err != nil { + return nil, fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback(ctx) query = ` WITH oldest_entry AS ( SELECT destination_uuid, topic, message, created_at_epoch_ms From 0c527a90425a0566df6b0ea5c466556785c157b1 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 20:52:07 -0700 Subject: [PATCH 08/10] should wait --- dbos/workflows_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 74e151a0..e8d678ea 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -1647,6 +1647,7 @@ func TestSetGetEvent(t *testing.T) { // Wait for the get event workflow to signal it has received the event getEventStartIdempotencyEvent.Wait() + getEventStartIdempotencyEvent.Clear() // Attempt recovering both workflows. Each should have exactly 1 step. recoveredHandles, err := recoverPendingWorkflows(context.Background(), []string{"local"}) @@ -1657,6 +1658,8 @@ func TestSetGetEvent(t *testing.T) { t.Fatalf("expected 2 recovered handles, got %d", len(recoveredHandles)) } + getEventStartIdempotencyEvent.Wait() + // Verify step counts setSteps, err := dbos.systemDB.GetWorkflowSteps(context.Background(), setHandle.GetWorkflowID()) if err != nil { From 3fd5d9900d054b732eed9a24ae350b2952ed8fa1 Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 24 Jul 2025 08:54:07 -0700 Subject: [PATCH 09/10] only the creator of the CV should delete it from the map --- dbos/system_database.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 51d37538..43a4f9fd 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1354,8 +1354,10 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp // Defer broadcast to ensure any waiting goroutines eventually unlock defer func() { cond.Broadcast() - // Clean up the condition variable after we're done - s.notificationsMap.Delete(payload) + // Clean up the condition variable after we're done, if we created it + if !loaded { + s.notificationsMap.Delete(payload) + } }() // Check if the event already exists in the database From 303871c556648d7f695479c7722f7f771af47729 Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 24 Jul 2025 08:53:53 -0700 Subject: [PATCH 10/10] newline --- dbos/migrations/000001_initial_dbos_schema.up.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbos/migrations/000001_initial_dbos_schema.up.sql b/dbos/migrations/000001_initial_dbos_schema.up.sql index 94afdcce..b2e6142a 100644 --- a/dbos/migrations/000001_initial_dbos_schema.up.sql +++ b/dbos/migrations/000001_initial_dbos_schema.up.sql @@ -106,4 +106,4 @@ $$ LANGUAGE plpgsql; -- Create events trigger CREATE TRIGGER dbos_workflow_events_trigger AFTER INSERT ON dbos.workflow_events -FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function(); \ No newline at end of file +FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function();