Skip to content

Commit 4bf803e

Browse files
committed
Make the workflow history length available
1 parent c7163a7 commit 4bf803e

File tree

7 files changed

+643
-3
lines changed

7 files changed

+643
-3
lines changed

internal/workflowstate/workflowstate.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,9 @@ type WfState struct {
6565
logger *slog.Logger
6666
tracer trace.Tracer
6767

68-
clock clock.Clock
69-
time time.Time
68+
clock clock.Clock
69+
time time.Time
70+
historyLength int64
7071
}
7172

7273
func NewWorkflowState(instance *core.WorkflowInstance, logger *slog.Logger, tracer trace.Tracer, clock clock.Clock) *WfState {
@@ -174,3 +175,11 @@ func (wf *WfState) Logger() *slog.Logger {
174175
func (wf *WfState) Tracer() trace.Tracer {
175176
return wf.tracer
176177
}
178+
179+
func (wf *WfState) SetHistoryLength(length int64) {
180+
wf.historyLength = length
181+
}
182+
183+
func (wf *WfState) HistoryLength() int64 {
184+
return wf.historyLength
185+
}

samples/workflow-info/README.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Workflow Info Sample
2+
3+
This sample demonstrates how to access workflow information during workflow execution, specifically the history length.
4+
5+
## What it demonstrates
6+
7+
- Using `workflow.InstanceExecutionDetails(ctx)` to access workflow metadata
8+
- Tracking how the workflow history grows as events are added
9+
- Accessing the `HistoryLength` field of `WorkflowInstanceExecutionDetails`
10+
11+
## Running the sample
12+
13+
```bash
14+
go run .
15+
```
16+
17+
## Expected Output
18+
19+
You should see log messages showing the history length increasing as the workflow executes:
20+
21+
```
22+
Workflow started historyLength=2
23+
Activity executed
24+
After activity execution historyLength=5
25+
Activity executed
26+
After second activity historyLength=8
27+
Workflow completed successfully!
28+
```
29+
30+
## How it works
31+
32+
The `WorkflowInstanceExecutionDetails` struct contains information about the current workflow execution. Currently it provides:
33+
34+
- `HistoryLength`: The number of events in the workflow history at the current point in execution
35+
36+
The history length includes all events that have been added to the workflow's event history, including:
37+
- WorkflowExecutionStarted
38+
- WorkflowTaskStarted
39+
- ActivityScheduled
40+
- ActivityCompleted
41+
- TimerScheduled
42+
- TimerFired
43+
- And other workflow events
44+
45+
This can be useful for:
46+
- Monitoring workflow complexity
47+
- Making decisions based on how far the workflow has progressed
48+
- Implementing custom limits or checkpointing logic
49+
- Debugging and understanding workflow execution
50+
51+
## Future extensions
52+
53+
The `WorkflowInstanceExecutionDetails` struct is designed to be extensible. Future additions might include:
54+
- Execution duration
55+
- Number of activities executed
56+
- Number of retries
57+
- Custom metadata

samples/workflow-info/main.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
"github.com/cschleiden/go-workflows/client"
10+
"github.com/cschleiden/go-workflows/worker"
11+
"github.com/cschleiden/go-workflows/workflow"
12+
13+
"github.com/cschleiden/go-workflows/backend/sqlite"
14+
)
15+
16+
// Workflow demonstrates accessing workflow info including history length
17+
func Workflow(ctx workflow.Context) error {
18+
// Get workflow info at the start
19+
info := workflow.InstanceExecutionDetails(ctx)
20+
logger := workflow.Logger(ctx)
21+
logger.Info("Workflow started", "historyLength", info.HistoryLength)
22+
23+
// Execute an activity
24+
_, err := workflow.ExecuteActivity[any](ctx, workflow.DefaultActivityOptions, Activity).Get(ctx)
25+
if err != nil {
26+
return err
27+
}
28+
29+
// Check history length again after activity
30+
info = workflow.InstanceExecutionDetails(ctx)
31+
logger.Info("After activity execution", "historyLength", info.HistoryLength)
32+
33+
// Execute another activity
34+
_, err = workflow.ExecuteActivity[any](ctx, workflow.DefaultActivityOptions, Activity).Get(ctx)
35+
if err != nil {
36+
return err
37+
}
38+
39+
// Check history length again
40+
info = workflow.InstanceExecutionDetails(ctx)
41+
logger.Info("After second activity", "historyLength", info.HistoryLength)
42+
43+
return nil
44+
}
45+
46+
func Activity(ctx context.Context) error {
47+
log.Println("Activity executed")
48+
return nil
49+
}
50+
51+
func main() {
52+
ctx := context.Background()
53+
54+
// Create in-memory SQLite backend
55+
b := sqlite.NewInMemoryBackend()
56+
57+
// Create worker
58+
w := worker.New(b, nil)
59+
60+
// Register workflow and activity
61+
w.RegisterWorkflow(Workflow)
62+
w.RegisterActivity(Activity)
63+
64+
// Start worker
65+
if err := w.Start(ctx); err != nil {
66+
panic(err)
67+
}
68+
69+
// Create client
70+
c := client.New(b)
71+
72+
// Create workflow instance
73+
wfi, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
74+
InstanceID: "workflow-info-demo",
75+
}, Workflow)
76+
if err != nil {
77+
panic(err)
78+
}
79+
80+
fmt.Println("Created workflow instance:", wfi.InstanceID)
81+
82+
// Wait for result (10 second timeout)
83+
err = c.WaitForWorkflowInstance(ctx, wfi, 10*time.Second)
84+
if err != nil {
85+
panic(err)
86+
}
87+
88+
fmt.Println("Workflow completed successfully!")
89+
fmt.Println("Check the logs above to see how the history length increased as the workflow executed.")
90+
}

tester/tester_workflowinfo_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package tester
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/cschleiden/go-workflows/workflow"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func Test_InstanceExecutionDetails_HistoryLength(t *testing.T) {
12+
var capturedLength int64
13+
14+
workflowWithInfo := func(ctx workflow.Context) error {
15+
info := workflow.InstanceExecutionDetails(ctx)
16+
capturedLength = info.HistoryLength
17+
return nil
18+
}
19+
20+
tester := NewWorkflowTester[any](workflowWithInfo)
21+
tester.Execute(context.Background())
22+
23+
require.True(t, tester.WorkflowFinished())
24+
require.Greater(t, capturedLength, int64(0), "History length should be greater than 0")
25+
}
26+
27+
func Test_InstanceExecutionDetails_HistoryLength_WithActivity(t *testing.T) {
28+
var lengthBeforeActivity, lengthAfterActivity int64
29+
30+
workflowWithActivity := func(ctx workflow.Context) (int, error) {
31+
info := workflow.InstanceExecutionDetails(ctx)
32+
lengthBeforeActivity = info.HistoryLength
33+
34+
r, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, activity1).Get(ctx)
35+
if err != nil {
36+
return 0, err
37+
}
38+
39+
info = workflow.InstanceExecutionDetails(ctx)
40+
lengthAfterActivity = info.HistoryLength
41+
42+
return r, nil
43+
}
44+
45+
tester := NewWorkflowTester[int](workflowWithActivity)
46+
tester.Registry().RegisterActivity(activity1)
47+
48+
tester.Execute(context.Background())
49+
50+
require.True(t, tester.WorkflowFinished())
51+
require.Greater(t, lengthBeforeActivity, int64(0))
52+
require.Greater(t, lengthAfterActivity, lengthBeforeActivity, "History length should increase after activity execution")
53+
54+
r, err := tester.WorkflowResult()
55+
require.NoError(t, err)
56+
require.Equal(t, 23, r)
57+
}
58+
59+
func Test_InstanceExecutionDetails_HistoryLength_MultipleSteps(t *testing.T) {
60+
var lengths []int64
61+
62+
workflowMultipleSteps := func(ctx workflow.Context) error {
63+
info := workflow.InstanceExecutionDetails(ctx)
64+
lengths = append(lengths, info.HistoryLength)
65+
66+
workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, activity1).Get(ctx)
67+
68+
info = workflow.InstanceExecutionDetails(ctx)
69+
lengths = append(lengths, info.HistoryLength)
70+
71+
workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, activity1).Get(ctx)
72+
73+
info = workflow.InstanceExecutionDetails(ctx)
74+
lengths = append(lengths, info.HistoryLength)
75+
76+
return nil
77+
}
78+
79+
tester := NewWorkflowTester[any](workflowMultipleSteps)
80+
tester.Registry().RegisterActivity(activity1)
81+
82+
tester.Execute(context.Background())
83+
84+
require.True(t, tester.WorkflowFinished())
85+
86+
// The tester replays the workflow, so we'll see the lengths multiple times
87+
// We just need to verify that the final three captures show increasing values
88+
require.GreaterOrEqual(t, len(lengths), 3, "Should have at least 3 length captures")
89+
90+
// Get the last 3 values (from the final execution)
91+
finalLengths := lengths[len(lengths)-3:]
92+
require.Greater(t, finalLengths[0], int64(0))
93+
require.Greater(t, finalLengths[1], finalLengths[0], "History should grow after first activity")
94+
require.Greater(t, finalLengths[2], finalLengths[1], "History should grow after second activity")
95+
}

workflow/executor/executor.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,8 @@ func (e *executor) ExecuteTask(ctx context.Context, t *backend.WorkflowTask) (*E
209209
executedEvents[i].SequenceID = e.nextSequenceID()
210210
}
211211

212+
e.workflowState.SetHistoryLength(e.lastSequenceID)
213+
212214
logger.Debug("Finished workflow task",
213215
log.ExecutedEventsKey, len(executedEvents),
214216
log.TaskLastSequenceIDKey, e.lastSequenceID,
@@ -269,10 +271,13 @@ func (e *executor) replayHistory(h []*history.Event) error {
269271
return errors.New("history has older events than current state")
270272
}
271273

274+
// Note: lastSequenceID is updated below after successful event execution.
275+
// For consistent history length reporting (e.g., for workflow code), we intentionally set historyLength here before executing the event.
276+
e.workflowState.SetHistoryLength(e.lastSequenceID + 1)
277+
272278
if err := e.executeEvent(event); err != nil {
273279
return err
274280
}
275-
276281
e.lastSequenceID = event.SequenceID
277282
}
278283

@@ -283,6 +288,9 @@ func (e *executor) executeNewEvents(newEvents []*history.Event) ([]*history.Even
283288
e.workflowState.SetReplaying(false)
284289

285290
for i, event := range newEvents {
291+
// Update history length BEFORE executing the event to reflect the event about to be added
292+
e.workflowState.SetHistoryLength(e.lastSequenceID + int64(i) + 1)
293+
286294
if err := e.executeEvent(event); err != nil {
287295
return newEvents[:i], err
288296
}

0 commit comments

Comments
 (0)