Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ linters:
enable:
- bidichk
- bodyclose
- errorlint
- goprintffuncname
- govet
- importas
Expand All @@ -22,16 +21,13 @@ linters:
- testifylint
- tparallel
- unconvert
- usetesting
- wastedassign
- whitespace
- unused
# - goworkflows
disable:
- errname
- errcheck
- errorlint
- usetesting
settings:
staticcheck:
checks:
Expand Down
13 changes: 11 additions & 2 deletions internal/workflowstate/workflowstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ type WfState struct {
logger *slog.Logger
tracer trace.Tracer

clock clock.Clock
time time.Time
clock clock.Clock
time time.Time
historyLength int64
}

func NewWorkflowState(instance *core.WorkflowInstance, logger *slog.Logger, tracer trace.Tracer, clock clock.Clock) *WfState {
Expand Down Expand Up @@ -174,3 +175,11 @@ func (wf *WfState) Logger() *slog.Logger {
func (wf *WfState) Tracer() trace.Tracer {
return wf.tracer
}

func (wf *WfState) SetHistoryLength(length int64) {
wf.historyLength = length
}

func (wf *WfState) HistoryLength() int64 {
return wf.historyLength
}
57 changes: 57 additions & 0 deletions samples/workflow-info/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Workflow Info Sample

This sample demonstrates how to access workflow information during workflow execution, specifically the history length.

## What it demonstrates

- Using `workflow.InstanceExecutionDetails(ctx)` to access workflow metadata
- Tracking how the workflow history grows as events are added
- Accessing the `HistoryLength` field of `WorkflowInstanceExecutionDetails`

## Running the sample

```bash
go run .
```

## Expected Output

You should see log messages showing the history length increasing as the workflow executes:

```
Workflow started historyLength=2
Activity executed
After activity execution historyLength=5
Activity executed
After second activity historyLength=8
Workflow completed successfully!
```

## How it works

The `WorkflowInstanceExecutionDetails` struct contains information about the current workflow execution. Currently it provides:

- `HistoryLength`: The number of events in the workflow history at the current point in execution

The history length includes all events that have been added to the workflow's event history, including:
- WorkflowExecutionStarted
- WorkflowTaskStarted
- ActivityScheduled
- ActivityCompleted
- TimerScheduled
- TimerFired
- And other workflow events

This can be useful for:
- Monitoring workflow complexity
- Making decisions based on how far the workflow has progressed
- Implementing custom limits or checkpointing logic
- Debugging and understanding workflow execution

## Future extensions

The `WorkflowInstanceExecutionDetails` struct is designed to be extensible. Future additions might include:
- Execution duration
- Number of activities executed
- Number of retries
- Custom metadata
90 changes: 90 additions & 0 deletions samples/workflow-info/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main

import (
"context"
"fmt"
"log"
"time"

"github.com/cschleiden/go-workflows/client"
"github.com/cschleiden/go-workflows/worker"
"github.com/cschleiden/go-workflows/workflow"

"github.com/cschleiden/go-workflows/backend/sqlite"
)

// Workflow demonstrates accessing workflow info including history length
func Workflow(ctx workflow.Context) error {
// Get workflow info at the start
info := workflow.InstanceExecutionDetails(ctx)
logger := workflow.Logger(ctx)
logger.Info("Workflow started", "historyLength", info.HistoryLength)

// Execute an activity
_, err := workflow.ExecuteActivity[any](ctx, workflow.DefaultActivityOptions, Activity).Get(ctx)
if err != nil {
return err
}

// Check history length again after activity
info = workflow.InstanceExecutionDetails(ctx)
logger.Info("After activity execution", "historyLength", info.HistoryLength)

// Execute another activity
_, err = workflow.ExecuteActivity[any](ctx, workflow.DefaultActivityOptions, Activity).Get(ctx)
if err != nil {
return err
}

// Check history length again
info = workflow.InstanceExecutionDetails(ctx)
logger.Info("After second activity", "historyLength", info.HistoryLength)

return nil
}

func Activity(ctx context.Context) error {
log.Println("Activity executed")
return nil
}

func main() {
ctx := context.Background()

// Create in-memory SQLite backend
b := sqlite.NewInMemoryBackend()

// Create worker
w := worker.New(b, nil)

// Register workflow and activity
w.RegisterWorkflow(Workflow)
w.RegisterActivity(Activity)

// Start worker
if err := w.Start(ctx); err != nil {
panic(err)
}

// Create client
c := client.New(b)

// Create workflow instance
wfi, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
InstanceID: "workflow-info-demo",
}, Workflow)
if err != nil {
panic(err)
}

fmt.Println("Created workflow instance:", wfi.InstanceID)

// Wait for result (10 second timeout)
err = c.WaitForWorkflowInstance(ctx, wfi, 10*time.Second)
if err != nil {
panic(err)
}

fmt.Println("Workflow completed successfully!")
fmt.Println("Check the logs above to see how the history length increased as the workflow executed.")
}
95 changes: 95 additions & 0 deletions tester/tester_workflowinfo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package tester

import (
"context"
"testing"

"github.com/cschleiden/go-workflows/workflow"
"github.com/stretchr/testify/require"
)

func Test_InstanceExecutionDetails_HistoryLength(t *testing.T) {
var capturedLength int64

workflowWithInfo := func(ctx workflow.Context) error {
info := workflow.InstanceExecutionDetails(ctx)
capturedLength = info.HistoryLength
return nil
}

tester := NewWorkflowTester[any](workflowWithInfo)
tester.Execute(context.Background())

require.True(t, tester.WorkflowFinished())
require.Positive(t, capturedLength, "History length should be greater than 0")
}

func Test_InstanceExecutionDetails_HistoryLength_WithActivity(t *testing.T) {
var lengthBeforeActivity, lengthAfterActivity int64

workflowWithActivity := func(ctx workflow.Context) (int, error) {
info := workflow.InstanceExecutionDetails(ctx)
lengthBeforeActivity = info.HistoryLength

r, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, activity1).Get(ctx)
if err != nil {
return 0, err
}

info = workflow.InstanceExecutionDetails(ctx)
lengthAfterActivity = info.HistoryLength

return r, nil
}

tester := NewWorkflowTester[int](workflowWithActivity)
tester.Registry().RegisterActivity(activity1)

tester.Execute(context.Background())

require.True(t, tester.WorkflowFinished())
require.Positive(t, lengthBeforeActivity)
require.Greater(t, lengthAfterActivity, lengthBeforeActivity, "History length should increase after activity execution")

r, err := tester.WorkflowResult()
require.NoError(t, err)
require.Equal(t, 23, r)
}

func Test_InstanceExecutionDetails_HistoryLength_MultipleSteps(t *testing.T) {
var lengths []int64

workflowMultipleSteps := func(ctx workflow.Context) error {
info := workflow.InstanceExecutionDetails(ctx)
lengths = append(lengths, info.HistoryLength)

workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, activity1).Get(ctx)

info = workflow.InstanceExecutionDetails(ctx)
lengths = append(lengths, info.HistoryLength)

workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, activity1).Get(ctx)

info = workflow.InstanceExecutionDetails(ctx)
lengths = append(lengths, info.HistoryLength)

return nil
}

tester := NewWorkflowTester[any](workflowMultipleSteps)
tester.Registry().RegisterActivity(activity1)

tester.Execute(context.Background())

require.True(t, tester.WorkflowFinished())

// The tester replays the workflow, so we'll see the lengths multiple times
// We just need to verify that the final three captures show increasing values
require.GreaterOrEqual(t, len(lengths), 3, "Should have at least 3 length captures")

// Get the last 3 values (from the final execution)
finalLengths := lengths[len(lengths)-3:]
require.Positive(t, finalLengths[0])
require.Greater(t, finalLengths[1], finalLengths[0], "History should grow after first activity")
require.Greater(t, finalLengths[2], finalLengths[1], "History should grow after second activity")
}
10 changes: 9 additions & 1 deletion workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ func (e *executor) ExecuteTask(ctx context.Context, t *backend.WorkflowTask) (*E
executedEvents[i].SequenceID = e.nextSequenceID()
}

e.workflowState.SetHistoryLength(e.lastSequenceID)

logger.Debug("Finished workflow task",
log.ExecutedEventsKey, len(executedEvents),
log.TaskLastSequenceIDKey, e.lastSequenceID,
Expand Down Expand Up @@ -269,10 +271,13 @@ func (e *executor) replayHistory(h []*history.Event) error {
return errors.New("history has older events than current state")
}

// Note: lastSequenceID is updated below after successful event execution.
// For consistent history length reporting (e.g., for workflow code), we intentionally set historyLength here before executing the event.
e.workflowState.SetHistoryLength(e.lastSequenceID + 1)

if err := e.executeEvent(event); err != nil {
return err
}

e.lastSequenceID = event.SequenceID
}

Expand All @@ -283,6 +288,9 @@ func (e *executor) executeNewEvents(newEvents []*history.Event) ([]*history.Even
e.workflowState.SetReplaying(false)

for i, event := range newEvents {
// Update history length BEFORE executing the event to reflect the event about to be added
e.workflowState.SetHistoryLength(e.lastSequenceID + int64(i) + 1)

if err := e.executeEvent(event); err != nil {
return newEvents[:i], err
}
Expand Down
Loading
Loading