Skip to content

Commit d4434eb

Browse files
authored
Add WaitForWorkflowInstance method
1 parent 8ba2afd commit d4434eb

File tree

8 files changed

+319
-11
lines changed

8 files changed

+319
-11
lines changed

backend/backend.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ import (
88
"github.com/cschleiden/go-workflows/workflow"
99
)
1010

11+
type WorkflowState int
12+
13+
const (
14+
WorkflowStateActive WorkflowState = iota
15+
WorkflowStateFinished
16+
)
17+
1118
//go:generate mockery --name=Backend --inpackage
1219
type Backend interface {
1320
// CreateWorkflowInstance creates a new workflow instance
@@ -16,6 +23,12 @@ type Backend interface {
1623
// CancelWorkflowInstance cancels a running workflow instance
1724
CancelWorkflowInstance(ctx context.Context, instance workflow.Instance) error
1825

26+
// GetWorkflowInstanceState returns the state of the given workflow instance
27+
GetWorkflowInstanceState(ctx context.Context, instance workflow.Instance) (WorkflowState, error)
28+
29+
// GetWorkflowInstanceHistory returns the full workflow history for the given instance
30+
GetWorkflowInstanceHistory(ctx context.Context, instance workflow.Instance) ([]history.Event, error)
31+
1932
// SignalWorkflow signals a running workflow instance
2033
SignalWorkflow(ctx context.Context, instanceID string, event history.Event) error
2134

backend/mock_Backend.go

Lines changed: 44 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,77 @@ func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance work
120120
return tx.Commit()
121121
}
122122

123+
func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance workflow.Instance) ([]history.Event, error) {
124+
tx, err := b.db.BeginTx(ctx, nil)
125+
if err != nil {
126+
return nil, err
127+
}
128+
defer tx.Rollback()
129+
130+
historyEvents, err := tx.QueryContext(
131+
ctx,
132+
"SELECT event_id, instance_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `history` WHERE instance_id = ? ORDER BY id",
133+
instance.GetInstanceID(),
134+
)
135+
if err != nil {
136+
return nil, errors.Wrap(err, "could not get history")
137+
}
138+
139+
h := make([]history.Event, 0)
140+
141+
for historyEvents.Next() {
142+
var instanceID string
143+
var attributes []byte
144+
145+
historyEvent := history.Event{}
146+
147+
if err := historyEvents.Scan(
148+
&historyEvent.ID,
149+
&instanceID,
150+
&historyEvent.Type,
151+
&historyEvent.Timestamp,
152+
&historyEvent.ScheduleEventID,
153+
&attributes,
154+
&historyEvent.VisibleAt,
155+
); err != nil {
156+
return nil, errors.Wrap(err, "could not scan event")
157+
}
158+
159+
a, err := history.DeserializeAttributes(historyEvent.Type, attributes)
160+
if err != nil {
161+
return nil, errors.Wrap(err, "could not deserialize attributes")
162+
}
163+
164+
historyEvent.Attributes = a
165+
166+
h = append(h, historyEvent)
167+
}
168+
169+
return h, nil
170+
}
171+
172+
func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance workflow.Instance) (backend.WorkflowState, error) {
173+
row := b.db.QueryRowContext(
174+
ctx,
175+
"SELECT completed_at FROM instances WHERE instance_id = ? AND execution_id = ?",
176+
instance.GetInstanceID(),
177+
instance.GetExecutionID(),
178+
)
179+
180+
var completedAt sql.NullTime
181+
if err := row.Scan(&completedAt); err != nil {
182+
if err == sql.ErrNoRows {
183+
return backend.WorkflowStateActive, errors.New("could not find workflow instance")
184+
}
185+
}
186+
187+
if completedAt.Valid {
188+
return backend.WorkflowStateFinished, nil
189+
}
190+
191+
return backend.WorkflowStateActive, nil
192+
}
193+
123194
func createInstance(ctx context.Context, tx *sql.Tx, wfi workflow.Instance) error {
124195
var parentInstanceID *string
125196
var parentEventID *int

backend/sqlite/schema.sql

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ CREATE TABLE IF NOT EXISTS `instances` (
88
`locked_until` DATETIME NULL,
99
`sticky_until` DATETIME NULL,
1010
`worker` TEXT NULL
11-
1211
);
1312

1413
CREATE INDEX IF NOT EXISTS `idx_instances_locked_until_completed_at` ON `instances` (`locked_until`, `sticky_until`, `completed_at`, `worker`);

backend/sqlite/sqlite.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,43 @@ func (sb *sqliteBackend) CancelWorkflowInstance(ctx context.Context, instance wo
146146
return tx.Commit()
147147
}
148148

149+
func (sb *sqliteBackend) GetWorkflowInstanceHistory(ctx context.Context, instance workflow.Instance) ([]history.Event, error) {
150+
tx, err := sb.db.BeginTx(ctx, nil)
151+
if err != nil {
152+
return nil, err
153+
}
154+
defer tx.Rollback()
155+
156+
h, err := getHistory(ctx, tx, instance.GetInstanceID())
157+
if err != nil {
158+
return nil, errors.Wrap(err, "could not get workflow history")
159+
}
160+
161+
return h, nil
162+
}
163+
164+
func (s *sqliteBackend) GetWorkflowInstanceState(ctx context.Context, instance workflow.Instance) (backend.WorkflowState, error) {
165+
row := s.db.QueryRowContext(
166+
ctx,
167+
"SELECT completed_at FROM instances WHERE id = ? AND execution_id = ?",
168+
instance.GetInstanceID(),
169+
instance.GetExecutionID(),
170+
)
171+
172+
var completedAt sql.NullTime
173+
if err := row.Scan(&completedAt); err != nil {
174+
if err == sql.ErrNoRows {
175+
return backend.WorkflowStateActive, errors.New("could not find workflow instance")
176+
}
177+
}
178+
179+
if completedAt.Valid {
180+
return backend.WorkflowStateFinished, nil
181+
}
182+
183+
return backend.WorkflowStateActive, nil
184+
}
185+
149186
func (sb *sqliteBackend) SignalWorkflow(ctx context.Context, instanceID string, event history.Event) error {
150187
tx, err := sb.db.BeginTx(ctx, nil)
151188
if err != nil {

client/client.go

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"time"
66

7+
"github.com/benbjohnson/clock"
78
"github.com/cschleiden/go-workflows/backend"
89
a "github.com/cschleiden/go-workflows/internal/args"
910
"github.com/cschleiden/go-workflows/internal/converter"
@@ -15,6 +16,9 @@ import (
1516
"github.com/pkg/errors"
1617
)
1718

19+
var ErrWorkflowCanceled = errors.New("workflow canceled")
20+
var ErrWorkflowTerminated = errors.New("workflow terminated")
21+
1822
type WorkflowInstanceOptions struct {
1923
InstanceID string
2024
}
@@ -24,16 +28,20 @@ type Client interface {
2428

2529
CancelWorkflowInstance(ctx context.Context, instance workflow.Instance) error
2630

31+
WaitForWorkflowInstance(ctx context.Context, instance workflow.Instance, timeout time.Duration) error
32+
2733
SignalWorkflow(ctx context.Context, instanceID string, name string, arg interface{}) error
2834
}
2935

3036
type client struct {
3137
backend backend.Backend
38+
clock clock.Clock
3239
}
3340

3441
func New(backend backend.Backend) Client {
3542
return &client{
3643
backend: backend,
44+
clock: clock.New(),
3745
}
3846
}
3947

@@ -44,7 +52,7 @@ func (c *client) CreateWorkflowInstance(ctx context.Context, options WorkflowIns
4452
}
4553

4654
startedEvent := history.NewHistoryEvent(
47-
time.Now(),
55+
c.clock.Now(),
4856
history.EventType_WorkflowExecutionStarted,
4957
&history.ExecutionStartedAttributes{
5058
Name: fn.Name(wf),
@@ -76,7 +84,7 @@ func (c *client) SignalWorkflow(ctx context.Context, instanceID string, name str
7684
}
7785

7886
event := history.NewHistoryEvent(
79-
time.Now(),
87+
c.clock.Now(),
8088
history.EventType_SignalReceived,
8189
&history.SignalReceivedAttributes{
8290
Name: name,
@@ -86,3 +94,79 @@ func (c *client) SignalWorkflow(ctx context.Context, instanceID string, name str
8694

8795
return c.backend.SignalWorkflow(ctx, instanceID, event)
8896
}
97+
98+
func (c *client) WaitForWorkflowInstance(ctx context.Context, instance workflow.Instance, timeout time.Duration) error {
99+
if timeout == 0 {
100+
timeout = time.Second * 20
101+
}
102+
103+
ticker := c.clock.Ticker(time.Second)
104+
defer ticker.Stop()
105+
106+
ctx, cancel := c.clock.WithTimeout(ctx, timeout)
107+
defer cancel()
108+
109+
for {
110+
s, err := c.backend.GetWorkflowInstanceState(ctx, instance)
111+
if err != nil {
112+
return errors.Wrap(err, "could not get workflow state")
113+
}
114+
115+
if s == backend.WorkflowStateFinished {
116+
return nil
117+
}
118+
119+
ticker.Reset(time.Second)
120+
select {
121+
case <-ticker.C:
122+
continue
123+
124+
case <-ctx.Done():
125+
return errors.New("workflow did not finish in specified timeout")
126+
}
127+
}
128+
}
129+
130+
func GetWorkflowResult[T any](ctx context.Context, c Client, instance workflow.Instance, timeout time.Duration) (T, error) {
131+
// Zero result
132+
var z T
133+
134+
if err := c.WaitForWorkflowInstance(ctx, instance, timeout); err != nil {
135+
return z, errors.Wrap(err, "workflow did not finish in time")
136+
}
137+
138+
ic := c.(*client)
139+
b := ic.backend
140+
141+
h, err := b.GetWorkflowInstanceHistory(ctx, instance)
142+
if err != nil {
143+
return z, errors.Wrap(err, "could not get workflow history")
144+
}
145+
146+
// Iterate over history backwards
147+
for i := len(h) - 1; i >= 0; i-- {
148+
event := h[i]
149+
switch event.Type {
150+
case history.EventType_WorkflowExecutionFinished:
151+
a := event.Attributes.(*history.ExecutionCompletedAttributes)
152+
if a.Error != "" {
153+
return z, errors.New(a.Error)
154+
}
155+
156+
var r T
157+
if err := converter.DefaultConverter.From(a.Result, &r); err != nil {
158+
return z, errors.Wrap(err, "could not convert result")
159+
}
160+
161+
return r, nil
162+
163+
case history.EventType_WorkflowExecutionCanceled:
164+
return z, ErrWorkflowCanceled
165+
166+
case history.EventType_WorkflowExecutionTerminated:
167+
return z, ErrWorkflowTerminated
168+
}
169+
}
170+
171+
return z, errors.New("workflow finished, but could not find result event")
172+
}

0 commit comments

Comments
 (0)