From 01646277172c7b22287e58c1a4344c4a54d6b1a6 Mon Sep 17 00:00:00 2001 From: Sushan Bhattarai Date: Mon, 29 Dec 2025 16:48:27 -0500 Subject: [PATCH 1/7] feat(bigtable): add client startup time metrics --- bigtable/bigtable.go | 2 + bigtable/internal/transport/connpool.go | 31 ++++- bigtable/internal/transport/connpool_test.go | 115 ++++++++++++++---- .../transport/dynamic_scale_monitor_test.go | 6 +- .../transport/metrics_reporter_test.go | 2 +- 5 files changed, 130 insertions(+), 26 deletions(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index a762905d13f7..0f5cbb344c7f 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -119,6 +119,7 @@ func NewClient(ctx context.Context, project, instance string, opts ...option.Cli // NewClientWithConfig creates a new client with the given config. func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) { + elaspedTimer := time.Now() metricsProvider := config.MetricsProvider if emulatorAddr := os.Getenv("BIGTABLE_EMULATOR_HOST"); emulatorAddr != "" { // Do not emit metrics when emulator is being used @@ -199,6 +200,7 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C } return btransport.NewBigtableConn(grpcConn), nil }, + elaspedTimer, // options btransport.WithInstanceName(fullInstanceName), btransport.WithAppProfile(config.AppProfile), diff --git a/bigtable/internal/transport/connpool.go b/bigtable/internal/transport/connpool.go index 3cf141745404..238ae91a12ab 100644 --- a/bigtable/internal/transport/connpool.go +++ b/bigtable/internal/transport/connpool.go @@ -30,6 +30,7 @@ import ( "time" btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" gtransport "google.golang.org/api/transport/grpc" "google.golang.org/grpc/credentials/alts" @@ -299,7 +300,7 @@ func (p *BigtableChannelPool) getConns() []*connEntry { // NewBigtableChannelPool creates a pool of connPoolSize and takes the dial func() // NewBigtableChannelPool primes the new connection in a non-blocking goroutine to warm it up. // We keep it consistent with the current channelpool behavior which is lazily initialized. -func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btopt.LoadBalancingStrategy, dial func() (*BigtableConn, error), opts ...BigtableChannelPoolOption) (*BigtableChannelPool, error) { +func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btopt.LoadBalancingStrategy, dial func() (*BigtableConn, error), startupTimer time.Time, opts ...BigtableChannelPoolOption) (*BigtableChannelPool, error) { if connPoolSize <= 0 { return nil, fmt.Errorf("bigtable_connpool: connPoolSize must be positive") } @@ -384,9 +385,37 @@ func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btop btopt.Debugf(pool.logger, "bigtable_connpool: failed to create metrics reporter: %v\n", err) } pool.startMonitors() + + // record the client startup time + // TODO: currently Prime() is non-blocking, we will make Prime() blocking and infer the transport type here. + transportType := "unknown" + pool.recordClientStartUp(startupTimer, transportType) + return pool, nil } +func (p *BigtableChannelPool) recordClientStartUp(elapsedTimer time.Time, transportType string) { + if p.meterProvider != nil { + meter := p.meterProvider.Meter(clientMeterName) + // Define buckets for startup latency (in milliseconds) + bucketBounds := []float64{0, 10, 50, 100, 300, 500, 1000, 2000, 5000, 10000, 20000} + clientStartupTime, err := meter.Float64Histogram( + "startup_time", + metric.WithDescription("Total time for completion of logic of NewClientWithConfig"), + metric.WithUnit("ms"), + metric.WithExplicitBucketBoundaries(bucketBounds...), + ) + + if err == nil { + elapsed := float64(time.Since(elapsedTimer).Milliseconds()) + clientStartupTime.Record(p.poolCtx, elapsed, metric.WithAttributes( + attribute.String("transport_type", transportType), + attribute.String("status", "OK"), + )) + } + } +} + func (p *BigtableChannelPool) startMonitors() { for _, m := range p.monitors { btopt.Debugf(p.logger, "bigtable_connpool: Starting monitor %T\n", m) diff --git a/bigtable/internal/transport/connpool_test.go b/bigtable/internal/transport/connpool_test.go index dc1b7221bd6f..8598b90f0be0 100644 --- a/bigtable/internal/transport/connpool_test.go +++ b/bigtable/internal/transport/connpool_test.go @@ -28,6 +28,9 @@ import ( "testing" "time" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "google.golang.org/grpc/codes" testpb "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/metadata" @@ -129,7 +132,7 @@ func TestNewBigtableChannelPoolEdgeCases(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - pool, err := NewBigtableChannelPool(ctx, tc.size, btopt.RoundRobin, tc.dial, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, tc.size, btopt.RoundRobin, tc.dial, time.Now(), poolOpts()...) if tc.wantErr { if err == nil { t.Errorf("NewBigtableChannelPool(%d) succeeded, want error containing %q", tc.size, tc.errMatch) @@ -304,7 +307,7 @@ func TestPoolInvoke(t *testing.T) { addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -376,7 +379,7 @@ func TestPoolNewStream(t *testing.T) { addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -440,7 +443,7 @@ func TestPoolNewStream(t *testing.T) { fake := &fakeService{} addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -491,7 +494,7 @@ func TestNewBigtableChannelPool(t *testing.T) { addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("NewBigtableChannelPool failed: %v", err) } @@ -538,7 +541,7 @@ func TestNewBigtableChannelPool(t *testing.T) { return dialBigtableserver(addr) } - _, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...) + _, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...) if err == nil { t.Errorf("NewBigtableChannelPool should have failed due to dial error") } @@ -644,7 +647,7 @@ func TestCachingStreamDecrement(t *testing.T) { addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -748,7 +751,7 @@ func TestMultipleStreamsSingleConn(t *testing.T) { addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -823,7 +826,7 @@ func TestPoolClose(t *testing.T) { fake := &fakeService{} addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -846,7 +849,7 @@ func TestGracefulDraining(t *testing.T) { dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } t.Run("DrainingOnReplaceConnection", func(t *testing.T) { - pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -914,7 +917,7 @@ func TestGracefulDraining(t *testing.T) { }) t.Run("SelectionSkipsDrainingConns", func(t *testing.T) { - pool, err := NewBigtableChannelPool(ctx, 3, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, 3, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -951,7 +954,7 @@ func TestGracefulDraining(t *testing.T) { maxDrainingTimeout = 100 * time.Millisecond defer func() { maxDrainingTimeout = originalTimeout }() - pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -1010,7 +1013,7 @@ func TestReplaceConnection(t *testing.T) { mu.Unlock() atomic.StoreInt32(&dialCount, 0) - pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -1044,7 +1047,7 @@ func TestReplaceConnection(t *testing.T) { mu.Unlock() atomic.StoreInt32(&dialCount, 0) - pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -1073,7 +1076,7 @@ func TestReplaceConnection(t *testing.T) { mu.Unlock() atomic.StoreInt32(&dialCount, 0) - poolCancelled, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...) + poolCancelled, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create poolCancelled: %v", err) } @@ -1103,7 +1106,7 @@ func TestReplaceConnection(t *testing.T) { fake.setPingErr(pingErr) atomic.StoreInt32(&dialCount, 0) - pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -1223,7 +1226,7 @@ func TestAddConnections(t *testing.T) { innerCtx, cancel := context.WithCancel(ctx) defer cancel() - pool, err := NewBigtableChannelPool(innerCtx, tc.initialSize, btopt.RoundRobin, baseDialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(innerCtx, tc.initialSize, btopt.RoundRobin, baseDialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -1326,7 +1329,7 @@ func TestRemoveConnections(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -1348,7 +1351,7 @@ func TestRemoveConnections(t *testing.T) { t.Run("VerifyOldestIsRemoved", func(t *testing.T) { poolSize := 5 - pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -1399,7 +1402,7 @@ func TestConnPoolStatisticsVisitor(t *testing.T) { addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -1473,6 +1476,76 @@ func TestConnPoolStatisticsVisitor(t *testing.T) { } } +func TestRecordClientStartUp(t *testing.T) { + ctx := context.Background() + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + + pool := &BigtableChannelPool{ + poolCtx: ctx, + meterProvider: provider, + } + + startTime := time.Now().Add(-500 * time.Millisecond) + transportType := "directpath" + + pool.recordClientStartUp(startTime, transportType) + + rm := metricdata.ResourceMetrics{} + if err := reader.Collect(ctx, &rm); err != nil { + t.Fatalf("Failed to collect metrics: %v", err) + } + + if len(rm.ScopeMetrics) == 0 { + t.Fatalf("No scope metrics found") + } + sm := rm.ScopeMetrics[0] + if sm.Scope.Name != clientMeterName { + t.Errorf("Scope name got %q, want %q", sm.Scope.Name, clientMeterName) + } + + if len(sm.Metrics) == 0 { + t.Fatalf("No metrics found") + } + m := sm.Metrics[0] + + if m.Name != "startup_time" { + t.Errorf("Metric name got %q, want %q", m.Name, "startup_time") + } + if m.Unit != "ms" { + t.Errorf("Metric unit got %q, want %q", m.Unit, "ms") + } + + hist, ok := m.Data.(metricdata.Histogram[float64]) + if !ok { + t.Fatalf("Metric data is not a Histogram: %T", m.Data) + } + + if len(hist.DataPoints) != 1 { + t.Fatalf("Expected 1 data point, got %d", len(hist.DataPoints)) + } + dp := hist.DataPoints[0] + + // Check attributes + expectedAttrs := attribute.NewSet( + attribute.String("transport_type", transportType), + attribute.String("status", "OK"), + ) + if !dp.Attributes.Equals(&expectedAttrs) { + t.Errorf("Attributes got %v, want %v", dp.Attributes, expectedAttrs) + } + + // Check value range + if dp.Count != 1 { + t.Errorf("Data point count got %d, want 1", dp.Count) + } + // The exact duration depends on execution speed, so check if it's positive. + // The test setup has a 500ms difference. + if dp.Sum <= 0 { + t.Errorf("Expected positive sum, got %f", dp.Sum) + } +} + // --- Benchmarks --- func createBenchmarkFake() *fakeService { @@ -1487,7 +1560,7 @@ func setupBenchmarkPool(b *testing.B, strategy btopt.LoadBalancingStrategy, pool } ctx := context.Background() - pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), poolOpts()...) if err != nil { b.Fatalf("Failed to create pool: %v", err) } diff --git a/bigtable/internal/transport/dynamic_scale_monitor_test.go b/bigtable/internal/transport/dynamic_scale_monitor_test.go index e1623642a959..93a1a343c053 100644 --- a/bigtable/internal/transport/dynamic_scale_monitor_test.go +++ b/bigtable/internal/transport/dynamic_scale_monitor_test.go @@ -127,7 +127,7 @@ func TestDynamicChannelScaling(t *testing.T) { tc.configOpt(&config) } - pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -153,7 +153,7 @@ func TestDynamicChannelScaling(t *testing.T) { config.MinScalingInterval = 5 * time.Minute initialSize := 3 - pool, err := NewBigtableChannelPool(ctx, initialSize, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, initialSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -188,7 +188,7 @@ func TestDynamicChannelScaling(t *testing.T) { t.Run("EmptyPoolNoAction", func(t *testing.T) { config := baseConfig - pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } diff --git a/bigtable/internal/transport/metrics_reporter_test.go b/bigtable/internal/transport/metrics_reporter_test.go index 5dcf65d3fc02..52364f479e56 100644 --- a/bigtable/internal/transport/metrics_reporter_test.go +++ b/bigtable/internal/transport/metrics_reporter_test.go @@ -50,7 +50,7 @@ func TestMetricsExporting(t *testing.T) { poolSize := 2 strategy := btopt.RoundRobin - pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, WithMeterProvider(provider), WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig())) + pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), WithMeterProvider(provider), WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig())) if err != nil { t.Fatalf("Failed to create pool: %v", err) } From a55e869d18d55f762e9af178db3b0d4cd149ece9 Mon Sep 17 00:00:00 2001 From: Sushan Bhattarai Date: Mon, 29 Dec 2025 16:50:15 -0500 Subject: [PATCH 2/7] fix --- bigtable/internal/transport/connpool_test.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/bigtable/internal/transport/connpool_test.go b/bigtable/internal/transport/connpool_test.go index 8598b90f0be0..ddfd53668ec5 100644 --- a/bigtable/internal/transport/connpool_test.go +++ b/bigtable/internal/transport/connpool_test.go @@ -1525,8 +1525,6 @@ func TestRecordClientStartUp(t *testing.T) { t.Fatalf("Expected 1 data point, got %d", len(hist.DataPoints)) } dp := hist.DataPoints[0] - - // Check attributes expectedAttrs := attribute.NewSet( attribute.String("transport_type", transportType), attribute.String("status", "OK"), @@ -1534,13 +1532,11 @@ func TestRecordClientStartUp(t *testing.T) { if !dp.Attributes.Equals(&expectedAttrs) { t.Errorf("Attributes got %v, want %v", dp.Attributes, expectedAttrs) } - - // Check value range + // Check count on bucket if dp.Count != 1 { t.Errorf("Data point count got %d, want 1", dp.Count) } - // The exact duration depends on execution speed, so check if it's positive. - // The test setup has a 500ms difference. + // sanity check to see if it is postive. if dp.Sum <= 0 { t.Errorf("Expected positive sum, got %f", dp.Sum) } From 679f1e18a9b2f0bf122da23b3dd07456fa38bbf1 Mon Sep 17 00:00:00 2001 From: Sushan Bhattarai Date: Tue, 30 Dec 2025 15:32:59 -0500 Subject: [PATCH 3/7] address review --- bigtable/internal/transport/connpool.go | 38 ++++++++++---------- bigtable/internal/transport/connpool_test.go | 26 ++++++++------ 2 files changed, 36 insertions(+), 28 deletions(-) diff --git a/bigtable/internal/transport/connpool.go b/bigtable/internal/transport/connpool.go index 238ae91a12ab..b2ecc5ffc31b 100644 --- a/bigtable/internal/transport/connpool.go +++ b/bigtable/internal/transport/connpool.go @@ -395,24 +395,26 @@ func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btop } func (p *BigtableChannelPool) recordClientStartUp(elapsedTimer time.Time, transportType string) { - if p.meterProvider != nil { - meter := p.meterProvider.Meter(clientMeterName) - // Define buckets for startup latency (in milliseconds) - bucketBounds := []float64{0, 10, 50, 100, 300, 500, 1000, 2000, 5000, 10000, 20000} - clientStartupTime, err := meter.Float64Histogram( - "startup_time", - metric.WithDescription("Total time for completion of logic of NewClientWithConfig"), - metric.WithUnit("ms"), - metric.WithExplicitBucketBoundaries(bucketBounds...), - ) - - if err == nil { - elapsed := float64(time.Since(elapsedTimer).Milliseconds()) - clientStartupTime.Record(p.poolCtx, elapsed, metric.WithAttributes( - attribute.String("transport_type", transportType), - attribute.String("status", "OK"), - )) - } + if p.meterProvider == nil { + return + } + + meter := p.meterProvider.Meter(clientMeterName) + // Define buckets for startup latency (in milliseconds) + bucketBounds := []float64{0, 10, 50, 100, 300, 500, 1000, 2000, 5000, 10000, 20000} + clientStartupTime, err := meter.Float64Histogram( + "startup_time", + metric.WithDescription("Total time for completion of logic of NewClientWithConfig"), + metric.WithUnit("ms"), + metric.WithExplicitBucketBoundaries(bucketBounds...), + ) + + if err == nil { + elapsed := float64(time.Since(elapsedTimer).Milliseconds()) + clientStartupTime.Record(p.poolCtx, elapsed, metric.WithAttributes( + attribute.String("transport_type", transportType), + attribute.String("status", "OK"), + )) } } diff --git a/bigtable/internal/transport/connpool_test.go b/bigtable/internal/transport/connpool_test.go index ddfd53668ec5..cd40b29376a9 100644 --- a/bigtable/internal/transport/connpool_test.go +++ b/bigtable/internal/transport/connpool_test.go @@ -1481,16 +1481,22 @@ func TestRecordClientStartUp(t *testing.T) { reader := metric.NewManualReader() provider := metric.NewMeterProvider(metric.WithReader(reader)) - pool := &BigtableChannelPool{ - poolCtx: ctx, - meterProvider: provider, - } + fake := &fakeService{} + addr := setupTestServer(t, fake) + dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } + poolSize := 1 startTime := time.Now().Add(-500 * time.Millisecond) - transportType := "directpath" + channelPoolOptions := append(poolOpts(), WithMeterProvider(provider)) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, startTime, channelPoolOptions...) - pool.recordClientStartUp(startTime, transportType) + if err != nil { + t.Fatalf("NewBigtableChannelPool failed: %v", err) + } + + defer pool.Close() + // Collect metrics rm := metricdata.ResourceMetrics{} if err := reader.Collect(ctx, &rm); err != nil { t.Fatalf("Failed to collect metrics: %v", err) @@ -1526,7 +1532,7 @@ func TestRecordClientStartUp(t *testing.T) { } dp := hist.DataPoints[0] expectedAttrs := attribute.NewSet( - attribute.String("transport_type", transportType), + attribute.String("transport_type", "unknown"), attribute.String("status", "OK"), ) if !dp.Attributes.Equals(&expectedAttrs) { @@ -1536,9 +1542,9 @@ func TestRecordClientStartUp(t *testing.T) { if dp.Count != 1 { t.Errorf("Data point count got %d, want 1", dp.Count) } - // sanity check to see if it is postive. - if dp.Sum <= 0 { - t.Errorf("Expected positive sum, got %f", dp.Sum) + // sanity check to see if it is greater than 500. + if dp.Sum < 500 { + t.Errorf("Expected positive sum > 500, got %f", dp.Sum) } } From b80219700735e3da8a3577bf562e98782a7a1f55 Mon Sep 17 00:00:00 2001 From: Sushan Bhattarai Date: Tue, 30 Dec 2025 17:03:10 -0500 Subject: [PATCH 4/7] add synctest --- bigtable/bigtable.go | 4 +- bigtable/go.mod | 1 + bigtable/internal/transport/connpool.go | 10 +- bigtable/internal/transport/connpool_test.go | 119 ++++++++++--------- 4 files changed, 69 insertions(+), 65 deletions(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 0f5cbb344c7f..f4c848d2fc60 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -119,7 +119,7 @@ func NewClient(ctx context.Context, project, instance string, opts ...option.Cli // NewClientWithConfig creates a new client with the given config. func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) { - elaspedTimer := time.Now() + clientCreationTimestamp := time.Now() metricsProvider := config.MetricsProvider if emulatorAddr := os.Getenv("BIGTABLE_EMULATOR_HOST"); emulatorAddr != "" { // Do not emit metrics when emulator is being used @@ -200,7 +200,7 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C } return btransport.NewBigtableConn(grpcConn), nil }, - elaspedTimer, + clientCreationTimestamp, // options btransport.WithInstanceName(fullInstanceName), btransport.WithAppProfile(config.AppProfile), diff --git a/bigtable/go.mod b/bigtable/go.mod index 4955604b74d4..740c93523927 100644 --- a/bigtable/go.mod +++ b/bigtable/go.mod @@ -1,6 +1,7 @@ module cloud.google.com/go/bigtable go 1.24.0 +toolchain go1.25.5 require ( cloud.google.com/go v0.123.0 diff --git a/bigtable/internal/transport/connpool.go b/bigtable/internal/transport/connpool.go index b2ecc5ffc31b..d6715b51038f 100644 --- a/bigtable/internal/transport/connpool.go +++ b/bigtable/internal/transport/connpool.go @@ -300,7 +300,7 @@ func (p *BigtableChannelPool) getConns() []*connEntry { // NewBigtableChannelPool creates a pool of connPoolSize and takes the dial func() // NewBigtableChannelPool primes the new connection in a non-blocking goroutine to warm it up. // We keep it consistent with the current channelpool behavior which is lazily initialized. -func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btopt.LoadBalancingStrategy, dial func() (*BigtableConn, error), startupTimer time.Time, opts ...BigtableChannelPoolOption) (*BigtableChannelPool, error) { +func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btopt.LoadBalancingStrategy, dial func() (*BigtableConn, error), clientCreationTimestamp time.Time, opts ...BigtableChannelPoolOption) (*BigtableChannelPool, error) { if connPoolSize <= 0 { return nil, fmt.Errorf("bigtable_connpool: connPoolSize must be positive") } @@ -389,12 +389,12 @@ func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btop // record the client startup time // TODO: currently Prime() is non-blocking, we will make Prime() blocking and infer the transport type here. transportType := "unknown" - pool.recordClientStartUp(startupTimer, transportType) + pool.recordClientStartUp(clientCreationTimestamp, transportType) return pool, nil } -func (p *BigtableChannelPool) recordClientStartUp(elapsedTimer time.Time, transportType string) { +func (p *BigtableChannelPool) recordClientStartUp(clientCreationTimestamp time.Time, transportType string) { if p.meterProvider == nil { return } @@ -410,8 +410,8 @@ func (p *BigtableChannelPool) recordClientStartUp(elapsedTimer time.Time, transp ) if err == nil { - elapsed := float64(time.Since(elapsedTimer).Milliseconds()) - clientStartupTime.Record(p.poolCtx, elapsed, metric.WithAttributes( + elapsedTime := float64(time.Since(clientCreationTimestamp).Milliseconds()) + clientStartupTime.Record(p.poolCtx, elapsedTime, metric.WithAttributes( attribute.String("transport_type", transportType), attribute.String("status", "OK"), )) diff --git a/bigtable/internal/transport/connpool_test.go b/bigtable/internal/transport/connpool_test.go index cd40b29376a9..448f2df8d18b 100644 --- a/bigtable/internal/transport/connpool_test.go +++ b/bigtable/internal/transport/connpool_test.go @@ -26,6 +26,7 @@ import ( "sync" "sync/atomic" "testing" + "testing/synctest" "time" "go.opentelemetry.io/otel/attribute" @@ -1475,77 +1476,79 @@ func TestConnPoolStatisticsVisitor(t *testing.T) { } } } - func TestRecordClientStartUp(t *testing.T) { - ctx := context.Background() - reader := metric.NewManualReader() - provider := metric.NewMeterProvider(metric.WithReader(reader)) - fake := &fakeService{} addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - poolSize := 1 - startTime := time.Now().Add(-500 * time.Millisecond) - channelPoolOptions := append(poolOpts(), WithMeterProvider(provider)) - pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, startTime, channelPoolOptions...) + synctest.Test(t, func(t *testing.T) { + ctx := context.Background() + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) - if err != nil { - t.Fatalf("NewBigtableChannelPool failed: %v", err) - } + poolSize := 1 + startTime := time.Now() + sleepTimer := 500 + time.Sleep(time.Duration(sleepTimer) * time.Millisecond) - defer pool.Close() + channelPoolOptions := append(poolOpts(), WithMeterProvider(provider)) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, startTime, channelPoolOptions...) - // Collect metrics - rm := metricdata.ResourceMetrics{} - if err := reader.Collect(ctx, &rm); err != nil { - t.Fatalf("Failed to collect metrics: %v", err) - } + if err != nil { + t.Fatalf("NewBigtableChannelPool failed: %v", err) + } - if len(rm.ScopeMetrics) == 0 { - t.Fatalf("No scope metrics found") - } - sm := rm.ScopeMetrics[0] - if sm.Scope.Name != clientMeterName { - t.Errorf("Scope name got %q, want %q", sm.Scope.Name, clientMeterName) - } + defer pool.Close() - if len(sm.Metrics) == 0 { - t.Fatalf("No metrics found") - } - m := sm.Metrics[0] + // Collect metrics + rm := metricdata.ResourceMetrics{} + if err := reader.Collect(ctx, &rm); err != nil { + t.Fatalf("Failed to collect metrics: %v", err) + } - if m.Name != "startup_time" { - t.Errorf("Metric name got %q, want %q", m.Name, "startup_time") - } - if m.Unit != "ms" { - t.Errorf("Metric unit got %q, want %q", m.Unit, "ms") - } + if len(rm.ScopeMetrics) == 0 { + t.Fatalf("No scope metrics found") + } + sm := rm.ScopeMetrics[0] + if sm.Scope.Name != clientMeterName { + t.Errorf("Scope name got %q, want %q", sm.Scope.Name, clientMeterName) + } - hist, ok := m.Data.(metricdata.Histogram[float64]) - if !ok { - t.Fatalf("Metric data is not a Histogram: %T", m.Data) - } + if len(sm.Metrics) == 0 { + t.Fatalf("No metrics found") + } + m := sm.Metrics[0] - if len(hist.DataPoints) != 1 { - t.Fatalf("Expected 1 data point, got %d", len(hist.DataPoints)) - } - dp := hist.DataPoints[0] - expectedAttrs := attribute.NewSet( - attribute.String("transport_type", "unknown"), - attribute.String("status", "OK"), - ) - if !dp.Attributes.Equals(&expectedAttrs) { - t.Errorf("Attributes got %v, want %v", dp.Attributes, expectedAttrs) - } - // Check count on bucket - if dp.Count != 1 { - t.Errorf("Data point count got %d, want 1", dp.Count) - } - // sanity check to see if it is greater than 500. - if dp.Sum < 500 { - t.Errorf("Expected positive sum > 500, got %f", dp.Sum) - } + if m.Name != "startup_time" { + t.Errorf("Metric name got %q, want %q", m.Name, "startup_time") + } + if m.Unit != "ms" { + t.Errorf("Metric unit got %q, want %q", m.Unit, "ms") + } + + hist, ok := m.Data.(metricdata.Histogram[float64]) + if !ok { + t.Fatalf("Metric data is not a Histogram: %T", m.Data) + } + + if len(hist.DataPoints) != 1 { + t.Fatalf("Expected 1 data point, got %d", len(hist.DataPoints)) + } + dp := hist.DataPoints[0] + expectedAttrs := attribute.NewSet( + attribute.String("transport_type", "unknown"), + attribute.String("status", "OK"), + ) + if !dp.Attributes.Equals(&expectedAttrs) { + t.Errorf("Attributes got %v, want %v", dp.Attributes, expectedAttrs) + } + if dp.Count != 1 { + t.Errorf("Data point count got %d, want 1", dp.Count) + } + if dp.Sum != float64(sleepTimer) { + t.Errorf("Expected %f, got %f", float64(sleepTimer), dp.Sum) + } + }) } // --- Benchmarks --- From 4c546bfaadd51979c8e0101b7119de6505c09caf Mon Sep 17 00:00:00 2001 From: Sushan Bhattarai Date: Tue, 30 Dec 2025 17:08:52 -0500 Subject: [PATCH 5/7] add synctest --- go.mod | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go.mod b/go.mod index 53ac90f3c9a2..4dc047306bd7 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module cloud.google.com/go go 1.24.0 +toolchain go1.25.5 + require ( cloud.google.com/go/storage v1.57.2 github.com/google/go-cmp v0.7.0 From ff811983262b87b0d1f6d7032564193bff9216cc Mon Sep 17 00:00:00 2001 From: Sushan Bhattarai Date: Tue, 30 Dec 2025 17:10:35 -0500 Subject: [PATCH 6/7] fix --- bigtable/go.mod | 1 + go.mod | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/bigtable/go.mod b/bigtable/go.mod index 740c93523927..d97b11c10b35 100644 --- a/bigtable/go.mod +++ b/bigtable/go.mod @@ -1,6 +1,7 @@ module cloud.google.com/go/bigtable go 1.24.0 + toolchain go1.25.5 require ( diff --git a/go.mod b/go.mod index 4dc047306bd7..53ac90f3c9a2 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module cloud.google.com/go go 1.24.0 -toolchain go1.25.5 - require ( cloud.google.com/go/storage v1.57.2 github.com/google/go-cmp v0.7.0 From e599b6267460ba221112860ec459b61f574a48c4 Mon Sep 17 00:00:00 2001 From: Sushan Bhattarai Date: Tue, 30 Dec 2025 18:00:51 -0500 Subject: [PATCH 7/7] make kokoro happy --- .../transport/connpool_synctest_test.go | 104 ++++++++++++++++++ bigtable/internal/transport/connpool_test.go | 78 ------------- 2 files changed, 104 insertions(+), 78 deletions(-) create mode 100644 bigtable/internal/transport/connpool_synctest_test.go diff --git a/bigtable/internal/transport/connpool_synctest_test.go b/bigtable/internal/transport/connpool_synctest_test.go new file mode 100644 index 000000000000..0e83ac9c7baa --- /dev/null +++ b/bigtable/internal/transport/connpool_synctest_test.go @@ -0,0 +1,104 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build synctest + +package internal + +import ( + "context" + "testing" + "testing/synctest" + "time" + + btopt "cloud.google.com/go/bigtable/internal/option" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +func TestRecordClientStartUp(t *testing.T) { + fake := &fakeService{} + addr := setupTestServer(t, fake) + dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } + + synctest.Test(t, func(t *testing.T) { + ctx := context.Background() + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + + poolSize := 1 + startTime := time.Now() + sleepTimer := 500 + time.Sleep(time.Duration(sleepTimer) * time.Millisecond) + + channelPoolOptions := append(poolOpts(), WithMeterProvider(provider)) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, startTime, channelPoolOptions...) + + if err != nil { + t.Fatalf("NewBigtableChannelPool failed: %v", err) + } + + defer pool.Close() + + // Collect metrics + rm := metricdata.ResourceMetrics{} + if err := reader.Collect(ctx, &rm); err != nil { + t.Fatalf("Failed to collect metrics: %v", err) + } + + if len(rm.ScopeMetrics) == 0 { + t.Fatalf("No scope metrics found") + } + sm := rm.ScopeMetrics[0] + if sm.Scope.Name != clientMeterName { + t.Errorf("Scope name got %q, want %q", sm.Scope.Name, clientMeterName) + } + + if len(sm.Metrics) == 0 { + t.Fatalf("No metrics found") + } + m := sm.Metrics[0] + + if m.Name != "startup_time" { + t.Errorf("Metric name got %q, want %q", m.Name, "startup_time") + } + if m.Unit != "ms" { + t.Errorf("Metric unit got %q, want %q", m.Unit, "ms") + } + + hist, ok := m.Data.(metricdata.Histogram[float64]) + if !ok { + t.Fatalf("Metric data is not a Histogram: %T", m.Data) + } + + if len(hist.DataPoints) != 1 { + t.Fatalf("Expected 1 data point, got %d", len(hist.DataPoints)) + } + dp := hist.DataPoints[0] + expectedAttrs := attribute.NewSet( + attribute.String("transport_type", "unknown"), + attribute.String("status", "OK"), + ) + if !dp.Attributes.Equals(&expectedAttrs) { + t.Errorf("Attributes got %v, want %v", dp.Attributes, expectedAttrs) + } + if dp.Count != 1 { + t.Errorf("Data point count got %d, want 1", dp.Count) + } + if dp.Sum != float64(sleepTimer) { + t.Errorf("Expected %f, got %f", float64(sleepTimer), dp.Sum) + } + }) +} diff --git a/bigtable/internal/transport/connpool_test.go b/bigtable/internal/transport/connpool_test.go index 448f2df8d18b..fcc100033e9f 100644 --- a/bigtable/internal/transport/connpool_test.go +++ b/bigtable/internal/transport/connpool_test.go @@ -26,12 +26,8 @@ import ( "sync" "sync/atomic" "testing" - "testing/synctest" "time" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/metricdata" "google.golang.org/grpc/codes" testpb "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/metadata" @@ -1476,80 +1472,6 @@ func TestConnPoolStatisticsVisitor(t *testing.T) { } } } -func TestRecordClientStartUp(t *testing.T) { - fake := &fakeService{} - addr := setupTestServer(t, fake) - dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - - synctest.Test(t, func(t *testing.T) { - ctx := context.Background() - reader := metric.NewManualReader() - provider := metric.NewMeterProvider(metric.WithReader(reader)) - - poolSize := 1 - startTime := time.Now() - sleepTimer := 500 - time.Sleep(time.Duration(sleepTimer) * time.Millisecond) - - channelPoolOptions := append(poolOpts(), WithMeterProvider(provider)) - pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, startTime, channelPoolOptions...) - - if err != nil { - t.Fatalf("NewBigtableChannelPool failed: %v", err) - } - - defer pool.Close() - - // Collect metrics - rm := metricdata.ResourceMetrics{} - if err := reader.Collect(ctx, &rm); err != nil { - t.Fatalf("Failed to collect metrics: %v", err) - } - - if len(rm.ScopeMetrics) == 0 { - t.Fatalf("No scope metrics found") - } - sm := rm.ScopeMetrics[0] - if sm.Scope.Name != clientMeterName { - t.Errorf("Scope name got %q, want %q", sm.Scope.Name, clientMeterName) - } - - if len(sm.Metrics) == 0 { - t.Fatalf("No metrics found") - } - m := sm.Metrics[0] - - if m.Name != "startup_time" { - t.Errorf("Metric name got %q, want %q", m.Name, "startup_time") - } - if m.Unit != "ms" { - t.Errorf("Metric unit got %q, want %q", m.Unit, "ms") - } - - hist, ok := m.Data.(metricdata.Histogram[float64]) - if !ok { - t.Fatalf("Metric data is not a Histogram: %T", m.Data) - } - - if len(hist.DataPoints) != 1 { - t.Fatalf("Expected 1 data point, got %d", len(hist.DataPoints)) - } - dp := hist.DataPoints[0] - expectedAttrs := attribute.NewSet( - attribute.String("transport_type", "unknown"), - attribute.String("status", "OK"), - ) - if !dp.Attributes.Equals(&expectedAttrs) { - t.Errorf("Attributes got %v, want %v", dp.Attributes, expectedAttrs) - } - if dp.Count != 1 { - t.Errorf("Data point count got %d, want 1", dp.Count) - } - if dp.Sum != float64(sleepTimer) { - t.Errorf("Expected %f, got %f", float64(sleepTimer), dp.Sum) - } - }) -} // --- Benchmarks ---