Skip to content

Commit 0c5a724

Browse files
Umang01-hasharyanmehrotra
authored andcommitted
Run subscriptions when app starts rather than app start (#346)
1 parent 65505ba commit 0c5a724

File tree

7 files changed

+200
-40
lines changed

7 files changed

+200
-40
lines changed

pkg/gofr/container/container.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type Container struct {
3030

3131
Services map[string]service.HTTP
3232
metricsManager metrics.Manager
33-
pubsub pubsub.Client
33+
PubSub pubsub.Client
3434

3535
Redis *redis.Redis
3636
SQL *sql.DB
@@ -82,15 +82,15 @@ func (c *Container) Create(conf config.Config) {
8282
partition, _ := strconv.Atoi(conf.GetOrDefault("PARTITION_SIZE", "0"))
8383
offSet, _ := strconv.Atoi(conf.GetOrDefault("PUBSUB_OFFSET", "-1"))
8484

85-
c.pubsub = kafka.New(kafka.Config{
85+
c.PubSub = kafka.New(kafka.Config{
8686
Broker: conf.Get("PUBSUB_BROKER"),
8787
Partition: partition,
8888
ConsumerGroupID: conf.Get("CONSUMER_ID"),
8989
OffSet: offSet,
9090
}, c.Logger, c.metricsManager)
9191
}
9292
case "GOOGLE":
93-
c.pubsub = google.New(google.Config{
93+
c.PubSub = google.New(google.Config{
9494
ProjectID: conf.Get("GOOGLE_PROJECT_ID"),
9595
SubscriptionName: conf.Get("GOOGLE_SUBSCRIPTION_NAME"),
9696
}, c.Logger, c.metricsManager)
@@ -146,9 +146,9 @@ func (c *Container) GetAppVersion() string {
146146
}
147147

148148
func (c *Container) GetPublisher() pubsub.Publisher {
149-
return c.pubsub
149+
return c.PubSub
150150
}
151151

152152
func (c *Container) GetSubscriber() pubsub.Subscriber {
153-
return c.pubsub
153+
return c.PubSub
154154
}

pkg/gofr/container/container_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func Test_newContainerPubSubIntializationFail(t *testing.T) {
5858
for _, tc := range testCases {
5959
c := NewContainer(testutil.NewMockConfig(tc.configs))
6060

61-
assert.Nil(t, c.pubsub)
61+
assert.Nil(t, c.PubSub)
6262
}
6363
}
6464

@@ -112,7 +112,7 @@ func TestContainer_GetAppVersion(t *testing.T) {
112112
func TestContainer_GetPublisher(t *testing.T) {
113113
publisher := &mockPubSub{}
114114

115-
c := &Container{pubsub: publisher}
115+
c := &Container{PubSub: publisher}
116116

117117
out := c.GetPublisher()
118118

@@ -122,7 +122,7 @@ func TestContainer_GetPublisher(t *testing.T) {
122122
func TestContainer_GetSubscriber(t *testing.T) {
123123
subscriber := &mockPubSub{}
124124

125-
c := &Container{pubsub: subscriber}
125+
c := &Container{PubSub: subscriber}
126126

127127
out := c.GetSubscriber()
128128

pkg/gofr/gofr.go

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package gofr
22

33
import (
4-
"context"
54
"fmt"
65

76
"net/http"
@@ -39,9 +38,10 @@ type App struct {
3938
// container is unexported because this is an internal implementation and applications are provided access to it via Context
4039
container *container.Container
4140

42-
grpcRegistered bool
43-
httpRegistered bool
44-
subscriberRegistered bool
41+
grpcRegistered bool
42+
httpRegistered bool
43+
44+
subscriptionManager SubscriptionManager
4545
}
4646

4747
// RegisterService adds a grpc service to the gofr application.
@@ -83,6 +83,8 @@ func New() *App {
8383

8484
app.grpcServer = newGRPCServer(app.container, port)
8585

86+
app.subscriptionManager = newSubscriptionManager(app.container)
87+
8688
return app
8789
}
8890

@@ -148,7 +150,12 @@ func (a *App) Run() {
148150
}
149151

150152
// If subscriber is registered, block main go routine to wait for subscriber to receive messages
151-
if a.subscriberRegistered {
153+
if a.subscriptionManager.subscriptions != nil {
154+
// Start subscribers concurrently using go-routines
155+
for topic, handler := range a.subscriptionManager.subscriptions {
156+
go a.subscriptionManager.startSubscriber(topic, handler)
157+
}
158+
152159
wg.Add(1)
153160
}
154161

@@ -264,31 +271,5 @@ func (a *App) Subscribe(topic string, handler SubscribeFunc) {
264271
return
265272
}
266273

267-
a.subscriberRegistered = true
268-
269-
// continuously subscribe in an infinite loop
270-
go func() {
271-
for {
272-
msg, err := a.container.GetSubscriber().Subscribe(context.Background(), topic)
273-
if msg == nil {
274-
continue
275-
}
276-
277-
if err != nil {
278-
a.container.Logger.Errorf("error while reading from Kafka, err: %v", err.Error())
279-
280-
continue
281-
}
282-
283-
// create a gofr context with message as request
284-
ctx := newContext(nil, msg, a.container)
285-
286-
err = handler(ctx)
287-
288-
// commit the message if the subscription function does not return error
289-
if err == nil {
290-
msg.Commit()
291-
}
292-
}
293-
}()
274+
a.subscriptionManager.subscriptions[topic] = handler
294275
}

pkg/gofr/logging/level_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package logging
22

33
import (
4+
"os"
45
"testing"
56

67
"github.com/stretchr/testify/assert"
@@ -93,3 +94,16 @@ func TestGetLevelFromString(t *testing.T) {
9394
assert.Equal(t, tc.expected, actual, "TEST[%d], Failed.\n%s", i, tc.desc)
9495
}
9596
}
97+
98+
func Test_changeLevel(t *testing.T) {
99+
l := logger{
100+
level: INFO,
101+
normalOut: os.Stdout,
102+
errorOut: os.Stderr,
103+
isTerminal: false,
104+
}
105+
106+
l.changeLevel(ERROR)
107+
108+
assert.Equal(t, ERROR, l.level, "Test_changeLevel failed! expected level to be error ")
109+
}

pkg/gofr/subscriber.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,46 @@
11
package gofr
22

3+
import (
4+
"context"
5+
6+
"gofr.dev/pkg/gofr/container"
7+
)
8+
39
type SubscribeFunc func(c *Context) error
10+
11+
type SubscriptionManager struct {
12+
container *container.Container
13+
subscriptions map[string]SubscribeFunc
14+
}
15+
16+
func newSubscriptionManager(c *container.Container) SubscriptionManager {
17+
return SubscriptionManager{
18+
container: c,
19+
subscriptions: make(map[string]SubscribeFunc),
20+
}
21+
}
22+
23+
func (s *SubscriptionManager) startSubscriber(topic string, handler SubscribeFunc) {
24+
// continuously subscribe in an infinite loop
25+
for {
26+
msg, err := s.container.GetSubscriber().Subscribe(context.Background(), topic)
27+
if msg == nil {
28+
continue
29+
}
30+
31+
if err != nil {
32+
s.container.Logger.Errorf("error while reading from Kafka, err: %v", err.Error())
33+
continue
34+
}
35+
36+
ctx := newContext(nil, msg, s.container)
37+
err = handler(ctx)
38+
39+
// commit the message if the subscription function does not return error
40+
if err == nil {
41+
msg.Commit()
42+
} else {
43+
s.container.Logger.Errorf("error in handler for topic %s: %v", topic, err)
44+
}
45+
}
46+
}

pkg/gofr/subscriber_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package gofr
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"strings"
8+
"testing"
9+
"time"
10+
11+
"gofr.dev/pkg/gofr/container"
12+
"gofr.dev/pkg/gofr/datasource/pubsub"
13+
"gofr.dev/pkg/gofr/logging"
14+
"gofr.dev/pkg/gofr/testutil"
15+
)
16+
17+
var errHandler = errors.New("error in subscribing")
18+
19+
func handleError(err string) error {
20+
return fmt.Errorf("%w: %s", errHandler, err)
21+
}
22+
23+
var errSubscription = errors.New("subscription error")
24+
25+
func subscriptionError(err string) error {
26+
return fmt.Errorf("%w: %s", errSubscription, err)
27+
}
28+
29+
type mockSubscriber struct {
30+
}
31+
32+
func (s mockSubscriber) Publish(_ context.Context, _ string, _ []byte) error {
33+
return nil
34+
}
35+
36+
type Message struct {
37+
Topic string
38+
Value []byte
39+
MetaData interface{}
40+
}
41+
42+
func (mockSubscriber) Subscribe(_ context.Context, topic string) (*pubsub.Message, error) {
43+
if topic == "test-topic" {
44+
return &pubsub.Message{
45+
Topic: "test-topic",
46+
Value: []byte(`{"data":{"productId":"123","price":"599"}}`),
47+
}, nil
48+
}
49+
50+
return &pubsub.Message{
51+
Topic: "test-topic",
52+
Value: []byte(`{"data":{"productId":"123","price":"599"}}`),
53+
}, subscriptionError("subscription error")
54+
}
55+
56+
func TestSubscriptionManager_HandlerError(t *testing.T) {
57+
done := make(chan struct{})
58+
59+
testLogs := testutil.StderrOutputForFunc(func() {
60+
mockContainer := container.Container{
61+
Logger: logging.NewLogger(logging.ERROR),
62+
PubSub: mockSubscriber{},
63+
}
64+
subscriptionManager := newSubscriptionManager(&mockContainer)
65+
66+
// Run the subscriber in a goroutine
67+
go func() {
68+
subscriptionManager.startSubscriber("test-topic",
69+
func(c *Context) error {
70+
return handleError("error in test-topic")
71+
})
72+
}()
73+
74+
// this sleep is added to wait for StderrOutputForFunc to collect the logs inside the testLogs
75+
time.Sleep(1 * time.Millisecond)
76+
})
77+
78+
// signal the test to end
79+
close(done)
80+
81+
if !strings.Contains(testLogs, "error in handler for topic test-topic: error in subscribing: error in test-topic") {
82+
t.Error("TestSubscriptionManager_HandlerError Failed! Missing log message about handler error")
83+
}
84+
}
85+
86+
func TestSubscriptionManager_SubscribeError(t *testing.T) {
87+
done := make(chan struct{})
88+
89+
testLogs := testutil.StderrOutputForFunc(func() {
90+
mockContainer := container.Container{
91+
Logger: logging.NewLogger(logging.ERROR),
92+
PubSub: mockSubscriber{},
93+
}
94+
subscriptionManager := newSubscriptionManager(&mockContainer)
95+
96+
// Run the subscriber in a goroutine
97+
go func() {
98+
subscriptionManager.startSubscriber("abc",
99+
func(c *Context) error {
100+
return handleError("error in abc")
101+
})
102+
}()
103+
104+
// this sleep is added to wait for StderrOutputForFunc to collect the logs inside the testLogs
105+
time.Sleep(1 * time.Millisecond)
106+
})
107+
108+
// signal the test to end
109+
close(done)
110+
111+
if !strings.Contains(testLogs, "error while reading from Kafka, err: ") {
112+
t.Error("TestSubscriptionManager_SubscribeError Failed! Missing log message about subscription error")
113+
}
114+
}

pkg/gofr/testutil/mock_logger.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@ type MockLogger struct {
2020
out io.Writer
2121
errOut io.Writer
2222
}
23+
type (
24+
Level int
25+
)
26+
27+
//nolint:unused //changeLevel is present to satisfy the mocking of logger inside testutil.
28+
func (m *MockLogger) changeLevel(level Level) {
29+
m.level = int(level)
30+
}
2331

2432
func NewMockLogger(level int) *MockLogger {
2533
return &MockLogger{

0 commit comments

Comments
 (0)