Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func NewClient(ctx context.Context, project, instance string, opts ...option.Cli

// NewClientWithConfig creates a new client with the given config.
func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) {
elaspedTimer := time.Now()
metricsProvider := config.MetricsProvider
if emulatorAddr := os.Getenv("BIGTABLE_EMULATOR_HOST"); emulatorAddr != "" {
// Do not emit metrics when emulator is being used
Expand Down Expand Up @@ -199,6 +200,7 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
}
return btransport.NewBigtableConn(grpcConn), nil
},
elaspedTimer,
// options
btransport.WithInstanceName(fullInstanceName),
btransport.WithAppProfile(config.AppProfile),
Expand Down
31 changes: 30 additions & 1 deletion bigtable/internal/transport/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
gtransport "google.golang.org/api/transport/grpc"
"google.golang.org/grpc/credentials/alts"
Expand Down Expand Up @@ -299,7 +300,7 @@ func (p *BigtableChannelPool) getConns() []*connEntry {
// NewBigtableChannelPool creates a pool of connPoolSize and takes the dial func()
// NewBigtableChannelPool primes the new connection in a non-blocking goroutine to warm it up.
// We keep it consistent with the current channelpool behavior which is lazily initialized.
func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btopt.LoadBalancingStrategy, dial func() (*BigtableConn, error), opts ...BigtableChannelPoolOption) (*BigtableChannelPool, error) {
func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btopt.LoadBalancingStrategy, dial func() (*BigtableConn, error), startupTimer time.Time, opts ...BigtableChannelPoolOption) (*BigtableChannelPool, error) {
if connPoolSize <= 0 {
return nil, fmt.Errorf("bigtable_connpool: connPoolSize must be positive")
}
Expand Down Expand Up @@ -384,9 +385,37 @@ func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btop
btopt.Debugf(pool.logger, "bigtable_connpool: failed to create metrics reporter: %v\n", err)
}
pool.startMonitors()

// record the client startup time
// TODO: currently Prime() is non-blocking, we will make Prime() blocking and infer the transport type here.
transportType := "unknown"
pool.recordClientStartUp(startupTimer, transportType)

return pool, nil
}

func (p *BigtableChannelPool) recordClientStartUp(elapsedTimer time.Time, transportType string) {
if p.meterProvider != nil {
meter := p.meterProvider.Meter(clientMeterName)
// Define buckets for startup latency (in milliseconds)
bucketBounds := []float64{0, 10, 50, 100, 300, 500, 1000, 2000, 5000, 10000, 20000}
clientStartupTime, err := meter.Float64Histogram(
"startup_time",
metric.WithDescription("Total time for completion of logic of NewClientWithConfig"),
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(bucketBounds...),
)

if err == nil {
elapsed := float64(time.Since(elapsedTimer).Milliseconds())
clientStartupTime.Record(p.poolCtx, elapsed, metric.WithAttributes(
attribute.String("transport_type", transportType),
attribute.String("status", "OK"),
))
}
}
}

func (p *BigtableChannelPool) startMonitors() {
for _, m := range p.monitors {
btopt.Debugf(p.logger, "bigtable_connpool: Starting monitor %T\n", m)
Expand Down
111 changes: 90 additions & 21 deletions bigtable/internal/transport/connpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"testing"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"google.golang.org/grpc/codes"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -129,7 +132,7 @@ func TestNewBigtableChannelPoolEdgeCases(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
pool, err := NewBigtableChannelPool(ctx, tc.size, btopt.RoundRobin, tc.dial, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, tc.size, btopt.RoundRobin, tc.dial, time.Now(), poolOpts()...)
if tc.wantErr {
if err == nil {
t.Errorf("NewBigtableChannelPool(%d) succeeded, want error containing %q", tc.size, tc.errMatch)
Expand Down Expand Up @@ -304,7 +307,7 @@ func TestPoolInvoke(t *testing.T) {
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }

pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -376,7 +379,7 @@ func TestPoolNewStream(t *testing.T) {
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }

pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -440,7 +443,7 @@ func TestPoolNewStream(t *testing.T) {
fake := &fakeService{}
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -491,7 +494,7 @@ func TestNewBigtableChannelPool(t *testing.T) {
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }

pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("NewBigtableChannelPool failed: %v", err)
}
Expand Down Expand Up @@ -538,7 +541,7 @@ func TestNewBigtableChannelPool(t *testing.T) {
return dialBigtableserver(addr)
}

_, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...)
_, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...)
if err == nil {
t.Errorf("NewBigtableChannelPool should have failed due to dial error")
}
Expand Down Expand Up @@ -644,7 +647,7 @@ func TestCachingStreamDecrement(t *testing.T) {
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }

pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -748,7 +751,7 @@ func TestMultipleStreamsSingleConn(t *testing.T) {
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }

pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -823,7 +826,7 @@ func TestPoolClose(t *testing.T) {
fake := &fakeService{}
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.LeastInFlight, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand All @@ -846,7 +849,7 @@ func TestGracefulDraining(t *testing.T) {
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }

t.Run("DrainingOnReplaceConnection", func(t *testing.T) {
pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -914,7 +917,7 @@ func TestGracefulDraining(t *testing.T) {
})

t.Run("SelectionSkipsDrainingConns", func(t *testing.T) {
pool, err := NewBigtableChannelPool(ctx, 3, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, 3, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -951,7 +954,7 @@ func TestGracefulDraining(t *testing.T) {
maxDrainingTimeout = 100 * time.Millisecond
defer func() { maxDrainingTimeout = originalTimeout }()

pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -1010,7 +1013,7 @@ func TestReplaceConnection(t *testing.T) {
mu.Unlock()
atomic.StoreInt32(&dialCount, 0)

pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -1044,7 +1047,7 @@ func TestReplaceConnection(t *testing.T) {
mu.Unlock()
atomic.StoreInt32(&dialCount, 0)

pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -1073,7 +1076,7 @@ func TestReplaceConnection(t *testing.T) {
mu.Unlock()
atomic.StoreInt32(&dialCount, 0)

poolCancelled, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...)
poolCancelled, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create poolCancelled: %v", err)
}
Expand Down Expand Up @@ -1103,7 +1106,7 @@ func TestReplaceConnection(t *testing.T) {
fake.setPingErr(pingErr)
atomic.StoreInt32(&dialCount, 0)

pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, 2, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -1223,7 +1226,7 @@ func TestAddConnections(t *testing.T) {
innerCtx, cancel := context.WithCancel(ctx)
defer cancel()

pool, err := NewBigtableChannelPool(innerCtx, tc.initialSize, btopt.RoundRobin, baseDialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(innerCtx, tc.initialSize, btopt.RoundRobin, baseDialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -1326,7 +1329,7 @@ func TestRemoveConnections(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand All @@ -1348,7 +1351,7 @@ func TestRemoveConnections(t *testing.T) {

t.Run("VerifyOldestIsRemoved", func(t *testing.T) {
poolSize := 5
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -1399,7 +1402,7 @@ func TestConnPoolStatisticsVisitor(t *testing.T) {
addr := setupTestServer(t, fake)
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }

pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -1473,6 +1476,72 @@ func TestConnPoolStatisticsVisitor(t *testing.T) {
}
}

func TestRecordClientStartUp(t *testing.T) {
ctx := context.Background()
reader := metric.NewManualReader()
provider := metric.NewMeterProvider(metric.WithReader(reader))

pool := &BigtableChannelPool{
poolCtx: ctx,
meterProvider: provider,
}

startTime := time.Now().Add(-500 * time.Millisecond)
transportType := "directpath"

pool.recordClientStartUp(startTime, transportType)

rm := metricdata.ResourceMetrics{}
if err := reader.Collect(ctx, &rm); err != nil {
t.Fatalf("Failed to collect metrics: %v", err)
}

if len(rm.ScopeMetrics) == 0 {
t.Fatalf("No scope metrics found")
}
sm := rm.ScopeMetrics[0]
if sm.Scope.Name != clientMeterName {
t.Errorf("Scope name got %q, want %q", sm.Scope.Name, clientMeterName)
}

if len(sm.Metrics) == 0 {
t.Fatalf("No metrics found")
}
m := sm.Metrics[0]

if m.Name != "startup_time" {
t.Errorf("Metric name got %q, want %q", m.Name, "startup_time")
}
if m.Unit != "ms" {
t.Errorf("Metric unit got %q, want %q", m.Unit, "ms")
}

hist, ok := m.Data.(metricdata.Histogram[float64])
if !ok {
t.Fatalf("Metric data is not a Histogram: %T", m.Data)
}

if len(hist.DataPoints) != 1 {
t.Fatalf("Expected 1 data point, got %d", len(hist.DataPoints))
}
dp := hist.DataPoints[0]
expectedAttrs := attribute.NewSet(
attribute.String("transport_type", transportType),
attribute.String("status", "OK"),
)
if !dp.Attributes.Equals(&expectedAttrs) {
t.Errorf("Attributes got %v, want %v", dp.Attributes, expectedAttrs)
}
// Check count on bucket
if dp.Count != 1 {
t.Errorf("Data point count got %d, want 1", dp.Count)
}
// sanity check to see if it is postive.
if dp.Sum <= 0 {
t.Errorf("Expected positive sum, got %f", dp.Sum)
}
}

// --- Benchmarks ---

func createBenchmarkFake() *fakeService {
Expand All @@ -1487,7 +1556,7 @@ func setupBenchmarkPool(b *testing.B, strategy btopt.LoadBalancingStrategy, pool
}

ctx := context.Background()
pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), poolOpts()...)
if err != nil {
b.Fatalf("Failed to create pool: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions bigtable/internal/transport/dynamic_scale_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestDynamicChannelScaling(t *testing.T) {
tc.configOpt(&config)
}

pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand All @@ -153,7 +153,7 @@ func TestDynamicChannelScaling(t *testing.T) {
config.MinScalingInterval = 5 * time.Minute
initialSize := 3

pool, err := NewBigtableChannelPool(ctx, initialSize, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, initialSize, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestDynamicChannelScaling(t *testing.T) {
t.Run("EmptyPoolNoAction", func(t *testing.T) {
config := baseConfig

pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, poolOpts()...)
pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, time.Now(), poolOpts()...)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion bigtable/internal/transport/metrics_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestMetricsExporting(t *testing.T) {

poolSize := 2
strategy := btopt.RoundRobin
pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, WithMeterProvider(provider), WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig()))
pool, err := NewBigtableChannelPool(ctx, poolSize, strategy, dialFunc, time.Now(), WithMeterProvider(provider), WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig()))
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
Expand Down
Loading