Skip to content

Commit ecea49a

Browse files
committed
runtime: subscription engine refactor [1]
This change is the first in a series of refactors to the subscription engine. This work is in preparation for adding Actor Subscriptions as detailed [here](dapr/proposals#78). In order to implement this new subscription type, the engine needs a hierarchical reordering of the internal data structures in flow to make adding the new implementation sane and maintainable. This first change introduces a new `Postman` interface that is used to send messages to subscription sinks- HTTP, gRPC or streaming channels. Actors would be another sink added in future. Shared types have been moved to a new `pkg/runtime/subscription/todo` package. Appreciate that there will be further changes made to this code path. Signed-off-by: joshvanl <[email protected]>
1 parent fffaca6 commit ecea49a

File tree

21 files changed

+2295
-1928
lines changed

21 files changed

+2295
-1928
lines changed

pkg/runtime/processor/subscriber/subscriber.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ import (
3232
"github.com/dapr/dapr/pkg/runtime/compstore"
3333
rtpubsub "github.com/dapr/dapr/pkg/runtime/pubsub"
3434
"github.com/dapr/dapr/pkg/runtime/subscription"
35+
"github.com/dapr/dapr/pkg/runtime/subscription/postman"
36+
postmangrpc "github.com/dapr/dapr/pkg/runtime/subscription/postman/grpc"
37+
"github.com/dapr/dapr/pkg/runtime/subscription/postman/http"
38+
"github.com/dapr/dapr/pkg/runtime/subscription/postman/streaming"
3539
"github.com/dapr/kit/logger"
3640
)
3741

@@ -492,24 +496,43 @@ func (s *Subscriber) initProgramaticSubscriptions(ctx context.Context) error {
492496
}
493497

494498
func (s *Subscriber) startSubscription(pubsub *rtpubsub.PubsubItem, comp *compstore.NamedSubscription, isStreamer bool) (*subscription.Subscription, error) {
499+
// TODO: @joshvanl
500+
var postman postman.Interface
495501
var streamer rtpubsub.AdapterStreamer
496502
if isStreamer {
497503
streamer = s.adapterStreamer
504+
postman = streaming.New(streaming.Options{
505+
Tracing: s.tracingSpec,
506+
Channel: s.adapterStreamer,
507+
})
508+
} else {
509+
if s.isHTTP {
510+
postman = http.New(http.Options{
511+
Channels: s.channels,
512+
Tracing: s.tracingSpec,
513+
Adapter: s.adapter,
514+
})
515+
} else {
516+
postman = postmangrpc.New(postmangrpc.Options{
517+
Channel: s.grpc,
518+
Tracing: s.tracingSpec,
519+
Adapter: s.adapter,
520+
})
521+
}
498522
}
499523
return subscription.New(subscription.Options{
500524
AppID: s.appID,
501525
Namespace: s.namespace,
502526
PubSubName: comp.PubsubName,
503527
Topic: comp.Topic,
504-
IsHTTP: s.isHTTP,
505528
PubSub: pubsub,
506529
Resiliency: s.resiliency,
507530
TraceSpec: s.tracingSpec,
508531
Route: comp.Subscription,
509-
Channels: s.channels,
510532
GRPC: s.grpc,
511533
Adapter: s.adapter,
512534
AdapterStreamer: streamer,
513535
ConnectionID: comp.ConnectionID,
536+
Postman: postman,
514537
})
515538
}

pkg/runtime/pubsub/adapter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type Adapter interface {
4545

4646
type AdapterStreamer interface {
4747
Subscribe(rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server, *rtv1pb.SubscribeTopicEventsRequestInitialAlpha1, ConnectionID) error
48-
Publish(context.Context, *SubscribedMessage) error
48+
Publish(context.Context, *SubscribedMessage) (*rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1, error)
4949
StreamerKey(pubsub, topic string) string
5050
Close(key string, connectionID ConnectionID)
5151
}

pkg/runtime/pubsub/streamer/streamer.go

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -146,27 +146,28 @@ func (s *streamer) recvLoop(
146146
}
147147
}
148148

149-
func (s *streamer) Publish(ctx context.Context, msg *rtpubsub.SubscribedMessage) error {
149+
// TODO: @joshvanl: move diagnostics.
150+
func (s *streamer) Publish(ctx context.Context, msg *rtpubsub.SubscribedMessage) (*rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1, error) {
150151
s.lock.RLock()
151152
key := s.StreamerKey(msg.PubSub, msg.Topic)
152153
connection, ok := s.subscribers[key][msg.SubscriberID]
153154
s.lock.RUnlock()
154155
if !ok {
155-
return fmt.Errorf("no streamer subscribed to pubsub %q topic %q", msg.PubSub, msg.Topic)
156+
return nil, fmt.Errorf("no streamer subscribed to pubsub %q topic %q", msg.PubSub, msg.Topic)
156157
}
157158

158159
if connection.closed.Load() {
159-
return errors.New("connection is closed")
160+
return nil, errors.New("connection is closed")
160161
}
161162

162163
envelope, span, err := rtpubsub.GRPCEnvelopeFromSubscriptionMessage(ctx, msg, log, s.tracingSpec)
163164
if err != nil {
164-
return err
165+
return nil, err
165166
}
166167

167168
ch, cleanup := connection.registerPublishResponse(envelope.GetId())
168169
if ch == nil {
169-
return fmt.Errorf("no client stream expecting publish response for id %s ConnectionID%d", envelope.GetId(), connection.connectionID)
170+
return nil, fmt.Errorf("no client stream expecting publish response for id %s ConnectionID%d", envelope.GetId(), connection.connectionID)
170171
}
171172
defer cleanup()
172173

@@ -191,35 +192,16 @@ func (s *streamer) Publish(ctx context.Context, msg *rtpubsub.SubscribedMessage)
191192
err = fmt.Errorf("error returned from app while processing pub/sub event %v: %w", msg.CloudEvent[contribpubsub.IDField], rterrors.NewRetriable(err))
192193
log.Debug(err)
193194
diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.PubSub, strings.ToLower(string(contribpubsub.Retry)), "", msg.Topic, elapsed)
194-
return err
195+
return nil, err
195196
}
196197

197-
var resp *rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1
198198
select {
199199
case <-ctx.Done():
200-
return ctx.Err()
200+
return nil, ctx.Err()
201201
case <-connection.closeCh:
202-
case resp = <-ch:
203-
}
204-
205-
switch resp.GetStatus().GetStatus() {
206-
case rtv1pb.TopicEventResponse_SUCCESS: //nolint:nosnakecase
207-
// on uninitialized status, this is the case it defaults to as an uninitialized status defaults to 0 which is
208-
// success from protobuf definition
209-
diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.PubSub, strings.ToLower(string(contribpubsub.Success)), "", msg.Topic, elapsed)
210-
return nil
211-
case rtv1pb.TopicEventResponse_RETRY: //nolint:nosnakecase
212-
diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.PubSub, strings.ToLower(string(contribpubsub.Retry)), "", msg.Topic, elapsed)
213-
// TODO: add retry error info
214-
return fmt.Errorf("RETRY status returned from app while processing pub/sub event %v: %w", msg.CloudEvent[contribpubsub.IDField], rterrors.NewRetriable(nil))
215-
case rtv1pb.TopicEventResponse_DROP: //nolint:nosnakecase
216-
log.Warnf("DROP status returned from app while processing pub/sub event %v", msg.CloudEvent[contribpubsub.IDField])
217-
diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.PubSub, strings.ToLower(string(contribpubsub.Drop)), "", msg.Topic, elapsed)
218-
return rtpubsub.ErrMessageDropped
219-
default:
220-
// Consider unknown status field as error and retry
221-
diag.DefaultComponentMonitoring.PubsubIngressEvent(ctx, msg.PubSub, strings.ToLower(string(contribpubsub.Retry)), "", msg.Topic, elapsed)
222-
return fmt.Errorf("unknown status returned from app while processing pub/sub event %v, status: %v, err: %w", msg.CloudEvent[contribpubsub.IDField], resp.GetStatus(), rterrors.NewRetriable(nil))
202+
return nil, errors.New("stream closed")
203+
case resp := <-ch:
204+
return resp, nil
223205
}
224206
}
225207

pkg/runtime/subscription/bulkresiliency.go

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,60 +19,59 @@ import (
1919

2020
contribpubsub "github.com/dapr/components-contrib/pubsub"
2121
"github.com/dapr/dapr/pkg/resiliency"
22+
"github.com/dapr/dapr/pkg/runtime/subscription/postman"
23+
"github.com/dapr/dapr/pkg/runtime/subscription/todo"
2224
"github.com/dapr/dapr/utils"
2325
)
2426

25-
type bulkSubscribeResiliencyRes struct {
26-
entries []contribpubsub.BulkSubscribeResponseEntry
27-
envelope map[string]interface{}
28-
}
29-
3027
// applyBulkSubscribeResiliency applies resiliency support to bulk subscribe. It tries to filter
3128
// out the messages that have been successfully processed and only retries the ones that have failed
32-
func (s *Subscription) applyBulkSubscribeResiliency(ctx context.Context, bulkSubCallData *bulkSubscribeCallData,
33-
psm bulkSubscribedMessage, deadLetterTopic string, path string, policyDef *resiliency.PolicyDefinition,
29+
func (s *Subscription) applyBulkSubscribeResiliency(ctx context.Context, bulkSubCallData *todo.BulkSubscribeCallData,
30+
psm todo.BulkSubscribedMessage, deadLetterTopic string, path string, policyDef *resiliency.PolicyDefinition,
3431
rawPayload bool, envelope map[string]interface{},
3532
) (*[]contribpubsub.BulkSubscribeResponseEntry, error) {
3633
bscData := *bulkSubCallData
3734
policyRunner := resiliency.NewRunnerWithOptions(
38-
ctx, policyDef, resiliency.RunnerOpts[*bulkSubscribeResiliencyRes]{
39-
Accumulator: func(bsrr *bulkSubscribeResiliencyRes) {
40-
for _, v := range bsrr.entries {
35+
ctx, policyDef, resiliency.RunnerOpts[*todo.BulkSubscribeResiliencyRes]{
36+
Accumulator: func(bsrr *todo.BulkSubscribeResiliencyRes) {
37+
for _, v := range bsrr.Entries {
4138
// add to main bulkResponses
42-
if index, ok := (*bscData.entryIdIndexMap)[v.EntryId]; ok {
43-
(*bscData.bulkResponses)[index].EntryId = v.EntryId
44-
(*bscData.bulkResponses)[index].Error = v.Error
39+
if index, ok := (*bscData.EntryIdIndexMap)[v.EntryId]; ok {
40+
(*bscData.BulkResponses)[index].EntryId = v.EntryId
41+
(*bscData.BulkResponses)[index].Error = v.Error
4542
}
4643
}
47-
filteredPubSubMsgs := utils.Filter(psm.pubSubMessages, func(ps message) bool {
48-
if index, ok := (*bscData.entryIdIndexMap)[ps.entry.EntryId]; ok {
49-
return (*bscData.bulkResponses)[index].Error != nil
44+
filteredPubSubMsgs := utils.Filter(psm.PubSubMessages, func(ps todo.Message) bool {
45+
if index, ok := (*bscData.EntryIdIndexMap)[ps.Entry.EntryId]; ok {
46+
return (*bscData.BulkResponses)[index].Error != nil
5047
}
5148
return false
5249
})
53-
psm.pubSubMessages = filteredPubSubMsgs
54-
psm.length = len(filteredPubSubMsgs)
50+
psm.PubSubMessages = filteredPubSubMsgs
51+
psm.Length = len(filteredPubSubMsgs)
5552
},
5653
})
57-
_, err := policyRunner(func(ctx context.Context) (*bulkSubscribeResiliencyRes, error) {
58-
var pErr error
59-
bsrr := &bulkSubscribeResiliencyRes{
60-
entries: make([]contribpubsub.BulkSubscribeResponseEntry, 0, len(psm.pubSubMessages)),
61-
envelope: maps.Clone(envelope),
54+
_, err := policyRunner(func(ctx context.Context) (*todo.BulkSubscribeResiliencyRes, error) {
55+
bsrr := &todo.BulkSubscribeResiliencyRes{
56+
Entries: make([]contribpubsub.BulkSubscribeResponseEntry, 0, len(psm.PubSubMessages)),
57+
Envelope: maps.Clone(envelope),
6258
}
63-
if s.isHTTP {
64-
pErr = s.publishBulkMessageHTTP(ctx, &bscData, &psm, bsrr, deadLetterTopic)
65-
} else {
66-
pErr = s.publishBulkMessageGRPC(ctx, &bscData, &psm, &bsrr.entries, rawPayload, deadLetterTopic)
67-
}
68-
return bsrr, pErr
59+
err := s.postman.DeliverBulk(ctx, &postman.DelivererBulkRequest{
60+
BulkSubCallData: &bscData,
61+
BulkSubMsg: &psm,
62+
BulkSubResiliencyRes: bsrr,
63+
BulkResponses: &bsrr.Entries,
64+
RawPayload: rawPayload,
65+
DeadLetterTopic: deadLetterTopic,
66+
})
67+
return bsrr, err
6968
})
7069
// setting error if any entry has not been yet touched - only use case that seems possible is of timeout
71-
for eId, ind := range *bscData.entryIdIndexMap { //nolint:stylecheck
72-
if (*bscData.bulkResponses)[ind].EntryId == "" {
73-
(*bscData.bulkResponses)[ind].EntryId = eId
74-
(*bscData.bulkResponses)[ind].Error = err
70+
for eId, ind := range *bscData.EntryIdIndexMap { //nolint:stylecheck
71+
if (*bscData.BulkResponses)[ind].EntryId == "" {
72+
(*bscData.BulkResponses)[ind].EntryId = eId
73+
(*bscData.BulkResponses)[ind].Error = err
7574
}
7675
}
77-
return bscData.bulkResponses, err
76+
return bscData.BulkResponses, err
7877
}

0 commit comments

Comments
 (0)