Skip to content

Commit e488998

Browse files
committed
use RunAsStep directly
1 parent 000c040 commit e488998

File tree

3 files changed

+95
-303
lines changed

3 files changed

+95
-303
lines changed

dbos/system_database.go

Lines changed: 0 additions & 286 deletions
Original file line numberDiff line numberDiff line change
@@ -547,45 +547,6 @@ type listWorkflowsDBInput struct {
547547

548548
// ListWorkflows retrieves a list of workflows based on the provided filters
549549
func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ([]WorkflowStatus, error) {
550-
functionName := "DBOS.listWorkflows"
551-
552-
// Get workflow state from context
553-
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
554-
var stepID int
555-
var runAsStep bool
556-
557-
if ok && wfState != nil && !wfState.isWithinStep {
558-
runAsStep = true
559-
stepID = wfState.NextStepID()
560-
561-
// Setup a new child context for the step
562-
stepState := workflowState{
563-
workflowID: wfState.workflowID,
564-
stepID: stepID,
565-
isWithinStep: true,
566-
}
567-
ctx = context.WithValue(ctx, workflowStateKey, &stepState)
568-
569-
// Check if already executed
570-
checkInput := checkOperationExecutionDBInput{
571-
workflowID: wfState.workflowID,
572-
stepID: stepID,
573-
stepName: functionName,
574-
}
575-
recordedResult, err := s.checkOperationExecution(ctx, checkInput)
576-
if err != nil {
577-
return nil, err
578-
}
579-
if recordedResult != nil {
580-
// Return cached result
581-
workflowStatuses, ok := recordedResult.output.([]WorkflowStatus)
582-
if !ok {
583-
return nil, fmt.Errorf("invalid cached result type for ListWorkflows: %T", recordedResult.output)
584-
}
585-
return workflowStatuses, recordedResult.err
586-
}
587-
}
588-
589550
qb := newQueryBuilder()
590551

591552
// Build the base query with conditional column selection
@@ -780,21 +741,6 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) (
780741
return nil, fmt.Errorf("error iterating over workflow rows: %w", err)
781742
}
782743

783-
// Record result if in workflow
784-
if runAsStep {
785-
recordInput := recordOperationResultDBInput{
786-
workflowID: wfState.workflowID,
787-
stepID: stepID,
788-
stepName: functionName,
789-
output: workflows,
790-
err: nil,
791-
}
792-
err = s.recordOperationResult(ctx, recordInput)
793-
if err != nil {
794-
return nil, fmt.Errorf("failed to record operation result: %w", err)
795-
}
796-
}
797-
798744
return workflows, nil
799745
}
800746

@@ -836,41 +782,6 @@ func (s *sysDB) updateWorkflowOutcome(ctx context.Context, input updateWorkflowO
836782
}
837783

838784
func (s *sysDB) cancelWorkflow(ctx context.Context, workflowID string) error {
839-
functionName := "DBOS.cancelWorkflow"
840-
841-
// Get workflow state from context (optional - can be called from outside workflow)
842-
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
843-
var stepID int
844-
var runAsStep bool
845-
846-
if ok && wfState != nil && !wfState.isWithinStep {
847-
runAsStep = true
848-
stepID = wfState.NextStepID()
849-
850-
// Setup a new child context for the step
851-
stepState := workflowState{
852-
workflowID: wfState.workflowID,
853-
stepID: stepID,
854-
isWithinStep: true,
855-
}
856-
ctx = context.WithValue(ctx, workflowStateKey, &stepState)
857-
858-
// Check if already executed
859-
checkInput := checkOperationExecutionDBInput{
860-
workflowID: wfState.workflowID,
861-
stepID: stepID,
862-
stepName: functionName,
863-
}
864-
recordedResult, err := s.checkOperationExecution(ctx, checkInput)
865-
if err != nil {
866-
return err
867-
}
868-
if recordedResult != nil {
869-
// Already cancelled, return any error that was recorded
870-
return recordedResult.err
871-
}
872-
}
873-
874785
tx, err := s.pool.Begin(ctx)
875786
if err != nil {
876787
return fmt.Errorf("failed to begin transaction: %w", err)
@@ -899,20 +810,6 @@ func (s *sysDB) cancelWorkflow(ctx context.Context, workflowID string) error {
899810
if err := tx.Rollback(ctx); err != nil {
900811
return fmt.Errorf("failed to commit transaction: %w", err)
901812
}
902-
// Record result if in workflow
903-
if runAsStep {
904-
recordInput := recordOperationResultDBInput{
905-
workflowID: wfState.workflowID,
906-
stepID: stepID,
907-
stepName: functionName,
908-
output: nil,
909-
err: nil,
910-
}
911-
err = s.recordOperationResult(ctx, recordInput)
912-
if err != nil {
913-
return fmt.Errorf("failed to record operation result: %w", err)
914-
}
915-
}
916813
return nil
917814
}
918815

@@ -926,22 +823,6 @@ func (s *sysDB) cancelWorkflow(ctx context.Context, workflowID string) error {
926823
return fmt.Errorf("failed to update workflow status to CANCELLED: %w", err)
927824
}
928825

929-
// Record result if in workflow
930-
if runAsStep {
931-
recordInput := recordOperationResultDBInput{
932-
workflowID: wfState.workflowID,
933-
stepID: stepID,
934-
stepName: functionName,
935-
output: nil,
936-
err: nil,
937-
tx: tx,
938-
}
939-
err = s.recordOperationResult(ctx, recordInput)
940-
if err != nil {
941-
return fmt.Errorf("failed to record operation result: %w", err)
942-
}
943-
}
944-
945826
if err := tx.Commit(ctx); err != nil {
946827
return fmt.Errorf("failed to commit transaction: %w", err)
947828
}
@@ -969,7 +850,6 @@ func (s *sysDB) cancelAllBefore(ctx context.Context, cutoffTime time.Time) error
969850
// If desired we could funnel the errors back the caller (conductor, admin server)
970851
}
971852
}
972-
973853
return nil
974854
}
975855

@@ -1033,41 +913,6 @@ func (s *sysDB) garbageCollectWorkflows(ctx context.Context, input garbageCollec
1033913
}
1034914

1035915
func (s *sysDB) resumeWorkflow(ctx context.Context, workflowID string) error {
1036-
functionName := "DBOS.resumeWorkflow"
1037-
1038-
// Get workflow state from context (optional - can be called from outside workflow)
1039-
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
1040-
var stepID int
1041-
var runAsStep bool
1042-
1043-
if ok && wfState != nil && !wfState.isWithinStep {
1044-
runAsStep = true
1045-
stepID = wfState.NextStepID()
1046-
1047-
// Setup a new child context for the step
1048-
stepState := workflowState{
1049-
workflowID: wfState.workflowID,
1050-
stepID: stepID,
1051-
isWithinStep: true,
1052-
}
1053-
ctx = context.WithValue(ctx, workflowStateKey, &stepState)
1054-
1055-
// Check if already executed
1056-
checkInput := checkOperationExecutionDBInput{
1057-
workflowID: wfState.workflowID,
1058-
stepID: stepID,
1059-
stepName: functionName,
1060-
}
1061-
recordedResult, err := s.checkOperationExecution(ctx, checkInput)
1062-
if err != nil {
1063-
return err
1064-
}
1065-
if recordedResult != nil {
1066-
// Already resumed, return any error that was recorded
1067-
return recordedResult.err
1068-
}
1069-
}
1070-
1071916
tx, err := s.pool.Begin(ctx)
1072917
if err != nil {
1073918
return fmt.Errorf("failed to begin transaction: %w", err)
@@ -1098,21 +943,6 @@ func (s *sysDB) resumeWorkflow(ctx context.Context, workflowID string) error {
1098943

1099944
wf := wfs[0]
1100945
if wf.Status == WorkflowStatusSuccess || wf.Status == WorkflowStatusError {
1101-
// Record result if in workflow
1102-
if runAsStep {
1103-
recordInput := recordOperationResultDBInput{
1104-
workflowID: wfState.workflowID,
1105-
stepID: stepID,
1106-
stepName: functionName,
1107-
output: nil,
1108-
err: nil,
1109-
tx: tx,
1110-
}
1111-
err = s.recordOperationResult(ctx, recordInput)
1112-
if err != nil {
1113-
return fmt.Errorf("failed to record operation result: %w", err)
1114-
}
1115-
}
1116946
return nil // Workflow is complete, do nothing
1117947
}
1118948

@@ -1133,22 +963,6 @@ func (s *sysDB) resumeWorkflow(ctx context.Context, workflowID string) error {
1133963
return fmt.Errorf("failed to update workflow status to ENQUEUED: %w", err)
1134964
}
1135965

1136-
// Record result if in workflow
1137-
if runAsStep {
1138-
recordInput := recordOperationResultDBInput{
1139-
workflowID: wfState.workflowID,
1140-
stepID: stepID,
1141-
stepName: functionName,
1142-
output: nil,
1143-
err: nil,
1144-
tx: tx,
1145-
}
1146-
err = s.recordOperationResult(ctx, recordInput)
1147-
if err != nil {
1148-
return fmt.Errorf("failed to record operation result: %w", err)
1149-
}
1150-
}
1151-
1152966
if err := tx.Commit(ctx); err != nil {
1153967
return fmt.Errorf("failed to commit transaction: %w", err)
1154968
}
@@ -1164,50 +978,12 @@ type forkWorkflowDBInput struct {
1164978
}
1165979

1166980
func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (string, error) {
1167-
functionName := "DBOS.forkWorkflow"
1168-
1169981
// Generate new workflow ID if not provided
1170982
forkedWorkflowID := input.forkedWorkflowID
1171983
if forkedWorkflowID == "" {
1172984
forkedWorkflowID = uuid.New().String()
1173985
}
1174986

1175-
// Get workflow state from context (optional - can be called from outside workflow)
1176-
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
1177-
var stepID int
1178-
var runAsStep bool
1179-
1180-
if ok && wfState != nil && !wfState.isWithinStep {
1181-
runAsStep = true
1182-
stepID = wfState.NextStepID()
1183-
1184-
stepState := workflowState{
1185-
workflowID: wfState.workflowID,
1186-
stepID: stepID,
1187-
isWithinStep: true,
1188-
}
1189-
ctx = context.WithValue(ctx, workflowStateKey, &stepState)
1190-
1191-
// Check if already executed
1192-
checkInput := checkOperationExecutionDBInput{
1193-
workflowID: wfState.workflowID,
1194-
stepID: stepID,
1195-
stepName: functionName,
1196-
}
1197-
recordedResult, err := s.checkOperationExecution(ctx, checkInput)
1198-
if err != nil {
1199-
return "", err
1200-
}
1201-
if recordedResult != nil {
1202-
// Already forked, return any error that was recorded
1203-
outputString, ok := recordedResult.output.(string)
1204-
if !ok {
1205-
return "", fmt.Errorf("unexpected output type in recorded result, expected string")
1206-
}
1207-
return outputString, recordedResult.err
1208-
}
1209-
}
1210-
1211987
// Validate startStep
1212988
if input.startStep < 0 {
1213989
return "", fmt.Errorf("startStep must be >= 0, got %d", input.startStep)
@@ -1296,22 +1072,6 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (st
12961072
}
12971073
}
12981074

1299-
// Record result if in workflow
1300-
if runAsStep {
1301-
recordInput := recordOperationResultDBInput{
1302-
workflowID: wfState.workflowID,
1303-
stepID: stepID,
1304-
stepName: functionName,
1305-
output: forkedWorkflowID,
1306-
err: nil,
1307-
tx: tx,
1308-
}
1309-
err = s.recordOperationResult(ctx, recordInput)
1310-
if err != nil {
1311-
return "", fmt.Errorf("failed to record operation result: %w", err)
1312-
}
1313-
}
1314-
13151075
if err := tx.Commit(ctx); err != nil {
13161076
return "", fmt.Errorf("failed to commit transaction: %w", err)
13171077
}
@@ -1632,37 +1392,6 @@ type StepInfo struct {
16321392
}
16331393

16341394
func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error) {
1635-
functionName := "DBOS.getWorkflowSteps"
1636-
1637-
// Get workflow state from context (optional - can be called from outside workflow)
1638-
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
1639-
var stepID int
1640-
var runAsStep bool
1641-
1642-
if ok && wfState != nil && !wfState.isWithinStep {
1643-
runAsStep = true
1644-
stepID = wfState.NextStepID()
1645-
1646-
// Check if already executed
1647-
checkInput := checkOperationExecutionDBInput{
1648-
workflowID: wfState.workflowID,
1649-
stepID: stepID,
1650-
stepName: functionName,
1651-
}
1652-
recordedResult, err := s.checkOperationExecution(ctx, checkInput)
1653-
if err != nil {
1654-
return nil, err
1655-
}
1656-
if recordedResult != nil {
1657-
// Return cached result
1658-
stepInfos, ok := recordedResult.output.([]StepInfo)
1659-
if !ok {
1660-
return nil, fmt.Errorf("unexpected output type in recorded result, expected []StepInfo")
1661-
}
1662-
return stepInfos, recordedResult.err
1663-
}
1664-
}
1665-
16661395
query := `SELECT function_id, function_name, output, error, child_workflow_id
16671396
FROM dbos.operation_outputs
16681397
WHERE workflow_uuid = $1
@@ -1712,21 +1441,6 @@ func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]Step
17121441
return nil, fmt.Errorf("error iterating over step rows: %w", err)
17131442
}
17141443

1715-
// Record result if in workflow
1716-
if runAsStep {
1717-
recordInput := recordOperationResultDBInput{
1718-
workflowID: wfState.workflowID,
1719-
stepID: stepID,
1720-
stepName: functionName,
1721-
output: steps,
1722-
err: nil,
1723-
}
1724-
err = s.recordOperationResult(ctx, recordInput)
1725-
if err != nil {
1726-
return nil, fmt.Errorf("failed to record operation result: %w", err)
1727-
}
1728-
}
1729-
17301444
return steps, nil
17311445
}
17321446

0 commit comments

Comments
 (0)