Skip to content

Commit 6d2218e

Browse files
committed
Merge remote-tracking branch 'origin/main' into workflow-context-getters
2 parents 71adb6b + 1d134da commit 6d2218e

File tree

2 files changed

+70
-0
lines changed

2 files changed

+70
-0
lines changed

dbos/system_database.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,9 +1116,15 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any
11161116
// Check if operation was already executed
11171117
// XXX this might not need to be in the transaction
11181118
checkInput := checkOperationExecutionDBInput{
1119+
<<<<<<< HEAD
11191120
workflowID: destinationID,
11201121
stepID: stepID,
11211122
stepName: functionName,
1123+
=======
1124+
workflowID: destinationID,
1125+
operationID: stepID,
1126+
functionName: functionName,
1127+
>>>>>>> origin/main
11221128
}
11231129
recordedResult, err := s.CheckOperationExecution(ctx, checkInput)
11241130
if err != nil {
@@ -1239,6 +1245,7 @@ func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInp
12391245
functionName := "DBOS.setEvent"
12401246

12411247
// Get workflow state from context
1248+
<<<<<<< HEAD
12421249
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
12431250
if !ok || wfState == nil {
12441251
return newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?")
@@ -1249,6 +1256,18 @@ func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInp
12491256
}
12501257

12511258
stepID := wfState.NextStepID()
1259+
=======
1260+
workflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState)
1261+
if !ok || workflowState == nil {
1262+
return newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?")
1263+
}
1264+
1265+
if workflowState.isWithinStep {
1266+
return newStepExecutionError(workflowState.WorkflowID, functionName, "cannot call SetEvent within a step")
1267+
}
1268+
1269+
stepID := workflowState.NextStepID()
1270+
>>>>>>> origin/main
12521271

12531272
tx, err := s.pool.Begin(ctx)
12541273
if err != nil {
@@ -1258,10 +1277,17 @@ func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInp
12581277

12591278
// Check if operation was already executed and do nothing if so
12601279
checkInput := checkOperationExecutionDBInput{
1280+
<<<<<<< HEAD
12611281
workflowID: wfState.workflowID,
12621282
stepID: stepID,
12631283
stepName: functionName,
12641284
tx: tx,
1285+
=======
1286+
workflowID: workflowState.WorkflowID,
1287+
operationID: stepID,
1288+
functionName: functionName,
1289+
tx: tx,
1290+
>>>>>>> origin/main
12651291
}
12661292
recordedResult, err := s.CheckOperationExecution(ctx, checkInput)
12671293
if err != nil {
@@ -1284,19 +1310,32 @@ func (s *systemDatabase) SetEvent(ctx context.Context, input WorkflowSetEventInp
12841310
ON CONFLICT (workflow_uuid, key)
12851311
DO UPDATE SET value = EXCLUDED.value`
12861312

1313+
<<<<<<< HEAD
12871314
_, err = tx.Exec(ctx, insertQuery, wfState.workflowID, input.Key, messageString)
1315+
=======
1316+
_, err = tx.Exec(ctx, insertQuery, workflowState.WorkflowID, input.Key, messageString)
1317+
>>>>>>> origin/main
12881318
if err != nil {
12891319
return fmt.Errorf("failed to insert/update workflow event: %w", err)
12901320
}
12911321

12921322
// Record the operation result
12931323
recordInput := recordOperationResultDBInput{
1324+
<<<<<<< HEAD
12941325
workflowID: wfState.workflowID,
12951326
stepID: stepID,
12961327
stepName: functionName,
12971328
output: nil,
12981329
err: nil,
12991330
tx: tx,
1331+
=======
1332+
workflowID: workflowState.WorkflowID,
1333+
operationID: stepID,
1334+
operationName: functionName,
1335+
output: nil,
1336+
err: nil,
1337+
tx: tx,
1338+
>>>>>>> origin/main
13001339
}
13011340

13021341
err = s.RecordOperationResult(ctx, recordInput)
@@ -1316,6 +1355,7 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp
13161355
functionName := "DBOS.getEvent"
13171356

13181357
// Get workflow state from context (optional for GetEvent as we can get an event from outside a workflow)
1358+
<<<<<<< HEAD
13191359
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
13201360
var stepID int
13211361
var isInWorkflow bool
@@ -1332,6 +1372,24 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp
13321372
workflowID: wfState.workflowID,
13331373
stepID: stepID,
13341374
stepName: functionName,
1375+
=======
1376+
workflowState, ok := ctx.Value(WorkflowStateKey).(*WorkflowState)
1377+
var stepID int
1378+
var isInWorkflow bool
1379+
1380+
if ok && workflowState != nil {
1381+
isInWorkflow = true
1382+
if workflowState.isWithinStep {
1383+
return nil, newStepExecutionError(workflowState.WorkflowID, functionName, "cannot call GetEvent within a step")
1384+
}
1385+
stepID = workflowState.NextStepID()
1386+
1387+
// Check if operation was already executed (only if in workflow)
1388+
checkInput := checkOperationExecutionDBInput{
1389+
workflowID: workflowState.WorkflowID,
1390+
operationID: stepID,
1391+
functionName: functionName,
1392+
>>>>>>> origin/main
13351393
}
13361394
recordedResult, err := s.CheckOperationExecution(ctx, checkInput)
13371395
if err != nil {
@@ -1414,11 +1472,19 @@ func (s *systemDatabase) GetEvent(ctx context.Context, input WorkflowGetEventInp
14141472
// Record the operation result if this is called within a workflow
14151473
if isInWorkflow {
14161474
recordInput := recordOperationResultDBInput{
1475+
<<<<<<< HEAD
14171476
workflowID: wfState.workflowID,
14181477
stepID: stepID,
14191478
stepName: functionName,
14201479
output: value,
14211480
err: nil,
1481+
=======
1482+
workflowID: workflowState.WorkflowID,
1483+
operationID: stepID,
1484+
operationName: functionName,
1485+
output: value,
1486+
err: nil,
1487+
>>>>>>> origin/main
14221488
}
14231489

14241490
err = s.RecordOperationResult(ctx, recordInput)

dbos/workflows_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,6 +1028,10 @@ func receiveWorkflowCoordinated(ctx context.Context, input struct {
10281028

10291029
// Do a single Recv call with timeout
10301030
msg, err := Recv[string](ctx, WorkflowRecvInput{Topic: input.Topic, Timeout: 3 * time.Second})
1031+
<<<<<<< HEAD
1032+
=======
1033+
fmt.Println(err)
1034+
>>>>>>> origin/main
10311035
if err != nil {
10321036
return "", err
10331037
}

0 commit comments

Comments
 (0)