Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 94 additions & 24 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
"github.com/google/uuid"
"golang.org/x/net/proxy"
"google.golang.org/api/option"

tel "cloud.google.com/go/cloudsqlconn/internal/tel"
sqladmin "google.golang.org/api/sqladmin/v1beta4"
)

Expand Down Expand Up @@ -71,6 +73,8 @@ var (
//go:embed version.txt
versionString string
userAgent = "cloud-sql-go-connector/" + strings.TrimSpace(versionString)
// dialerID is a unique ID for the dialer process.
dialerID = uuid.New().String()
)

// keyGenerator encapsulates the details of RSA key generation to provide lazy
Expand Down Expand Up @@ -174,7 +178,9 @@ type Dialer struct {

// dialerID uniquely identifies a Dialer. Used for monitoring purposes,
// *only* when a client has configured OpenCensus exporters.
dialerID string
dialerID string
metricsMu sync.Mutex
metricRecorders map[instance.ConnName]tel.MetricRecorder

// dialFunc is the function used to connect to the address on the named
// network. By default, it is golang.org/x/net/proxy#Dial.
Expand All @@ -190,6 +196,18 @@ type Dialer struct {
// metadataExchangeDisabled true when the dialer should never
// send MDX mdx requests.
metadataExchangeDisabled bool

// applicationName is the name of the application using the dialer.
applicationName string

// disableBuiltInMetrics turns the internal metric export into a no-op.
disableBuiltInMetrics bool

// clientOpts are options for all Google Cloud API clients.
clientOpts []option.ClientOption
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need this anymore, and can just create a client directly with some improvements in the underlying exporter.

See GoogleCloudPlatform/alloydb-go-connector#728.


// userAgent is the combined user agent string.
userAgent string
}

var (
Expand All @@ -208,11 +226,12 @@ func (nullLogger) Debugf(_ context.Context, _ string, _ ...interface{}) {}
// RSA keypair is generated will be faster.
func NewDialer(ctx context.Context, opts ...Option) (*Dialer, error) {
cfg := &dialerConfig{
refreshTimeout: cloudsql.RefreshTimeout,
dialFunc: proxy.Dial,
logger: nullLogger{},
useragents: []string{userAgent},
failoverPeriod: cloudsql.FailoverPeriod,
refreshTimeout: cloudsql.RefreshTimeout,
dialFunc: proxy.Dial,
logger: nullLogger{},
useragents: []string{userAgent},
failoverPeriod: cloudsql.FailoverPeriod,
applicationName: "unknown",
}
for _, opt := range opts {
opt(cfg)
Expand Down Expand Up @@ -318,17 +337,49 @@ func NewDialer(ctx context.Context, opts ...Option) (*Dialer, error) {
sqladmin: client,
logger: cfg.logger,
defaultDialConfig: dc,
dialerID: uuid.New().String(),
dialerID: dialerID,
iamTokenProvider: cfg.iamLoginTokenProvider,
metricRecorders: map[instance.ConnName]tel.MetricRecorder{},
dialFunc: cfg.dialFunc,
resolver: r,
failoverPeriod: cfg.failoverPeriod,
metadataExchangeDisabled: cfg.metadataExchangeDisabled,
userAgent: strings.Join(cfg.useragents, " "),
applicationName: cfg.applicationName,
}

// print dialer id to terminal for debugging purposes
fmt.Println("Cloud SQL Go Connector Dialer ID:", d.dialerID)

return d, nil
}

// metricRecorder does a lazy initialization of the metric exporter.
func (d *Dialer) metricRecorder(ctx context.Context, inst instance.ConnName) tel.MetricRecorder {
d.metricsMu.Lock()
defer d.metricsMu.Unlock()
if mr, ok := d.metricRecorders[inst]; ok {
return mr
}
cfg := tel.Config{
Enabled: !d.disableBuiltInMetrics,
Version: versionString,
ResourceContainer: inst.Project(),
ResourceID: inst.Name(),
ClientUID: d.dialerID,
ApplicationName: d.applicationName,
Region: inst.Region(),
ClientRegion: "Client-Region-Testing", // TODO: detect client region
ComputePlatform: "Compute-Platform-Testing", // TODO: detect compute platform
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConnectorType: tel.ConnectorTypeValue(d.userAgent),
ConnectorVersion: versionString,
DatabaseEngineType: "DB-Engine-Type-Testing", // TODO: detect database engine type
}
mr := tel.NewMetricRecorder(ctx, d.logger, cfg, d.clientOpts...)
d.metricRecorders[inst] = mr
return mr
}

// Dial returns a net.Conn connected to the specified Cloud SQL instance. The
// icn argument may be the instance's connection name in the format
// "project-name:region:instance-name" or a DNS name that resolves to an
Expand All @@ -339,8 +390,29 @@ func (d *Dialer) Dial(ctx context.Context, icn string, opts ...DialOption) (conn
return nil, ErrDialerClosed
default:
}
cfg := d.defaultDialConfig
for _, opt := range opts {
opt(&cfg)
}

// Resolve the instance connection name to a ConnName struct.
// Note: icn may be a domain name that resolves to an instance connection name.
cn, err := d.resolver.Resolve(ctx, icn)
if err != nil {
return nil, err
}
mr := d.metricRecorder(ctx, cn)

startTime := time.Now()
var endDial trace.EndSpanFunc
attrs := tel.Attributes{
IAMAuthN: cfg.useIAMAuthN,
RefreshType: tel.RefreshAheadType,
IPType: cfg.ipType,
}
if d.lazyRefresh {
attrs.RefreshType = tel.RefreshLazyType
}
ctx, endDial = trace.StartSpan(ctx, "cloud.google.com/go/cloudsqlconn.Dial",
trace.AddInstanceName(icn),
trace.AddDialerID(d.dialerID),
Expand All @@ -349,10 +421,6 @@ func (d *Dialer) Dial(ctx context.Context, icn string, opts ...DialOption) (conn
trace.RecordDialError(context.Background(), icn, d.dialerID, err)
endDial(err)
}()
cn, err := d.resolver.Resolve(ctx, icn)
if err != nil {
return nil, err
}

// Log if resolver changed the instance name input string.
if cn.DomainName() != "" {
Expand All @@ -363,14 +431,10 @@ func (d *Dialer) Dial(ctx context.Context, icn string, opts ...DialOption) (conn
d.logger.Debugf(ctx, "resolved instance connection string %s to %s", icn, cn.String())
}

cfg := d.defaultDialConfig
for _, opt := range opts {
opt(&cfg)
}

var endInfo trace.EndSpanFunc
ctx, endInfo = trace.StartSpan(ctx, "cloud.google.com/go/cloudsqlconn/internal.InstanceInfo")
c, err := d.connectionInfoCache(ctx, cn, &cfg.useIAMAuthN)
c, cacheHit, err := d.connectionInfoCache(ctx, cn, &cfg.useIAMAuthN)
attrs.CacheHit = cacheHit
if err != nil {
endInfo(err)
return nil, err
Expand Down Expand Up @@ -453,10 +517,16 @@ func (d *Dialer) Dial(ctx context.Context, icn string, opts ...DialOption) (conn
n := c.openConnsCount.Add(1)
trace.RecordOpenConnections(ctx, int64(n), d.dialerID, cn.String())
trace.RecordDialLatency(ctx, icn, d.dialerID, latency)
mr.RecordOpenConnection(ctx, attrs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bug in the AlloyDB implementation around open connections, but wow I can't find it yet. Beware!

Copy link
Contributor Author

@panavenue panavenue Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I think I might know what the potential bug -> The GAUGE metric always have more "open" than "close", because we are sending the metric ONLY on every 60s, and when the application shut down, we never record the "-1" of the open_conn GAUGE metric.

Is this the potential bug? (I was testing on how to go around this, but it's gonna be tough to mitigate this one)

mr.RecordConnectLatencies(ctx, attrs, latency)

closeFunc := func() {
n := c.openConnsCount.Add(^uint64(0)) // c.openConnsCount = c.openConnsCount - 1
trace.RecordOpenConnections(context.Background(), int64(n), d.dialerID, cn.String())
mr.RecordClosedConnection(context.Background(), attrs)
mr.RecordClosedConnectionCount(context.Background(), attrs)
// lot the message to terminal for debugging purposes
fmt.Println("Cloud SQL Go Connector Dialer ID:", d.dialerID, "closed connection to instance:", cn.String())
}
errFunc := func(err error) {
// io.EOF occurs when the server closes the connection. This is safe to
Expand Down Expand Up @@ -553,7 +623,7 @@ func (d *Dialer) EngineVersion(ctx context.Context, icn string) (string, error)
if err != nil {
return "", err
}
c, err := d.connectionInfoCache(ctx, cn, &d.defaultDialConfig.useIAMAuthN)
c, _, err := d.connectionInfoCache(ctx, cn, &d.defaultDialConfig.useIAMAuthN)
if err != nil {
return "", err
}
Expand All @@ -577,7 +647,7 @@ func (d *Dialer) Warmup(ctx context.Context, icn string, opts ...DialOption) err
for _, opt := range opts {
opt(&cfg)
}
c, err := d.connectionInfoCache(ctx, cn, &cfg.useIAMAuthN)
c, _, err := d.connectionInfoCache(ctx, cn, &cfg.useIAMAuthN)
if err != nil {
return err
}
Expand Down Expand Up @@ -724,7 +794,7 @@ func createKey(cn instance.ConnName) cacheKey {
// modify the existing one, or leave it unchanged as needed.
func (d *Dialer) connectionInfoCache(
ctx context.Context, cn instance.ConnName, useIAMAuthN *bool,
) (*monitoredCache, error) {
) (*monitoredCache, bool, error) {
k := createKey(cn)

d.lock.RLock()
Expand All @@ -733,7 +803,7 @@ func (d *Dialer) connectionInfoCache(

if ok && !c.isClosed() {
c.UpdateRefresh(useIAMAuthN)
return c, nil
return c, ok, nil
}

d.lock.Lock()
Expand All @@ -745,7 +815,7 @@ func (d *Dialer) connectionInfoCache(
// c exists and is not closed
if ok && !c.isClosed() {
c.UpdateRefresh(useIAMAuthN)
return c, nil
return c, ok, nil
}

// Create a new instance of monitoredCache
Expand All @@ -756,7 +826,7 @@ func (d *Dialer) connectionInfoCache(
d.logger.Debugf(ctx, "[%v] Connection info added to cache", cn.String())
rsaKey, err := d.keyGenerator.rsaKey()
if err != nil {
return nil, err
return nil, ok, err
}
var cache connectionInfoCache
if d.lazyRefresh {
Expand All @@ -779,7 +849,7 @@ func (d *Dialer) connectionInfoCache(
c = newMonitoredCache(cache, cn, d.failoverPeriod, d.resolver, d.logger)
d.cache[k] = c

return c, nil
return c, ok, nil
}

// newMDXRequest builds a metadata exchange request based on the connection
Expand Down
12 changes: 10 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@ toolchain go1.25.3
require (
cloud.google.com/go/auth v0.17.0
cloud.google.com/go/auth/oauth2adapt v0.2.8
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.54.0
github.com/go-sql-driver/mysql v1.9.3
github.com/google/uuid v1.6.0
github.com/jackc/pgx/v4 v4.18.3
github.com/jackc/pgx/v5 v5.7.6
github.com/microsoft/go-mssqldb v1.9.3
go.opencensus.io v0.24.0
go.opentelemetry.io/otel v1.38.0
go.opentelemetry.io/otel/metric v1.38.0
go.opentelemetry.io/otel/sdk v1.38.0
go.opentelemetry.io/otel/sdk/metric v1.38.0
golang.org/x/net v0.46.0
golang.org/x/oauth2 v0.32.0
golang.org/x/time v0.14.0
Expand All @@ -24,7 +29,9 @@ require (

require (
cloud.google.com/go/compute/metadata v0.9.0 // indirect
cloud.google.com/go/monitoring v1.24.2 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.54.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand All @@ -43,12 +50,13 @@ require (
github.com/jackc/pgtype v1.14.4 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect
go.opentelemetry.io/otel v1.38.0 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
go.opentelemetry.io/otel/trace v1.38.0 // indirect
golang.org/x/crypto v0.43.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/sys v0.37.0 // indirect
golang.org/x/text v0.30.0 // indirect
google.golang.org/genproto v0.0.0-20250922171735-9219d122eba9 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250922171735-9219d122eba9 // indirect
)
Loading
Loading