Skip to content

Commit 31346f0

Browse files
authored
Introduce interface for topic event subscriber (#661)
* Introduce interface for topic event subscriber I introduced the TopicEventSubscriber interface to allow for subscribers to be implemented as structs. This will allow subscribers to be initialized with any settings or dependencies needed to process incoming events. I converted TopicEventHandler to implement TopicEventSubscriber. I revised TopicRegistrar to store TopicEventSubscribers instead of TopicEventHandlers. Resolves #660 Signed-off-by: Michael Collins <[email protected]> * Update go-service documentation I added examples of how to use the TopicEventSubscriber interface to create a new subscriber for both HTTP and gRPC services. Signed-off-by: Michael Collins <[email protected]> --------- Signed-off-by: Michael Collins <[email protected]>
1 parent 921a6a7 commit 31346f0

File tree

7 files changed

+107
-16
lines changed

7 files changed

+107
-16
lines changed

daprdocs/content/en/go-sdk-docs/go-service/grpc-service.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,35 @@ if err != nil {
8383
}
8484
```
8585

86+
You can also create a custom type that implements the `TopicEventSubscriber` interface to handle your events:
87+
88+
```go
89+
type EventHandler struct {
90+
// any data or references that your event handler needs.
91+
}
92+
93+
func (h *EventHandler) Handle(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
94+
log.Printf("event - PubsubName:%s, Topic:%s, ID:%s, Data: %v", e.PubsubName, e.Topic, e.ID, e.Data)
95+
// do something with the event
96+
return true, nil
97+
}
98+
```
99+
100+
The `EventHandler` can then be added using the `AddTopicEventSubscriber` method:
101+
102+
```go
103+
sub := &common.Subscription{
104+
PubsubName: "messages",
105+
Topic: "topic1",
106+
}
107+
eventHandler := &EventHandler{
108+
// initialize any fields
109+
}
110+
if err := s.AddTopicEventSubscriber(sub, eventHandler); err != nil {
111+
log.Fatalf("error adding topic subscription: %v", err)
112+
}
113+
```
114+
86115
### Service Invocation Handler
87116
To handle service invocations you will need to add at least one service invocation handler before starting the service:
88117

daprdocs/content/en/go-sdk-docs/go-service/http-service.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,35 @@ if err != nil {
7878
}
7979
```
8080

81+
You can also create a custom type that implements the `TopicEventSubscriber` interface to handle your events:
82+
83+
```go
84+
type EventHandler struct {
85+
// any data or references that your event handler needs.
86+
}
87+
88+
func (h *EventHandler) Handle(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
89+
log.Printf("event - PubsubName:%s, Topic:%s, ID:%s, Data: %v", e.PubsubName, e.Topic, e.ID, e.Data)
90+
// do something with the event
91+
return true, nil
92+
}
93+
```
94+
95+
The `EventHandler` can then be added using the `AddTopicEventSubscriber` method:
96+
97+
```go
98+
sub := &common.Subscription{
99+
PubsubName: "messages",
100+
Topic: "topic1",
101+
}
102+
eventHandler := &EventHandler{
103+
// initialize any fields
104+
}
105+
if err := s.AddTopicEventSubscriber(sub, eventHandler); err != nil {
106+
log.Fatalf("error adding topic subscription: %v", err)
107+
}
108+
```
109+
81110
### Service Invocation Handler
82111
To handle service invocations you will need to add at least one service invocation handler before starting the service:
83112

service/common/service.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,23 @@ const (
2727
)
2828

2929
// Service represents Dapr callback service.
30+
//
31+
//nolint:interfacebloat
3032
type Service interface {
3133
// AddHealthCheckHandler sets a health check handler, name: http (router) and grpc (invalid).
3234
AddHealthCheckHandler(name string, fn HealthCheckHandler) error
3335
// AddServiceInvocationHandler appends provided service invocation handler with its name to the service.
3436
AddServiceInvocationHandler(name string, fn ServiceInvocationHandler) error
3537
// AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service.
36-
// Note, retries are only considered when there is an error. Lack of error is considered as a success
38+
// Note, retries are only considered when there is an error. Lack of error is considered as a success.
3739
AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error
40+
// AddTopicEventSubscriber appends the provided subscriber with its topic and optional metadata to the service.
41+
// Note, retries are only considered when there is an error. Lack of error is considered as a success.
42+
AddTopicEventSubscriber(sub *Subscription, subscriber TopicEventSubscriber) error
3843
// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service.
3944
AddBindingInvocationHandler(name string, fn BindingInvocationHandler) error
4045
// RegisterActorImplFactory Register a new actor to actor runtime of go sdk
41-
// Deprecated: use RegisterActorImplFactoryContext instead
46+
// Deprecated: use RegisterActorImplFactoryContext instead.
4247
RegisterActorImplFactory(f actor.Factory, opts ...config.Option)
4348
// RegisterActorImplFactoryContext Register a new actor to actor runtime of go sdk
4449
RegisterActorImplFactoryContext(f actor.FactoryContext, opts ...config.Option)
@@ -49,7 +54,7 @@ type Service interface {
4954
Start() error
5055
// Stop stops the previously started service.
5156
Stop() error
52-
// Gracefully stops the previous started service
57+
// Gracefully stops the previous started service.
5358
GracefulStop() error
5459
}
5560

@@ -60,3 +65,13 @@ type (
6065
JobEventHandler func(ctx context.Context, in *JobEvent) error
6166
HealthCheckHandler func(context.Context) error
6267
)
68+
69+
type TopicEventSubscriber interface {
70+
Handle(ctx context.Context, e *TopicEvent) (retry bool, err error)
71+
}
72+
73+
// Handle converts TopicEventHandler into an adapter that implements
74+
// TopicEventSubscriber.
75+
func (h TopicEventHandler) Handle(ctx context.Context, e *TopicEvent) (retry bool, err error) {
76+
return h(ctx, e)
77+
}

service/grpc/topic.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,20 @@ import (
3131

3232
// AddTopicEventHandler appends provided event handler with topic name to the service.
3333
func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error {
34+
if fn == nil {
35+
return errors.New("topic handler required")
36+
}
37+
38+
return s.AddTopicEventSubscriber(sub, fn)
39+
}
40+
41+
// AddTopicEventSubscriber appends the provided subscriber to the service.
42+
func (s *Server) AddTopicEventSubscriber(sub *common.Subscription, subscriber common.TopicEventSubscriber) error {
3443
if sub == nil {
3544
return errors.New("subscription required")
3645
}
3746

38-
return s.topicRegistrar.AddSubscription(sub, fn)
47+
return s.topicRegistrar.AddSubscription(sub, subscriber)
3948
}
4049

4150
// ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to.
@@ -142,7 +151,7 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventReq
142151
in.GetPath(), in.GetPubsubName(), in.GetTopic(),
143152
)
144153
}
145-
retry, err := h(ctx, e)
154+
retry, err := h.Handle(ctx, e)
146155
if err == nil {
147156
return &runtimev1pb.TopicEventResponse{Status: runtimev1pb.TopicEventResponse_SUCCESS}, nil
148157
}

service/http/topic.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,15 @@ func (s *Server) registerBaseHandler() {
241241

242242
// AddTopicEventHandler appends provided event handler with it's name to the service.
243243
func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error {
244+
if fn == nil {
245+
return errors.New("topic handler required")
246+
}
247+
248+
return s.AddTopicEventSubscriber(sub, fn)
249+
}
250+
251+
// AddTopicEventSubscriber appends the provided subscriber to the service.
252+
func (s *Server) AddTopicEventSubscriber(sub *common.Subscription, subscriber common.TopicEventSubscriber) error {
244253
if sub == nil {
245254
return errors.New("subscription required")
246255
}
@@ -249,7 +258,7 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
249258
if sub.Route == "" {
250259
return errors.New("handler route name")
251260
}
252-
if err := s.topicRegistrar.AddSubscription(sub, fn); err != nil {
261+
if err := s.topicRegistrar.AddSubscription(sub, subscriber); err != nil {
253262
return err
254263
}
255264

@@ -306,7 +315,7 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
306315
w.WriteHeader(http.StatusOK)
307316

308317
// execute user handler
309-
retry, err := fn(r.Context(), &te)
318+
retry, err := subscriber.Handle(r.Context(), &te)
310319
if err == nil {
311320
writeStatus(w, common.SubscriptionResponseStatusSuccess)
312321
return

service/internal/topicregistrar.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ type TopicRegistrar map[string]*TopicRegistration
1414
// TopicRegistration encapsulates the subscription and handlers.
1515
type TopicRegistration struct {
1616
Subscription *TopicSubscription
17-
DefaultHandler common.TopicEventHandler
18-
RouteHandlers map[string]common.TopicEventHandler
17+
DefaultHandler common.TopicEventSubscriber
18+
RouteHandlers map[string]common.TopicEventSubscriber
1919
}
2020

21-
func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.TopicEventHandler) error {
21+
func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.TopicEventSubscriber) error {
2222
if sub.Topic == "" {
2323
return errors.New("topic name required")
2424
}
@@ -40,7 +40,7 @@ func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.Topi
4040
if !ok {
4141
ts = &TopicRegistration{
4242
Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic),
43-
RouteHandlers: make(map[string]common.TopicEventHandler),
43+
RouteHandlers: make(map[string]common.TopicEventSubscriber),
4444
DefaultHandler: nil,
4545
}
4646
ts.Subscription.SetMetadata(sub.Metadata)

service/internal/topicregistrar_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ import (
1313
)
1414

1515
func TestTopicRegistrarValidation(t *testing.T) {
16-
fn := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
16+
fn := common.TopicEventHandler(func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
1717
return false, nil
18-
}
18+
})
1919
tests := map[string]struct {
2020
sub common.Subscription
21-
fn common.TopicEventHandler
21+
fn common.TopicEventSubscriber
2222
err string
2323
}{
2424
"pubsub required": {
@@ -75,9 +75,9 @@ func TestTopicRegistrarValidation(t *testing.T) {
7575
}
7676

7777
func TestTopicAddSubscriptionMetadata(t *testing.T) {
78-
handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
78+
handler := common.TopicEventHandler(func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
7979
return false, nil
80-
}
80+
})
8181
topicRegistrar := internal.TopicRegistrar{}
8282
sub := &common.Subscription{
8383
PubsubName: "pubsubname",

0 commit comments

Comments
 (0)