Skip to content

Commit 3df973f

Browse files
emit metric for source peer variant (#3587)
Co-authored-by: Copilot <[email protected]>
1 parent 0cfe145 commit 3df973f

File tree

8 files changed

+135
-0
lines changed

8 files changed

+135
-0
lines changed

flow/activities/flowable.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1626,6 +1626,21 @@ func (a *FlowableActivity) GetFlowMetadata(
16261626
Type: peerTypes[input.DestinationName],
16271627
}
16281628
}
1629+
1630+
// Detect source database variant
1631+
if input.SourceName != "" {
1632+
if srcConn, err := connectors.GetByNameAs[connectors.DatabaseVariantConnector](ctx, nil, a.CatalogPool, input.SourceName); err == nil {
1633+
if variant, variantErr := srcConn.GetDatabaseVariant(ctx); variantErr == nil {
1634+
sourcePeer.Variant = variant
1635+
} else {
1636+
logger.Warn("failed to get source database variant", slog.Any("error", variantErr))
1637+
}
1638+
connectors.CloseConnector(ctx, srcConn)
1639+
} else {
1640+
logger.Warn("failed to get source connector to detect database variant", slog.Any("error", err))
1641+
}
1642+
}
1643+
16291644
logger.Debug("loaded peer types for flow", slog.String("flowName", input.FlowName),
16301645
slog.String("sourceName", input.SourceName), slog.String("destinationName", input.DestinationName),
16311646
slog.Any("peerTypes", peerTypes))

flow/connectors/core.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,12 @@ type GetLogRetentionConnector interface {
311311
GetLogRetentionHours(ctx context.Context) (float64, error)
312312
}
313313

314+
type DatabaseVariantConnector interface {
315+
Connector
316+
317+
GetDatabaseVariant(ctx context.Context) (protos.DatabaseVariant, error)
318+
}
319+
314320
func LoadPeerType(ctx context.Context, catalogPool shared.CatalogPool, peerName string) (protos.DBType, error) {
315321
row := catalogPool.QueryRow(ctx, "SELECT type FROM peers WHERE name = $1", peerName)
316322
var dbtype protos.DBType
@@ -639,4 +645,7 @@ var (
639645

640646
_ GetLogRetentionConnector = &connmysql.MySqlConnector{}
641647
_ GetLogRetentionConnector = &connmongo.MongoConnector{}
648+
649+
_ DatabaseVariantConnector = &connpostgres.PostgresConnector{}
650+
_ DatabaseVariantConnector = &connmysql.MySqlConnector{}
642651
)

flow/connectors/mysql/mysql.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"iter"
99
"log/slog"
1010
"net"
11+
"strings"
1112
"sync/atomic"
1213
"time"
1314

@@ -476,3 +477,36 @@ func (c *MySqlConnector) StatActivity(
476477
StatData: statInfoRows,
477478
}, nil
478479
}
480+
481+
func (c *MySqlConnector) GetDatabaseVariant(ctx context.Context) (protos.DatabaseVariant, error) {
482+
query := `SHOW VARIABLES WHERE Variable_name IN ('aurora_version', 'cloudsql_iam_authentication', 'azure_server_name', 'basedir')`
483+
484+
rs, err := c.Execute(ctx, query)
485+
if err != nil {
486+
c.logger.Error("failed to execute SHOW VARIABLES for getting database variant", slog.Any("error", err))
487+
return protos.DatabaseVariant_VARIANT_UNKNOWN, err
488+
}
489+
490+
var basedirValue string
491+
for _, row := range rs.Values {
492+
varName := string(row[0].AsString())
493+
varValue := string(row[1].AsString())
494+
switch varName {
495+
case "aurora_version":
496+
return protos.DatabaseVariant_AWS_AURORA, nil
497+
case "cloudsql_iam_authentication":
498+
return protos.DatabaseVariant_GOOGLE_CLOUD_SQL, nil
499+
case "azure_server_name":
500+
return protos.DatabaseVariant_AZURE_DATABASE, nil
501+
case "basedir":
502+
basedirValue = varValue
503+
}
504+
}
505+
506+
// true for RDS and Aurora, but we check for Aurora via aurora_version above
507+
if strings.Contains(basedirValue, "/rdsdbbin/") {
508+
return protos.DatabaseVariant_AWS_RDS, nil
509+
}
510+
511+
return protos.DatabaseVariant_VARIANT_UNKNOWN, nil
512+
}

flow/connectors/postgres/postgres.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1725,3 +1725,62 @@ func (c *PostgresConnector) GetVersion(ctx context.Context) (string, error) {
17251725
c.logger.Info("[postgres] version", slog.String("version", version))
17261726
return version, nil
17271727
}
1728+
1729+
func (c *PostgresConnector) GetDatabaseVariant(ctx context.Context) (protos.DatabaseVariant, error) {
1730+
// First check for Aurora by trying to call aurora_version()
1731+
var auroraVersion string
1732+
err := c.conn.QueryRow(ctx, "SELECT aurora_version()").Scan(&auroraVersion)
1733+
if err == nil && auroraVersion != "" {
1734+
return protos.DatabaseVariant_AWS_AURORA, nil
1735+
}
1736+
1737+
// If aurora_version() fails, it's not Aurora - continue checking other variants
1738+
settingsQuery := `
1739+
SELECT name, setting
1740+
FROM pg_settings
1741+
WHERE name IN (
1742+
'rds.extensions',
1743+
'cloudsql.logical_decoding',
1744+
'azure.extensions',
1745+
'neon.endpoint_id',
1746+
'extwlist.pscale_allowed_extensions',
1747+
'supautils.privileged_extensions'
1748+
) AND setting IS NOT NULL AND setting != ''`
1749+
1750+
rows, err := c.conn.Query(ctx, settingsQuery)
1751+
if err != nil {
1752+
c.logger.Error("failed to query pg_settings for determining variant", slog.Any("error", err))
1753+
return protos.DatabaseVariant_VARIANT_UNKNOWN, err
1754+
}
1755+
defer rows.Close()
1756+
1757+
for rows.Next() {
1758+
var name, setting string
1759+
if err := rows.Scan(&name, &setting); err != nil {
1760+
c.logger.Warn("failed to scan from pg_settings", slog.Any("error", err))
1761+
continue
1762+
}
1763+
1764+
switch name {
1765+
case "rds.extensions":
1766+
return protos.DatabaseVariant_AWS_RDS, nil
1767+
case "cloudsql.logical_decoding":
1768+
return protos.DatabaseVariant_GOOGLE_CLOUD_SQL, nil
1769+
case "azure.extensions":
1770+
return protos.DatabaseVariant_AZURE_DATABASE, nil
1771+
case "neon.endpoint_id":
1772+
return protos.DatabaseVariant_NEON, nil
1773+
case "extwlist.pscale_allowed_extensions":
1774+
return protos.DatabaseVariant_PLANETSCALE, nil
1775+
case "supautils.privileged_extensions":
1776+
return protos.DatabaseVariant_SUPABASE, nil
1777+
}
1778+
}
1779+
1780+
if err := rows.Err(); err != nil {
1781+
c.logger.Error("error iterating pg_settings rows", slog.Any("error", err))
1782+
return protos.DatabaseVariant_VARIANT_UNKNOWN, err
1783+
}
1784+
1785+
return protos.DatabaseVariant_VARIANT_UNKNOWN, nil
1786+
}

flow/otel_metrics/attributes.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const (
1515
WorkflowTypeKey = "workflowType"
1616
BatchIdKey = "batchId"
1717
SourcePeerType = "sourcePeerType"
18+
SourcePeerVariant = "sourcePeerVariant"
1819
DestinationPeerType = "destinationPeerType"
1920
SourcePeerName = "sourcePeerName"
2021
DestinationPeerName = "destinationPeerName"

flow/otel_metrics/observables.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func buildContextualAttributes(ctx context.Context) metric.MeasurementOption {
110110
if flowMetadata.Source != nil {
111111
attributes = append(attributes,
112112
attribute.Stringer(SourcePeerType, flowMetadata.Source.Type),
113+
attribute.Stringer(SourcePeerVariant, flowMetadata.Source.Variant),
113114
attribute.String(SourcePeerName, flowMetadata.Source.Name))
114115
}
115116
if flowMetadata.Destination != nil {

protos/flow.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,7 @@ message MaintenanceMirrors {
541541
message PeerContextMetadata {
542542
string name = 1;
543543
peerdb_peers.DBType type = 2;
544+
peerdb_peers.DatabaseVariant variant = 3;
544545
}
545546

546547
message FlowContextMetadataInput {

protos/peers.proto

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,21 @@ enum DBType {
274274
DBTYPE_UNKNOWN = -1;
275275
}
276276

277+
enum DatabaseVariant {
278+
// either vanilla or something not listed here
279+
VARIANT_UNKNOWN = 0;
280+
AWS_RDS = 2;
281+
AWS_AURORA = 3;
282+
GOOGLE_CLOUD_SQL = 4;
283+
AZURE_DATABASE = 5;
284+
// only Postgres
285+
NEON = 6;
286+
// has Vitess, but only Postgres for now
287+
PLANETSCALE = 7;
288+
// only Postgres
289+
SUPABASE = 8;
290+
}
291+
277292
message Peer {
278293
string name = 1;
279294
DBType type = 2;

0 commit comments

Comments
 (0)