Skip to content

Commit 7f1b36a

Browse files
authored
feat(bigtable): add metrics exporter for outstandingrpcs and perconnectionerror count (googleapis#13510)
lift and shift of sushanb#3 ``` curl -L -O https://github.com/sushanb/google-cloud-go/commit/b9d20984c514b4632f7c613e4d6ab58b1776a939.patch git apply b9d2098.patch ```
1 parent 65a3c4e commit 7f1b36a

File tree

8 files changed

+542
-9
lines changed

8 files changed

+542
-9
lines changed

bigtable/bigtable.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
203203
btransport.WithInstanceName(fullInstanceName),
204204
btransport.WithAppProfile(config.AppProfile),
205205
btransport.WithFeatureFlagsMetadata(ffMD),
206+
btransport.WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig()),
207+
btransport.WithMeterProvider(metricsTracerFactory.otelMeterProvider),
206208
)
207209

208210
if err != nil {

bigtable/internal/option/option.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,3 +252,18 @@ func DefaultDynamicChannelPoolConfig() DynamicChannelPoolConfig {
252252
MaxRemoveConns: 2, // Only Cap for removals
253253
}
254254
}
255+
256+
// MetricsReporterConfig for periodic reporting
257+
// MetricsReporterConfig holds the parameters for metrics reporting.
258+
type MetricsReporterConfig struct {
259+
Enabled bool
260+
ReportingInterval time.Duration
261+
}
262+
263+
// DefaultMetricsReporterConfig with defaults used.
264+
func DefaultMetricsReporterConfig() MetricsReporterConfig {
265+
return MetricsReporterConfig{
266+
Enabled: true,
267+
ReportingInterval: 1 * time.Minute,
268+
}
269+
}

bigtable/internal/transport/connpool.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,20 @@ const requestParamsHeader = "x-goog-request-params"
5050
// BigtableChannelPoolOption options for configurable
5151
type BigtableChannelPoolOption func(*BigtableChannelPool)
5252

53+
// connPoolStatsSupplier callback that returns a snapshot of connection pool statistics.
54+
type connPoolStatsSupplier func() []connPoolStats
55+
56+
// connPoolStats holds a snapshot of statistics for a single connection.
57+
type connPoolStats struct {
58+
OutstandingUnaryLoad int32
59+
OutstandingStreamingLoad int32
60+
ErrorCount int64
61+
IsALTSUsed bool
62+
LBPolicy string
63+
}
64+
65+
var _ Monitor = (*MetricsReporter)(nil)
66+
5367
// WithAppProfile provides the appProfile
5468
func WithAppProfile(appProfile string) BigtableChannelPoolOption {
5569
return func(p *BigtableChannelPool) {
@@ -131,6 +145,28 @@ func (bc *BigtableConn) Prime(ctx context.Context, fullInstanceName, appProfileI
131145
return nil
132146
}
133147

148+
// connPoolStatsSupplier returns a snapshot of the current connection pool statistics.
149+
func (p *BigtableChannelPool) connPoolStatsSupplier() []connPoolStats {
150+
conns := p.getConns()
151+
if len(conns) == 0 {
152+
return nil
153+
}
154+
155+
stats := make([]connPoolStats, len(conns))
156+
lbPolicy := p.strategy.String()
157+
158+
for i, entry := range conns {
159+
stats[i] = connPoolStats{
160+
OutstandingUnaryLoad: entry.unaryLoad.Load(),
161+
OutstandingStreamingLoad: entry.streamingLoad.Load(),
162+
ErrorCount: entry.errorCount.Swap(0),
163+
IsALTSUsed: entry.isALTSUsed(),
164+
LBPolicy: lbPolicy,
165+
}
166+
}
167+
return stats
168+
}
169+
134170
// NewBigtableConn creates a wrapped grpc Client Conn
135171
func NewBigtableConn(conn *grpc.ClientConn) *BigtableConn {
136172
bc := &BigtableConn{
@@ -239,6 +275,16 @@ type BigtableChannelPool struct {
239275
instanceName string
240276
featureFlagsMD metadata.MD
241277
meterProvider metric.MeterProvider
278+
// configs
279+
metricsConfig btopt.MetricsReporterConfig
280+
281+
// background monitors
282+
monitors []Monitor
283+
}
284+
285+
// WithMetricsReporterConfig attaches the relevant config for exporting the metrics
286+
func WithMetricsReporterConfig(config btopt.MetricsReporterConfig) BigtableChannelPoolOption {
287+
return func(p *BigtableChannelPool) { p.metricsConfig = config }
242288
}
243289

244290
// getConns safely loads the current slice of connections.
@@ -327,9 +373,27 @@ func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btop
327373
}
328374

329375
pool.conns.Store(&initialConns)
376+
377+
btopt.Debugf(pool.logger, "bigtable_connpool: using load balancing strategy: %s\n", strategy)
378+
379+
metricsReporter, err := NewMetricsReporter(pool.metricsConfig, pool.connPoolStatsSupplier, pool.logger, pool.meterProvider)
380+
if err == nil {
381+
// ignore
382+
pool.monitors = append(pool.monitors, metricsReporter)
383+
} else {
384+
btopt.Debugf(pool.logger, "bigtable_connpool: failed to create metrics reporter: %v\n", err)
385+
}
386+
pool.startMonitors()
330387
return pool, nil
331388
}
332389

390+
func (p *BigtableChannelPool) startMonitors() {
391+
for _, m := range p.monitors {
392+
btopt.Debugf(p.logger, "bigtable_connpool: Starting monitor %T\n", m)
393+
m.Start(p.poolCtx)
394+
}
395+
}
396+
333397
// Num returns the number of connections in the pool.
334398
func (p *BigtableChannelPool) Num() int {
335399
return len(p.getConns())
@@ -338,6 +402,10 @@ func (p *BigtableChannelPool) Num() int {
338402
// Close closes all connections in the pool.
339403
func (p *BigtableChannelPool) Close() error {
340404
p.poolCancel() // Cancel the context for background tasks
405+
// Stop all monitors.
406+
for _, m := range p.monitors {
407+
m.Stop()
408+
}
341409
conns := p.getConns()
342410
var errs multiError
343411

bigtable/internal/transport/connpool_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"io"
2222
"net/url"
23+
"reflect"
2324
"sort"
2425
"strings"
2526
"sync"
@@ -1391,6 +1392,87 @@ func TestRemoveConnections(t *testing.T) {
13911392
})
13921393
}
13931394

1395+
func TestConnPoolStatisticsVisitor(t *testing.T) {
1396+
ctx := context.Background()
1397+
poolSize := 3
1398+
fake := &fakeService{}
1399+
addr := setupTestServer(t, fake)
1400+
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
1401+
1402+
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...)
1403+
if err != nil {
1404+
t.Fatalf("Failed to create pool: %v", err)
1405+
}
1406+
defer pool.Close()
1407+
1408+
// Wait for connections to be established
1409+
time.Sleep(100 * time.Millisecond)
1410+
1411+
conns := pool.getConns()
1412+
if len(conns) != poolSize {
1413+
t.Fatalf("Pool size mismatch: got %d, want %d", len(conns), poolSize)
1414+
}
1415+
1416+
testData := []struct {
1417+
unary int32
1418+
streaming int32
1419+
errors int64
1420+
isALTS bool
1421+
}{
1422+
{unary: 5, streaming: 2, errors: 10, isALTS: false},
1423+
{unary: 0, streaming: 1, errors: 0, isALTS: true},
1424+
{unary: 3, streaming: 0, errors: 5, isALTS: false},
1425+
}
1426+
1427+
if len(testData) != poolSize {
1428+
t.Fatalf("Test data size: pool size mismatch: got %d, want %d", len(testData), poolSize)
1429+
}
1430+
1431+
for i, data := range testData {
1432+
if i < len(conns) {
1433+
conns[i].unaryLoad.Store(data.unary)
1434+
conns[i].streamingLoad.Store(data.streaming)
1435+
conns[i].errorCount.Store(data.errors)
1436+
conns[i].conn.isALTSConn.Store(data.isALTS)
1437+
}
1438+
}
1439+
1440+
// Get the snapshot
1441+
stats := pool.connPoolStatsSupplier()
1442+
1443+
if len(stats) != poolSize {
1444+
t.Errorf("Snapshot size mismatch: got %d, want %d", len(stats), poolSize)
1445+
}
1446+
1447+
expectedStats := make([]connPoolStats, poolSize)
1448+
for i, data := range testData {
1449+
expectedStats[i] = connPoolStats{
1450+
OutstandingUnaryLoad: data.unary,
1451+
OutstandingStreamingLoad: data.streaming,
1452+
ErrorCount: data.errors,
1453+
IsALTSUsed: data.isALTS,
1454+
LBPolicy: btopt.RoundRobin.String(),
1455+
}
1456+
}
1457+
1458+
sort.Slice(stats, func(i, j int) bool { return stats[i].OutstandingUnaryLoad < stats[j].OutstandingUnaryLoad })
1459+
sort.Slice(expectedStats, func(i, j int) bool {
1460+
return expectedStats[i].OutstandingUnaryLoad < expectedStats[j].OutstandingUnaryLoad
1461+
})
1462+
1463+
if !reflect.DeepEqual(stats, expectedStats) {
1464+
t.Errorf("Snapshot data mismatch:\ngot: %v\nwant: %v", stats, expectedStats)
1465+
}
1466+
1467+
// Verify error counts are reset
1468+
connsAfter := pool.getConns()
1469+
for i, entry := range connsAfter {
1470+
if entry.errorCount.Load() != 0 {
1471+
t.Errorf("entry[%d].errorCount was not reset: got %d, want 0", i, entry.errorCount.Load())
1472+
}
1473+
}
1474+
}
1475+
13941476
// --- Benchmarks ---
13951477

13961478
func createBenchmarkFake() *fakeService {
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package internal
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"log"
21+
"sync"
22+
"time"
23+
24+
btopt "cloud.google.com/go/bigtable/internal/option"
25+
"go.opentelemetry.io/otel/attribute"
26+
"go.opentelemetry.io/otel/metric"
27+
)
28+
29+
const (
30+
// outstandingRPCsMetricName is the name for the outstanding RPCs histogram.
31+
outstandingRPCsMetricName = "connection_pool/outstanding_rpcs"
32+
// perConnErrorCountMetricName is the name for the per-connection error count histogram.
33+
perConnErrorCountMetricName = "per_connection_error_count"
34+
// clientMeterName is the prefix for metrics name
35+
clientMeterName = "bigtable.googleapis.com/internal/client/"
36+
)
37+
38+
// MetricsReporter periodically collects and reports metrics for the connection pool.
39+
type MetricsReporter struct {
40+
config btopt.MetricsReporterConfig
41+
connPoolStatsSupplier connPoolStatsSupplier
42+
logger *log.Logger
43+
ticker *time.Ticker
44+
done chan struct{}
45+
stopOnce sync.Once
46+
47+
// OpenTelemetry metric instruments
48+
meterProvider metric.MeterProvider
49+
outstandingRPCsHistogram metric.Float64Histogram
50+
perConnectionErrorCountHistogram metric.Float64Histogram
51+
}
52+
53+
// NewMetricsReporter starts a monitor to export periodic metrics
54+
func NewMetricsReporter(config btopt.MetricsReporterConfig, connPoolStatsSupplier connPoolStatsSupplier, logger *log.Logger, mp metric.MeterProvider) (*MetricsReporter, error) {
55+
mr := &MetricsReporter{
56+
config: config,
57+
connPoolStatsSupplier: connPoolStatsSupplier,
58+
logger: logger,
59+
done: make(chan struct{}),
60+
meterProvider: mp,
61+
}
62+
63+
if !config.Enabled {
64+
// Stats Disabled, return error during ctor
65+
return nil, fmt.Errorf("bigtable_connpool: MetricsReporterConfig.Enabled is false")
66+
}
67+
68+
if mp == nil {
69+
// State: Stats Enabled but MeterProvider is nil, return error during ctor
70+
return nil, fmt.Errorf("bigtable_connpool: MetricsReporterConfig.Enabled is true, but MeterProvider is nil")
71+
}
72+
73+
// create meter
74+
meter := mp.Meter(clientMeterName)
75+
var err error
76+
77+
// fail if meter cannot be created.
78+
mr.outstandingRPCsHistogram, err = meter.Float64Histogram(outstandingRPCsMetricName, metric.WithDescription("Distribution of outstanding RPCs per connection."), metric.WithUnit("1"))
79+
if err != nil {
80+
return nil, fmt.Errorf("bigtable_connpool: failed to create %s histogram: %w", outstandingRPCsMetricName, err)
81+
}
82+
83+
mr.perConnectionErrorCountHistogram, err = meter.Float64Histogram(perConnErrorCountMetricName, metric.WithDescription("Distribution of errors per connection per minute."), metric.WithUnit("1"))
84+
if err != nil {
85+
return nil, fmt.Errorf("bigtable_connpool: failed to create %s histogram: %w", perConnErrorCountMetricName, err)
86+
}
87+
88+
return mr, nil
89+
}
90+
91+
// Start starts the reporter.
92+
func (mr *MetricsReporter) Start(ctx context.Context) {
93+
// config.Enabled should ensure that all relevant thing (meterProvider and meter should exists)
94+
if !mr.config.Enabled {
95+
return
96+
}
97+
98+
mr.ticker = time.NewTicker(mr.config.ReportingInterval)
99+
go func() {
100+
defer mr.ticker.Stop()
101+
for {
102+
select {
103+
case <-mr.ticker.C:
104+
mr.snapshotAndRecordMetrics(ctx)
105+
case <-mr.done:
106+
return
107+
case <-ctx.Done():
108+
return
109+
}
110+
}
111+
}()
112+
}
113+
114+
// Stop stops the reporter gracefully
115+
func (mr *MetricsReporter) Stop() {
116+
mr.stopOnce.Do(func() {
117+
if mr.config.Enabled {
118+
close(mr.done)
119+
}
120+
})
121+
}
122+
123+
// snapshotAndRecordMetrics collects and records metrics for the current state of the connection pool.
124+
func (mr *MetricsReporter) snapshotAndRecordMetrics(ctx context.Context) {
125+
stats := mr.connPoolStatsSupplier()
126+
if len(stats) == 0 {
127+
return
128+
}
129+
130+
for _, stat := range stats {
131+
// Record per-connection error count for the interval
132+
if mr.perConnectionErrorCountHistogram != nil {
133+
mr.perConnectionErrorCountHistogram.Record(ctx, float64(stat.ErrorCount))
134+
}
135+
transportType := "cloudpath"
136+
if stat.IsALTSUsed {
137+
transportType = "directpath"
138+
}
139+
140+
// Common attributes for this connection
141+
baseAttrs := []attribute.KeyValue{
142+
attribute.String("transport_type", transportType),
143+
attribute.String("lb_policy", stat.LBPolicy),
144+
}
145+
146+
if mr.outstandingRPCsHistogram != nil {
147+
// Record distribution sample for unary load
148+
unaryAttrs := attribute.NewSet(append(baseAttrs, attribute.Bool("streaming", false))...)
149+
mr.outstandingRPCsHistogram.Record(ctx, float64(stat.OutstandingUnaryLoad), metric.WithAttributeSet(unaryAttrs))
150+
151+
// Record distribution sample for streaming load
152+
streamingAttrs := attribute.NewSet(append(baseAttrs, attribute.Bool("streaming", true))...)
153+
mr.outstandingRPCsHistogram.Record(ctx, float64(stat.OutstandingStreamingLoad), metric.WithAttributeSet(streamingAttrs))
154+
}
155+
156+
}
157+
}

0 commit comments

Comments
 (0)