Skip to content

Commit 08d34a0

Browse files
authored
refactor(agent): channel capacity as histogram (#450)
* refactor(agent): channel capacity as histogram (#429) Channel capacity is a metric of each agent, that is created in each handle so the capacity varies depending on the client/agent being used and we need to differentiate the lows/highs of each in the metric. Adding a client label to the Gauge is not feasible, since it'd explode cardinality. Thus, adding a new metric as an histogram so we maintain compat until deprecating the old gauge one * fix(metrics): tests and usage in nats rpc (#451) * fix(metrics): missing parenthesis on histogram * fix(metrics): report histogram in rpc channel * refactor(metrics): remove depreacted channel capacity gauge * chore(go): update to 1.23
1 parent 7794345 commit 08d34a0

File tree

12 files changed

+28
-28
lines changed

12 files changed

+28
-28
lines changed

.github/workflows/tests.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
- name: Set up Go
2121
uses: actions/setup-go@v5
2222
with:
23-
go-version: '1.22'
23+
go-version: '1.23'
2424
- name: Download dependencies
2525
run: go mod download
2626
unit-test:
@@ -33,7 +33,7 @@ jobs:
3333
- name: Set up Go
3434
uses: actions/setup-go@v5
3535
with:
36-
go-version: '1.22'
36+
go-version: '1.23'
3737
- name: Setup dependencies
3838
env:
3939
GO111MODULE: auto
@@ -54,7 +54,7 @@ jobs:
5454
- name: Set up Go
5555
uses: actions/setup-go@v5
5656
with:
57-
go-version: '1.22'
57+
go-version: '1.23'
5858
- name: Run tests
5959
run: make e2e-test-nats
6060
e2e-test-grpc:
@@ -67,6 +67,6 @@ jobs:
6767
- name: Set up Go
6868
uses: actions/setup-go@v5
6969
with:
70-
go-version: '1.22'
70+
go-version: '1.23'
7171
- name: Run tests
7272
run: make e2e-test-grpc

.readthedocs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ build:
1313
# You can also specify other tool versions:
1414
# nodejs: "20"
1515
# rust: "1.70"
16-
golang: "1.22"
16+
golang: "1.23"
1717

1818
# Build documentation in the "docs/" directory with Sphinx
1919
sphinx:

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/topfreegames/pitaya/v3
22

3-
go 1.22.0
3+
go 1.23
44

55
require (
66
github.com/DataDog/datadog-go v4.8.3+incompatible

go.work

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
go 1.22.0
1+
go 1.23
22

33
toolchain go1.23.4
44

pkg/agent/agent.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -698,8 +698,8 @@ func (a *agentImpl) reportChannelSize() {
698698
logger.Log.Warnf("chSend is at maximum capacity")
699699
}
700700
for _, mr := range a.metricsReporters {
701-
if err := mr.ReportGauge(metrics.ChannelCapacity, map[string]string{"channel": "agent_chsend"}, float64(chSendCapacity)); err != nil {
702-
logger.Log.Warnf("failed to report chSend channel capaacity: %s", err.Error())
701+
if err := mr.ReportHistogram(metrics.ChannelCapacity, map[string]string{"channel": "agent_chsend"}, float64(chSendCapacity)); err != nil {
702+
logger.Log.Warnf("failed to report histogram chSend channel capacity: %s", err.Error())
703703
}
704704
}
705705
}

pkg/agent/agent_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func TestAgentSendSerializeErr(t *testing.T) {
275275
wg.Done()
276276
})
277277
go ag.write()
278-
mockMetricsReporter.EXPECT().ReportGauge(gomock.Any(), gomock.Any(), gomock.Any())
278+
mockMetricsReporter.EXPECT().ReportHistogram(gomock.Any(), gomock.Any(), gomock.Any())
279279
ag.send(expected)
280280
wg.Wait()
281281

@@ -347,7 +347,7 @@ func TestAgentPushStruct(t *testing.T) {
347347
close(ag.chSend)
348348
}
349349

350-
mockMetricsReporter.EXPECT().ReportGauge(metrics.ChannelCapacity, gomock.Any(), float64(10))
350+
mockMetricsReporter.EXPECT().ReportHistogram(metrics.ChannelCapacity, gomock.Any(), float64(10))
351351
err = ag.Push(msg.Route, table.data)
352352
assert.Equal(t, table.err, err)
353353

@@ -409,7 +409,7 @@ func TestAgentPush(t *testing.T) {
409409
close(ag.chSend)
410410
}
411411

412-
mockMetricsReporter.EXPECT().ReportGauge(metrics.ChannelCapacity, gomock.Any(), float64(10))
412+
mockMetricsReporter.EXPECT().ReportHistogram(metrics.ChannelCapacity, gomock.Any(), float64(10))
413413
err = ag.Push(msg.Route, table.data)
414414
assert.Equal(t, table.err, err)
415415

@@ -442,7 +442,7 @@ func TestAgentPushFullChannel(t *testing.T) {
442442
ag := newAgent(mockConn, mockDecoder, mockEncoder, mockSerializer, hbTime, writeTimeout, 0, dieChan, messageEncoder, mockMetricsReporters, sessionPool).(*agentImpl)
443443
assert.NotNil(t, ag)
444444

445-
mockMetricsReporter.EXPECT().ReportGauge(metrics.ChannelCapacity, gomock.Any(), float64(0))
445+
mockMetricsReporter.EXPECT().ReportHistogram(metrics.ChannelCapacity, gomock.Any(), float64(0))
446446

447447
msg := &message.Message{
448448
Route: "route",
@@ -528,7 +528,7 @@ func TestAgentResponseMID(t *testing.T) {
528528
ctx := getCtxWithRequestKeys()
529529
if table.mid != 0 {
530530
mockEncoder.EXPECT().Encode(gomock.Any(), gomock.Any()).Return([]byte("ok!"), nil)
531-
mockMetricsReporter.EXPECT().ReportGauge(metrics.ChannelCapacity, gomock.Any(), float64(10))
531+
mockMetricsReporter.EXPECT().ReportHistogram(metrics.ChannelCapacity, gomock.Any(), float64(10))
532532
}
533533
if table.mid != 0 {
534534
if table.err != nil {
@@ -581,7 +581,7 @@ func TestAgentResponseMIDFullChannel(t *testing.T) {
581581
sessionPool := session.NewSessionPool()
582582
ag := newAgent(mockConn, mockDecoder, mockEncoder, mockSerializer, hbTime, writeTimeout, 0, dieChan, messageEncoder, mockMetricsReporters, sessionPool).(*agentImpl)
583583
assert.NotNil(t, ag)
584-
mockMetricsReporters[0].(*metricsmocks.MockReporter).EXPECT().ReportGauge(metrics.ChannelCapacity, gomock.Any(), float64(0))
584+
mockMetricsReporters[0].(*metricsmocks.MockReporter).EXPECT().ReportHistogram(metrics.ChannelCapacity, gomock.Any(), float64(0))
585585
go func() {
586586
err := ag.ResponseMID(nil, 1, []byte("data"))
587587
assert.NoError(t, err)
@@ -1204,7 +1204,7 @@ func TestNatsRPCServerReportMetrics(t *testing.T) {
12041204

12051205
ag.chSend <- pendingWrite{}
12061206

1207-
mockMetricsReporter.EXPECT().ReportGauge(metrics.ChannelCapacity, gomock.Any(), float64(-1)) // because buffersize is 0 and chan sz is 1
1207+
mockMetricsReporter.EXPECT().ReportHistogram(metrics.ChannelCapacity, gomock.Any(), float64(-1))
12081208
ag.reportChannelSize()
12091209
}
12101210

pkg/cluster/nats_rpc_server.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -408,16 +408,15 @@ func (ns *NatsRPCServer) reportMetrics() {
408408
if subChanCapacity == 0 {
409409
logger.Log.Warn("subChan is at maximum capacity")
410410
}
411-
if err := mr.ReportGauge(metrics.ChannelCapacity, map[string]string{"channel": "rpc_server_subchan"}, float64(subChanCapacity)); err != nil {
411+
if err := mr.ReportHistogram(metrics.ChannelCapacity, map[string]string{"channel": "rpc_server_subchan"}, float64(subChanCapacity)); err != nil {
412412
logger.Log.Warnf("failed to report subChan queue capacity: %s", err.Error())
413413
}
414-
415414
// bindingschan
416415
bindingsChanCapacity := ns.messagesBufferSize - len(ns.bindingsChan)
417416
if bindingsChanCapacity == 0 {
418417
logger.Log.Warn("bindingsChan is at maximum capacity")
419418
}
420-
if err := mr.ReportGauge(metrics.ChannelCapacity, map[string]string{"channel": "rpc_server_bindingschan"}, float64(bindingsChanCapacity)); err != nil {
419+
if err := mr.ReportHistogram(metrics.ChannelCapacity, map[string]string{"channel": "rpc_server_bindingschan"}, float64(bindingsChanCapacity)); err != nil {
421420
logger.Log.Warnf("failed to report bindingsChan capacity: %s", err.Error())
422421
}
423422

@@ -426,7 +425,7 @@ func (ns *NatsRPCServer) reportMetrics() {
426425
if userPushChanCapacity == 0 {
427426
logger.Log.Warn("userPushChan is at maximum capacity")
428427
}
429-
if err := mr.ReportGauge(metrics.ChannelCapacity, map[string]string{"channel": "rpc_server_userpushchan"}, float64(userPushChanCapacity)); err != nil {
428+
if err := mr.ReportHistogram(metrics.ChannelCapacity, map[string]string{"channel": "rpc_server_userpushchan"}, float64(userPushChanCapacity)); err != nil {
430429
logger.Log.Warnf("failed to report userPushCh capacity: %s", err.Error())
431430
}
432431
}

pkg/cluster/nats_rpc_server_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ func TestNatsRPCServerHandleMessages(t *testing.T) {
316316
assert.NoError(t, err)
317317

318318
mockMetricsReporter.EXPECT().ReportGauge(metrics.DroppedMessages, gomock.Any(), float64(0))
319-
mockMetricsReporter.EXPECT().ReportGauge(metrics.ChannelCapacity, gomock.Any(), gomock.Any()).Times(3)
319+
mockMetricsReporter.EXPECT().ReportHistogram(metrics.ChannelCapacity, gomock.Any(), gomock.Any()).Times(3)
320320

321321
conn.Publish(table.topic, b)
322322
r := helpers.ShouldEventuallyReceive(t, rpcServer.unhandledReqCh).(*protos.Request)
@@ -505,6 +505,6 @@ func TestNatsRPCServerReportMetrics(t *testing.T) {
505505
rpcServer.userPushCh <- &protos.Push{}
506506

507507
mockMetricsReporter.EXPECT().ReportGauge(metrics.DroppedMessages, gomock.Any(), float64(rpcServer.dropped))
508-
mockMetricsReporter.EXPECT().ReportGauge(metrics.ChannelCapacity, gomock.Any(), float64(99)).Times(3)
508+
mockMetricsReporter.EXPECT().ReportHistogram(metrics.ChannelCapacity, gomock.Any(), float64(99)).Times(3)
509509
rpcServer.reportMetrics()
510510
}

pkg/metrics/constants.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ var (
77
ConnectedClients = "connected_clients"
88
// CountServers counts the number of servers of different types
99
CountServers = "count_servers"
10-
// ChannelCapacity represents the capacity of a channel (available slots)
10+
// ChannelCapacity represents the capacity of a channel as a histogram (distribution of available slots)
1111
ChannelCapacity = "channel_capacity"
1212
// DroppedMessages reports the number of dropped messages in rpc server (messages that will not be handled)
1313
DroppedMessages = "dropped_messages"

pkg/metrics/prometheus_reporter.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,12 +188,13 @@ func (p *PrometheusReporter) registerMetrics(
188188
append([]string{"type"}, additionalLabelsKeys...),
189189
)
190190

191-
p.gaugeReportersMap[ChannelCapacity] = prometheus.NewGaugeVec(
192-
prometheus.GaugeOpts{
191+
p.histogramReportersMap[ChannelCapacity] = prometheus.NewHistogramVec(
192+
prometheus.HistogramOpts{
193193
Namespace: "pitaya",
194194
Subsystem: "channel",
195195
Name: ChannelCapacity,
196196
Help: "the available capacity of the channel",
197+
Buckets: []float64{0, 1, 10, 50, 100, 250, 500, 750, 1000, 1500, 2000, 3000, 4000, 5000},
197198
ConstLabels: constLabels,
198199
},
199200
append([]string{"channel"}, additionalLabelsKeys...),

0 commit comments

Comments
 (0)