Skip to content

Commit 344b21d

Browse files
authored
fix(monitoring): prevent catalog pool deadlock in AddCDCBatchTablesForFlow (#3557)
Move PeerDBMetricsRecordAggregatesEnabled check before transaction start to avoid nested catalog pool connection acquisition. Previously, the function would: 1. Acquire connection for transaction (pool.Begin) 2. Call PeerDBMetricsRecordAggregatesEnabled which internally acquires another connection via `dynLookup > GetCatalogConnectionPoolFromEnv` With concurrent calls and pool size of 3, all connections would be held by transactions while each waited for a 4th connection to check the dynamic config, causing deadlock with idle transactions stuck in BEGIN. Fixes deadlock observed with 3 connections in "idle in transaction" state blocking all subsequent catalog operations.
1 parent 5e0a0d7 commit 344b21d

File tree

1 file changed

+6
-4
lines changed

1 file changed

+6
-4
lines changed

flow/connectors/utils/monitoring/monitoring.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ func AddCDCBatchTablesForFlow(
123123
tableNameRowsMapping map[string]*model.RecordTypeCounts,
124124
otelManager *otel_metrics.OtelManager,
125125
) error {
126+
var recordAggregateMetrics bool
127+
if enabled, err := internal.PeerDBMetricsRecordAggregatesEnabled(ctx, nil); err == nil && enabled {
128+
recordAggregateMetrics = true
129+
}
130+
126131
insertBatchTablesTx, err := pool.Begin(ctx)
127132
if err != nil {
128133
return fmt.Errorf("error while beginning transaction for inserting statistics: %w", err)
@@ -132,10 +137,7 @@ func AddCDCBatchTablesForFlow(
132137
op string
133138
count int32
134139
}
135-
var recordAggregateMetrics bool
136-
if enabled, err := internal.PeerDBMetricsRecordAggregatesEnabled(ctx, nil); err == nil && enabled {
137-
recordAggregateMetrics = true
138-
}
140+
139141
var syncedTablesCount int64
140142
tableNameOperations := make(map[string][3]opWithValue, len(tableNameRowsMapping))
141143
for destinationTableName, rowCounts := range tableNameRowsMapping {

0 commit comments

Comments
 (0)