Skip to content

Commit c51457d

Browse files
authored
Add metrics for PubSub and update docs (#349)
1 parent f9d6b75 commit c51457d

File tree

10 files changed

+233
-27
lines changed

10 files changed

+233
-27
lines changed

docs/quick-start/observability/page.md

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,27 @@ GoFr by default publishes metrics automatically to port: _2121_ on _/metrics_ en
7373
---
7474
* app_sql_stats
7575
* histogram
76-
* Observes the response time for SQL queries
76+
* Response time of SQL queries in microseconds
7777
---
7878
* app_redis_stats
7979
* histogram
80-
* Observes the response time for Redis commands
80+
* Response time of Redis commands in microseconds
81+
---
82+
* app_pubsub_publish_total_count
83+
* counter
84+
* Number of total publish operations
85+
---
86+
* app_pubsub_publish_success_count
87+
* counter
88+
* Number of successful publish operations
89+
---
90+
* app_pubsub_subscribe_total_count
91+
* counter
92+
* Number of total subscribe operations
93+
---
94+
* app_pubsub_subscribe_success_count
95+
* counter
96+
* Number of successful subscribe operations
8197

8298
{% /table %}
8399

pkg/gofr/container/container.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,13 @@ func (c *Container) Create(conf config.Config) {
8787
Partition: partition,
8888
ConsumerGroupID: conf.Get("CONSUMER_ID"),
8989
OffSet: offSet,
90-
}, c.Logger)
90+
}, c.Logger, c.metricsManager)
9191
}
9292
case "GOOGLE":
9393
c.pubsub = google.New(google.Config{
9494
ProjectID: conf.Get("GOOGLE_PROJECT_ID"),
9595
SubscriptionName: conf.Get("GOOGLE_SUBSCRIPTION_NAME"),
96-
}, c.Logger)
96+
}, c.Logger, c.metricsManager)
9797
}
9898
}
9999

@@ -129,6 +129,12 @@ func (c *Container) registerFrameworkMetrics() {
129129
c.Metrics().NewHistogram("app_sql_stats", "Response time of SQL queries in microseconds.", sqlBuckets...)
130130
c.Metrics().NewGauge("app_sql_open_connections", "Number of open SQL connections.")
131131
c.Metrics().NewGauge("app_sql_inUse_connections", "Number of inUse SQL connections.")
132+
133+
// pubsub metrics
134+
c.Metrics().NewCounter("app_pubsub_publish_total_count", "Number of total publish operations.")
135+
c.Metrics().NewCounter("app_pubsub_publish_success_count", "Number of successful publish operations.")
136+
c.Metrics().NewCounter("app_pubsub_subscribe_total_count", "Number of total subscribe operations.")
137+
c.Metrics().NewCounter("app_pubsub_subscribe_success_count", "Number of successful subscribe operations.")
132138
}
133139

134140
func (c *Container) GetAppName() string {

pkg/gofr/datasource/pubsub/google/google.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@ type Config struct {
2323
type googleClient struct {
2424
Config
2525

26-
client *gcPubSub.Client
27-
logger pubsub.Logger
26+
client *gcPubSub.Client
27+
logger pubsub.Logger
28+
metrics Metrics
2829
}
2930

3031
//nolint:revive // We do not want anyone using the client without initialization steps.
31-
func New(conf Config, logger pubsub.Logger) *googleClient {
32+
func New(conf Config, logger pubsub.Logger, metrics Metrics) *googleClient {
3233
err := validateConfigs(&conf)
3334
if err != nil {
3435
logger.Errorf("google pubsub could not be configured, err : %v", err)
@@ -44,9 +45,10 @@ func New(conf Config, logger pubsub.Logger) *googleClient {
4445
}
4546

4647
return &googleClient{
47-
Config: conf,
48-
client: client,
49-
logger: logger,
48+
Config: conf,
49+
client: client,
50+
logger: logger,
51+
metrics: metrics,
5052
}
5153
}
5254

@@ -63,6 +65,8 @@ func validateConfigs(conf *Config) error {
6365
}
6466

6567
func (g *googleClient) Publish(ctx context.Context, topic string, message []byte) error {
68+
g.metrics.IncrementCounter(ctx, "app_pubsub_publish_total_count", "topic", topic)
69+
6670
t, err := g.getTopic(ctx, topic)
6771
if err != nil {
6872
return err
@@ -80,10 +84,14 @@ func (g *googleClient) Publish(ctx context.Context, topic string, message []byte
8084

8185
g.logger.Debugf("published google message %v on topic %v", string(message), topic)
8286

87+
g.metrics.IncrementCounter(ctx, "app_pubsub_publish_success_count", "topic", topic)
88+
8389
return nil
8490
}
8591

8692
func (g *googleClient) Subscribe(ctx context.Context, topic string) (*pubsub.Message, error) {
93+
g.metrics.IncrementCounter(ctx, "app_pubsub_subscribe_total_count", "topic", topic)
94+
8795
var m = pubsub.NewMessage(ctx)
8896

8997
t, err := g.getTopic(ctx, topic)
@@ -118,6 +126,8 @@ func (g *googleClient) Subscribe(ctx context.Context, topic string) (*pubsub.Mes
118126

119127
g.logger.Debugf("received google message %v on topic %v", string(m.Value), m.Topic)
120128

129+
g.metrics.IncrementCounter(ctx, "app_pubsub_subscribe_success_count", "topic", topic)
130+
121131
return m, nil
122132
}
123133

pkg/gofr/datasource/pubsub/google/google_test.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
gcPubSub "cloud.google.com/go/pubsub"
88
"cloud.google.com/go/pubsub/pstest"
99
"github.com/stretchr/testify/assert"
10+
"go.uber.org/mock/gomock"
1011
"google.golang.org/api/option"
1112
"google.golang.org/grpc"
1213
"google.golang.org/grpc/credentials/insecure"
@@ -35,10 +36,13 @@ func TestGoogleClient_New_Error(t *testing.T) {
3536
g *googleClient
3637
)
3738

39+
ctrl := gomock.NewController(t)
40+
defer ctrl.Finish()
41+
3842
out := testutil.StderrOutputForFunc(func() {
3943
logger := testutil.NewMockLogger(testutil.ERRORLOG)
4044

41-
g = New(Config{}, logger)
45+
g = New(Config{}, logger, NewMockMetrics(ctrl))
4246
})
4347

4448
assert.Nil(t, g)
@@ -49,6 +53,11 @@ func TestGoogleClient_Publish_Success(t *testing.T) {
4953
client := getGoogleClient(t)
5054
defer client.Close()
5155

56+
ctrl := gomock.NewController(t)
57+
defer ctrl.Finish()
58+
59+
mockMetrics := NewMockMetrics(ctrl)
60+
5261
topic := "test-topic"
5362
message := []byte("test message")
5463
expectedLog := "published google message test message on topic test-topic\n"
@@ -61,8 +70,12 @@ func TestGoogleClient_Publish_Success(t *testing.T) {
6170
ProjectID: "test",
6271
SubscriptionName: "sub",
6372
},
73+
metrics: mockMetrics,
6474
}
6575

76+
mockMetrics.EXPECT().IncrementCounter(context.Background(), "app_pubsub_publish_total_count", "topic", topic)
77+
mockMetrics.EXPECT().IncrementCounter(context.Background(), "app_pubsub_publish_success_count", "topic", topic)
78+
6679
err := g.Publish(context.Background(), topic, message)
6780

6881
assert.Nil(t, err)
@@ -72,16 +85,23 @@ func TestGoogleClient_Publish_Success(t *testing.T) {
7285
}
7386

7487
func TestGoogleClient_PublishTopic_Error(t *testing.T) {
88+
ctrl := gomock.NewController(t)
89+
defer ctrl.Finish()
90+
91+
mockMetrics := NewMockMetrics(ctrl)
92+
7593
g := &googleClient{client: getGoogleClient(t), Config: Config{
7694
ProjectID: "test",
7795
SubscriptionName: "sub",
78-
}}
96+
}, metrics: mockMetrics}
7997
defer g.client.Close()
8098

8199
ctx, cancel := context.WithCancel(context.Background())
82100

83101
cancel()
84102

103+
mockMetrics.EXPECT().IncrementCounter(ctx, "app_pubsub_publish_total_count", "topic", "test-topic")
104+
85105
err := g.Publish(ctx, "test-topic", []byte(""))
86106
if assert.Error(t, err) {
87107
assert.Contains(t, err.Error(), "context canceled")
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package google
2+
3+
import "context"
4+
5+
type Metrics interface {
6+
IncrementCounter(ctx context.Context, name string, labels ...string)
7+
}

pkg/gofr/datasource/pubsub/google/mock_metrics.go

Lines changed: 57 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/gofr/datasource/pubsub/kafka/kafka.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@ type kafkaClient struct {
2929
reader map[string]Reader
3030
mu *sync.RWMutex
3131

32-
logger pubsub.Logger
33-
config Config
32+
logger pubsub.Logger
33+
config Config
34+
metrics Metrics
3435
}
3536

3637
//nolint:revive // We do not want anyone using the client without initialization steps.
37-
func New(conf Config, logger pubsub.Logger) *kafkaClient {
38+
func New(conf Config, logger pubsub.Logger, metrics Metrics) *kafkaClient {
3839
err := validateConfigs(conf)
3940
if err != nil {
4041
logger.Errorf("could not initialize kafka, err : %v", err)
@@ -55,12 +56,13 @@ func New(conf Config, logger pubsub.Logger) *kafkaClient {
5556
reader := make(map[string]Reader)
5657

5758
return &kafkaClient{
58-
config: conf,
59-
dialer: dialer,
60-
reader: reader,
61-
logger: logger,
62-
writer: writer,
63-
mu: &sync.RWMutex{},
59+
config: conf,
60+
dialer: dialer,
61+
reader: reader,
62+
logger: logger,
63+
writer: writer,
64+
mu: &sync.RWMutex{},
65+
metrics: metrics,
6466
}
6567
}
6668

@@ -77,6 +79,8 @@ func validateConfigs(conf Config) error {
7779
}
7880

7981
func (k *kafkaClient) Publish(ctx context.Context, topic string, message []byte) error {
82+
k.metrics.IncrementCounter(ctx, "app_pubsub_publish_total_count", "topic", topic)
83+
8084
if k.writer == nil || topic == "" {
8185
return errPublisherNotConfigured
8286
}
@@ -96,10 +100,14 @@ func (k *kafkaClient) Publish(ctx context.Context, topic string, message []byte)
96100

97101
k.logger.Debugf("published kafka message %v on topic %v", string(message), topic)
98102

103+
k.metrics.IncrementCounter(ctx, "app_pubsub_publish_success_count", "topic", topic)
104+
99105
return nil
100106
}
101107

102108
func (k *kafkaClient) Subscribe(ctx context.Context, topic string) (*pubsub.Message, error) {
109+
k.metrics.IncrementCounter(ctx, "app_pubsub_subscribe_total_count", "topic", topic)
110+
103111
var reader Reader
104112
// Lock the reader map to ensure only one subscriber access the reader at a time
105113
k.mu.Lock()
@@ -130,6 +138,8 @@ func (k *kafkaClient) Subscribe(ctx context.Context, topic string) (*pubsub.Mess
130138

131139
k.logger.Debugf("received kafka message %v on topic %v", string(msg.Value), msg.Topic)
132140

141+
k.metrics.IncrementCounter(ctx, "app_pubsub_subscribe_success_count", "topic", topic)
142+
133143
return m, err
134144
}
135145

0 commit comments

Comments
 (0)