diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index a762905d13f7..f4c848d2fc60 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -119,6 +119,7 @@ func NewClient(ctx context.Context, project, instance string, opts ...option.Cli // NewClientWithConfig creates a new client with the given config. func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) { + clientCreationTimestamp := time.Now() metricsProvider := config.MetricsProvider if emulatorAddr := os.Getenv("BIGTABLE_EMULATOR_HOST"); emulatorAddr != "" { // Do not emit metrics when emulator is being used @@ -199,6 +200,7 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C } return btransport.NewBigtableConn(grpcConn), nil }, + clientCreationTimestamp, // options btransport.WithInstanceName(fullInstanceName), btransport.WithAppProfile(config.AppProfile), diff --git a/bigtable/go.mod b/bigtable/go.mod index 4955604b74d4..d97b11c10b35 100644 --- a/bigtable/go.mod +++ b/bigtable/go.mod @@ -2,6 +2,8 @@ module cloud.google.com/go/bigtable go 1.24.0 +toolchain go1.25.5 + require ( cloud.google.com/go v0.123.0 cloud.google.com/go/iam v1.5.3 diff --git a/bigtable/internal/transport/connpool.go b/bigtable/internal/transport/connpool.go index 3cf141745404..d6715b51038f 100644 --- a/bigtable/internal/transport/connpool.go +++ b/bigtable/internal/transport/connpool.go @@ -30,6 +30,7 @@ import ( "time" btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" gtransport "google.golang.org/api/transport/grpc" "google.golang.org/grpc/credentials/alts" @@ -299,7 +300,7 @@ func (p *BigtableChannelPool) getConns() []*connEntry { // NewBigtableChannelPool creates a pool of connPoolSize and takes the dial func() // NewBigtableChannelPool primes the new connection in a non-blocking goroutine to warm it up. // We keep it consistent with the current channelpool behavior which is lazily initialized. -func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btopt.LoadBalancingStrategy, dial func() (*BigtableConn, error), opts ...BigtableChannelPoolOption) (*BigtableChannelPool, error) { +func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btopt.LoadBalancingStrategy, dial func() (*BigtableConn, error), clientCreationTimestamp time.Time, opts ...BigtableChannelPoolOption) (*BigtableChannelPool, error) { if connPoolSize <= 0 { return nil, fmt.Errorf("bigtable_connpool: connPoolSize must be positive") } @@ -384,9 +385,39 @@ func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btop btopt.Debugf(pool.logger, "bigtable_connpool: failed to create metrics reporter: %v\n", err) } pool.startMonitors() + + // record the client startup time + // TODO: currently Prime() is non-blocking, we will make Prime() blocking and infer the transport type here. + transportType := "unknown" + pool.recordClientStartUp(clientCreationTimestamp, transportType) + return pool, nil } +func (p *BigtableChannelPool) recordClientStartUp(clientCreationTimestamp time.Time, transportType string) { + if p.meterProvider == nil { + return + } + + meter := p.meterProvider.Meter(clientMeterName) + // Define buckets for startup latency (in milliseconds) + bucketBounds := []float64{0, 10, 50, 100, 300, 500, 1000, 2000, 5000, 10000, 20000} + clientStartupTime, err := meter.Float64Histogram( + "startup_time", + metric.WithDescription("Total time for completion of logic of NewClientWithConfig"), + metric.WithUnit("ms"), + metric.WithExplicitBucketBoundaries(bucketBounds...), + ) + + if err == nil { + elapsedTime := float64(time.Since(clientCreationTimestamp).Milliseconds()) + clientStartupTime.Record(p.poolCtx, elapsedTime, metric.WithAttributes( + attribute.String("transport_type", transportType), + attribute.String("status", "OK"), + )) + } +} + func (p *BigtableChannelPool) startMonitors() { for _, m := range p.monitors { btopt.Debugf(p.logger, "bigtable_connpool: Starting monitor %T\n", m) diff --git a/bigtable/internal/transport/connpool_synctest_test.go b/bigtable/internal/transport/connpool_synctest_test.go new file mode 100644 index 000000000000..0e83ac9c7baa --- /dev/null +++ b/bigtable/internal/transport/connpool_synctest_test.go @@ -0,0 +1,104 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build synctest + +package internal + +import ( + "context" + "testing" + "testing/synctest" + "time" + + btopt "cloud.google.com/go/bigtable/internal/option" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +func TestRecordClientStartUp(t *testing.T) { + fake := &fakeService{} + addr := setupTestServer(t, fake) + dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } + + synctest.Test(t, func(t *testing.T) { + ctx := context.Background() + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + + poolSize := 1 + startTime := time.Now() + sleepTimer := 500 + time.Sleep(time.Duration(sleepTimer) * time.Millisecond) + + channelPoolOptions := append(poolOpts(), WithMeterProvider(provider)) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, startTime, channelPoolOptions...) + + if err != nil { + t.Fatalf("NewBigtableChannelPool failed: %v", err) + } + + defer pool.Close() + + // Collect metrics + rm := metricdata.ResourceMetrics{} + if err := reader.Collect(ctx, &rm); err != nil { + t.Fatalf("Failed to collect metrics: %v", err) + } + + if len(rm.ScopeMetrics) == 0 { + t.Fatalf("No scope metrics found") + } + sm := rm.ScopeMetrics[0] + if sm.Scope.Name != clientMeterName { + t.Errorf("Scope name got %q, want %q", sm.Scope.Name, clientMeterName) + } + + if len(sm.Metrics) == 0 { + t.Fatalf("No metrics found") + } + m := sm.Metrics[0] + + if m.Name != "startup_time" { + t.Errorf("Metric name got %q, want %q", m.Name, "startup_time") + } + if m.Unit != "ms" { + t.Errorf("Metric unit got %q, want %q", m.Unit, "ms") + } + + hist, ok := m.Data.(metricdata.Histogram[float64]) + if !ok { + t.Fatalf("Metric data is not a Histogram: %T", m.Data) + } + + if len(hist.DataPoints) != 1 { + t.Fatalf("Expected 1 data point, got %d", len(hist.DataPoints)) + } + dp := hist.DataPoints[0] + expectedAttrs := attribute.NewSet( + attribute.String("transport_type", "unknown"), + attribute.String("status", "OK"), + ) + if !dp.Attributes.Equals(&expectedAttrs) { + t.Errorf("Attributes got %v, want %v", dp.Attributes, expectedAttrs) + } + if dp.Count != 1 { + t.Errorf("Data point count got %d, want 1", dp.Count) + } + if dp.Sum != float64(sleepTimer) { + t.Errorf("Expected %f, got %f", float64(sleepTimer), dp.Sum) + } + }) +} diff --git a/bigtable/internal/transport/connpool_test.go b/bigtable/internal/transport/connpool_test.go index dc1b7221bd6f..fcc100033e9f 100644 --- a/bigtable/internal/transport/connpool_test.go +++ b/bigtable/internal/transport/connpool_test.go @@ -129,7 +129,7 @@ func TestNewBigtableChannelPoolEdgeCases(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - pool, err := NewBigtableChannelPool(ctx, tc.size, btopt.RoundRobin, tc.dial, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, tc.size, btopt.RoundRobin, tc.dial, time.Now(), poolOpts()...) if tc.wantErr { if err == nil { t.Errorf("NewBigtableChannelPool(%d) succeeded, want error containing %q", tc.size, tc.errMatch) @@ -304,7 +304,7 @@ func TestPoolInvoke(t *testing.T) { addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -376,7 +376,7 @@ func TestPoolNewStream(t *testing.T) { addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -440,7 +440,7 @@ func TestPoolNewStream(t *testing.T) { fake := &fakeService{} addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -491,7 +491,7 @@ func TestNewBigtableChannelPool(t *testing.T) { addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("NewBigtableChannelPool failed: %v", err) } @@ -538,7 +538,7 @@ func TestNewBigtableChannelPool(t *testing.T) { return dialBigtableserver(addr) } - _, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...) + _, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...) if err == nil { t.Errorf("NewBigtableChannelPool should have failed due to dial error") } @@ -644,7 +644,7 @@ func TestCachingStreamDecrement(t *testing.T) { addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -748,7 +748,7 @@ func TestMultipleStreamsSingleConn(t *testing.T) { addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -823,7 +823,7 @@ func TestPoolClose(t *testing.T) { fake := &fakeService{} addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -846,7 +846,7 @@ func TestGracefulDraining(t *testing.T) { dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } t.Run("DrainingOnReplaceConnection", func(t *testing.T) { - pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -914,7 +914,7 @@ func TestGracefulDraining(t *testing.T) { }) t.Run("SelectionSkipsDrainingConns", func(t *testing.T) { - pool, err := NewBigtableChannelPool(ctx, 3, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, 3, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -951,7 +951,7 @@ func TestGracefulDraining(t *testing.T) { maxDrainingTimeout = 100 * time.Millisecond defer func() { maxDrainingTimeout = originalTimeout }() - pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -1010,7 +1010,7 @@ func TestReplaceConnection(t *testing.T) { mu.Unlock() atomic.StoreInt32(&dialCount, 0) - pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -1044,7 +1044,7 @@ func TestReplaceConnection(t *testing.T) { mu.Unlock() atomic.StoreInt32(&dialCount, 0) - pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -1073,7 +1073,7 @@ func TestReplaceConnection(t *testing.T) { mu.Unlock() atomic.StoreInt32(&dialCount, 0) - poolCancelled, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...) + poolCancelled, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create poolCancelled: %v", err) } @@ -1103,7 +1103,7 @@ func TestReplaceConnection(t *testing.T) { fake.setPingErr(pingErr) atomic.StoreInt32(&dialCount, 0) - pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -1223,7 +1223,7 @@ func TestAddConnections(t *testing.T) { innerCtx, cancel := context.WithCancel(ctx) defer cancel() - pool, err := NewBigtableChannelPool(innerCtx, tc.initialSize, btopt.RoundRobin, baseDialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(innerCtx, tc.initialSize, btopt.RoundRobin, baseDialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -1326,7 +1326,7 @@ func TestRemoveConnections(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -1348,7 +1348,7 @@ func TestRemoveConnections(t *testing.T) { t.Run("VerifyOldestIsRemoved", func(t *testing.T) { poolSize := 5 - pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -1399,7 +1399,7 @@ func TestConnPoolStatisticsVisitor(t *testing.T) { addr := setupTestServer(t, fake) dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } - pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -1487,7 +1487,7 @@ func setupBenchmarkPool(b *testing.B, strategy btopt.LoadBalancingStrategy, pool } ctx := context.Background() - pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), poolOpts()...) if err != nil { b.Fatalf("Failed to create pool: %v", err) } diff --git a/bigtable/internal/transport/dynamic_scale_monitor_test.go b/bigtable/internal/transport/dynamic_scale_monitor_test.go index e1623642a959..93a1a343c053 100644 --- a/bigtable/internal/transport/dynamic_scale_monitor_test.go +++ b/bigtable/internal/transport/dynamic_scale_monitor_test.go @@ -127,7 +127,7 @@ func TestDynamicChannelScaling(t *testing.T) { tc.configOpt(&config) } - pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -153,7 +153,7 @@ func TestDynamicChannelScaling(t *testing.T) { config.MinScalingInterval = 5 * time.Minute initialSize := 3 - pool, err := NewBigtableChannelPool(ctx, initialSize, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, initialSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } @@ -188,7 +188,7 @@ func TestDynamicChannelScaling(t *testing.T) { t.Run("EmptyPoolNoAction", func(t *testing.T) { config := baseConfig - pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, poolOpts()...) + pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...) if err != nil { t.Fatalf("Failed to create pool: %v", err) } diff --git a/bigtable/internal/transport/metrics_reporter_test.go b/bigtable/internal/transport/metrics_reporter_test.go index 5dcf65d3fc02..52364f479e56 100644 --- a/bigtable/internal/transport/metrics_reporter_test.go +++ b/bigtable/internal/transport/metrics_reporter_test.go @@ -50,7 +50,7 @@ func TestMetricsExporting(t *testing.T) { poolSize := 2 strategy := btopt.RoundRobin - pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, WithMeterProvider(provider), WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig())) + pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), WithMeterProvider(provider), WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig())) if err != nil { t.Fatalf("Failed to create pool: %v", err) }