Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func NewClient(ctx context.Context, project, instance string, opts ...option.Cli

// NewClientWithConfig creates a new client with the given config.
func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) {
clientCreationTimestamp := time.Now()
metricsProvider := config.MetricsProvider
if emulatorAddr := os.Getenv("BIGTABLE_EMULATOR_HOST"); emulatorAddr != "" {
// Do not emit metrics when emulator is being used
Expand Down Expand Up @@ -199,6 +200,7 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
}
return btransport.NewBigtableConn(grpcConn), nil
},
clientCreationTimestamp,
// options
btransport.WithInstanceName(fullInstanceName),
btransport.WithAppProfile(config.AppProfile),
Expand Down
2 changes: 2 additions & 0 deletions bigtable/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module cloud.google.com/go/bigtable

go 1.24.0

toolchain go1.25.5

require (
cloud.google.com/go v0.123.0
cloud.google.com/go/iam v1.5.3
Expand Down
33 changes: 32 additions & 1 deletion bigtable/internal/transport/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
gtransport "google.golang.org/api/transport/grpc"
"google.golang.org/grpc/credentials/alts"
Expand Down Expand Up @@ -299,7 +300,7 @@ func (p *BigtableChannelPool) getConns() []*connEntry {
// NewBigtableChannelPool creates a pool of connPoolSize and takes the dial func()
// NewBigtableChannelPool primes the new connection in a non-blocking goroutine to warm it up.
// We keep it consistent with the current channelpool behavior which is lazily initialized.
func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btopt.LoadBalancingStrategy, dial func() (*BigtableConn, error), opts ...BigtableChannelPoolOption) (*BigtableChannelPool, error) {
func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btopt.LoadBalancingStrategy, dial func() (*BigtableConn, error), clientCreationTimestamp time.Time, opts ...BigtableChannelPoolOption) (*BigtableChannelPool, error) {
if connPoolSize <= 0 {
return nil, fmt.Errorf("bigtable_connpool: connPoolSize must be positive")
}
Expand Down Expand Up @@ -384,9 +385,39 @@ func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btop
btopt.Debugf(pool.logger, "bigtable_connpool: failed to create metrics reporter: %v\n", err)
}
pool.startMonitors()

// record the client startup time
// TODO: currently Prime() is non-blocking, we will make Prime() blocking and infer the transport type here.
transportType := "unknown"
pool.recordClientStartUp(clientCreationTimestamp, transportType)

return pool, nil
}

func (p *BigtableChannelPool) recordClientStartUp(clientCreationTimestamp time.Time, transportType string) {
if p.meterProvider == nil {
return
}

meter := p.meterProvider.Meter(clientMeterName)
// Define buckets for startup latency (in milliseconds)
bucketBounds := []float64{0, 10, 50, 100, 300, 500, 1000, 2000, 5000, 10000, 20000}
clientStartupTime, err := meter.Float64Histogram(
"startup_time",
metric.WithDescription("Total time for completion of logic of NewClientWithConfig"),
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(bucketBounds...),
)

if err == nil {
elapsedTime := float64(time.Since(clientCreationTimestamp).Milliseconds())
clientStartupTime.Record(p.poolCtx, elapsedTime, metric.WithAttributes(
attribute.String("transport_type", transportType),
attribute.String("status", "OK"),
))
}
}

func (p *BigtableChannelPool) startMonitors() {
for _, m := range p.monitors {
btopt.Debugf(p.logger, "bigtable_connpool: Starting monitor %T\n", m)
Expand Down
104 changes: 104 additions & 0 deletions bigtable/internal/transport/connpool_synctest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build synctest

package internal

import (
"context"
"testing"
"testing/synctest"
"time"

btopt "cloud.google.com/go/bigtable/internal/option"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func TestRecordClientStartUp(t *testing.T) {
fake := &fakeService{}
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }

synctest.Test(t, func(t *testing.T) {
ctx := context.Background()
reader := metric.NewManualReader()
provider := metric.NewMeterProvider(metric.WithReader(reader))

poolSize := 1
startTime := time.Now()
sleepTimer := 500
time.Sleep(time.Duration(sleepTimer) * time.Millisecond)

channelPoolOptions := append(poolOpts(), WithMeterProvider(provider))
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, startTime, channelPoolOptions...)

if err != nil {
t.Fatalf("NewBigtableChannelPool failed: %v", err)
}

defer pool.Close()

// Collect metrics
rm := metricdata.ResourceMetrics{}
if err := reader.Collect(ctx, &rm); err != nil {
t.Fatalf("Failed to collect metrics: %v", err)
}

if len(rm.ScopeMetrics) == 0 {
t.Fatalf("No scope metrics found")
}
sm := rm.ScopeMetrics[0]
if sm.Scope.Name != clientMeterName {
t.Errorf("Scope name got %q, want %q", sm.Scope.Name, clientMeterName)
}

if len(sm.Metrics) == 0 {
t.Fatalf("No metrics found")
}
m := sm.Metrics[0]

if m.Name != "startup_time" {
t.Errorf("Metric name got %q, want %q", m.Name, "startup_time")
}
if m.Unit != "ms" {
t.Errorf("Metric unit got %q, want %q", m.Unit, "ms")
}

hist, ok := m.Data.(metricdata.Histogram[float64])
if !ok {
t.Fatalf("Metric data is not a Histogram: %T", m.Data)
}

if len(hist.DataPoints) != 1 {
t.Fatalf("Expected 1 data point, got %d", len(hist.DataPoints))
}
dp := hist.DataPoints[0]
expectedAttrs := attribute.NewSet(
attribute.String("transport_type", "unknown"),
attribute.String("status", "OK"),
)
if !dp.Attributes.Equals(&expectedAttrs) {
t.Errorf("Attributes got %v, want %v", dp.Attributes, expectedAttrs)
}
if dp.Count != 1 {
t.Errorf("Data point count got %d, want 1", dp.Count)
}
if dp.Sum != float64(sleepTimer) {
t.Errorf("Expected %f, got %f", float64(sleepTimer), dp.Sum)
}
})
}
42 changes: 21 additions & 21 deletions bigtable/internal/transport/connpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestNewBigtableChannelPoolEdgeCases(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
pool, err := NewBigtableChannelPool(ctx, tc.size, btopt.RoundRobin, tc.dial, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, tc.size, btopt.RoundRobin, tc.dial, time.Now(), poolOpts()...)
if tc.wantErr {
if err == nil {
t.Errorf("NewBigtableChannelPool(%d) succeeded, want error containing %q", tc.size, tc.errMatch)
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestPoolInvoke(t *testing.T) {
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }

pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -376,7 +376,7 @@ func TestPoolNewStream(t *testing.T) {
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }

pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -440,7 +440,7 @@ func TestPoolNewStream(t *testing.T) {
fake := &fakeService{}
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -491,7 +491,7 @@ func TestNewBigtableChannelPool(t *testing.T) {
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }

pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("NewBigtableChannelPool failed: %v", err)
}
Expand Down Expand Up @@ -538,7 +538,7 @@ func TestNewBigtableChannelPool(t *testing.T) {
return dialBigtableserver(addr)
}

_, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...)
_, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...)
if err == nil {
t.Errorf("NewBigtableChannelPool should have failed due to dial error")
}
Expand Down Expand Up @@ -644,7 +644,7 @@ func TestCachingStreamDecrement(t *testing.T) {
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }

pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -748,7 +748,7 @@ func TestMultipleStreamsSingleConn(t *testing.T) {
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }

pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -823,7 +823,7 @@ func TestPoolClose(t *testing.T) {
fake := &fakeService{}
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand All @@ -846,7 +846,7 @@ func TestGracefulDraining(t *testing.T) {
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }

t.Run("DrainingOnReplaceConnection", func(t *testing.T) {
pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -914,7 +914,7 @@ func TestGracefulDraining(t *testing.T) {
})

t.Run("SelectionSkipsDrainingConns", func(t *testing.T) {
pool, err := NewBigtableChannelPool(ctx, 3, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, 3, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -951,7 +951,7 @@ func TestGracefulDraining(t *testing.T) {
maxDrainingTimeout = 100 * time.Millisecond
defer func() { maxDrainingTimeout = originalTimeout }()

pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -1010,7 +1010,7 @@ func TestReplaceConnection(t *testing.T) {
mu.Unlock()
atomic.StoreInt32(&dialCount, 0)

pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -1044,7 +1044,7 @@ func TestReplaceConnection(t *testing.T) {
mu.Unlock()
atomic.StoreInt32(&dialCount, 0)

pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -1073,7 +1073,7 @@ func TestReplaceConnection(t *testing.T) {
mu.Unlock()
atomic.StoreInt32(&dialCount, 0)

poolCancelled, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...)
poolCancelled, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create poolCancelled: %v", err)
}
Expand Down Expand Up @@ -1103,7 +1103,7 @@ func TestReplaceConnection(t *testing.T) {
fake.setPingErr(pingErr)
atomic.StoreInt32(&dialCount, 0)

pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -1223,7 +1223,7 @@ func TestAddConnections(t *testing.T) {
innerCtx, cancel := context.WithCancel(ctx)
defer cancel()

pool, err := NewBigtableChannelPool(innerCtx, tc.initialSize, btopt.RoundRobin, baseDialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(innerCtx, tc.initialSize, btopt.RoundRobin, baseDialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -1326,7 +1326,7 @@ func TestRemoveConnections(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand All @@ -1348,7 +1348,7 @@ func TestRemoveConnections(t *testing.T) {

t.Run("VerifyOldestIsRemoved", func(t *testing.T) {
poolSize := 5
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -1399,7 +1399,7 @@ func TestConnPoolStatisticsVisitor(t *testing.T) {
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }

pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -1487,7 +1487,7 @@ func setupBenchmarkPool(b *testing.B, strategy btopt.LoadBalancingStrategy, pool
}

ctx := context.Background()
pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), poolOpts()...)
if err != nil {
b.Fatalf("Failed to create pool: %v", err)
}
Expand Down
Loading
Loading