Skip to content

Commit b1be73e

Browse files
authored
decoupling the event transport interface from ce sdk-go (#159)
Signed-off-by: Wei Liu <[email protected]>
1 parent b492d8b commit b1be73e

37 files changed

+420
-497
lines changed

pkg/cloudevents/clients/addon/client_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"testing"
77
"time"
88

9-
"github.com/cloudevents/sdk-go/v2/protocol/gochan"
109
jsonpatch "github.com/evanphx/json-patch/v5"
1110

1211
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -64,11 +63,14 @@ func TestPatch(t *testing.T) {
6463

6564
for _, c := range cases {
6665
t.Run(c.name, func(t *testing.T) {
66+
ctx, cancel := context.WithCancel(context.Background())
67+
defer cancel()
68+
6769
watcherStore := store.NewAgentInformerWatcherStore[*addonapiv1alpha1.ManagedClusterAddOn]()
6870

69-
ceClientOpt := fake.NewAgentOptions(gochan.New(), nil, c.clusterName, c.clusterName+"agent")
71+
ceClientOpt := fake.NewAgentOptions(fake.NewEventChan(), c.clusterName, c.clusterName+"agent")
7072
ceClient, err := clients.NewCloudEventAgentClient(
71-
context.Background(),
73+
ctx,
7274
ceClientOpt,
7375
store.NewAgentWatcherStoreLister(watcherStore),
7476
statushash.StatusHash,
@@ -89,7 +91,7 @@ func TestPatch(t *testing.T) {
8991
watcherStore.SetInformer(informer)
9092

9193
if _, err = addonClientSet.AddonV1alpha1().ManagedClusterAddOns(c.clusterName).Patch(
92-
context.Background(),
94+
ctx,
9395
c.addon.Name,
9496
types.MergePatchType,
9597
c.patch,

pkg/cloudevents/clients/cluster/client_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"testing"
77
"time"
88

9-
"github.com/cloudevents/sdk-go/v2/protocol/gochan"
109
jsonpatch "github.com/evanphx/json-patch/v5"
1110

1211
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -57,7 +56,7 @@ func TestCreate(t *testing.T) {
5756
defer cancel()
5857

5958
watcherStore := store.NewAgentInformerWatcherStore[*clusterv1.ManagedCluster]()
60-
ceClientOpt := fake.NewAgentOptions(gochan.New(), nil, "cluster1", "cluster1-agent")
59+
ceClientOpt := fake.NewAgentOptions(fake.NewEventChan(), "cluster1", "cluster1-agent")
6160
ceClient, err := clients.NewCloudEventAgentClient(
6261
ctx,
6362
ceClientOpt,
@@ -116,7 +115,7 @@ func TestPatch(t *testing.T) {
116115
defer cancel()
117116

118117
watcherStore := store.NewAgentInformerWatcherStore[*clusterv1.ManagedCluster]()
119-
ceClientOpt := fake.NewAgentOptions(gochan.New(), nil, "cluster1", "test-agent")
118+
ceClientOpt := fake.NewAgentOptions(fake.NewEventChan(), "cluster1", "test-agent")
120119
ceClient, err := clients.NewCloudEventAgentClient(
121120
ctx,
122121
ceClientOpt,

pkg/cloudevents/clients/csr/client_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77

88
v1 "open-cluster-management.io/api/cluster/v1"
99

10-
"github.com/cloudevents/sdk-go/v2/protocol/gochan"
11-
1210
certificatev1 "k8s.io/api/certificates/v1"
1311
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1412
"k8s.io/apimachinery/pkg/runtime"
@@ -61,7 +59,7 @@ func TestCreate(t *testing.T) {
6159
defer cancel()
6260

6361
watcherStore := store.NewAgentInformerWatcherStore[*certificatev1.CertificateSigningRequest]()
64-
ceClientOpt := fake.NewAgentOptions(gochan.New(), nil, "cluster1", "cluster1-agent")
62+
ceClientOpt := fake.NewAgentOptions(fake.NewEventChan(), "cluster1", "cluster1-agent")
6563
ceClient, err := clients.NewCloudEventAgentClient(
6664
ctx,
6765
ceClientOpt,

pkg/cloudevents/clients/event/client_test.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"encoding/json"
66
"testing"
77

8-
"github.com/cloudevents/sdk-go/v2/protocol/gochan"
9-
108
eventsv1 "k8s.io/api/events/v1"
119
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1210
"k8s.io/apimachinery/pkg/types"
@@ -34,9 +32,12 @@ func TestCreate(t *testing.T) {
3432

3533
for _, c := range cases {
3634
t.Run(c.name, func(t *testing.T) {
35+
ctx, cancel := context.WithCancel(context.Background())
36+
defer cancel()
37+
3738
ceClient, err := clients.NewCloudEventAgentClient(
38-
context.Background(),
39-
fake.NewAgentOptions(gochan.New(), nil, c.clusterName, c.clusterName+"agent"),
39+
ctx,
40+
fake.NewAgentOptions(fake.NewEventChan(), c.clusterName, c.clusterName+"agent"),
4041
nil,
4142
statushash.StatusHash,
4243
NewEventCodec())
@@ -89,9 +90,12 @@ func TestPatch(t *testing.T) {
8990

9091
for _, c := range cases {
9192
t.Run(c.name, func(t *testing.T) {
93+
ctx, cancel := context.WithCancel(context.Background())
94+
defer cancel()
95+
9296
ceClient, err := clients.NewCloudEventAgentClient(
93-
context.Background(),
94-
fake.NewAgentOptions(gochan.New(), nil, c.clusterName, c.clusterName+"agent"),
97+
ctx,
98+
fake.NewAgentOptions(fake.NewEventChan(), c.clusterName, c.clusterName+"agent"),
9599
nil,
96100
statushash.StatusHash,
97101
NewEventCodec())
@@ -101,7 +105,7 @@ func TestPatch(t *testing.T) {
101105

102106
evtClient := NewEventClient(ceClient).WithNamespace(c.clusterName)
103107

104-
if _, err := evtClient.Patch(context.Background(),
108+
if _, err := evtClient.Patch(ctx,
105109
c.event.Name,
106110
types.StrategicMergePatchType,
107111
c.patch,

pkg/cloudevents/clients/lease/client_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"context"
55
"testing"
66

7-
"github.com/cloudevents/sdk-go/v2/protocol/gochan"
8-
97
coordv1 "k8s.io/api/coordination/v1"
108
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
119

@@ -35,10 +33,13 @@ func TestUpdate(t *testing.T) {
3533

3634
for _, c := range cases {
3735
t.Run(c.name, func(t *testing.T) {
36+
ctx, cancel := context.WithCancel(context.Background())
37+
defer cancel()
38+
3839
leaseWatchStore := store.NewSimpleStore[*coordv1.Lease]()
3940
ceClient, err := clients.NewCloudEventAgentClient(
40-
context.Background(),
41-
fake.NewAgentOptions(gochan.New(), nil, c.clusterName, c.clusterName+"agent"),
41+
ctx,
42+
fake.NewAgentOptions(fake.NewEventChan(), c.clusterName, c.clusterName+"agent"),
4243
store.NewAgentWatcherStoreLister(leaseWatchStore),
4344
statushash.StatusHash,
4445
NewLeaseCodec())
@@ -81,6 +82,9 @@ func TestGet(t *testing.T) {
8182

8283
for _, c := range cases {
8384
t.Run(c.name, func(t *testing.T) {
85+
ctx, cancel := context.WithCancel(context.Background())
86+
defer cancel()
87+
8488
leaseWatchStore := store.NewSimpleStore[*coordv1.Lease]()
8589
for _, lease := range c.leases {
8690
if err := leaseWatchStore.Add(lease); err != nil {
@@ -89,8 +93,8 @@ func TestGet(t *testing.T) {
8993
}
9094

9195
ceClient, err := clients.NewCloudEventAgentClient(
92-
context.Background(),
93-
fake.NewAgentOptions(gochan.New(), nil, "cluster1", "cluster1-agent"),
96+
ctx,
97+
fake.NewAgentOptions(fake.NewEventChan(), "cluster1", "cluster1-agent"),
9498
store.NewAgentWatcherStoreLister(leaseWatchStore),
9599
statushash.StatusHash,
96100
NewLeaseCodec())

pkg/cloudevents/clients/options/generic.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (o *GenericClientOptions[T]) AgentClient(ctx context.Context) (generic.Clou
111111
o.watcherStore = store.NewAgentInformerWatcherStore[T]()
112112
}
113113

114-
options, err := builder.BuildCloudEventsAgentOptions(o.config, o.clusterName, o.clientID)
114+
options, err := builder.BuildCloudEventsAgentOptions(o.config, o.clusterName, o.clientID, o.codec.EventDataType())
115115
if err != nil {
116116
return nil, err
117117
}
@@ -182,7 +182,7 @@ func (o *GenericClientOptions[T]) SourceClient(ctx context.Context) (generic.Clo
182182
return nil, fmt.Errorf("a watcher store is required")
183183
}
184184

185-
options, err := builder.BuildCloudEventsSourceOptions(o.config, o.clientID, o.sourceID)
185+
options, err := builder.BuildCloudEventsSourceOptions(o.config, o.clientID, o.sourceID, o.codec.EventDataType())
186186
if err != nil {
187187
return nil, err
188188
}

pkg/cloudevents/generic/clients/agentclient.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,9 @@ func NewCloudEventAgentClient[T generic.ResourceObject](
4848
) (generic.CloudEventsClient[T], error) {
4949
baseClient := &baseClient{
5050
clientID: agentOptions.AgentID,
51-
cloudEventsOptions: agentOptions.CloudEventsOptions,
51+
transport: agentOptions.CloudEventsTransport,
5252
cloudEventsRateLimiter: utils.NewRateLimiter(agentOptions.EventRateLimit),
5353
reconnectedChan: make(chan struct{}),
54-
dataType: codec.EventDataType(),
5554
}
5655

5756
if err := baseClient.connect(ctx); err != nil {

pkg/cloudevents/generic/clients/agentclient_test.go

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"time"
99

1010
cloudevents "github.com/cloudevents/sdk-go/v2"
11-
"github.com/cloudevents/sdk-go/v2/protocol/gochan"
1211
"github.com/stretchr/testify/require"
1312

1413
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -61,7 +60,7 @@ func TestAgentResync(t *testing.T) {
6160
lister := generictesting.NewMockResourceLister(c.resources...)
6261
agent, err := NewCloudEventAgentClient(
6362
ctx,
64-
fake.NewAgentOptions(gochan.New(), nil, c.clusterName, testAgentName),
63+
fake.NewAgentOptions(fake.NewEventChan(), c.clusterName, testAgentName),
6564
lister, generictesting.StatusHash,
6665
generictesting.NewMockResourceCodec(),
6766
)
@@ -72,12 +71,19 @@ func TestAgentResync(t *testing.T) {
7271
stop := make(chan bool)
7372

7473
go func() {
75-
cloudEventsClient := agent.(*CloudEventAgentClient[*generictesting.MockResource]).cloudEventsClient
76-
err = cloudEventsClient.StartReceiver(ctx, func(event cloudevents.Event) {
77-
eventChan <- receiveEvent{event: event}
74+
transport := agent.(*CloudEventAgentClient[*generictesting.MockResource]).transport
75+
err = transport.Receive(ctx, func(event cloudevents.Event) {
76+
select {
77+
case eventChan <- receiveEvent{event: event}:
78+
case <-ctx.Done():
79+
return
80+
}
7881
})
79-
if err != nil {
80-
eventChan <- receiveEvent{err: err}
82+
if err != nil && err != context.Canceled {
83+
select {
84+
case eventChan <- receiveEvent{err: err}:
85+
case <-ctx.Done():
86+
}
8187
}
8288
stop <- true
8389
}()
@@ -132,10 +138,10 @@ func TestAgentPublish(t *testing.T) {
132138
t.Run(c.name, func(t *testing.T) {
133139
ctx, cancel := context.WithCancel(context.Background())
134140

135-
agentOptions := fake.NewAgentOptions(gochan.New(), nil, c.clusterName, testAgentName)
141+
agentOptions := fake.NewAgentOptions(fake.NewEventChan(), c.clusterName, testAgentName)
136142
lister := generictesting.NewMockResourceLister()
137143
agent, err := NewCloudEventAgentClient(
138-
context.TODO(),
144+
ctx,
139145
agentOptions,
140146
lister,
141147
generictesting.StatusHash,
@@ -147,12 +153,19 @@ func TestAgentPublish(t *testing.T) {
147153
eventChan := make(chan receiveEvent)
148154
stop := make(chan bool)
149155
go func() {
150-
cloudEventsClient := agent.(*CloudEventAgentClient[*generictesting.MockResource]).cloudEventsClient
151-
err = cloudEventsClient.StartReceiver(ctx, func(event cloudevents.Event) {
152-
eventChan <- receiveEvent{event: event}
156+
cloudEventsClient := agent.(*CloudEventAgentClient[*generictesting.MockResource]).transport
157+
err = cloudEventsClient.Receive(ctx, func(event cloudevents.Event) {
158+
select {
159+
case eventChan <- receiveEvent{event: event}:
160+
case <-ctx.Done():
161+
return
162+
}
153163
})
154-
if err != nil {
155-
eventChan <- receiveEvent{err: err}
164+
if err != nil && err != context.Canceled {
165+
select {
166+
case eventChan <- receiveEvent{err: err}:
167+
case <-ctx.Done():
168+
}
156169
}
157170
stop <- true
158171
}()
@@ -293,7 +306,7 @@ func TestStatusResyncResponse(t *testing.T) {
293306
ctx, cancel := context.WithCancel(context.Background())
294307
defer cancel()
295308

296-
agentOptions := fake.NewAgentOptions(gochan.New(), nil, c.clusterName, testAgentName)
309+
agentOptions := fake.NewAgentOptions(fake.NewEventChan(), c.clusterName, testAgentName)
297310
lister := generictesting.NewMockResourceLister(c.resources...)
298311
agent, err := NewCloudEventAgentClient(
299312
ctx,
@@ -310,8 +323,8 @@ func TestStatusResyncResponse(t *testing.T) {
310323
mutex := &sync.Mutex{}
311324

312325
go func() {
313-
cloudEventsClient := agent.(*CloudEventAgentClient[*generictesting.MockResource]).cloudEventsClient
314-
_ = cloudEventsClient.StartReceiver(ctx, func(event cloudevents.Event) {
326+
cloudEventsClient := agent.(*CloudEventAgentClient[*generictesting.MockResource]).transport
327+
_ = cloudEventsClient.Receive(ctx, func(event cloudevents.Event) {
315328
mutex.Lock()
316329
defer mutex.Unlock()
317330
receivedEvents = append(receivedEvents, event)
@@ -511,7 +524,7 @@ func TestReceiveResourceSpec(t *testing.T) {
511524

512525
for _, c := range cases {
513526
t.Run(c.name, func(t *testing.T) {
514-
agentOptions := fake.NewAgentOptions(gochan.New(), nil, c.clusterName, testAgentName)
527+
agentOptions := fake.NewAgentOptions(fake.NewEventChan(), c.clusterName, testAgentName)
515528
lister := generictesting.NewMockResourceLister(c.resources...)
516529
agent, err := NewCloudEventAgentClient(
517530
context.TODO(),

0 commit comments

Comments
 (0)