Skip to content

Commit 2a27b62

Browse files
authored
func param WatchOrchestrationRuntimeStatus (#47)
Update WatchOrchestrationRuntimeStatus to use a func condition param to reduce go routine count. Signed-off-by: joshvanl <[email protected]>
1 parent 99d7903 commit 2a27b62

File tree

8 files changed

+33
-72
lines changed

8 files changed

+33
-72
lines changed

backend/backend.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,9 @@ type Backend interface {
8787

8888
// WatchOrchestrationRuntimeStatus is a streaming API to watch for changes to
8989
// the OrchestrtionMetadata, receiving events as and when the state changes.
90+
// When the given condition is true, returns.
9091
// Used over polling the metadata.
91-
WatchOrchestrationRuntimeStatus(ctx context.Context, id api.InstanceID, ch chan<- *OrchestrationMetadata) error
92+
WatchOrchestrationRuntimeStatus(ctx context.Context, id api.InstanceID, condition func(*OrchestrationMetadata) bool) error
9293

9394
// GetOrchestrationMetadata gets the metadata associated with the given orchestration instance ID.
9495
//

backend/client.go

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package backend
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
76
"time"
87

@@ -15,7 +14,6 @@ import (
1514
"github.com/dapr/durabletask-go/api"
1615
"github.com/dapr/durabletask-go/api/helpers"
1716
"github.com/dapr/durabletask-go/api/protos"
18-
"github.com/dapr/kit/concurrency"
1917
)
2018

2119
type TaskHubClient interface {
@@ -116,31 +114,13 @@ func (c *backendClient) WaitForOrchestrationCompletion(ctx context.Context, id a
116114
}
117115

118116
func (c *backendClient) waitForOrchestrationCondition(ctx context.Context, id api.InstanceID, condition func(metadata *OrchestrationMetadata) bool) (*OrchestrationMetadata, error) {
119-
ch := make(chan *protos.OrchestrationMetadata)
120117
var metadata *protos.OrchestrationMetadata
121-
err := concurrency.NewRunnerManager(
122-
func(ctx context.Context) error {
123-
return c.be.WatchOrchestrationRuntimeStatus(ctx, id, ch)
124-
},
125-
func(ctx context.Context) error {
126-
for {
127-
select {
128-
case <-ctx.Done():
129-
return ctx.Err()
130-
case metadata = <-ch:
131-
if condition(metadata) {
132-
return nil
133-
}
134-
}
135-
}
136-
},
137-
).Run(ctx)
138-
139-
if err != nil || ctx.Err() != nil {
140-
return nil, errors.Join(err, ctx.Err())
141-
}
118+
err := c.be.WatchOrchestrationRuntimeStatus(ctx, id, func(m *OrchestrationMetadata) bool {
119+
metadata = m
120+
return condition(m)
121+
})
142122

143-
return metadata, nil
123+
return metadata, err
144124
}
145125

146126
// TerminateOrchestration enqueues a message to terminate a running orchestration, causing it to stop receiving new events and

backend/executor.go

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"github.com/dapr/durabletask-go/api"
2222
"github.com/dapr/durabletask-go/api/helpers"
2323
"github.com/dapr/durabletask-go/api/protos"
24-
"github.com/dapr/kit/concurrency"
2524
)
2625

2726
var emptyCompleteTaskResponse = &protos.CompleteTaskResponse{}
@@ -622,28 +621,13 @@ func (g *grpcExecutor) WaitForInstanceStart(ctx context.Context, req *protos.Get
622621
func (g *grpcExecutor) waitForInstance(ctx context.Context, req *protos.GetInstanceRequest, condition func(*OrchestrationMetadata) bool) (*protos.GetInstanceResponse, error) {
623622
iid := api.InstanceID(req.InstanceId)
624623

625-
ch := make(chan *protos.OrchestrationMetadata)
626624
var metadata *protos.OrchestrationMetadata
627-
err := concurrency.NewRunnerManager(
628-
func(ctx context.Context) error {
629-
return g.backend.WatchOrchestrationRuntimeStatus(ctx, iid, ch)
630-
},
631-
func(ctx context.Context) error {
632-
for {
633-
select {
634-
case <-ctx.Done():
635-
return ctx.Err()
636-
case metadata = <-ch:
637-
if condition(metadata) {
638-
return nil
639-
}
640-
}
641-
}
642-
},
643-
).Run(ctx)
644-
645-
if err != nil || ctx.Err() != nil {
646-
return nil, errors.Join(err, ctx.Err())
625+
err := g.backend.WatchOrchestrationRuntimeStatus(ctx, iid, func(m *OrchestrationMetadata) bool {
626+
metadata = m
627+
return condition(m)
628+
})
629+
if err != nil {
630+
return nil, err
647631
}
648632

649633
if metadata == nil {

backend/postgres/postgres.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func (be *postgresBackend) NextOrchestrationWorkItem(ctx context.Context) (*back
155155
}
156156
}
157157

158-
func (be *postgresBackend) WatchOrchestrationRuntimeStatus(ctx context.Context, id api.InstanceID, ch chan<- *backend.OrchestrationMetadata) error {
158+
func (be *postgresBackend) WatchOrchestrationRuntimeStatus(ctx context.Context, id api.InstanceID, fn func(*backend.OrchestrationMetadata) bool) error {
159159
b := backoff.ExponentialBackOff{
160160
InitialInterval: 100 * time.Millisecond,
161161
MaxInterval: 10 * time.Second,
@@ -181,10 +181,8 @@ func (be *postgresBackend) WatchOrchestrationRuntimeStatus(ctx context.Context,
181181
return err
182182
}
183183

184-
select {
185-
case <-ctx.Done():
186-
return ctx.Err()
187-
case ch <- meta:
184+
if fn(meta) {
185+
return nil
188186
}
189187
}
190188
}

backend/sqlite/sqlite.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ func (be *sqliteBackend) AddNewOrchestrationEvent(ctx context.Context, iid api.I
644644
return nil
645645
}
646646

647-
func (be *sqliteBackend) WatchOrchestrationRuntimeStatus(ctx context.Context, id api.InstanceID, ch chan<- *backend.OrchestrationMetadata) error {
647+
func (be *sqliteBackend) WatchOrchestrationRuntimeStatus(ctx context.Context, id api.InstanceID, fn func(*backend.OrchestrationMetadata) bool) error {
648648
b := backoff.ExponentialBackOff{
649649
InitialInterval: 100 * time.Millisecond,
650650
MaxInterval: 10 * time.Second,
@@ -670,10 +670,8 @@ func (be *sqliteBackend) WatchOrchestrationRuntimeStatus(ctx context.Context, id
670670
return err
671671
}
672672

673-
select {
674-
case <-ctx.Done():
675-
return ctx.Err()
676-
case ch <- meta:
673+
if fn(meta) {
674+
return nil
677675
}
678676
}
679677
}

tests/mocks/Backend.go

Lines changed: 12 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/mocks/Executor.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/mocks/TaskWorker.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)