From a0cda1813602324e134010b7650c789e2e087857 Mon Sep 17 00:00:00 2001 From: maxdml Date: Tue, 22 Jul 2025 17:28:26 -0700 Subject: [PATCH 01/19] WIP: send/get events --- .../000001_initial_dbos_schema.down.sql | 4 +- .../000001_initial_dbos_schema.up.sql | 17 +- dbos/system_database.go | 192 +++++++++++++++++- dbos/workflow.go | 31 +++ 4 files changed, 238 insertions(+), 6 deletions(-) diff --git a/dbos/migrations/000001_initial_dbos_schema.down.sql b/dbos/migrations/000001_initial_dbos_schema.down.sql index b037b93b..10526474 100644 --- a/dbos/migrations/000001_initial_dbos_schema.down.sql +++ b/dbos/migrations/000001_initial_dbos_schema.down.sql @@ -1,10 +1,12 @@ -- 001_initial_dbos_schema.down.sql --- Drop trigger first +-- Drop triggers first DROP TRIGGER IF EXISTS dbos_notifications_trigger ON dbos.notifications; +DROP TRIGGER IF EXISTS dbos_workflow_events_trigger ON dbos.workflow_events; -- Drop function DROP FUNCTION IF EXISTS dbos.notifications_function(); +DROP FUNCTION IF EXISTS dbos.workflow_events_function(); -- Drop tables in reverse order to respect foreign key constraints DROP TABLE IF EXISTS dbos.workflow_events; diff --git a/dbos/migrations/000001_initial_dbos_schema.up.sql b/dbos/migrations/000001_initial_dbos_schema.up.sql index 4da3b693..94afdcce 100644 --- a/dbos/migrations/000001_initial_dbos_schema.up.sql +++ b/dbos/migrations/000001_initial_dbos_schema.up.sql @@ -91,4 +91,19 @@ CREATE TABLE dbos.workflow_events ( PRIMARY KEY (workflow_uuid, key), FOREIGN KEY (workflow_uuid) REFERENCES dbos.workflow_status(workflow_uuid) ON UPDATE CASCADE ON DELETE CASCADE -); \ No newline at end of file +); + +-- Create events function +CREATE OR REPLACE FUNCTION dbos.workflow_events_function() RETURNS TRIGGER AS $$ +DECLARE + payload text := NEW.workflow_uuid || '::' || NEW.key; +BEGIN + PERFORM pg_notify('dbos_workflow_events_channel', payload); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Create events trigger +CREATE TRIGGER dbos_workflow_events_trigger +AFTER INSERT ON dbos.workflow_events +FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function(); \ No newline at end of file diff --git a/dbos/system_database.go b/dbos/system_database.go index a6ea0d6d..742187eb 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -39,6 +39,8 @@ type SystemDatabase interface { GetWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error) Send(ctx context.Context, input WorkflowSendInput) error Recv(ctx context.Context, input WorkflowRecvInput) (any, error) + SetEvent(ctx context.Context, input WorkflowSetEventInput) error + GetEvent(ctx context.Context, input WorkflowGetEventInput) (any, error) } type systemDatabase struct { @@ -157,7 +159,7 @@ func NewSystemDatabase(databaseURL string) (SystemDatabase, error) { return nil, fmt.Errorf("failed to parse database URL: %v", err) } config.OnNotification = func(c *pgconn.PgConn, n *pgconn.Notification) { - if n.Channel == "dbos_notifications_channel" { + if n.Channel == "dbos_notifications_channel" || n.Channel == "dbos_workflow_events_channel" { // Check if an entry exists in the map, indexed by the payload // If yes, send a signal to the channel so the listener can wake up if ch, exists := notificationsMap.Load(n.Payload); exists { @@ -971,17 +973,17 @@ func (s *systemDatabase) GetWorkflowSteps(ctx context.Context, workflowID string /****************************************/ func (s *systemDatabase) notificationListenerLoop(ctx context.Context) { - mrr := s.notificationListenerConnection.Exec(ctx, "LISTEN dbos_notifications_channel") + mrr := s.notificationListenerConnection.Exec(ctx, "LISTEN dbos_notifications_channel; LISTEN dbos_workflow_events_channel") results, err := mrr.ReadAll() if err != nil { - getLogger().Error("Failed to listen on dbos_notifications_channel", "error", err) + getLogger().Error("Failed to listen on notification channels", "error", err) return } mrr.Close() for _, result := range results { if result.Err != nil { - getLogger().Error("Error listening on dbos_notifications_channel", "error", result.Err) + getLogger().Error("Error listening on notification channels", "error", result.Err) return } } @@ -1231,6 +1233,188 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any return message, nil } +func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInput) error { + functionName := "DBOS.setEvent" + + // Get workflow state from context + workflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) + if !ok || workflowState == nil { + return newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?") + } + + if workflowState.isWithinStep { + return newStepExecutionError(workflowState.WorkflowID, functionName, "cannot call SetEvent within a step") + } + + stepID := workflowState.NextStepID() + + tx, err := s.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback(ctx) + + // Check if operation was already executed and do nothing if so + checkInput := checkOperationExecutionDBInput{ + workflowID: workflowState.WorkflowID, + operationID: stepID, + functionName: functionName, + tx: tx, + } + recordedResult, err := s.CheckOperationExecution(ctx, checkInput) + if err != nil { + return err + } + if recordedResult != nil { + return nil + } + + // Serialize the message. It must have been registered with encoding/gob by the user if not a basic type. + messageString, err := serialize(input.Message) + if err != nil { + return fmt.Errorf("failed to serialize message: %w", err) + } + + // Insert or update the event using UPSERT + insertQuery := `INSERT INTO dbos.workflow_events (workflow_uuid, key, value) + VALUES ($1, $2, $3) + ON CONFLICT (workflow_uuid, key) + DO UPDATE SET value = EXCLUDED.value` + + _, err = tx.Exec(ctx, insertQuery, workflowState.WorkflowID, input.Key, messageString) + if err != nil { + return fmt.Errorf("failed to insert/update workflow event: %w", err) + } + + // Record the operation result + recordInput := recordOperationResultDBInput{ + workflowID: workflowState.WorkflowID, + operationID: stepID, + operationName: functionName, + output: nil, + err: nil, + tx: tx, + } + + err = s.RecordOperationResult(ctx, recordInput) + if err != nil { + return fmt.Errorf("failed to record operation result: %w", err) + } + + // Commit transaction + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + return nil +} + +func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInput) (any, error) { + functionName := "DBOS.getEvent" + + // Get workflow state from context (optional for GetEvent as we can get an event from outside a workflow) + workflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) + var stepID int + var isInWorkflow bool + + if ok && workflowState != nil { + isInWorkflow = true + if workflowState.isWithinStep { + return nil, newStepExecutionError(workflowState.WorkflowID, functionName, "cannot call GetEvent within a step") + } + stepID = workflowState.NextStepID() + + // Check if operation was already executed (only if in workflow) + checkInput := checkOperationExecutionDBInput{ + workflowID: workflowState.WorkflowID, + operationID: stepID, + functionName: functionName, + } + recordedResult, err := s.CheckOperationExecution(ctx, checkInput) + if err != nil { + return nil, err + } + if recordedResult != nil { + return recordedResult.output, recordedResult.err + } + } + + // Create notification payload and channel + payload := fmt.Sprintf("%s::%s", input.TargetWorkflowID, input.Key) + c := make(chan bool, 1) + _, loaded := s.notificationsMap.LoadOrStore(payload, c) + if loaded { + close(c) + // TODO: consider adding a specific error type for this case to help users handle it + getLogger().Error("GetEvent already called for target workflow", "target_workflow_id", input.TargetWorkflowID, "key", input.Key) + return nil, fmt.Errorf("GetEvent already called for target workflow %s and key %s", input.TargetWorkflowID, input.Key) + } + defer func() { + s.notificationsMap.Delete(payload) + close(c) + }() + + // Check if the event already exists in the database + query := `SELECT value FROM dbos.workflow_events WHERE workflow_uuid = $1 AND key = $2` + var value any + var valueString *string + + row := s.pool.QueryRow(ctx, query, input.TargetWorkflowID, input.Key) + err := row.Scan(&valueString) + if err != nil && err != pgx.ErrNoRows { + return nil, fmt.Errorf("failed to query workflow event: %w", err) + } + + if err == pgx.ErrNoRows || valueString == nil { // XXX valueString should never be `nil` + // Wait for notification with timeout + select { + case <-c: + // Received notification + case <-time.After(input.Timeout): + // Timeout reached + case <-ctx.Done(): + return nil, fmt.Errorf("context cancelled while waiting for event: %w", ctx.Err()) + } + + // Query the database again after waiting + row = s.pool.QueryRow(ctx, query, input.TargetWorkflowID, input.Key) + err = row.Scan(&valueString) + if err != nil { + if err == pgx.ErrNoRows { + value = nil // Event still doesn't exist + } else { + return nil, fmt.Errorf("failed to query workflow event after wait: %w", err) + } + } + } + + // Deserialize the value if it exists XXX valueString should never be `nil`s + if valueString != nil { + value, err = deserialize(valueString) + if err != nil { + return nil, fmt.Errorf("failed to deserialize event value: %w", err) + } + } + + // Record the operation result if this is called within a workflow + if isInWorkflow { + recordInput := recordOperationResultDBInput{ + workflowID: workflowState.WorkflowID, + operationID: stepID, + operationName: functionName, + output: value, + err: nil, + } + + err = s.RecordOperationResult(ctx, recordInput) + if err != nil { + return nil, fmt.Errorf("failed to record operation result: %w", err) + } + } + + return value, nil +} + /*******************************/ /******* QUEUES ********/ /*******************************/ diff --git a/dbos/workflow.go b/dbos/workflow.go index 0e15d7fa..fc696555 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -744,6 +744,37 @@ func Recv[R any](ctx context.Context, input WorkflowRecvInput) (R, error) { return typedMessage, nil } +type WorkflowSetEventInput struct { + Key string + Message any +} + +func SetEvent(ctx context.Context, input WorkflowSetEventInput) error { + return dbos.systemDB.SetEvent(ctx, input) +} + +type WorkflowGetEventInput struct { + TargetWorkflowID string + Key string + Timeout time.Duration +} + +func GetEvent[R any](ctx context.Context, input WorkflowGetEventInput) (R, error) { + value, err := dbos.systemDB.GetEvent(ctx, input) + if err != nil { + return *new(R), err + } + if value == nil { + return *new(R), nil + } + // Type check + typedValue, ok := value.(R) + if !ok { + return *new(R), newWorkflowUnexpectedResultType("", fmt.Sprintf("%T", new(R)), fmt.Sprintf("%T", value)) + } + return typedValue, nil +} + /***********************************/ /******* WORKFLOW MANAGEMENT *******/ /***********************************/ From d89fe165c12729234ada8f51e6593b348501f173 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 18:51:20 -0700 Subject: [PATCH 02/19] useful info --- dbos/dbos.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbos/dbos.go b/dbos/dbos.go index f56fd54d..ef3d0be7 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -211,10 +211,13 @@ func Launch() error { } // Run a round of recovery on the local executor - _, err := recoverPendingWorkflows(context.Background(), []string{_EXECUTOR_ID}) // XXX maybe use the queue runner context here to allow Shutdown to cancel it? + recoveryHandles, err := recoverPendingWorkflows(context.Background(), []string{_EXECUTOR_ID}) // XXX maybe use the queue runner context here to allow Shutdown to cancel it? if err != nil { return newInitializationError(fmt.Sprintf("failed to recover pending workflows during launch: %v", err)) } + if len(recoveryHandles) > 0 { + logger.Info("Recovered pending workflows", "count", len(recoveryHandles)) + } logger.Info("DBOS initialized", "app_version", _APP_VERSION, "executor_id", _EXECUTOR_ID) return nil From a4e57f211e2f5f57c994db53dc95dfca6f3c8cec Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 18:52:28 -0700 Subject: [PATCH 03/19] change the notification system to use a condition variable. Channels work fine work 1 consumer <-> 1 producer, but set/get event accepts multiple consumers --- dbos/system_database.go | 71 ++++++++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 29 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 742187eb..8cf28d18 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -161,13 +161,9 @@ func NewSystemDatabase(databaseURL string) (SystemDatabase, error) { config.OnNotification = func(c *pgconn.PgConn, n *pgconn.Notification) { if n.Channel == "dbos_notifications_channel" || n.Channel == "dbos_workflow_events_channel" { // Check if an entry exists in the map, indexed by the payload - // If yes, send a signal to the channel so the listener can wake up - if ch, exists := notificationsMap.Load(n.Payload); exists { - select { - case ch.(chan bool) <- true: // Send a signal to wake up the listener - default: - getLogger().Warn("notification channel for payload is full, skipping", "payload", n.Payload) - } + // If yes, broadcast on the condition variable so listeners can wake up + if cond, exists := notificationsMap.Load(n.Payload); exists { + cond.(*sync.Cond).Broadcast() } } } @@ -1143,18 +1139,17 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any // First check if there's already a receiver for this workflow/topic to avoid unnecessary database load payload := fmt.Sprintf("%s::%s", destinationID, topic) - c := make(chan bool, 1) // Make it buffered to allow the notification listener to post a signal even if the receiver has not reached its select statement yet - _, loaded := s.notificationsMap.LoadOrStore(payload, c) + cond := sync.NewCond(&sync.Mutex{}) + _, loaded := s.notificationsMap.LoadOrStore(payload, cond) if loaded { - close(c) getLogger().Error("Receive already called for workflow", "destination_id", destinationID) return nil, newWorkflowConflictIDError(destinationID) } defer func() { - // Clean up the channel after we're done + // Clean up the condition variable after we're done and broadcast to wake up any waiting goroutines // XXX We should handle panics in this function and make sure we call this. Not a problem for now as panic will crash the importing package. + cond.Broadcast() s.notificationsMap.Delete(payload) - close(c) }() // Now check if there is already a message available in the database. @@ -1166,15 +1161,23 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any return false, fmt.Errorf("failed to check message: %w", err) } if !exists { - // Listen for notifications on the channel + // Wait for notifications using condition variable with timeout pattern // XXX should we prevent zero or negative timeouts? - getLogger().Debug("Waiting for notification on channel", "payload", payload) + getLogger().Debug("Waiting for notification on condition variable", "payload", payload) + + done := make(chan struct{}) + go func() { + cond.L.Lock() + defer cond.L.Unlock() + cond.Wait() + close(done) + }() + select { - case <-c: - getLogger().Debug("Received notification on channel", "payload", payload) + case <-done: + getLogger().Debug("Received notification on condition variable", "payload", payload) case <-time.After(input.Timeout): - // If we reach the timeout, we can check if there is a message in the database, and if not return nil. - getLogger().Warn("Timeout reached for channel", "payload", payload, "timeout", input.Timeout) + getLogger().Warn("Recv() timeout reached", "payload", payload, "timeout", input.Timeout) } } @@ -1339,19 +1342,20 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp } } - // Create notification payload and channel + // Create notification payload and condition variable payload := fmt.Sprintf("%s::%s", input.TargetWorkflowID, input.Key) - c := make(chan bool, 1) - _, loaded := s.notificationsMap.LoadOrStore(payload, c) + cond := sync.NewCond(&sync.Mutex{}) + existingCond, loaded := s.notificationsMap.LoadOrStore(payload, cond) if loaded { - close(c) - // TODO: consider adding a specific error type for this case to help users handle it - getLogger().Error("GetEvent already called for target workflow", "target_workflow_id", input.TargetWorkflowID, "key", input.Key) - return nil, fmt.Errorf("GetEvent already called for target workflow %s and key %s", input.TargetWorkflowID, input.Key) + // Reuse the existing condition variable + cond = existingCond.(*sync.Cond) } + + // Defer broadcast to ensure any waiting goroutines eventually unlock defer func() { + cond.Broadcast() + // Clean up the condition variable after we're done s.notificationsMap.Delete(payload) - close(c) }() // Check if the event already exists in the database @@ -1366,12 +1370,21 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp } if err == pgx.ErrNoRows || valueString == nil { // XXX valueString should never be `nil` - // Wait for notification with timeout + // Wait for notification with timeout using condition variable + done := make(chan struct{}) + go func() { + cond.L.Lock() + defer cond.L.Unlock() + cond.Wait() + close(done) + }() + select { - case <-c: + case <-done: // Received notification case <-time.After(input.Timeout): // Timeout reached + getLogger().Warn("GetEvent() timeout reached", "target_workflow_id", input.TargetWorkflowID, "key", input.Key, "timeout", input.Timeout) case <-ctx.Done(): return nil, fmt.Errorf("context cancelled while waiting for event: %w", ctx.Err()) } @@ -1388,7 +1401,7 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp } } - // Deserialize the value if it exists XXX valueString should never be `nil`s + // Deserialize the value if it exists if valueString != nil { value, err = deserialize(valueString) if err != nil { From 473428506405afd9fd3ef5f50ce07f9db23c1cb4 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 18:52:51 -0700 Subject: [PATCH 04/19] tests for set/get events --- dbos/workflows_test.go | 400 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 358 insertions(+), 42 deletions(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 00c025ab..0e3b3554 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "strings" + "sync" "testing" "time" @@ -1085,48 +1086,6 @@ func TestSendRecv(t *testing.T) { if time.Since(start) > 5*time.Second { t.Fatalf("receive workflow took too long to complete, expected < 5s, got %v", time.Since(start)) } - - // Send and receive again the same workflows to verify idempotency - _, err = sendWf(context.Background(), sendWorkflowInput{ - DestinationID: receiveHandle.GetWorkflowID(), - Topic: "test-topic", - }, WithWorkflowID(handle.GetWorkflowID())) - if err != nil { - t.Fatalf("failed to send message with same workflow ID: %v", err) - } - receiveHandle2, err := receiveWf(context.Background(), "test-topic", WithWorkflowID(receiveHandle.GetWorkflowID())) - if err != nil { - t.Fatalf("failed to start receive workflow with same ID: %v", err) - } - result, err = receiveHandle2.GetResult(context.Background()) - if err != nil { - t.Fatalf("failed to get result from receive workflow with same ID: %v", err) - } - if result != "message1-message2-message3" { - t.Fatalf("expected received message to be 'message1-message2-message3', got '%s'", result) - } - - // Get steps for both workflows and verify we have the expected number - sendSteps, err := dbos.systemDB.GetWorkflowSteps(context.Background(), handle.GetWorkflowID()) - if err != nil { - t.Fatalf("failed to get steps for send workflow: %v", err) - } - receiveSteps, err := dbos.systemDB.GetWorkflowSteps(context.Background(), receiveHandle.GetWorkflowID()) - if err != nil { - t.Fatalf("failed to get steps for receive workflow: %v", err) - } - - // Verify the number of steps matches the number of send() and recv() calls - // sendWorkflow has 3 Send() calls, receiveWorkflow has 3 Recv() calls - expectedSendSteps := 3 - expectedReceiveSteps := 3 - - if len(sendSteps) != expectedSendSteps { - t.Fatalf("expected %d steps in send workflow, got %d", expectedSendSteps, len(sendSteps)) - } - if len(receiveSteps) != expectedReceiveSteps { - t.Fatalf("expected %d steps in receive workflow, got %d", expectedReceiveSteps, len(receiveSteps)) - } }) t.Run("SendRecvCustomStruct", func(t *testing.T) { @@ -1317,3 +1276,360 @@ func TestSendRecv(t *testing.T) { } }) } + +var ( + setEventWf = WithWorkflow(setEventWorkflow) + getEventWf = WithWorkflow(getEventWorkflow) + setTwoEventsWf = WithWorkflow(setTwoEventsWorkflow) + setEventIdempotencyWf = WithWorkflow(setEventIdempotencyWorkflow) + getEventIdempotencyWf = WithWorkflow(getEventIdempotencyWorkflow) + setEventIdempotencyEvent = NewEvent() + getEventStartIdempotencyEvent = NewEvent() + getEventStopIdempotencyEvent = NewEvent() + setSecondEventSignal = NewEvent() +) + +type setEventWorkflowInput struct { + Key string + Message string +} + +func setEventWorkflow(ctx context.Context, input setEventWorkflowInput) (string, error) { + err := SetEvent(ctx, WorkflowSetEventInput{Key: input.Key, Message: input.Message}) + if err != nil { + return "", err + } + return "event-set", nil +} + +func getEventWorkflow(ctx context.Context, input setEventWorkflowInput) (string, error) { + result, err := GetEvent[string](ctx, WorkflowGetEventInput{ + TargetWorkflowID: input.Key, // Reusing Key field as target workflow ID + Key: input.Message, // Reusing Message field as event key + Timeout: 3 * time.Second, + }) + if err != nil { + return "", err + } + return result, nil +} + +func setTwoEventsWorkflow(ctx context.Context, input setEventWorkflowInput) (string, error) { + // Set the first event + err := SetEvent(ctx, WorkflowSetEventInput{Key: "event1", Message: "first-event-message"}) + if err != nil { + return "", err + } + + // Wait for external signal before setting the second event + setSecondEventSignal.Wait() + + // Set the second event + err = SetEvent(ctx, WorkflowSetEventInput{Key: "event2", Message: "second-event-message"}) + if err != nil { + return "", err + } + + return "two-events-set", nil +} + +func setEventIdempotencyWorkflow(ctx context.Context, input setEventWorkflowInput) (string, error) { + err := SetEvent(ctx, WorkflowSetEventInput{Key: input.Key, Message: input.Message}) + if err != nil { + return "", err + } + setEventIdempotencyEvent.Wait() + return "idempotent-set-completed", nil +} + +func getEventIdempotencyWorkflow(ctx context.Context, input setEventWorkflowInput) (string, error) { + result, err := GetEvent[string](ctx, WorkflowGetEventInput{ + TargetWorkflowID: input.Key, + Key: input.Message, + Timeout: 3 * time.Second, + }) + if err != nil { + return "", err + } + getEventStartIdempotencyEvent.Set() + getEventStopIdempotencyEvent.Wait() + return result, nil +} + +func TestSetGetEvent(t *testing.T) { + setupDBOS(t) + + t.Run("SetGetEventFromWorkflow", func(t *testing.T) { + // Clear the signal event before starting + setSecondEventSignal.Clear() + + // Start the workflow that sets two events + setHandle, err := setTwoEventsWf(context.Background(), setEventWorkflowInput{ + Key: "test-workflow", + Message: "unused", + }) + if err != nil { + t.Fatalf("failed to start set two events workflow: %v", err) + } + + // Start a workflow to get the first event + getFirstEventHandle, err := getEventWf(context.Background(), setEventWorkflowInput{ + Key: setHandle.GetWorkflowID(), // Target workflow ID + Message: "event1", // Event key + }) + if err != nil { + t.Fatalf("failed to start get first event workflow: %v", err) + } + + // Verify we can get the first event + firstMessage, err := getFirstEventHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result from first event workflow: %v", err) + } + if firstMessage != "first-event-message" { + t.Fatalf("expected first message to be 'first-event-message', got '%s'", firstMessage) + } + + // Signal the workflow to set the second event + setSecondEventSignal.Set() + + // Start a workflow to get the second event + getSecondEventHandle, err := getEventWf(context.Background(), setEventWorkflowInput{ + Key: setHandle.GetWorkflowID(), // Target workflow ID + Message: "event2", // Event key + }) + if err != nil { + t.Fatalf("failed to start get second event workflow: %v", err) + } + + // Verify we can get the second event + secondMessage, err := getSecondEventHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result from second event workflow: %v", err) + } + if secondMessage != "second-event-message" { + t.Fatalf("expected second message to be 'second-event-message', got '%s'", secondMessage) + } + + // Wait for the workflow to complete + result, err := setHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result from set two events workflow: %v", err) + } + if result != "two-events-set" { + t.Fatalf("expected result to be 'two-events-set', got '%s'", result) + } + }) + + t.Run("GetEventFromOutsideWorkflow", func(t *testing.T) { + // Start a workflow that sets an event + setHandle, err := setEventWf(context.Background(), setEventWorkflowInput{ + Key: "test-key", + Message: "test-message", + }) + if err != nil { + t.Fatalf("failed to start set event workflow: %v", err) + } + + // Wait for the event to be set + _, err = setHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result from set event workflow: %v", err) + } + + // Start a workflow that gets the event from outside the original workflow + message, err := GetEvent[string](context.Background(), WorkflowGetEventInput{ + TargetWorkflowID: setHandle.GetWorkflowID(), + Key: "test-key", + Timeout: 3 * time.Second, + }) + if err != nil { + t.Fatalf("failed to get event from outside workflow: %v", err) + } + if message != "test-message" { + t.Fatalf("expected received message to be 'test-message', got '%s'", message) + } + }) + + t.Run("GetEventTimeout", func(t *testing.T) { + // Try to get an event from a non-existent workflow + nonExistentID := uuid.NewString() + message, err := GetEvent[string](context.Background(), WorkflowGetEventInput{ + TargetWorkflowID: nonExistentID, + Key: "test-key", + Timeout: 3 * time.Second, + }) + if err != nil { + t.Fatal("failed to get event from non-existent workflow:", err) + } + if message != "" { + t.Fatalf("expected empty result on timeout, got '%s'", message) + } + + // Try to get an event from an existing workflow but with a key that doesn't exist + setHandle, err := setEventWf(context.Background(), setEventWorkflowInput{ + Key: "test-key", + Message: "test-message", + }) + if err != nil { + t.Fatal("failed to set event:", err) + } + _, err = setHandle.GetResult(context.Background()) + if err != nil { + t.Fatal("failed to get result from set event workflow:", err) + } + message, err = GetEvent[string](context.Background(), WorkflowGetEventInput{ + TargetWorkflowID: setHandle.GetWorkflowID(), + Key: "non-existent-key", + Timeout: 3 * time.Second, + }) + if err != nil { + t.Fatal("failed to get event with non-existent key:", err) + } + if message != "" { + t.Fatalf("expected empty result on timeout with non-existent key, got '%s'", message) + } + }) + + t.Run("SetGetEventMustRunInsideWorkflows", func(t *testing.T) { + ctx := context.Background() + + // Attempt to run SetEvent outside of a workflow context + err := SetEvent(ctx, WorkflowSetEventInput{Key: "test-key", Message: "test-message"}) + if err == nil { + t.Fatal("expected error when running SetEvent outside of workflow context, but got none") + } + + // Check the error type + dbosErr, ok := err.(*DBOSError) + if !ok { + t.Fatalf("expected error to be of type *DBOSError, got %T", err) + } + + if dbosErr.Code != StepExecutionError { + t.Fatalf("expected error code to be StepExecutionError, got %v", dbosErr.Code) + } + + // Test the specific message from the error + expectedMessagePart := "workflow state not found in context: are you running this step within a workflow?" + if !strings.Contains(err.Error(), expectedMessagePart) { + t.Fatalf("expected error message to contain %q, but got %q", expectedMessagePart, err.Error()) + } + }) + + t.Run("SetGetEventIdempotency", func(t *testing.T) { + // Start the set event workflow + setHandle, err := setEventIdempotencyWf(context.Background(), setEventWorkflowInput{ + Key: "idempotency-key", + Message: "idempotency-message", + }) + if err != nil { + t.Fatalf("failed to start set event idempotency workflow: %v", err) + } + + // Start the get event workflow + getHandle, err := getEventIdempotencyWf(context.Background(), setEventWorkflowInput{ + Key: setHandle.GetWorkflowID(), + Message: "idempotency-key", + }) + if err != nil { + t.Fatalf("failed to start get event idempotency workflow: %v", err) + } + + // Wait for the get event workflow to signal it has received the event + getEventStartIdempotencyEvent.Wait() + + // Attempt recovering both workflows. Each should have exactly 1 step. + recoveredHandles, err := recoverPendingWorkflows(context.Background(), []string{"local"}) + if err != nil { + t.Fatalf("failed to recover pending workflows: %v", err) + } + if len(recoveredHandles) != 2 { + t.Fatalf("expected 2 recovered handles, got %d", len(recoveredHandles)) + } + + // Verify step counts + setSteps, err := dbos.systemDB.GetWorkflowSteps(context.Background(), setHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get steps for set event idempotency workflow: %v", err) + } + if len(setSteps) != 1 { + t.Fatalf("expected 1 step in set event idempotency workflow, got %d", len(setSteps)) + } + + getSteps, err := dbos.systemDB.GetWorkflowSteps(context.Background(), getHandle.GetWorkflowID()) + if err != nil { + t.Fatalf("failed to get steps for get event idempotency workflow: %v", err) + } + if len(getSteps) != 1 { + t.Fatalf("expected 1 step in get event idempotency workflow, got %d", len(getSteps)) + } + + // Complete the workflows + setEventIdempotencyEvent.Set() + getEventStopIdempotencyEvent.Set() + + setResult, err := setHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result from set event idempotency workflow: %v", err) + } + if setResult != "idempotent-set-completed" { + t.Fatalf("expected result to be 'idempotent-set-completed', got '%s'", setResult) + } + + getResult, err := getHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result from get event idempotency workflow: %v", err) + } + if getResult != "idempotency-message" { + t.Fatalf("expected result to be 'idempotency-message', got '%s'", getResult) + } + }) + + t.Run("ConcurrentGetEvent", func(t *testing.T) { + // Set event + setHandle, err := setEventWf(context.Background(), setEventWorkflowInput{ + Key: "concurrent-event-key", + Message: "concurrent-event-message", + }) + if err != nil { + t.Fatalf("failed to start set event workflow: %v", err) + } + + // Wait for the set event workflow to complete + _, err = setHandle.GetResult(context.Background()) + if err != nil { + t.Fatalf("failed to get result from set event workflow: %v", err) + } + // Start a few goroutines that'll concurrently get the event + numGoroutines := 5 + var wg sync.WaitGroup + errors := make(chan error, numGoroutines) + wg.Add(numGoroutines) + for range numGoroutines { + go func() { + defer wg.Done() + res, err := GetEvent[string](context.Background(), WorkflowGetEventInput{ + TargetWorkflowID: setHandle.GetWorkflowID(), + Key: "concurrent-event-key", + Timeout: 10 * time.Second, + }) + if err != nil { + errors <- fmt.Errorf("failed to get event in goroutine: %v", err) + return + } + if res != "concurrent-event-message" { + errors <- fmt.Errorf("expected result in goroutine to be 'concurrent-event-message', got '%s'", res) + return + } + }() + } + wg.Wait() + close(errors) + + // Check for any errors from goroutines + for err := range errors { + t.Fatal(err) + } + }) +} From dec1e915d2977b92924de3545e9a6ffb697fd62c Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 20:17:00 -0700 Subject: [PATCH 05/19] add a test for concurrent receive error behavior --- dbos/workflows_test.go | 109 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 0e3b3554..74e151a0 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -970,6 +970,7 @@ func TestScheduledWorkflows(t *testing.T) { var ( sendWf = WithWorkflow(sendWorkflow) receiveWf = WithWorkflow(receiveWorkflow) + receiveWfCoordinated = WithWorkflow(receiveWorkflowCoordinated) sendStructWf = WithWorkflow(sendStructWorkflow) receiveStructWf = WithWorkflow(receiveStructWorkflow) sendIdempotencyWf = WithWorkflow(sendIdempotencyWorkflow) @@ -977,6 +978,9 @@ var ( recvIdempotencyWf = WithWorkflow(receiveIdempotencyWorkflow) receiveIdempotencyStartEvent = NewEvent() receiveIdempotencyStopEvent = NewEvent() + numConcurrentRecvWfs = 5 + concurrentRecvReadyEvents = make([]*Event, numConcurrentRecvWfs) + concurrentRecvStartEvent = NewEvent() ) type sendWorkflowInput struct { @@ -1018,6 +1022,26 @@ func receiveWorkflow(ctx context.Context, topic string) (string, error) { return msg1 + "-" + msg2 + "-" + msg3, nil } +func receiveWorkflowCoordinated(ctx context.Context, input struct { + Topic string + i int +}) (string, error) { + // Signal that this workflow has started and is ready + concurrentRecvReadyEvents[input.i].Set() + + // Wait for the coordination event before starting to receive + + concurrentRecvStartEvent.Wait() + + // Do a single Recv call with timeout + msg, err := Recv[string](ctx, WorkflowRecvInput{Topic: input.Topic, Timeout: 3 * time.Second}) + fmt.Println(err) + if err != nil { + return "", err + } + return msg, nil +} + func sendStructWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) { testStruct := sendRecvType{Value: "test-struct-value"} err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: testStruct}) @@ -1275,6 +1299,91 @@ func TestSendRecv(t *testing.T) { t.Fatalf("expected result to be 'idempotent-send-completed', got '%s'", result) } }) + + t.Run("ConcurrentRecv", func(t *testing.T) { + // Test concurrent receivers - only 1 should timeout, others should get errors + receiveTopic := "concurrent-recv-topic" + + // Start multiple concurrent receive workflows - no messages will be sent + numReceivers := 5 + var wg sync.WaitGroup + results := make(chan string, numReceivers) + errors := make(chan error, numReceivers) + receiverHandles := make([]WorkflowHandle[string], numReceivers) + + // Start all receivers - they will signal when ready and wait for coordination + for i := range numReceivers { + concurrentRecvReadyEvents[i] = NewEvent() + receiveHandle, err := receiveWfCoordinated(context.Background(), struct { + Topic string + i int + }{ + Topic: receiveTopic, + i: i, + }, WithWorkflowID("concurrent-recv-wfid")) + if err != nil { + t.Fatalf("failed to start receive workflow %d: %v", i, err) + } + receiverHandles[i] = receiveHandle + } + + // Wait for all workflows to signal they are ready + for i := range numReceivers { + concurrentRecvReadyEvents[i].Wait() + } + + // Now unblock all receivers simultaneously so they race to the Recv call + concurrentRecvStartEvent.Set() + + // Collect results from all receivers concurrently + // Only 1 should timeout (winner of the CV), others should get errors + wg.Add(numReceivers) + for i := range numReceivers { + go func(index int) { + defer wg.Done() + result, err := receiverHandles[index].GetResult(context.Background()) + if err != nil { + errors <- err + } else { + results <- result + } + }(i) + } + + wg.Wait() + close(results) + close(errors) + + // Count timeout results and errors + timeoutCount := 0 + errorCount := 0 + + for result := range results { + if result == "" { + // Empty string indicates a timeout - only 1 receiver should get this + timeoutCount++ + } + } + + for err := range errors { + t.Logf("Receiver error (expected): %v", err) + errorCount++ + } + + // Verify that exactly 1 receiver timed out and 4 got errors + if timeoutCount != 1 { + t.Fatalf("expected exactly 1 receiver to timeout, got %d timeouts", timeoutCount) + } + + if errorCount != 4 { + t.Fatalf("expected exactly 4 receivers to get errors, got %d errors", errorCount) + } + + // Ensure total results match expected + if timeoutCount+errorCount != numReceivers { + t.Fatalf("expected total results (%d) to equal number of receivers (%d)", timeoutCount+errorCount, numReceivers) + } + }) } var ( From 4ca8b7e88637990695083fb8fe2272b232a11839 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 20:30:40 -0700 Subject: [PATCH 06/19] comment to clarify the durable check --- dbos/system_database.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbos/system_database.go b/dbos/system_database.go index 8cf28d18..f479765e 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1038,6 +1038,7 @@ func (s *systemDatabase) Send(ctx context.Context, input WorkflowSendInput) erro return err } if recordedResult != nil { + // when hitting this case, recordedResult will be &{ } return nil } @@ -1269,6 +1270,7 @@ func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInp return err } if recordedResult != nil { + // when hitting this case, recordedResult will be &{ } return nil } From 0bfabde114899a99ddcf352a74ba57638c58a52b Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 20:40:31 -0700 Subject: [PATCH 07/19] fix txn usage in Recv() --- dbos/system_database.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index f479765e..51d37538 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1113,19 +1113,12 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any topic = input.Topic } - tx, err := s.pool.Begin(ctx) - if err != nil { - return nil, fmt.Errorf("failed to begin transaction: %w", err) - } - defer tx.Rollback(ctx) - // Check if operation was already executed // XXX this might not need to be in the transaction checkInput := checkOperationExecutionDBInput{ workflowID: destinationID, operationID: stepID, functionName: functionName, - tx: tx, } recordedResult, err := s.CheckOperationExecution(ctx, checkInput) if err != nil { @@ -1157,7 +1150,7 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any // If not, we'll wait for a notification and timeout var exists bool query := `SELECT EXISTS (SELECT 1 FROM dbos.notifications WHERE destination_uuid = $1 AND topic = $2)` - err = tx.QueryRow(ctx, query, destinationID, topic).Scan(&exists) + err = s.pool.QueryRow(ctx, query, destinationID, topic).Scan(&exists) if err != nil { return false, fmt.Errorf("failed to check message: %w", err) } @@ -1183,6 +1176,11 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any } // Find the oldest message and delete it atomically + tx, err := s.pool.Begin(ctx) + if err != nil { + return nil, fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback(ctx) query = ` WITH oldest_entry AS ( SELECT destination_uuid, topic, message, created_at_epoch_ms From 0c527a90425a0566df6b0ea5c466556785c157b1 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 20:52:07 -0700 Subject: [PATCH 08/19] should wait --- dbos/workflows_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 74e151a0..e8d678ea 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -1647,6 +1647,7 @@ func TestSetGetEvent(t *testing.T) { // Wait for the get event workflow to signal it has received the event getEventStartIdempotencyEvent.Wait() + getEventStartIdempotencyEvent.Clear() // Attempt recovering both workflows. Each should have exactly 1 step. recoveredHandles, err := recoverPendingWorkflows(context.Background(), []string{"local"}) @@ -1657,6 +1658,8 @@ func TestSetGetEvent(t *testing.T) { t.Fatalf("expected 2 recovered handles, got %d", len(recoveredHandles)) } + getEventStartIdempotencyEvent.Wait() + // Verify step counts setSteps, err := dbos.systemDB.GetWorkflowSteps(context.Background(), setHandle.GetWorkflowID()) if err != nil { From b2fc931c13449c40a0bf953cc3f8cfa6d54e4c13 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 21:07:37 -0700 Subject: [PATCH 09/19] do no export process config --- dbos/dbos.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbos/dbos.go b/dbos/dbos.go index ef3d0be7..06daa504 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -71,11 +71,11 @@ type Config struct { AdminServer bool } -// ProcessConfig merges configuration from two sources in order of precedence: +// processConfig merges configuration from two sources in order of precedence: // 1. programmatic configuration // 2. environment variables // Finally, it applies default values if needed. -func ProcessConfig(inputConfig *Config) (*Config, error) { +func processConfig(inputConfig *Config) (*Config, error) { // First check required fields if len(inputConfig.DatabaseURL) == 0 { return nil, fmt.Errorf("missing required config field: databaseURL") @@ -133,7 +133,7 @@ func Initialize(inputConfig Config) error { } // Load & process the configuration - config, err := ProcessConfig(&inputConfig) + config, err := processConfig(&inputConfig) if err != nil { return newInitializationError(err.Error()) } From f4a64a5aa452f5c66363e23bf6576babeb24a548 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 21:08:12 -0700 Subject: [PATCH 10/19] add a GetWorkflowID package function that accepts a contexdt and returns a workflow ID if any --- dbos/system_database.go | 8 ++++---- dbos/workflow.go | 23 ++++++++++++++++------- dbos/workflows_test.go | 41 ++++++++++++++++++++--------------------- 3 files changed, 40 insertions(+), 32 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 51d37538..63ccab60 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1009,7 +1009,7 @@ func (s *systemDatabase) Send(ctx context.Context, input WorkflowSendInput) erro functionName := "DBOS.send" // Get workflow state from context - workflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) + workflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) if !ok || workflowState == nil { return newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?") } @@ -1095,7 +1095,7 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any // Get workflow state from context // XXX these checks might be better suited for outside of the system db code. We'll see when we implement the client. - workflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) + workflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) if !ok || workflowState == nil { return nil, newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?") } @@ -1239,7 +1239,7 @@ func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInp functionName := "DBOS.setEvent" // Get workflow state from context - workflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) + workflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) if !ok || workflowState == nil { return newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?") } @@ -1316,7 +1316,7 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp functionName := "DBOS.getEvent" // Get workflow state from context (optional for GetEvent as we can get an event from outside a workflow) - workflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) + workflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) var stepID int var isInWorkflow bool diff --git a/dbos/workflow.go b/dbos/workflow.go index fc696555..475c715d 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -98,7 +98,7 @@ func (h *workflowHandle[R]) GetResult(ctx context.Context) (R, error) { return *new(R), errors.New("workflow result channel is already closed. Did you call GetResult() twice on the same workflow handle?") } // If we are calling GetResult inside a workflow, record the result as a step result - parentWorkflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) + parentWorkflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) isChildWorkflow := ok && parentWorkflowState != nil if isChildWorkflow { encodedOutput, encErr := serialize(outcome.result) @@ -153,7 +153,7 @@ func (h *workflowPollingHandle[R]) GetResult(ctx context.Context) (R, error) { return *new(R), newWorkflowUnexpectedResultType(h.workflowID, fmt.Sprintf("%T", new(R)), fmt.Sprintf("%T", result)) } // If we are calling GetResult inside a workflow, record the result as a step result - parentWorkflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) + parentWorkflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) isChildWorkflow := ok && parentWorkflowState != nil if isChildWorkflow { encodedOutput, encErr := serialize(typedResult) @@ -334,7 +334,7 @@ func WithWorkflow[P any, R any](fn WorkflowFunc[P, R], opts ...workflowRegistrat type contextKey string // TODO this should be a private type, once we have proper getter for a workflow state -const WorkflowStateKey contextKey = "workflowState" +const workflowStateKey contextKey = "workflowState" type WorkflowFunc[P any, R any] func(ctx context.Context, input P) (R, error) type WorkflowWrapperFunc[P any, R any] func(ctx context.Context, input P, opts ...workflowOption) (WorkflowHandle[R], error) @@ -399,7 +399,7 @@ func runAsWorkflow[P any, R any](ctx context.Context, fn WorkflowFunc[P, R], inp dbosWorkflowContext := context.Background() // Check if we are within a workflow (and thus a child workflow) - parentWorkflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) + parentWorkflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) isChildWorkflow := ok && parentWorkflowState != nil // TODO Check if cancelled @@ -516,7 +516,7 @@ func runAsWorkflow[P any, R any](ctx context.Context, fn WorkflowFunc[P, R], inp } // Run the function in a goroutine - augmentUserContext := context.WithValue(ctx, WorkflowStateKey, workflowState) + augmentUserContext := context.WithValue(ctx, workflowStateKey, workflowState) go func() { result, err := fn(augmentUserContext, input) status := WorkflowStatusSuccess @@ -614,7 +614,7 @@ func RunAsStep[P any, R any](ctx context.Context, fn StepFunc[P, R], input P, op } // Get workflow state from context - workflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) + workflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) if !ok || workflowState == nil { return *new(R), newStepExecutionError("", operationName, "workflow state not found in context: are you running this step within a workflow?") } @@ -646,7 +646,7 @@ func RunAsStep[P any, R any](ctx context.Context, fn StepFunc[P, R], input P, op stepCounter: workflowState.stepCounter, isWithinStep: true, } - stepCtx := context.WithValue(ctx, WorkflowStateKey, &stepState) + stepCtx := context.WithValue(ctx, workflowStateKey, &stepState) stepOutput, stepError := fn(stepCtx, input) @@ -779,6 +779,15 @@ func GetEvent[R any](ctx context.Context, input WorkflowGetEventInput) (R, error /******* WORKFLOW MANAGEMENT *******/ /***********************************/ +// GetWorkflowID retrieves the workflow ID from the context if called within a DBOS workflow +func GetWorkflowID(ctx context.Context) (string, error) { + workflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) + if !ok || workflowState == nil { + return "", errors.New("not within a DBOS workflow context") + } + return workflowState.WorkflowID, nil +} + func RetrieveWorkflow[R any](workflowID string) (workflowPollingHandle[R], error) { ctx := context.Background() workflowStatus, err := dbos.systemDB.ListWorkflows(ctx, listWorkflowsDBInput{ diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index e8d678ea..75c3c1a2 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -455,24 +455,23 @@ func TestSteps(t *testing.T) { var ( childWf = WithWorkflow(func(ctx context.Context, i int) (string, error) { - workflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) - if !ok { - return "", fmt.Errorf("workflow state not found in context") + workflowID, err := GetWorkflowID(ctx) + if err != nil { + return "", fmt.Errorf("failed to get workflow ID: %v", err) } - fmt.Println("childWf workflow state:", workflowState) - expectedCurrentID := fmt.Sprintf("%s-%d", workflowState.WorkflowID, i) - if workflowState.WorkflowID != expectedCurrentID { - return "", fmt.Errorf("expected parentWf workflow ID to be %s, got %s", expectedCurrentID, workflowState.WorkflowID) + expectedCurrentID := fmt.Sprintf("%s-%d", workflowID, i) + if workflowID != expectedCurrentID { + return "", fmt.Errorf("expected parentWf workflow ID to be %s, got %s", expectedCurrentID, workflowID) } // XXX right now the steps of a child workflow start with an incremented step ID, because the first step ID is allocated to the child workflow return RunAsStep(ctx, simpleStep, "") }) parentWf = WithWorkflow(func(ctx context.Context, i int) (string, error) { - workflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) - if !ok { - return "", fmt.Errorf("workflow state not found in context") + workflowID, err := GetWorkflowID(ctx) + if err != nil { + return "", fmt.Errorf("failed to get workflow ID: %v", err) } - fmt.Println("parentWf workflow state:", workflowState) + fmt.Println("parentWf workflow ID:", workflowID) childHandle, err := childWf(ctx, i) if err != nil { @@ -480,14 +479,14 @@ var ( } // Check this wf ID is built correctly - expectedParentID := fmt.Sprintf("%s-%d", workflowState.WorkflowID, i) - if workflowState.WorkflowID != expectedParentID { - return "", fmt.Errorf("expected parentWf workflow ID to be %s, got %s", expectedParentID, workflowState.WorkflowID) + expectedParentID := fmt.Sprintf("%s-%d", workflowID, i) + if workflowID != expectedParentID { + return "", fmt.Errorf("expected parentWf workflow ID to be %s, got %s", expectedParentID, workflowID) } // Verify child workflow ID follows the pattern: parentID-functionID childWorkflowID := childHandle.GetWorkflowID() - expectedChildID := fmt.Sprintf("%s-%d", workflowState.WorkflowID, i) + expectedChildID := fmt.Sprintf("%s-%d", workflowID, i) if childWorkflowID != expectedChildID { return "", fmt.Errorf("expected childWf ID to be %s, got %s", expectedChildID, childWorkflowID) } @@ -495,11 +494,11 @@ var ( }) grandParentWf = WithWorkflow(func(ctx context.Context, _ string) (string, error) { for i := range 3 { - workflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) - if !ok { - return "", fmt.Errorf("workflow state not found in context") + workflowID, err := GetWorkflowID(ctx) + if err != nil { + return "", fmt.Errorf("failed to get workflow ID: %v", err) } - fmt.Println("grandParentWf workflow state:", workflowState) + fmt.Println("grandParentWf workflow ID:", workflowID) childHandle, err := parentWf(ctx, i) if err != nil { @@ -507,14 +506,14 @@ var ( } // The handle should a direct handle - _, ok = childHandle.(*workflowHandle[string]) + _, ok := childHandle.(*workflowHandle[string]) if !ok { return "", fmt.Errorf("expected childHandle to be of type *workflowHandle[string], got %T", childHandle) } // Verify child workflow ID follows the pattern: parentID-functionID childWorkflowID := childHandle.GetWorkflowID() - expectedPrefix := fmt.Sprintf("%s-%d", workflowState.WorkflowID, i) + expectedPrefix := fmt.Sprintf("%s-%d", workflowID, i) if childWorkflowID != expectedPrefix { return "", fmt.Errorf("expected parentWf workflow ID to be %s, got %s", expectedPrefix, childWorkflowID) } From f38ea48275c75f76908377823ddba5cd2c776524 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 21:14:31 -0700 Subject: [PATCH 11/19] operation -> step --- dbos/system_database.go | 127 ++++++++++++++++++++-------------------- dbos/workflow.go | 40 ++++++------- 2 files changed, 84 insertions(+), 83 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 63ccab60..03cec832 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -646,12 +646,12 @@ func (s *systemDatabase) AwaitWorkflowResult(ctx context.Context, workflowID str } type recordOperationResultDBInput struct { - workflowID string - operationID int - operationName string - output any - err error - tx pgx.Tx + workflowID string + stepID int + stepName string + output any + err error + tx pgx.Tx } func (s *systemDatabase) RecordOperationResult(ctx context.Context, input recordOperationResultDBInput) error { @@ -675,18 +675,18 @@ func (s *systemDatabase) RecordOperationResult(ctx context.Context, input record if input.tx != nil { commandTag, err = input.tx.Exec(ctx, query, input.workflowID, - input.operationID, + input.stepID, outputString, errorString, - input.operationName, + input.stepName, ) } else { commandTag, err = s.pool.Exec(ctx, query, input.workflowID, - input.operationID, + input.stepID, outputString, errorString, - input.operationName, + input.stepName, ) } @@ -716,8 +716,8 @@ func (s *systemDatabase) RecordOperationResult(ctx context.Context, input record type recordChildWorkflowDBInput struct { parentWorkflowID string childWorkflowID string - functionID int - functionName string + stepID int + stepName string tx pgx.Tx } @@ -732,15 +732,15 @@ func (s *systemDatabase) RecordChildWorkflow(ctx context.Context, input recordCh if input.tx != nil { commandTag, err = input.tx.Exec(ctx, query, input.parentWorkflowID, - input.functionID, - input.functionName, + input.stepID, + input.stepName, input.childWorkflowID, ) } else { commandTag, err = s.pool.Exec(ctx, query, input.parentWorkflowID, - input.functionID, - input.functionName, + input.stepID, + input.stepName, input.childWorkflowID, ) } @@ -750,7 +750,7 @@ func (s *systemDatabase) RecordChildWorkflow(ctx context.Context, input recordCh if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == "23505" { return fmt.Errorf( "child workflow %s already registered for parent workflow %s (operation ID: %d)", - input.childWorkflowID, input.parentWorkflowID, input.functionID) + input.childWorkflowID, input.parentWorkflowID, input.stepID) } return fmt.Errorf("failed to record child workflow: %w", err) } @@ -782,7 +782,7 @@ func (s *systemDatabase) CheckChildWorkflow(ctx context.Context, workflowID stri type recordChildGetResultDBInput struct { parentWorkflowID string childWorkflowID string - operationID int + stepID int output string err error } @@ -801,7 +801,7 @@ func (s *systemDatabase) RecordChildGetResult(ctx context.Context, input recordC _, err := s.pool.Exec(ctx, query, input.parentWorkflowID, - input.operationID, + input.stepID, "DBOS.getResult", input.output, errorString, @@ -823,10 +823,10 @@ type recordedResult struct { } type checkOperationExecutionDBInput struct { - workflowID string - operationID int - functionName string - tx pgx.Tx + workflowID string + stepID int + stepName string + tx pgx.Tx } func (s *systemDatabase) CheckOperationExecution(ctx context.Context, input checkOperationExecutionDBInput) (*recordedResult, error) { @@ -848,7 +848,7 @@ func (s *systemDatabase) CheckOperationExecution(ctx context.Context, input chec workflowStatusQuery := `SELECT status FROM dbos.workflow_status WHERE workflow_uuid = $1` // Second query: Retrieve operation outputs if they exist - operationOutputQuery := `SELECT output, error, function_name + stepOutputQuery := `SELECT output, error, function_name FROM dbos.operation_outputs WHERE workflow_uuid = $1 AND function_id = $2` @@ -873,7 +873,7 @@ func (s *systemDatabase) CheckOperationExecution(ctx context.Context, input chec var errorStr *string var recordedFunctionName string - err = tx.QueryRow(ctx, operationOutputQuery, input.workflowID, input.operationID).Scan(&outputString, &errorStr, &recordedFunctionName) + err = tx.QueryRow(ctx, stepOutputQuery, input.workflowID, input.stepID).Scan(&outputString, &errorStr, &recordedFunctionName) // If there are no operation outputs, return nil if err != nil { @@ -884,8 +884,8 @@ func (s *systemDatabase) CheckOperationExecution(ctx context.Context, input chec } // If the provided and recorded function name are different, throw an exception - if input.functionName != recordedFunctionName { - return nil, newUnexpectedStepError(input.workflowID, input.operationID, input.functionName, recordedFunctionName) + if input.stepName != recordedFunctionName { + return nil, newUnexpectedStepError(input.workflowID, input.stepID, input.stepName, recordedFunctionName) } output, err := deserialize(outputString) @@ -1028,10 +1028,10 @@ func (s *systemDatabase) Send(ctx context.Context, input WorkflowSendInput) erro // Check if operation was already executed and do nothing if so checkInput := checkOperationExecutionDBInput{ - workflowID: workflowState.WorkflowID, - operationID: stepID, - functionName: functionName, - tx: tx, + workflowID: workflowState.WorkflowID, + stepID: stepID, + stepName: functionName, + tx: tx, } recordedResult, err := s.CheckOperationExecution(ctx, checkInput) if err != nil { @@ -1068,12 +1068,12 @@ func (s *systemDatabase) Send(ctx context.Context, input WorkflowSendInput) erro // Record the operation result recordInput := recordOperationResultDBInput{ - workflowID: workflowState.WorkflowID, - operationID: stepID, - operationName: functionName, - output: nil, - err: nil, - tx: tx, + workflowID: workflowState.WorkflowID, + stepID: stepID, + stepName: functionName, + output: nil, + err: nil, + tx: tx, } err = s.RecordOperationResult(ctx, recordInput) @@ -1116,15 +1116,16 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any // Check if operation was already executed // XXX this might not need to be in the transaction checkInput := checkOperationExecutionDBInput{ - workflowID: destinationID, - operationID: stepID, - functionName: functionName, + workflowID: destinationID, + stepID: stepID, + stepName: functionName, } recordedResult, err := s.CheckOperationExecution(ctx, checkInput) if err != nil { return nil, fmt.Errorf("failed to check operation execution: %w", err) } if recordedResult != nil { + // XXX should we simply return recordedResult.output, recordedResult.err? if recordedResult.output != nil { return recordedResult.output, nil } @@ -1217,11 +1218,11 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any // Record the operation result recordInput := recordOperationResultDBInput{ - workflowID: destinationID, - operationID: stepID, - operationName: functionName, - output: message, - tx: tx, + workflowID: destinationID, + stepID: stepID, + stepName: functionName, + output: message, + tx: tx, } err = s.RecordOperationResult(ctx, recordInput) if err != nil { @@ -1258,10 +1259,10 @@ func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInp // Check if operation was already executed and do nothing if so checkInput := checkOperationExecutionDBInput{ - workflowID: workflowState.WorkflowID, - operationID: stepID, - functionName: functionName, - tx: tx, + workflowID: workflowState.WorkflowID, + stepID: stepID, + stepName: functionName, + tx: tx, } recordedResult, err := s.CheckOperationExecution(ctx, checkInput) if err != nil { @@ -1291,12 +1292,12 @@ func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInp // Record the operation result recordInput := recordOperationResultDBInput{ - workflowID: workflowState.WorkflowID, - operationID: stepID, - operationName: functionName, - output: nil, - err: nil, - tx: tx, + workflowID: workflowState.WorkflowID, + stepID: stepID, + stepName: functionName, + output: nil, + err: nil, + tx: tx, } err = s.RecordOperationResult(ctx, recordInput) @@ -1329,9 +1330,9 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp // Check if operation was already executed (only if in workflow) checkInput := checkOperationExecutionDBInput{ - workflowID: workflowState.WorkflowID, - operationID: stepID, - functionName: functionName, + workflowID: workflowState.WorkflowID, + stepID: stepID, + stepName: functionName, } recordedResult, err := s.CheckOperationExecution(ctx, checkInput) if err != nil { @@ -1412,11 +1413,11 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp // Record the operation result if this is called within a workflow if isInWorkflow { recordInput := recordOperationResultDBInput{ - workflowID: workflowState.WorkflowID, - operationID: stepID, - operationName: functionName, - output: value, - err: nil, + workflowID: workflowState.WorkflowID, + stepID: stepID, + stepName: functionName, + output: value, + err: nil, } err = s.RecordOperationResult(ctx, recordInput) diff --git a/dbos/workflow.go b/dbos/workflow.go index 475c715d..9ca9ac8e 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -108,7 +108,7 @@ func (h *workflowHandle[R]) GetResult(ctx context.Context) (R, error) { recordGetResultInput := recordChildGetResultDBInput{ parentWorkflowID: parentWorkflowState.WorkflowID, childWorkflowID: h.workflowID, - operationID: parentWorkflowState.NextStepID(), + stepID: parentWorkflowState.NextStepID(), output: encodedOutput, err: outcome.err, } @@ -163,7 +163,7 @@ func (h *workflowPollingHandle[R]) GetResult(ctx context.Context) (R, error) { recordGetResultInput := recordChildGetResultDBInput{ parentWorkflowID: parentWorkflowState.WorkflowID, childWorkflowID: h.workflowID, - operationID: parentWorkflowState.NextStepID(), + stepID: parentWorkflowState.NextStepID(), output: encodedOutput, err: err, } @@ -483,8 +483,8 @@ func runAsWorkflow[P any, R any](ctx context.Context, fn WorkflowFunc[P, R], inp childInput := recordChildWorkflowDBInput{ parentWorkflowID: parentWorkflowState.WorkflowID, childWorkflowID: workflowStatus.ID, - functionName: runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name(), // Will need to test this - functionID: stepID, + stepName: runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name(), // Will need to test this + stepID: stepID, tx: tx, } err = dbos.systemDB.RecordChildWorkflow(dbosWorkflowContext, childInput) @@ -600,7 +600,7 @@ func RunAsStep[P any, R any](ctx context.Context, fn StepFunc[P, R], input P, op return *new(R), newStepExecutionError("", "", "step function cannot be nil") } - operationName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() + stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() // Apply options to build params with defaults params := StepParams{ @@ -616,7 +616,7 @@ func RunAsStep[P any, R any](ctx context.Context, fn StepFunc[P, R], input P, op // Get workflow state from context workflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) if !ok || workflowState == nil { - return *new(R), newStepExecutionError("", operationName, "workflow state not found in context: are you running this step within a workflow?") + return *new(R), newStepExecutionError("", stepName, "workflow state not found in context: are you running this step within a workflow?") } // If within a step, just run the function directly @@ -625,16 +625,16 @@ func RunAsStep[P any, R any](ctx context.Context, fn StepFunc[P, R], input P, op } // Get next step ID - operationID := workflowState.NextStepID() + stepID := workflowState.NextStepID() // Check the step is cancelled, has already completed, or is called with a different name recordedOutput, err := dbos.systemDB.CheckOperationExecution(ctx, checkOperationExecutionDBInput{ - workflowID: workflowState.WorkflowID, - operationID: operationID, - functionName: runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name(), + workflowID: workflowState.WorkflowID, + stepID: stepID, + stepName: runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name(), }) if err != nil { - return *new(R), newStepExecutionError(workflowState.WorkflowID, operationName, fmt.Sprintf("checking operation execution: %v", err)) + return *new(R), newStepExecutionError(workflowState.WorkflowID, stepName, fmt.Sprintf("checking operation execution: %v", err)) } if recordedOutput != nil { return recordedOutput.output.(R), recordedOutput.err @@ -663,12 +663,12 @@ func RunAsStep[P any, R any](ctx context.Context, fn StepFunc[P, R], input P, op delay = time.Duration(math.Min(exponentialDelay, float64(params.MaxInterval))) } - getLogger().Error("step failed, retrying", "step_name", operationName, "retry", retry, "max_retries", params.MaxRetries, "delay", delay, "error", stepError) + getLogger().Error("step failed, retrying", "step_name", stepName, "retry", retry, "max_retries", params.MaxRetries, "delay", delay, "error", stepError) // Wait before retry select { case <-ctx.Done(): - return *new(R), newStepExecutionError(workflowState.WorkflowID, operationName, fmt.Sprintf("context cancelled during retry: %v", ctx.Err())) + return *new(R), newStepExecutionError(workflowState.WorkflowID, stepName, fmt.Sprintf("context cancelled during retry: %v", ctx.Err())) case <-time.After(delay): // Continue to retry } @@ -686,7 +686,7 @@ func RunAsStep[P any, R any](ctx context.Context, fn StepFunc[P, R], input P, op // If max retries reached, create MaxStepRetriesExceeded error if retry == params.MaxRetries { - stepError = newMaxStepRetriesExceededError(workflowState.WorkflowID, operationName, params.MaxRetries, joinedErrors) + stepError = newMaxStepRetriesExceededError(workflowState.WorkflowID, stepName, params.MaxRetries, joinedErrors) break } } @@ -694,15 +694,15 @@ func RunAsStep[P any, R any](ctx context.Context, fn StepFunc[P, R], input P, op // Record the final result dbInput := recordOperationResultDBInput{ - workflowID: workflowState.WorkflowID, - operationName: operationName, - operationID: operationID, - err: stepError, - output: stepOutput, + workflowID: workflowState.WorkflowID, + stepName: stepName, + stepID: stepID, + err: stepError, + output: stepOutput, } recErr := dbos.systemDB.RecordOperationResult(ctx, dbInput) if recErr != nil { - return *new(R), newStepExecutionError(workflowState.WorkflowID, operationName, fmt.Sprintf("recording step outcome: %v", recErr)) + return *new(R), newStepExecutionError(workflowState.WorkflowID, stepName, fmt.Sprintf("recording step outcome: %v", recErr)) } return stepOutput, stepError From 571a6cab6b1f86f21e35290ac4da6e12a3cb79f3 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 21:14:55 -0700 Subject: [PATCH 12/19] do not export TypedErasedWorkflowWrapperFunc --- dbos/system_database.go | 1 - dbos/workflow.go | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 03cec832..d6ef91b3 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1125,7 +1125,6 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any return nil, fmt.Errorf("failed to check operation execution: %w", err) } if recordedResult != nil { - // XXX should we simply return recordedResult.output, recordedResult.err? if recordedResult.output != nil { return recordedResult.output, nil } diff --git a/dbos/workflow.go b/dbos/workflow.go index 9ca9ac8e..2da63bdf 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -200,10 +200,10 @@ func (h *workflowPollingHandle[R]) GetWorkflowID() string { /**********************************/ /******* WORKFLOW REGISTRY *******/ /**********************************/ -type TypedErasedWorkflowWrapperFunc func(ctx context.Context, input any, opts ...workflowOption) (WorkflowHandle[any], error) +type typedErasedWorkflowWrapperFunc func(ctx context.Context, input any, opts ...workflowOption) (WorkflowHandle[any], error) type workflowRegistryEntry struct { - wrappedFunction TypedErasedWorkflowWrapperFunc + wrappedFunction typedErasedWorkflowWrapperFunc maxRetries int } @@ -211,7 +211,7 @@ var registry = make(map[string]workflowRegistryEntry) var regMutex sync.RWMutex // Register adds a workflow function to the registry (thread-safe, only once per name) -func registerWorkflow(fqn string, fn TypedErasedWorkflowWrapperFunc, maxRetries int) { +func registerWorkflow(fqn string, fn typedErasedWorkflowWrapperFunc, maxRetries int) { regMutex.Lock() defer regMutex.Unlock() From 2930e11ffd37a3ed94ea2ffbaabde48db5191b89 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 21:15:06 -0700 Subject: [PATCH 13/19] cleanup tess --- dbos/workflows_test.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 75c3c1a2..63ff0210 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -312,7 +312,6 @@ var stepIdempotencyCounter int func stepIdempotencyTest(ctx context.Context, input string) (string, error) { stepIdempotencyCounter++ - fmt.Println("Executing idempotency step:", stepIdempotencyCounter) return input, nil } @@ -471,7 +470,6 @@ var ( if err != nil { return "", fmt.Errorf("failed to get workflow ID: %v", err) } - fmt.Println("parentWf workflow ID:", workflowID) childHandle, err := childWf(ctx, i) if err != nil { @@ -498,7 +496,6 @@ var ( if err != nil { return "", fmt.Errorf("failed to get workflow ID: %v", err) } - fmt.Println("grandParentWf workflow ID:", workflowID) childHandle, err := parentWf(ctx, i) if err != nil { @@ -893,7 +890,6 @@ var ( counter1Ch = make(chan time.Time, 100) _ = WithWorkflow(func(ctx context.Context, scheduledTime time.Time) (string, error) { startTime := time.Now() - // fmt.Println("scheduled time:", scheduledTime, "current time:", startTime) counter++ if counter == 10 { return "", fmt.Errorf("counter reached 100, stopping workflow") @@ -988,7 +984,6 @@ type sendWorkflowInput struct { } func sendWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) { - fmt.Println("Starting send workflow with input:", input) err := Send(ctx, WorkflowSendInput{DestinationID: input.DestinationID, Topic: input.Topic, Message: "message1"}) if err != nil { return "", err @@ -1001,7 +996,6 @@ func sendWorkflow(ctx context.Context, input sendWorkflowInput) (string, error) if err != nil { return "", err } - fmt.Println("Sending message on topic:", input.Topic, "to destination:", input.DestinationID) return "", nil } @@ -1034,7 +1028,6 @@ func receiveWorkflowCoordinated(ctx context.Context, input struct { // Do a single Recv call with timeout msg, err := Recv[string](ctx, WorkflowRecvInput{Topic: input.Topic, Timeout: 3 * time.Second}) - fmt.Println(err) if err != nil { return "", err } From 926f08a593f06ba3e8f58e645943671935792429 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 21:36:10 -0700 Subject: [PATCH 14/19] add missing file --- dbos/dbos_test.go | 87 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 dbos/dbos_test.go diff --git a/dbos/dbos_test.go b/dbos/dbos_test.go new file mode 100644 index 00000000..9a639415 --- /dev/null +++ b/dbos/dbos_test.go @@ -0,0 +1,87 @@ +package dbos + +import ( + "context" + "encoding/hex" + "maps" + "testing" +) + +func TestConfigValidationErrorTypes(t *testing.T) { + databaseURL := getDatabaseURL(t) + + t.Run("FailsWithoutAppName", func(t *testing.T) { + config := Config{ + DatabaseURL: databaseURL, + } + + err := Initialize(config) + if err == nil { + t.Fatal("expected error when app name is missing, but got none") + } + + dbosErr, ok := err.(*DBOSError) + if !ok { + t.Fatalf("expected DBOSError, got %T", err) + } + + if dbosErr.Code != InitializationError { + t.Fatalf("expected InitializationError code, got %v", dbosErr.Code) + } + + expectedMsg := "Error initializing DBOS Transact: missing required config field: appName" + if dbosErr.Message != expectedMsg { + t.Fatalf("expected error message '%s', got '%s'", expectedMsg, dbosErr.Message) + } + }) + + t.Run("FailsWithoutDatabaseURL", func(t *testing.T) { + config := Config{ + AppName: "test-app", + } + + err := Initialize(config) + if err == nil { + t.Fatal("expected error when database URL is missing, but got none") + } + + dbosErr, ok := err.(*DBOSError) + if !ok { + t.Fatalf("expected DBOSError, got %T", err) + } + + if dbosErr.Code != InitializationError { + t.Fatalf("expected InitializationError code, got %v", dbosErr.Code) + } + + expectedMsg := "Error initializing DBOS Transact: missing required config field: databaseURL" + if dbosErr.Message != expectedMsg { + t.Fatalf("expected error message '%s', got '%s'", expectedMsg, dbosErr.Message) + } + }) +} +func TestAppVersion(t *testing.T) { + if _, err := hex.DecodeString(_APP_VERSION); err != nil { + t.Fatalf("APP_VERSION is not a valid hex string: %v", err) + } + + // Save the original registry content + originalRegistry := make(map[string]workflowRegistryEntry) + maps.Copy(originalRegistry, registry) + + // Restore the registry after the test + defer func() { + registry = originalRegistry + }() + + // Replace the registry and verify the hash is different + registry = make(map[string]workflowRegistryEntry) + + WithWorkflow(func(ctx context.Context, input string) (string, error) { + return "new-registry-workflow-" + input, nil + }) + hash2 := computeApplicationVersion() + if _APP_VERSION == hash2 { + t.Fatalf("APP_VERSION hash did not change after replacing registry") + } +} From 79ea93fe807fe44b77f31876fb0173d8a86dc274 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 23 Jul 2025 21:36:33 -0700 Subject: [PATCH 15/19] stop publishing workflow state struct --- dbos/system_database.go | 56 ++++++++++++++++----------------- dbos/workflow.go | 70 ++++++++++++++++++++--------------------- 2 files changed, 63 insertions(+), 63 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index d6ef91b3..c4dd46ef 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1009,16 +1009,16 @@ func (s *systemDatabase) Send(ctx context.Context, input WorkflowSendInput) erro functionName := "DBOS.send" // Get workflow state from context - workflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) - if !ok || workflowState == nil { + wfState, ok := ctx.Value(workflowStateKey).(*workflowState) + if !ok || wfState == nil { return newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?") } - if workflowState.isWithinStep { - return newStepExecutionError(workflowState.WorkflowID, functionName, "cannot call Send within a step") + if wfState.isWithinStep { + return newStepExecutionError(wfState.workflowID, functionName, "cannot call Send within a step") } - stepID := workflowState.NextStepID() + stepID := wfState.NextStepID() tx, err := s.pool.Begin(ctx) if err != nil { @@ -1028,7 +1028,7 @@ func (s *systemDatabase) Send(ctx context.Context, input WorkflowSendInput) erro // Check if operation was already executed and do nothing if so checkInput := checkOperationExecutionDBInput{ - workflowID: workflowState.WorkflowID, + workflowID: wfState.workflowID, stepID: stepID, stepName: functionName, tx: tx, @@ -1068,7 +1068,7 @@ func (s *systemDatabase) Send(ctx context.Context, input WorkflowSendInput) erro // Record the operation result recordInput := recordOperationResultDBInput{ - workflowID: workflowState.WorkflowID, + workflowID: wfState.workflowID, stepID: stepID, stepName: functionName, output: nil, @@ -1095,17 +1095,17 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any // Get workflow state from context // XXX these checks might be better suited for outside of the system db code. We'll see when we implement the client. - workflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) - if !ok || workflowState == nil { + wfState, ok := ctx.Value(workflowStateKey).(*workflowState) + if !ok || wfState == nil { return nil, newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?") } - if workflowState.isWithinStep { - return nil, newStepExecutionError(workflowState.WorkflowID, functionName, "cannot call Recv within a step") + if wfState.isWithinStep { + return nil, newStepExecutionError(wfState.workflowID, functionName, "cannot call Recv within a step") } - stepID := workflowState.NextStepID() - destinationID := workflowState.WorkflowID + stepID := wfState.NextStepID() + destinationID := wfState.workflowID // Set default topic if not provided topic := _DBOS_NULL_TOPIC @@ -1239,16 +1239,16 @@ func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInp functionName := "DBOS.setEvent" // Get workflow state from context - workflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) - if !ok || workflowState == nil { + wfState, ok := ctx.Value(workflowStateKey).(*workflowState) + if !ok || wfState == nil { return newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?") } - if workflowState.isWithinStep { - return newStepExecutionError(workflowState.WorkflowID, functionName, "cannot call SetEvent within a step") + if wfState.isWithinStep { + return newStepExecutionError(wfState.workflowID, functionName, "cannot call SetEvent within a step") } - stepID := workflowState.NextStepID() + stepID := wfState.NextStepID() tx, err := s.pool.Begin(ctx) if err != nil { @@ -1258,7 +1258,7 @@ func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInp // Check if operation was already executed and do nothing if so checkInput := checkOperationExecutionDBInput{ - workflowID: workflowState.WorkflowID, + workflowID: wfState.workflowID, stepID: stepID, stepName: functionName, tx: tx, @@ -1284,14 +1284,14 @@ func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInp ON CONFLICT (workflow_uuid, key) DO UPDATE SET value = EXCLUDED.value` - _, err = tx.Exec(ctx, insertQuery, workflowState.WorkflowID, input.Key, messageString) + _, err = tx.Exec(ctx, insertQuery, wfState.workflowID, input.Key, messageString) if err != nil { return fmt.Errorf("failed to insert/update workflow event: %w", err) } // Record the operation result recordInput := recordOperationResultDBInput{ - workflowID: workflowState.WorkflowID, + workflowID: wfState.workflowID, stepID: stepID, stepName: functionName, output: nil, @@ -1316,20 +1316,20 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp functionName := "DBOS.getEvent" // Get workflow state from context (optional for GetEvent as we can get an event from outside a workflow) - workflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) + wfState, ok := ctx.Value(workflowStateKey).(*workflowState) var stepID int var isInWorkflow bool - if ok && workflowState != nil { + if ok && wfState != nil { isInWorkflow = true - if workflowState.isWithinStep { - return nil, newStepExecutionError(workflowState.WorkflowID, functionName, "cannot call GetEvent within a step") + if wfState.isWithinStep { + return nil, newStepExecutionError(wfState.workflowID, functionName, "cannot call GetEvent within a step") } - stepID = workflowState.NextStepID() + stepID = wfState.NextStepID() // Check if operation was already executed (only if in workflow) checkInput := checkOperationExecutionDBInput{ - workflowID: workflowState.WorkflowID, + workflowID: wfState.workflowID, stepID: stepID, stepName: functionName, } @@ -1412,7 +1412,7 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp // Record the operation result if this is called within a workflow if isInWorkflow { recordInput := recordOperationResultDBInput{ - workflowID: workflowState.WorkflowID, + workflowID: wfState.workflowID, stepID: stepID, stepName: functionName, output: value, diff --git a/dbos/workflow.go b/dbos/workflow.go index 2da63bdf..51f6b1ba 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -54,16 +54,16 @@ type WorkflowStatus struct { Priority int `json:"priority"` } -// WorkflowState holds the runtime state for a workflow execution +// workflowState holds the runtime state for a workflow execution // TODO: this should be an internal type. Workflows should have aptly named getters to access the state -type WorkflowState struct { - WorkflowID string +type workflowState struct { + workflowID string stepCounter int isWithinStep bool } // NextStepID returns the next step ID and increments the counter -func (ws *WorkflowState) NextStepID() int { +func (ws *workflowState) NextStepID() int { ws.stepCounter++ return ws.stepCounter } @@ -98,15 +98,15 @@ func (h *workflowHandle[R]) GetResult(ctx context.Context) (R, error) { return *new(R), errors.New("workflow result channel is already closed. Did you call GetResult() twice on the same workflow handle?") } // If we are calling GetResult inside a workflow, record the result as a step result - parentWorkflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) + parentWorkflowState, ok := ctx.Value(workflowStateKey).(*workflowState) isChildWorkflow := ok && parentWorkflowState != nil if isChildWorkflow { encodedOutput, encErr := serialize(outcome.result) if encErr != nil { - return *new(R), newWorkflowExecutionError(parentWorkflowState.WorkflowID, fmt.Sprintf("serializing child workflow result: %v", encErr)) + return *new(R), newWorkflowExecutionError(parentWorkflowState.workflowID, fmt.Sprintf("serializing child workflow result: %v", encErr)) } recordGetResultInput := recordChildGetResultDBInput{ - parentWorkflowID: parentWorkflowState.WorkflowID, + parentWorkflowID: parentWorkflowState.workflowID, childWorkflowID: h.workflowID, stepID: parentWorkflowState.NextStepID(), output: encodedOutput, @@ -153,15 +153,15 @@ func (h *workflowPollingHandle[R]) GetResult(ctx context.Context) (R, error) { return *new(R), newWorkflowUnexpectedResultType(h.workflowID, fmt.Sprintf("%T", new(R)), fmt.Sprintf("%T", result)) } // If we are calling GetResult inside a workflow, record the result as a step result - parentWorkflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) + parentWorkflowState, ok := ctx.Value(workflowStateKey).(*workflowState) isChildWorkflow := ok && parentWorkflowState != nil if isChildWorkflow { encodedOutput, encErr := serialize(typedResult) if encErr != nil { - return *new(R), newWorkflowExecutionError(parentWorkflowState.WorkflowID, fmt.Sprintf("serializing child workflow result: %v", encErr)) + return *new(R), newWorkflowExecutionError(parentWorkflowState.workflowID, fmt.Sprintf("serializing child workflow result: %v", encErr)) } recordGetResultInput := recordChildGetResultDBInput{ - parentWorkflowID: parentWorkflowState.WorkflowID, + parentWorkflowID: parentWorkflowState.workflowID, childWorkflowID: h.workflowID, stepID: parentWorkflowState.NextStepID(), output: encodedOutput, @@ -399,7 +399,7 @@ func runAsWorkflow[P any, R any](ctx context.Context, fn WorkflowFunc[P, R], inp dbosWorkflowContext := context.Background() // Check if we are within a workflow (and thus a child workflow) - parentWorkflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) + parentWorkflowState, ok := ctx.Value(workflowStateKey).(*workflowState) isChildWorkflow := ok && parentWorkflowState != nil // TODO Check if cancelled @@ -409,7 +409,7 @@ func runAsWorkflow[P any, R any](ctx context.Context, fn WorkflowFunc[P, R], inp if params.workflowID == "" { if isChildWorkflow { stepID := parentWorkflowState.NextStepID() - workflowID = fmt.Sprintf("%s-%d", parentWorkflowState.WorkflowID, stepID) + workflowID = fmt.Sprintf("%s-%d", parentWorkflowState.workflowID, stepID) } else { workflowID = uuid.New().String() } @@ -419,9 +419,9 @@ func runAsWorkflow[P any, R any](ctx context.Context, fn WorkflowFunc[P, R], inp // If this is a child workflow that has already been recorded in operations_output, return directly a polling handle if isChildWorkflow { - childWorkflowID, err := dbos.systemDB.CheckChildWorkflow(dbosWorkflowContext, parentWorkflowState.WorkflowID, parentWorkflowState.stepCounter) + childWorkflowID, err := dbos.systemDB.CheckChildWorkflow(dbosWorkflowContext, parentWorkflowState.workflowID, parentWorkflowState.stepCounter) if err != nil { - return nil, newWorkflowExecutionError(parentWorkflowState.WorkflowID, fmt.Sprintf("checking child workflow: %v", err)) + return nil, newWorkflowExecutionError(parentWorkflowState.workflowID, fmt.Sprintf("checking child workflow: %v", err)) } if childWorkflowID != nil { return &workflowPollingHandle[R]{workflowID: *childWorkflowID}, nil @@ -481,7 +481,7 @@ func runAsWorkflow[P any, R any](ctx context.Context, fn WorkflowFunc[P, R], inp // Get the step ID that was used for generating the child workflow ID stepID := parentWorkflowState.stepCounter childInput := recordChildWorkflowDBInput{ - parentWorkflowID: parentWorkflowState.WorkflowID, + parentWorkflowID: parentWorkflowState.workflowID, childWorkflowID: workflowStatus.ID, stepName: runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name(), // Will need to test this stepID: stepID, @@ -489,7 +489,7 @@ func runAsWorkflow[P any, R any](ctx context.Context, fn WorkflowFunc[P, R], inp } err = dbos.systemDB.RecordChildWorkflow(dbosWorkflowContext, childInput) if err != nil { - return nil, newWorkflowExecutionError(parentWorkflowState.WorkflowID, fmt.Sprintf("recording child workflow: %v", err)) + return nil, newWorkflowExecutionError(parentWorkflowState.workflowID, fmt.Sprintf("recording child workflow: %v", err)) } } @@ -510,13 +510,13 @@ func runAsWorkflow[P any, R any](ctx context.Context, fn WorkflowFunc[P, R], inp } // Create workflow state to track step execution - workflowState := &WorkflowState{ - WorkflowID: workflowStatus.ID, + wfState := &workflowState{ + workflowID: workflowStatus.ID, stepCounter: -1, } // Run the function in a goroutine - augmentUserContext := context.WithValue(ctx, workflowStateKey, workflowState) + augmentUserContext := context.WithValue(ctx, workflowStateKey, wfState) go func() { result, err := fn(augmentUserContext, input) status := WorkflowStatusSuccess @@ -614,36 +614,36 @@ func RunAsStep[P any, R any](ctx context.Context, fn StepFunc[P, R], input P, op } // Get workflow state from context - workflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) - if !ok || workflowState == nil { + wfState, ok := ctx.Value(workflowStateKey).(*workflowState) + if !ok || wfState == nil { return *new(R), newStepExecutionError("", stepName, "workflow state not found in context: are you running this step within a workflow?") } // If within a step, just run the function directly - if workflowState.isWithinStep { + if wfState.isWithinStep { return fn(ctx, input) } // Get next step ID - stepID := workflowState.NextStepID() + stepID := wfState.NextStepID() // Check the step is cancelled, has already completed, or is called with a different name recordedOutput, err := dbos.systemDB.CheckOperationExecution(ctx, checkOperationExecutionDBInput{ - workflowID: workflowState.WorkflowID, + workflowID: wfState.workflowID, stepID: stepID, stepName: runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name(), }) if err != nil { - return *new(R), newStepExecutionError(workflowState.WorkflowID, stepName, fmt.Sprintf("checking operation execution: %v", err)) + return *new(R), newStepExecutionError(wfState.workflowID, stepName, fmt.Sprintf("checking operation execution: %v", err)) } if recordedOutput != nil { return recordedOutput.output.(R), recordedOutput.err } // Execute step with retry logic if MaxRetries > 0 - stepState := WorkflowState{ - WorkflowID: workflowState.WorkflowID, - stepCounter: workflowState.stepCounter, + stepState := workflowState{ + workflowID: wfState.workflowID, + stepCounter: wfState.stepCounter, isWithinStep: true, } stepCtx := context.WithValue(ctx, workflowStateKey, &stepState) @@ -668,7 +668,7 @@ func RunAsStep[P any, R any](ctx context.Context, fn StepFunc[P, R], input P, op // Wait before retry select { case <-ctx.Done(): - return *new(R), newStepExecutionError(workflowState.WorkflowID, stepName, fmt.Sprintf("context cancelled during retry: %v", ctx.Err())) + return *new(R), newStepExecutionError(wfState.workflowID, stepName, fmt.Sprintf("context cancelled during retry: %v", ctx.Err())) case <-time.After(delay): // Continue to retry } @@ -686,7 +686,7 @@ func RunAsStep[P any, R any](ctx context.Context, fn StepFunc[P, R], input P, op // If max retries reached, create MaxStepRetriesExceeded error if retry == params.MaxRetries { - stepError = newMaxStepRetriesExceededError(workflowState.WorkflowID, stepName, params.MaxRetries, joinedErrors) + stepError = newMaxStepRetriesExceededError(wfState.workflowID, stepName, params.MaxRetries, joinedErrors) break } } @@ -694,7 +694,7 @@ func RunAsStep[P any, R any](ctx context.Context, fn StepFunc[P, R], input P, op // Record the final result dbInput := recordOperationResultDBInput{ - workflowID: workflowState.WorkflowID, + workflowID: wfState.workflowID, stepName: stepName, stepID: stepID, err: stepError, @@ -702,7 +702,7 @@ func RunAsStep[P any, R any](ctx context.Context, fn StepFunc[P, R], input P, op } recErr := dbos.systemDB.RecordOperationResult(ctx, dbInput) if recErr != nil { - return *new(R), newStepExecutionError(workflowState.WorkflowID, stepName, fmt.Sprintf("recording step outcome: %v", recErr)) + return *new(R), newStepExecutionError(wfState.workflowID, stepName, fmt.Sprintf("recording step outcome: %v", recErr)) } return stepOutput, stepError @@ -781,11 +781,11 @@ func GetEvent[R any](ctx context.Context, input WorkflowGetEventInput) (R, error // GetWorkflowID retrieves the workflow ID from the context if called within a DBOS workflow func GetWorkflowID(ctx context.Context) (string, error) { - workflowState, ok := ctx.Value(workflowStateKey).(*WorkflowState) - if !ok || workflowState == nil { + wfState, ok := ctx.Value(workflowStateKey).(*workflowState) + if !ok || wfState == nil { return "", errors.New("not within a DBOS workflow context") } - return workflowState.WorkflowID, nil + return wfState.workflowID, nil } func RetrieveWorkflow[R any](workflowID string) (workflowPollingHandle[R], error) { From 5bb12d27efe31438fa6cf7cc6751ff69a4f836b5 Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 24 Jul 2025 08:53:53 -0700 Subject: [PATCH 16/19] newline --- dbos/migrations/000001_initial_dbos_schema.up.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbos/migrations/000001_initial_dbos_schema.up.sql b/dbos/migrations/000001_initial_dbos_schema.up.sql index 94afdcce..b2e6142a 100644 --- a/dbos/migrations/000001_initial_dbos_schema.up.sql +++ b/dbos/migrations/000001_initial_dbos_schema.up.sql @@ -106,4 +106,4 @@ $$ LANGUAGE plpgsql; -- Create events trigger CREATE TRIGGER dbos_workflow_events_trigger AFTER INSERT ON dbos.workflow_events -FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function(); \ No newline at end of file +FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function(); From 71adb6bed565c43e07a65db2a84aa8e325c126d7 Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 24 Jul 2025 08:54:07 -0700 Subject: [PATCH 17/19] only the creator of the CV should delete it from the map --- dbos/system_database.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index c4dd46ef..52ab975f 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1354,8 +1354,10 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp // Defer broadcast to ensure any waiting goroutines eventually unlock defer func() { cond.Broadcast() - // Clean up the condition variable after we're done - s.notificationsMap.Delete(payload) + // Clean up the condition variable after we're done, if we created it + if !loaded { + s.notificationsMap.Delete(payload) + } }() // Check if the event already exists in the database From 72a2a8f5eadba42bdc2a8b7bd03e70df439ee100 Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 24 Jul 2025 10:12:32 -0700 Subject: [PATCH 18/19] fix merge conflict --- dbos/system_database.go | 66 ----------------------------------------- dbos/workflows_test.go | 4 --- 2 files changed, 70 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 002d39be..52ab975f 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1116,15 +1116,9 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any // Check if operation was already executed // XXX this might not need to be in the transaction checkInput := checkOperationExecutionDBInput{ -<<<<<<< HEAD workflowID: destinationID, stepID: stepID, stepName: functionName, -======= - workflowID: destinationID, - operationID: stepID, - functionName: functionName, ->>>>>>> origin/main } recordedResult, err := s.CheckOperationExecution(ctx, checkInput) if err != nil { @@ -1245,7 +1239,6 @@ func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInp functionName := "DBOS.setEvent" // Get workflow state from context -<<<<<<< HEAD wfState, ok := ctx.Value(workflowStateKey).(*workflowState) if !ok || wfState == nil { return newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?") @@ -1256,18 +1249,6 @@ func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInp } stepID := wfState.NextStepID() -======= - workflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) - if !ok || workflowState == nil { - return newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?") - } - - if workflowState.isWithinStep { - return newStepExecutionError(workflowState.WorkflowID, functionName, "cannot call SetEvent within a step") - } - - stepID := workflowState.NextStepID() ->>>>>>> origin/main tx, err := s.pool.Begin(ctx) if err != nil { @@ -1277,17 +1258,10 @@ func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInp // Check if operation was already executed and do nothing if so checkInput := checkOperationExecutionDBInput{ -<<<<<<< HEAD workflowID: wfState.workflowID, stepID: stepID, stepName: functionName, tx: tx, -======= - workflowID: workflowState.WorkflowID, - operationID: stepID, - functionName: functionName, - tx: tx, ->>>>>>> origin/main } recordedResult, err := s.CheckOperationExecution(ctx, checkInput) if err != nil { @@ -1310,32 +1284,19 @@ func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInp ON CONFLICT (workflow_uuid, key) DO UPDATE SET value = EXCLUDED.value` -<<<<<<< HEAD _, err = tx.Exec(ctx, insertQuery, wfState.workflowID, input.Key, messageString) -======= - _, err = tx.Exec(ctx, insertQuery, workflowState.WorkflowID, input.Key, messageString) ->>>>>>> origin/main if err != nil { return fmt.Errorf("failed to insert/update workflow event: %w", err) } // Record the operation result recordInput := recordOperationResultDBInput{ -<<<<<<< HEAD workflowID: wfState.workflowID, stepID: stepID, stepName: functionName, output: nil, err: nil, tx: tx, -======= - workflowID: workflowState.WorkflowID, - operationID: stepID, - operationName: functionName, - output: nil, - err: nil, - tx: tx, ->>>>>>> origin/main } err = s.RecordOperationResult(ctx, recordInput) @@ -1355,7 +1316,6 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp functionName := "DBOS.getEvent" // Get workflow state from context (optional for GetEvent as we can get an event from outside a workflow) -<<<<<<< HEAD wfState, ok := ctx.Value(workflowStateKey).(*workflowState) var stepID int var isInWorkflow bool @@ -1372,24 +1332,6 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp workflowID: wfState.workflowID, stepID: stepID, stepName: functionName, -======= - workflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState) - var stepID int - var isInWorkflow bool - - if ok && workflowState != nil { - isInWorkflow = true - if workflowState.isWithinStep { - return nil, newStepExecutionError(workflowState.WorkflowID, functionName, "cannot call GetEvent within a step") - } - stepID = workflowState.NextStepID() - - // Check if operation was already executed (only if in workflow) - checkInput := checkOperationExecutionDBInput{ - workflowID: workflowState.WorkflowID, - operationID: stepID, - functionName: functionName, ->>>>>>> origin/main } recordedResult, err := s.CheckOperationExecution(ctx, checkInput) if err != nil { @@ -1472,19 +1414,11 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp // Record the operation result if this is called within a workflow if isInWorkflow { recordInput := recordOperationResultDBInput{ -<<<<<<< HEAD workflowID: wfState.workflowID, stepID: stepID, stepName: functionName, output: value, err: nil, -======= - workflowID: workflowState.WorkflowID, - operationID: stepID, - operationName: functionName, - output: value, - err: nil, ->>>>>>> origin/main } err = s.RecordOperationResult(ctx, recordInput) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index ed5cf465..63ff0210 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -1028,10 +1028,6 @@ func receiveWorkflowCoordinated(ctx context.Context, input struct { // Do a single Recv call with timeout msg, err := Recv[string](ctx, WorkflowRecvInput{Topic: input.Topic, Timeout: 3 * time.Second}) -<<<<<<< HEAD -======= - fmt.Println(err) ->>>>>>> origin/main if err != nil { return "", err } From 43d1cf909be7176eb5eaa25475fb492358fff4fd Mon Sep 17 00:00:00 2001 From: maxdml Date: Thu, 24 Jul 2025 10:31:22 -0700 Subject: [PATCH 19/19] perform DBOS migrations in an isolated schema migration table --- dbos/system_database.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 52ab975f..b2c652f4 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -5,6 +5,7 @@ import ( "embed" "errors" "fmt" + "net/url" "strings" "sync" "time" @@ -98,7 +99,8 @@ func createDatabaseIfNotExists(databaseURL string) error { //go:embed migrations/*.sql var migrationFiles embed.FS -// TODO: must use the systemdb name +const _DBOS_MIGRATION_TABLE = "dbos_schema_migrations" + func runMigrations(databaseURL string) error { // Change the driver to pgx5 databaseURL = "pgx5://" + strings.TrimPrefix(databaseURL, "postgres://") @@ -109,6 +111,20 @@ func runMigrations(databaseURL string) error { return newInitializationError(fmt.Sprintf("failed to create migration source: %v", err)) } + // Add custom migration table name to avoid conflicts with user migrations + // Parse the URL to properly determine where to add the query parameter + parsedURL, err := url.Parse(databaseURL) + if err != nil { + return newInitializationError(fmt.Sprintf("failed to parse database URL: %v", err)) + } + + // Check if query parameters already exist + separator := "?" + if parsedURL.RawQuery != "" { + separator = "&" + } + databaseURL += separator + "x-migrations-table=" + _DBOS_MIGRATION_TABLE + // Create migrator m, err := migrate.NewWithSourceInstance("iofs", d, databaseURL) if err != nil {