Skip to content

Commit 87a5732

Browse files
authored
feat: implement WatchEvents for argoKubeWorkflowServiceClient. Fixes argoproj#6173 (argoproj#6816)
Signed-off-by: NikeNano <[email protected]>
1 parent 543366f commit 87a5732

File tree

2 files changed

+51
-10
lines changed

2 files changed

+51
-10
lines changed

pkg/apiclient/argo-kube-workflow-service-client.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package apiclient
22

33
import (
44
"context"
5-
"errors"
65
"io"
76

87
"google.golang.org/grpc"
@@ -30,7 +29,7 @@ func (c *argoKubeWorkflowServiceClient) ListWorkflows(ctx context.Context, req *
3029
}
3130

3231
func (c *argoKubeWorkflowServiceClient) WatchWorkflows(ctx context.Context, req *workflowpkg.WatchWorkflowsRequest, _ ...grpc.CallOption) (workflowpkg.WorkflowService_WatchWorkflowsClient, error) {
33-
intermediary := newWatchIntermediary(ctx)
32+
intermediary := newWorkflowWatchIntermediary(ctx)
3433
go func() {
3534
defer intermediary.cancel()
3635
err := c.delegate.WatchWorkflows(req, intermediary)
@@ -43,8 +42,18 @@ func (c *argoKubeWorkflowServiceClient) WatchWorkflows(ctx context.Context, req
4342
return intermediary, nil
4443
}
4544

46-
func (c *argoKubeWorkflowServiceClient) WatchEvents(context.Context, *workflowpkg.WatchEventsRequest, ...grpc.CallOption) (workflowpkg.WorkflowService_WatchEventsClient, error) {
47-
return nil, errors.New("not implemented")
45+
func (c *argoKubeWorkflowServiceClient) WatchEvents(ctx context.Context, req *workflowpkg.WatchEventsRequest, _ ...grpc.CallOption) (workflowpkg.WorkflowService_WatchEventsClient, error) {
46+
intermediary := newEventWatchIntermediary(ctx)
47+
go func() {
48+
defer intermediary.cancel()
49+
err := c.delegate.WatchEvents(req, intermediary)
50+
if err != nil {
51+
intermediary.error <- err
52+
} else {
53+
intermediary.error <- io.EOF
54+
}
55+
}()
56+
return intermediary, nil
4857
}
4958

5059
func (c *argoKubeWorkflowServiceClient) DeleteWorkflow(ctx context.Context, req *workflowpkg.WorkflowDeleteRequest, _ ...grpc.CallOption) (*workflowpkg.WorkflowDeleteResponse, error) {

pkg/apiclient/watch-intermediary.go

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,52 @@ import (
55

66
"google.golang.org/grpc/metadata"
77

8+
v1 "k8s.io/api/core/v1"
9+
810
workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
911
)
1012

11-
type watchIntermediary struct {
13+
type workflowWatchIntermediary struct {
1214
abstractIntermediary
1315
events chan *workflowpkg.WorkflowWatchEvent
1416
}
1517

16-
func (w watchIntermediary) Send(e *workflowpkg.WorkflowWatchEvent) error {
18+
func (w workflowWatchIntermediary) Send(e *workflowpkg.WorkflowWatchEvent) error {
19+
w.events <- e
20+
return nil
21+
}
22+
23+
func (w workflowWatchIntermediary) Recv() (*workflowpkg.WorkflowWatchEvent, error) {
24+
select {
25+
case e := <-w.error:
26+
return nil, e
27+
case event := <-w.events:
28+
return event, nil
29+
}
30+
}
31+
32+
func (w *workflowWatchIntermediary) SendHeader(metadata.MD) error {
33+
// We invoke `SendHeader` in order to eagerly flush headers to allow us to send period
34+
// keepalives when using HTTP/1 and Server Sent Events, so we need to implement this here,
35+
// though we don't use the meta for anything.
36+
return nil
37+
}
38+
39+
func newWorkflowWatchIntermediary(ctx context.Context) *workflowWatchIntermediary {
40+
return &workflowWatchIntermediary{newAbstractIntermediary(ctx), make(chan *workflowpkg.WorkflowWatchEvent)}
41+
}
42+
43+
type eventWatchIntermediary struct {
44+
abstractIntermediary
45+
events chan *v1.Event
46+
}
47+
48+
func (w eventWatchIntermediary) Send(e *v1.Event) error {
1749
w.events <- e
1850
return nil
1951
}
2052

21-
func (w watchIntermediary) Recv() (*workflowpkg.WorkflowWatchEvent, error) {
53+
func (w eventWatchIntermediary) Recv() (*v1.Event, error) {
2254
select {
2355
case e := <-w.error:
2456
return nil, e
@@ -27,13 +59,13 @@ func (w watchIntermediary) Recv() (*workflowpkg.WorkflowWatchEvent, error) {
2759
}
2860
}
2961

30-
func (w *watchIntermediary) SendHeader(metadata.MD) error {
62+
func (w *eventWatchIntermediary) SendHeader(metadata.MD) error {
3163
// We invoke `SendHeader` in order to eagerly flush headers to allow us to send period
3264
// keepalives when using HTTP/1 and Server Sent Events, so we need to implement this here,
3365
// though we don't use the meta for anything.
3466
return nil
3567
}
3668

37-
func newWatchIntermediary(ctx context.Context) *watchIntermediary {
38-
return &watchIntermediary{newAbstractIntermediary(ctx), make(chan *workflowpkg.WorkflowWatchEvent)}
69+
func newEventWatchIntermediary(ctx context.Context) *eventWatchIntermediary {
70+
return &eventWatchIntermediary{newAbstractIntermediary(ctx), make(chan *v1.Event)}
3971
}

0 commit comments

Comments
 (0)