File tree Expand file tree Collapse file tree 1 file changed +6
-1
lines changed Expand file tree Collapse file tree 1 file changed +6
-1
lines changed Original file line number Diff line number Diff line change @@ -3,6 +3,7 @@ package gofr
3
3
import (
4
4
"context"
5
5
"runtime/debug"
6
+ "time"
6
7
7
8
"gofr.dev/pkg/gofr/container"
8
9
"gofr.dev/pkg/gofr/logging"
@@ -24,15 +25,19 @@ func newSubscriptionManager(c *container.Container) SubscriptionManager {
24
25
25
26
// startSubscriber continuously subscribes to a topic and handles messages using the provided handler.
26
27
func (s * SubscriptionManager ) startSubscriber (ctx context.Context , topic string , handler SubscribeFunc ) error {
28
+ var delay time.Duration
29
+
27
30
for {
28
31
select {
29
32
case <- ctx .Done ():
30
33
s .container .Logger .Infof ("shutting down subscriber for topic %s" , topic )
31
34
return nil
32
- default :
35
+ case <- time . After ( delay ) :
33
36
err := s .handleSubscription (ctx , topic , handler )
34
37
if err != nil {
35
38
s .container .Logger .Errorf ("error in subscription for topic %s: %v" , topic , err )
39
+
40
+ delay = time .Second * 2
36
41
}
37
42
}
38
43
}
You can’t perform that action at this time.
0 commit comments