@@ -2,13 +2,17 @@ package multiplex
22
33import (
44 "context"
5+ "errors"
56 "fmt"
67 "sync"
8+ "time"
79
810 "github.com/saiya/dsps/server/domain"
911 "github.com/saiya/dsps/server/logger"
1012)
1113
14+ const parallelFetchEarlyReturnWindow = 300 * time .Millisecond
15+
1216func (s * storageMultiplexer ) PublishMessages (ctx context.Context , msgs []domain.Message ) error {
1317 _ , err := s .parallelAtLeastOneSuccess (ctx , "PublishMessages" , func (ctx context.Context , _ domain.StorageID , child domain.Storage ) (interface {}, error ) {
1418 if child := child .AsPubSubStorage (); child != nil {
@@ -25,20 +29,41 @@ func (s *storageMultiplexer) FetchMessages(ctx context.Context, sl domain.Subscr
2529 moreMessages bool
2630 ackHandle domain.AckHandle
2731 }
28- results , err := s .parallelAtLeastOneSuccess (ctx , "FetchMessages" , func (ctx context.Context , _ domain.StorageID , child domain.Storage ) (interface {}, error ) {
32+ parallelCtx , parallelCtxCancel := context .WithCancel (ctx )
33+ defer parallelCtxCancel ()
34+ subscriptionMissingCh := make (chan domain.StorageID , len (s .children ))
35+ results , err := s .parallelAtLeastOneSuccess (parallelCtx , "FetchMessages" , func (ctx context.Context , storageID domain.StorageID , child domain.Storage ) (interface {}, error ) {
2936 if child := child .AsPubSubStorage (); child != nil {
3037 msgs , moreMsgs , ackHandle , err := child .FetchMessages (ctx , sl , max , waituntil )
3138 if err != nil {
39+ if errors .Is (err , domain .ErrSubscriptionNotFound ) || errors .Is (err , domain .ErrInvalidChannel ) {
40+ subscriptionMissingCh <- storageID
41+ }
3242 return nil , err
3343 }
44+ if len (msgs ) > 0 {
45+ // If one or more storage returns messages, multiplexer should immediately return them even if other storages still polling.
46+ time .AfterFunc (parallelFetchEarlyReturnWindow , parallelCtxCancel )
47+ }
3448 return fetchResult {msgs : msgs , moreMessages : moreMsgs , ackHandle : ackHandle }, nil
3549 }
3650 return nil , errMultiplexSkipped
3751 })
52+ close (subscriptionMissingCh )
3853 if err != nil {
3954 return nil , false , domain.AckHandle {}, err
4055 }
4156
57+ for id := range subscriptionMissingCh {
58+ // Subscriber missing on this storage.
59+ // This situation could occur if the storage had been temporary unavailable when subscriber created.
60+ // So that automatically create subscriber to receive future messages.
61+ logger .Of (ctx ).Debugf (`Auto-creating (recovering) subscriber %v on storage '%s' because fetch succeeded in the multiplexer but this storage reported the subscriber does not exist.` , sl , id )
62+ if err := s .children [id ].AsPubSubStorage ().NewSubscriber (ctx , sl ); err != nil {
63+ logger .Of (ctx ).WarnError (fmt .Sprintf ("Failed to auto-create (recover) subscriber %v on storage '%s': %%w" , sl , id ), err )
64+ }
65+ }
66+
4267 // Note that this merge logic honors message ordering as possible.
4368 // Only exception is that storages returns messages by different ordering, possible cause of such case is that client retry to publish messages.
4469 // If client retried publish, no need to guarantee ordering of the messages sent concurrently with the retry.
0 commit comments