-
Notifications
You must be signed in to change notification settings - Fork 3.5k
feat: retry connections on network errors. Fixes #15011 #15006
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: isubasinghe <[email protected]>
Signed-off-by: isubasinghe <[email protected]>
Signed-off-by: isubasinghe <[email protected]>
Signed-off-by: isubasinghe <[email protected]>
Signed-off-by: isubasinghe <[email protected]>
Signed-off-by: isubasinghe <[email protected]>
Signed-off-by: isubasinghe <[email protected]>
Signed-off-by: isubasinghe <[email protected]>
Signed-off-by: isubasinghe <[email protected]>
Signed-off-by: isubasinghe <[email protected]>
Signed-off-by: isubasinghe <[email protected]>
11884e0 to
e5f7227
Compare
Signed-off-by: isubasinghe <[email protected]>
Signed-off-by: isubasinghe <[email protected]>
Signed-off-by: isubasinghe <[email protected]>
|
Not sure how to test this unfortunately, any recommendations here? Maybe spin up test containers, spin down postgres and then spin it up again? EDIT: I have a testing solution, going to write up a test logger to collect logs, spin down postgres, witness the logs, spin up postgres and witness the logs indicating recovery. |
Signed-off-by: isubasinghe <[email protected]>
Signed-off-by: isubasinghe <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds automatic retry and reconnection capabilities for database connections on network errors to address issue #15011. The implementation introduces a SessionProxy wrapper around upperdb sessions that detects network errors and automatically attempts to reconnect and retry operations.
Key Changes:
- Introduced
SessionProxywith automatic reconnection on network errors - Refactored all database operations to use
SessionProxyinstead of rawdb.Session - Added configurable retry parameters (
DBReconnectConfig) with exponential backoff - Updated sync manager, workflow archive, and persistence layers to use the new proxy pattern
Reviewed Changes
Copilot reviewed 35 out of 35 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
util/sqldb/session.go |
Core SessionProxy implementation with retry/reconnection logic |
util/sqldb/session_test.go |
Tests for SessionProxy reconnection behavior |
util/sync/db/queries.go |
Refactored all query methods to use SessionProxy.With() pattern |
util/sync/db/config.go |
Updated DBInfo to use SessionProxy instead of Session |
workflow/sync/sync_manager.go |
Updated to create and use SessionProxy, simplified transaction handling |
workflow/sync/transaction.go |
Changed to hold SessionProxy instead of raw Session |
workflow/sync/database_semaphore.go |
Updated all database calls to use SessionProxy |
workflow/sync/database_mutex.go |
Updated to use SessionProxy |
persist/sqldb/workflow_archive.go |
Wrapped all database operations with SessionProxy.With() |
persist/sqldb/offload_node_status_repo.go |
Updated to use SessionProxy pattern |
workflow/controller/controller.go |
Updated to create SessionProxy for persistence |
workflow/controller/config.go |
Updated configuration to use SessionProxy |
server/apiserver/argoserver.go |
Updated to use SessionProxy |
config/config.go |
Added DBReconnectConfig for retry configuration |
docs/workflow-controller-configmap.md |
Documented new DBReconnectConfig options |
| Multiple test files | Updated all test instantiations to pass ensureDBConnection parameter |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (sp *SessionProxy) TxWith(ctx context.Context, fn func(*SessionProxy) error, opts *sql.TxOptions) error { | ||
| return sp.With(ctx, func(s db.Session) error { | ||
| return s.TxContext(ctx, func(sess db.Session) error { | ||
| newSp := SessionProxy{sp.kubectlConfig, sp.namespace, sp.dbConfig, sp.username, sp.password, sess, sync.RWMutex{}, sp.closed, sp.maxRetries, sp.baseDelay, sp.maxDelay, sp.retryMultiple, true} |
Copilot
AI
Nov 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In TxWith, line 147 creates a new SessionProxy with insideTransaction set to true in the struct literal (position 12), but the parent's sp.insideTransaction value is used in position 11. This is the same issue as in the Tx() method. The value at position 12 should be true, but the struct initialization is confusing. Consider using named fields in the struct literal for clarity and correctness.
| sp.mu.RLock() | ||
| if sp.closed { | ||
| sp.mu.RUnlock() | ||
| logger.Warn(ctx, "session proxy is closed") | ||
| return fmt.Errorf("session proxy is closed") | ||
| } | ||
| sp.mu.RUnlock() | ||
|
|
||
| sp.mu.RLock() | ||
| sess := sp.sess | ||
| sp.mu.RUnlock() | ||
|
|
||
| if sess == nil { | ||
| return fmt.Errorf("no active session") | ||
| } | ||
|
|
||
| err := fn(sess) | ||
| if err == nil { | ||
| return nil | ||
| } | ||
|
|
||
| // If it's not a network error or inside a tx do not retry | ||
| if !sp.isNetworkError(err) || sp.insideTransaction { | ||
| return err | ||
| } | ||
|
|
||
| if reconnectErr := sp.Reconnect(ctx); reconnectErr != nil { | ||
| return fmt.Errorf("operation failed and reconnection failed: %w", reconnectErr) | ||
| } | ||
|
|
||
| sp.mu.RLock() | ||
| sess = sp.sess | ||
| sp.mu.RUnlock() | ||
|
|
||
| if sess == nil { | ||
| return fmt.Errorf("no active session after reconnection") | ||
| } | ||
|
|
||
| if retryErr := fn(sess); retryErr != nil { | ||
| return fmt.Errorf("operation failed after reconnection: %w", retryErr) | ||
| } | ||
|
|
||
| return nil |
Copilot
AI
Nov 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The retry logic in With() only retries once after reconnection. If the second attempt also fails due to a network error, it will return an error without attempting to reconnect again. Consider implementing a retry loop that respects maxRetries for consistency with the reconnection logic in Reconnect().
| sp.mu.RLock() | |
| if sp.closed { | |
| sp.mu.RUnlock() | |
| logger.Warn(ctx, "session proxy is closed") | |
| return fmt.Errorf("session proxy is closed") | |
| } | |
| sp.mu.RUnlock() | |
| sp.mu.RLock() | |
| sess := sp.sess | |
| sp.mu.RUnlock() | |
| if sess == nil { | |
| return fmt.Errorf("no active session") | |
| } | |
| err := fn(sess) | |
| if err == nil { | |
| return nil | |
| } | |
| // If it's not a network error or inside a tx do not retry | |
| if !sp.isNetworkError(err) || sp.insideTransaction { | |
| return err | |
| } | |
| if reconnectErr := sp.Reconnect(ctx); reconnectErr != nil { | |
| return fmt.Errorf("operation failed and reconnection failed: %w", reconnectErr) | |
| } | |
| sp.mu.RLock() | |
| sess = sp.sess | |
| sp.mu.RUnlock() | |
| if sess == nil { | |
| return fmt.Errorf("no active session after reconnection") | |
| } | |
| if retryErr := fn(sess); retryErr != nil { | |
| return fmt.Errorf("operation failed after reconnection: %w", retryErr) | |
| } | |
| return nil | |
| var lastErr error | |
| for attempt := 0; attempt <= sp.maxRetries; attempt++ { | |
| sp.mu.RLock() | |
| if sp.closed { | |
| sp.mu.RUnlock() | |
| logger.Warn(ctx, "session proxy is closed") | |
| return fmt.Errorf("session proxy is closed") | |
| } | |
| sess := sp.sess | |
| sp.mu.RUnlock() | |
| if sess == nil { | |
| return fmt.Errorf("no active session") | |
| } | |
| err := fn(sess) | |
| if err == nil { | |
| return nil | |
| } | |
| // If it's not a network error or inside a tx do not retry | |
| if !sp.isNetworkError(err) || sp.insideTransaction { | |
| return err | |
| } | |
| lastErr = err | |
| logger.WithField("attempt_number", attempt).Warn(ctx, fmt.Sprintf("operation failed with network error: %v", err)) | |
| // If this is the last attempt, break | |
| if attempt == sp.maxRetries { | |
| break | |
| } | |
| if reconnectErr := sp.Reconnect(ctx); reconnectErr != nil { | |
| return fmt.Errorf("operation failed and reconnection failed: %w", reconnectErr) | |
| } | |
| // Exponential backoff before next retry, similar to Reconnect | |
| delay := time.Duration(float64(sp.baseDelay) * float64(attempt+1) * sp.retryMultiple) | |
| delay = min(delay, sp.maxDelay) | |
| select { | |
| case <-ctx.Done(): | |
| return ctx.Err() | |
| case <-time.After(delay): | |
| } | |
| } | |
| return fmt.Errorf("operation failed after %d retries, last error: %w", sp.maxRetries, lastErr) |
| s := SessionProxy{sp.kubectlConfig, sp.namespace, sp.dbConfig, sp.username, sp.password, sp.Session(), sync.RWMutex{}, sp.closed, sp.maxRetries, sp.baseDelay, sp.maxDelay, sp.retryMultiple, sp.insideTransaction} | ||
| s.insideTransaction = true | ||
| return &s | ||
| } | ||
|
|
Copilot
AI
Nov 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Tx() method creates a new SessionProxy with insideTransaction = true, but it doesn't properly copy the field. The statement s.insideTransaction at the end should copy the value from the parent, but then immediately sets it to true on the next line. However, there's a bug: the field at position 11 in the struct literal is sp.insideTransaction, which is the parent's value (likely false), not the new struct's field. This means the transaction flag won't be properly set.
The corrected line 138 should be:
s := SessionProxy{sp.kubectlConfig, sp.namespace, sp.dbConfig, sp.username, sp.password, sp.Session(), sync.RWMutex{}, sp.closed, sp.maxRetries, sp.baseDelay, sp.maxDelay, sp.retryMultiple, false}| s := SessionProxy{sp.kubectlConfig, sp.namespace, sp.dbConfig, sp.username, sp.password, sp.Session(), sync.RWMutex{}, sp.closed, sp.maxRetries, sp.baseDelay, sp.maxDelay, sp.retryMultiple, sp.insideTransaction} | |
| s.insideTransaction = true | |
| return &s | |
| } | |
| s := SessionProxy{sp.kubectlConfig, sp.namespace, sp.dbConfig, sp.username, sp.password, sp.Session(), sync.RWMutex{}, sp.closed, sp.maxRetries, sp.baseDelay, sp.maxDelay, sp.retryMultiple, true} | |
| return &s | |
| } |
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughThis pull request implements database query resilience by introducing a SessionProxy abstraction that automatically reconnects and retries failed queries on network errors. The feature includes configurable retry parameters (max retries, backoff delays, multiplier), network error detection, and integration across persistence, sync, and controller layers. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Application<br/>(Query)
participant SP as SessionProxy
participant DB as Database
participant Net as Network
rect rgb(220, 240, 250)
Note over Client,Net: Normal Query Flow
Client->>SP: With(ctx, func(session))
SP->>SP: Check if in transaction
SP->>DB: Execute query via session
DB-->>SP: Result
SP-->>Client: Success
end
rect rgb(250, 240, 220)
Note over Client,Net: Network Error & Reconnect Flow
Client->>SP: With(ctx, func(session))
SP->>DB: Execute query via session
DB-xNet: Network error occurs
Net-->>SP: Error (timeout/connection lost)
SP->>SP: isNetworkError(err) → true
SP->>SP: Reconnect(ctx)<br/>with exponential backoff
SP->>SP: Attempt 1: wait baseDelay ms
SP->>Net: New connection attempt
alt Connection succeeds
Net-->>DB: Connected
SP->>DB: Retry query
DB-->>SP: Result
SP-->>Client: Success (after reconnect)
else Max retries exhausted
SP->>SP: Close old session
SP-->>Client: Error (retry limit reached)
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Areas requiring extra attention:
Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
workflow/controller/controller.go (1)
130-135: Handle lock‑manager creation failures to avoid nilsyncManagerpanics
createSynchronizationManagerlogs the error fromsync.NewLockManagerbut leaveswfc.syncManagerpotentially nil. Later calls such aswfc.initManagers(ctx)(which doeswfc.syncManager.Initialize(...)) and the backgroundwfc.syncManager.CheckWorkflowExistencegoroutine assume a non‑nil manager and will panic if the DB connection fails during creation.It would be safer to treat this as a fatal startup error and stop
Runearly rather than continuing with a nilsyncManager. One way to do this is to return an error fromcreateSynchronizationManagerand bail out inRun:-func (wfc *WorkflowController) createSynchronizationManager(ctx context.Context) { +func (wfc *WorkflowController) createSynchronizationManager(ctx context.Context) error { @@ - var err error - wfc.syncManager, err = sync.NewLockManager(ctx, wfc.kubeclientset, wfc.namespace, wfc.Config.Synchronization, getSyncLimit, nextWorkflow, isWFDeleted, true) - - if err != nil { - logging.RequireLoggerFromContext(ctx).WithError(err).Error(ctx, "Failed to create lock manager") - } + var err error + wfc.syncManager, err = sync.NewLockManager(ctx, wfc.kubeclientset, wfc.namespace, wfc.Config.Synchronization, getSyncLimit, nextWorkflow, isWFDeleted, true) + if err != nil { + logging.RequireLoggerFromContext(ctx).WithError(err).WithFatal().Error(ctx, "Failed to create lock manager") + return err + } + return nil }and in
Run:- // Create Synchronization Manager - wfc.createSynchronizationManager(ctx) + // Create Synchronization Manager + if err := wfc.createSynchronizationManager(ctx); err != nil { + return + }Also applies to: 401-444
test/e2e/fixtures/persistence.go (1)
31-54: Potential resource leak:sessionProxyis not closed.The code creates both a raw
session(line 31) and asessionProxy(lines 41-45). Thesessionis stored and closed inClose(), butsessionProxyis only passed to the repos and never explicitly closed. IfSessionProxymanages its own DB connection internally, this could leak connections.Additionally, creating both may result in maintaining two separate database connections when only one is needed.
Consider either:
- Close the
sessionProxyinClose()method, or- Remove the raw
sessioncreation ifsessionProxycan provide the underlying session for cleanup#!/bin/bash # Check if SessionProxy has a Close method or manages connection lifecycle ast-grep --pattern $'type SessionProxy struct { $$$ }' rg -n "func.*SessionProxy.*Close|sessionProxy.*Close" util/sqldb/
🧹 Nitpick comments (13)
docs/workflow-controller-configmap.md (1)
238-250: Consider documenting default values and fix capitalization inconsistency.The
DBReconnectConfigdocumentation is clear, but consider these improvements:
Default values: Users would benefit from knowing the default values for these fields (e.g., "MaxRetries defaults to 5"). Based on the code snippet in
util/sqldb/session.go, the defaults appear to be: maxRetries=5, baseDelay=100ms, maxDelay=30s, retryMultiple=2.0.Capitalization: Line 247 uses lowercase
retryMultiplewhen referring to the field nameRetryMultiple.Apply this diff to document defaults and fix capitalization:
## DBReconnectConfig DBReconnectConfig contains database reconnect settings ### Fields | Field Name | Field Type | Description | |--------------------|------------|-------------------------------------------------------------------------------------------------------------| -| `MaxRetries` | `int` | MaxRetries defines how many connection attempts should be made before we give up | -| `BaseDelaySeconds` | `int` | BaseDelaySeconds delays retries by this amount multiplied by the retryMultiple, capped to `maxDelaySeconds` | -| `MaxDelaySeconds` | `int` | MaxDelaySeconds the absolute upper limit to wait before retrying | -| `RetryMultiple` | `float64` | RetryMultiple is the growth factor for `baseDelaySeconds` | +| `MaxRetries` | `int` | MaxRetries defines how many connection attempts should be made before we give up (default: 5) | +| `BaseDelaySeconds` | `int` | BaseDelaySeconds delays retries by this amount multiplied by the `RetryMultiple`, capped to `MaxDelaySeconds` (default: 0.1 seconds) | +| `MaxDelaySeconds` | `int` | MaxDelaySeconds the absolute upper limit to wait before retrying (default: 30 seconds) | +| `RetryMultiple` | `float64` | RetryMultiple is the growth factor for `BaseDelaySeconds` (default: 2.0) |server/sync/sync_server.go (1)
37-44: Consider logging session proxy creation errors.The session proxy creation error is silently ignored (line 42), causing the DATABASE sync provider to not be registered. While graceful degradation to CONFIGMAP-only sync is reasonable, users might not realize their database sync configuration failed.
Consider logging the error to help with troubleshooting:
if syncConfig != nil && syncConfig.EnableAPI { sessionProxy, err := sqldb.NewSessionProxy(ctx, sqldb.SessionProxyConfig{ KubectlConfig: kubectlConfig, Namespace: namespace, DBConfig: syncConfig.DBConfig, }) if err == nil { server.providers[syncpkg.SyncConfigType_DATABASE] = &dbSyncProvider{db: syncdb.NewSyncQueries(sessionProxy, syncdb.DBConfigFromConfig(syncConfig))} + } else { + log := logging.RequireLoggerFromContext(ctx) + log.WithError(err).Warn(ctx, "Failed to create database session proxy for sync, DATABASE sync provider will not be available") } }workflow/controller/config.go (1)
36-77: Persistence now usesSessionProxy; consider behavior on DB config changesThe new wiring that creates a single
wfc.sessionProxy(used for offload repo, archive, and migrations) is consistent with the SessionProxy abstraction and gives persistence the same reconnect/retry behavior as the rest of the system.However, because
updateConfigonly creates the proxy whenwfc.sessionProxy == nilandsqldb.NewSessionProxytakes a copy ofpersistence.DBConfig, subsequent changes to persistence DB settings in the controller ConfigMap won’t affect an already‑running controller: it will keep using the original connection parameters until restart.If supporting hot changes to persistence DB/reconnect settings is important, consider:
- detecting relevant changes to
persistence.DBConfigand- recreating the SessionProxy (closing the old one) when they occur, so that
initDBand later operations run against the updated configuration.Also applies to: 93-107
util/sqldb/session_test.go (2)
21-26: Fixed port may cause test flakiness in parallel runs.Using a hardcoded port (
15432) can cause conflicts when tests run in parallel or when the port isn't released quickly after container termination. Consider using dynamic port allocation with a mechanism to pass the new port to theSessionProxyafter reconnection, or document that this test must run serially.
100-124: Test assertions inside goroutine may not fail the test correctly.When
assert.NoErrororassert.Truefail inside a goroutine after the main test function has returned, the failure may not be properly reported. The test does wait via<-doneChan, but if the assertions fail,t.FailNow()won't work correctly from a goroutine.Consider capturing errors and asserting in the main goroutine:
doneChan := make(chan struct{}) + var goroutineErr error + var sawError bool go func() { - hasSeenErr := false outerErr := sessionProxy.With(ctx, func(s db.Session) error { innerErr := s.Ping() if innerErr != nil { - hasSeenErr = true + sawError = true } return innerErr }) - assert.NoError(t, outerErr) - assert.True(t, hasSeenErr) + goroutineErr = outerErr doneChan <- struct{}{} }() newDBConfig, cancel, err := setupPostgresContainer(ctx, t) require.NoError(t, err) assert.Equal(t, cfg.PostgreSQL.Host, newDBConfig.PostgreSQL.Host) assert.Equal(t, cfg.PostgreSQL.Port, newDBConfig.PostgreSQL.Port) <-doneChan + require.NoError(t, goroutineErr) + assert.True(t, sawError) cancel()persist/sqldb/workflow_archive.go (1)
290-357: Consider extracting duplicated name filter logic.The name filter logic (lines 311-325 and similar blocks in
countWorkflowsOptimizedandHasMoreWorkflows) is duplicated across multiple methods. Consider extracting to a helper function for maintainability.func applyNameFilter(selector db.Selector, options sutils.ListOptions) db.Selector { if options.Name == "" { return selector } nameFilter := options.NameFilter if nameFilter == "" { nameFilter = "Exact" } switch nameFilter { case "Exact": return selector.And(nameEqual(options.Name)) case "Contains": return selector.And(nameContainsClause(options.Name)) case "Prefix": return selector.And(namePrefixClause(options.Name)) default: return selector } }util/sqldb/session.go (3)
137-141: Redundant assignment in Tx() method.Line 138 initializes
s.insideTransactionwithsp.insideTransaction(position 12 in struct literal), then line 139 immediately overwrites it withtrue. This is confusing but functionally correct. Consider using named fields for clarity.func (sp *SessionProxy) Tx() *SessionProxy { - s := SessionProxy{sp.kubectlConfig, sp.namespace, sp.dbConfig, sp.username, sp.password, sp.Session(), sync.RWMutex{}, sp.closed, sp.maxRetries, sp.baseDelay, sp.maxDelay, sp.retryMultiple, sp.insideTransaction} - s.insideTransaction = true - return &s + return &SessionProxy{ + kubectlConfig: sp.kubectlConfig, + namespace: sp.namespace, + dbConfig: sp.dbConfig, + username: sp.username, + password: sp.password, + sess: sp.Session(), + closed: sp.closed, + maxRetries: sp.maxRetries, + baseDelay: sp.baseDelay, + maxDelay: sp.maxDelay, + retryMultiple: sp.retryMultiple, + insideTransaction: true, + } }
143-151: Consider using named fields in TxWith for consistency.Similar to
Tx(), using named fields would improve readability and reduce maintenance risk when fields are added/reordered.func (sp *SessionProxy) TxWith(ctx context.Context, fn func(*SessionProxy) error, opts *sql.TxOptions) error { return sp.With(ctx, func(s db.Session) error { return s.TxContext(ctx, func(sess db.Session) error { - newSp := SessionProxy{sp.kubectlConfig, sp.namespace, sp.dbConfig, sp.username, sp.password, sess, sync.RWMutex{}, sp.closed, sp.maxRetries, sp.baseDelay, sp.maxDelay, sp.retryMultiple, true} + newSp := SessionProxy{ + kubectlConfig: sp.kubectlConfig, + namespace: sp.namespace, + dbConfig: sp.dbConfig, + username: sp.username, + password: sp.password, + sess: sess, + closed: sp.closed, + maxRetries: sp.maxRetries, + baseDelay: sp.baseDelay, + maxDelay: sp.maxDelay, + retryMultiple: sp.retryMultiple, + insideTransaction: true, + } return fn(&newSp) }, opts) }) }
236-281: With() only retries once after reconnection.If the second attempt also fails with a network error, no further retries are attempted. This may be intentional to avoid indefinite retry loops, but consider documenting this behavior or aligning with
maxRetriesconfiguration.The current single-retry approach may be intentional for simplicity and to avoid long blocking. If this is the desired behavior, consider adding a doc comment clarifying that
With()performs at most one retry after reconnection, whileReconnect()itself usesmaxRetriesfor the connection attempt.-// With executes with a db Session +// With executes fn with a db.Session. On network errors, it attempts one reconnection +// (which itself may retry up to maxRetries times) and retries the operation once. +// Operations inside transactions (insideTransaction=true) are not retried. func (sp *SessionProxy) With(ctx context.Context, fn func(db.Session) error) error {util/sync/db/queries.go (4)
63-86: Consider consolidating duplicate method pairs in the interface.The interface contains duplicate method pairs with identical signatures:
GetPendingInQueue(line 72) vsGetPendingInQueueWithSession(line 83)UpdateStateToHeld(line 70) vsUpdateStateToHeldWithSession(line 84)InsertHeldState(line 71) vsInsertHeldStateWithSession(line 85)Both variants accept
*sqldb.SessionProxyand have the same parameters. Consider removing theWithSessionsuffix variants and keeping only one set of methods, as the distinction is no longer meaningful after the migration.
263-277: Duplicate implementations:GetPendingInQueueandGetPendingInQueueWithSession.These two methods have identical signatures and implementations. Since both accept
*sqldb.SessionProxyas a parameter, one should be removed. TheWithSessionvariant at lines 364-377 could simply delegate toGetPendingInQueueor be removed entirely.-func (q *syncQueries) GetPendingInQueueWithSession(ctx context.Context, session *sqldb.SessionProxy, semaphoreName, holderKey, controllerName string) ([]StateRecord, error) { - var pending []StateRecord - err := session.With(ctx, func(s db.Session) error { - err := s.SQL(). - Select(StateKeyField). - From(q.config.StateTable). - Where(db.Cond{StateNameField: semaphoreName}). - And(db.Cond{StateKeyField: holderKey}). - And(db.Cond{StateControllerField: controllerName}). - And(db.Cond{StateHeldField: false}). - All(&pending) - return err - }) - return pending, err +func (q *syncQueries) GetPendingInQueueWithSession(ctx context.Context, session *sqldb.SessionProxy, semaphoreName, holderKey, controllerName string) ([]StateRecord, error) { + return q.GetPendingInQueue(ctx, session, semaphoreName, holderKey, controllerName) }Also applies to: 364-377
243-254: Duplicate implementations:UpdateStateToHeldandUpdateStateToHeldWithSession.These methods are identical in signature and implementation. Consider having
UpdateStateToHeldWithSessiondelegate toUpdateStateToHeld:-func (q *syncQueries) UpdateStateToHeldWithSession(ctx context.Context, session *sqldb.SessionProxy, semaphoreName, holderKey, controllerName string) error { - return session.With(ctx, func(s db.Session) error { - _, err := s.SQL().Update(q.config.StateTable). - Set(StateHeldField, true). - Where(db.Cond{StateNameField: semaphoreName}). - And(db.Cond{StateKeyField: holderKey}). - And(db.Cond{StateControllerField: controllerName}). - And(db.Cond{StateHeldField: false}). - Exec() - return err - }) +func (q *syncQueries) UpdateStateToHeldWithSession(ctx context.Context, session *sqldb.SessionProxy, semaphoreName, holderKey, controllerName string) error { + return q.UpdateStateToHeld(ctx, session, semaphoreName, holderKey, controllerName) }Also applies to: 380-391
256-261: Duplicate implementations:InsertHeldStateandInsertHeldStateWithSession.Same duplication pattern as the other methods. Consider delegating:
-func (q *syncQueries) InsertHeldStateWithSession(ctx context.Context, session *sqldb.SessionProxy, record *StateRecord) error { - return session.With(ctx, func(s db.Session) error { - _, err := s.Collection(q.config.StateTable).Insert(record) - return err - }) +func (q *syncQueries) InsertHeldStateWithSession(ctx context.Context, session *sqldb.SessionProxy, record *StateRecord) error { + return q.InsertHeldState(ctx, session, record) }Also applies to: 393-398
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (35)
.features/pending/feat-refresh-connection.md(1 hunks)config/config.go(1 hunks)docs/workflow-controller-configmap.md(3 hunks)go.mod(1 hunks)hack/db/main.go(3 hunks)persist/sqldb/archived_workflow_labels.go(2 hunks)persist/sqldb/migrate.go(1 hunks)persist/sqldb/offload_node_status_repo.go(8 hunks)persist/sqldb/workflow_archive.go(9 hunks)server/apiserver/argoserver.go(2 hunks)server/sync/sync_server.go(2 hunks)test/e2e/fixtures/persistence.go(1 hunks)util/sqldb/migrate.go(1 hunks)util/sqldb/session.go(1 hunks)util/sqldb/session_test.go(1 hunks)util/sync/db/config.go(2 hunks)util/sync/db/migrate.go(1 hunks)util/sync/db/mocks/SyncQueries.go(29 hunks)util/sync/db/queries.go(3 hunks)workflow/controller/config.go(3 hunks)workflow/controller/controller.go(3 hunks)workflow/controller/operator_concurrency_test.go(11 hunks)workflow/controller/operator_test.go(1 hunks)workflow/sync/database_helper_test.go(1 hunks)workflow/sync/database_mutex.go(1 hunks)workflow/sync/database_mutex_test.go(1 hunks)workflow/sync/database_semaphore.go(10 hunks)workflow/sync/database_semaphore_test.go(12 hunks)workflow/sync/multiple_test.go(6 hunks)workflow/sync/mutex_test.go(4 hunks)workflow/sync/semaphore_test.go(7 hunks)workflow/sync/sync_manager.go(9 hunks)workflow/sync/sync_manager_multiple_test.go(1 hunks)workflow/sync/sync_manager_test.go(23 hunks)workflow/sync/transaction.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (22)
workflow/sync/sync_manager_test.go (1)
workflow/sync/sync_manager.go (1)
NewLockManager(49-59)
util/sync/db/config.go (1)
util/sqldb/session.go (1)
SessionProxy(22-42)
workflow/sync/database_mutex_test.go (1)
util/sqldb/session.go (1)
SessionProxy(22-42)
workflow/sync/database_semaphore_test.go (1)
util/sqldb/session.go (1)
SessionProxy(22-42)
workflow/controller/operator_concurrency_test.go (1)
workflow/sync/sync_manager.go (1)
NewLockManager(49-59)
workflow/controller/operator_test.go (2)
workflow/sync/sync_manager.go (1)
NewLockManager(49-59)pkg/apis/workflow/v1alpha1/workflow_types.go (2)
Synchronization(1765-1770)Mutex(1797-1804)
workflow/sync/mutex_test.go (2)
workflow/sync/sync_manager.go (1)
NewLockManager(49-59)workflow/sync/sync_manager_test.go (1)
WorkflowExistenceFunc(330-332)
workflow/sync/transaction.go (1)
util/sqldb/session.go (1)
SessionProxy(22-42)
workflow/sync/multiple_test.go (2)
workflow/sync/sync_manager.go (1)
NewLockManager(49-59)workflow/sync/sync_manager_test.go (1)
WorkflowExistenceFunc(330-332)
util/sync/db/migrate.go (2)
util/sqldb/session.go (1)
SessionProxy(22-42)util/sqldb/migrate.go (2)
Migrate(25-97)Change(12-14)
workflow/sync/sync_manager_multiple_test.go (2)
util/sqldb/session.go (1)
SessionProxy(22-42)workflow/sync/sync_manager_test.go (1)
WorkflowExistenceFunc(330-332)
hack/db/main.go (2)
util/sqldb/session.go (1)
NewSessionProxyFromSession(122-133)persist/sqldb/workflow_archive.go (1)
NewWorkflowArchive(100-102)
workflow/sync/database_helper_test.go (2)
util/sqldb/session.go (3)
NewSessionProxy(73-119)SessionProxyConfig(45-54)SessionProxy(22-42)util/sync/db/config.go (2)
DBInfo(26-29)DBConfigFromConfig(77-91)
workflow/sync/semaphore_test.go (1)
util/sqldb/session.go (1)
SessionProxy(22-42)
persist/sqldb/workflow_archive.go (5)
util/sqldb/session.go (1)
SessionProxy(22-42)util/sqldb/db_type.go (3)
DBTypeFor(19-27)MySQL(14-14)Postgres(15-15)server/utils/list_options.go (1)
ListOptions(15-24)persist/sqldb/selector.go (1)
BuildArchivedWorkflowSelector(12-54)workflow/common/common.go (1)
LabelKeyWorkflowArchivingStatus(89-89)
util/sync/db/mocks/SyncQueries.go (2)
util/sync/db/queries.go (2)
SyncQueries(57-86)StateRecord(18-25)util/sqldb/session.go (1)
SessionProxy(22-42)
workflow/sync/sync_manager.go (3)
util/sqldb/session.go (3)
SessionProxy(22-42)NewSessionProxy(73-119)SessionProxyConfig(45-54)util/sync/db/config.go (1)
DBConfigFromConfig(77-91)util/sync/db/queries.go (1)
NewSyncQueries(97-102)
util/sync/db/queries.go (1)
util/sqldb/session.go (1)
SessionProxy(22-42)
workflow/sync/database_mutex.go (2)
util/sync/db/queries.go (1)
NewSyncQueries(97-102)util/sqldb/session.go (1)
SessionProxy(22-42)
workflow/controller/controller.go (2)
util/sqldb/session.go (1)
SessionProxy(22-42)workflow/sync/sync_manager.go (1)
NewLockManager(49-59)
workflow/controller/config.go (6)
util/sqldb/session.go (2)
NewSessionProxy(73-119)SessionProxyConfig(45-54)config/config.go (2)
DBConfig(239-248)ConnectionPool(323-330)util/sqldb/sqldb.go (1)
ConfigureDBSession(117-124)persist/sqldb/offload_node_status_repo.go (1)
NewOffloadNodeStatusRepo(35-41)persist/sqldb/workflow_archive.go (1)
NewWorkflowArchive(100-102)persist/sqldb/migrate.go (1)
Migrate(16-231)
workflow/sync/database_semaphore.go (3)
util/sync/db/queries.go (2)
NewSyncQueries(97-102)StateRecord(18-25)util/sync/db/mocks/SyncQueries.go (1)
NewSyncQueries(18-28)util/sqldb/session.go (1)
SessionProxy(22-42)
🪛 GitHub Actions: CI
util/sqldb/session_test.go
[error] 89-89: TestSessionReconnect failed: generic container: create container: ensure default network: network create: Error response from daemon: could not find plugin bridge in v1 plugin registry: plugin not found
🔇 Additional comments (68)
go.mod (2)
1-212: Scope mismatch: Missing implementation code in review.The PR objective describes implementing a SessionProxy abstraction with retry parameters, network error detection, and integration across multiple layers. However, the provided file only shows
go.moddependency changes. The actual implementation code (SessionProxy, retry logic, network error handling) is not included in this review.For a comprehensive review of the retry resilience feature, please ensure all modified/added implementation files are included (e.g., persistence layer changes, SessionProxy definition, retry configuration, error handling logic).
128-129: Clarify why Docker dependencies are promoted to direct dependencies.The review focuses on database connection resilience via SessionProxy, but the shown changes promote
github.com/docker/dockerandgithub.com/docker/go-connectionsfrom indirect to direct dependencies. Since testcontainers-go is already listed as a direct dependency, verify whether docker packages are used directly in the codebase or if they are only needed transitively through testcontainers. If the latter, these should remain indirect dependencies.util/sqldb/migrate.go (1)
117-121: LGTM!The comment clarification accurately describes that the change is applied directly with the transaction session (
tx), which ensures proper transactional semantics during migration.workflow/sync/sync_manager_multiple_test.go (1)
69-80: LGTM!The test setup correctly migrates to the
SessionProxypattern:
- Uses
SessionProxy.Session()to access the underlying session for SQL operations- Passes
SessionProxytocreateLockManageras expected by the new signature- Assertions properly verify that sessions are initialized through the proxy
The changes are consistent with the broader SessionProxy migration.
workflow/sync/database_mutex.go (1)
19-19: LGTM!The change correctly passes
info.SessionProxytoNewSyncQueries, matching the updated function signature that now expects a*sqldb.SessionProxyparameter.config/config.go (1)
250-260: Review validation logic or document expected behavior for DBReconnectConfig edge cases.The configuration struct lacks explicit handling for edge cases:
- What happens if
MaxRetriesis 0 or negative?- What if
MaxDelaySeconds < BaseDelaySeconds?- What are sensible defaults when this config is nil or partially set?
If validation or defaulting is handled elsewhere in
SessionProxyinitialization, this comment can be disregarded. Otherwise, consider adding a validation method or documenting expected behavior.workflow/sync/transaction.go (1)
7-9: [rewritten review comment]
[classification tag]hack/db/main.go (2)
33-35: LGTM!The error handling after
createDBSessionis correct and ensures failures are properly propagated.
79-80: Verify transaction semantics for SessionProxy.Tx() in the fake data generation loop.The
sessionProxyis created viaSessionProxy.Tx()before the loop and reused for multipleArchiveWorkflowcalls. Please confirm:
- Does
Tx()initiate an actual database transaction that must be explicitly committed?- If so, should the entire batch be committed after the loop completes, or should each iteration have separate transaction handling?
- Are there defer/cleanup mechanisms that automatically handle rollback on errors?
This is important for data integrity in bulk operations.
.features/pending/feat-refresh-connection.md (1)
1-7: LGTM!The feature documentation clearly describes the reconnection and retry behavior for database queries on network failures.
docs/workflow-controller-configmap.md (2)
185-185: LGTM!The
DBReconnectConfigfield addition toPersistConfigis well-documented with a clear description.
332-349: LGTM!The
DBReconnectConfigfield addition toSyncConfigis consistent with thePersistConfigaddition and properly documented.workflow/sync/database_mutex_test.go (1)
26-26: LGTM!The migration from direct session usage to
SessionProxy.Tx()aligns with the broader SessionProxy adoption across the codebase.server/apiserver/argoserver.go (2)
241-245: LGTM!The migration from
sqldb.CreateDBSessiontosqldb.NewSessionProxyproperly configures the session proxy with the necessary Kubernetes configuration, namespace, and database settings.
255-261: LGTM!The updates to
NewOffloadNodeStatusRepoandNewWorkflowArchivecorrectly pass the session proxy instead of a direct session, aligning with the new constructor signatures.persist/sqldb/archived_workflow_labels.go (2)
21-26: LGTM!The refactor to use
sessionProxy.Withproperly scopes the SQL query to the provided context, enabling automatic retry and reconnection logic while preserving the original query behavior.
42-48: LGTM!Consistent with the previous change, this refactor properly wraps the SQL query with
sessionProxy.Withfor context-aware session management.persist/sqldb/migrate.go (1)
15-17: Verify session vs proxy usage in migration function.The new
Migratefunction accepts a directdb.Sessionrather than a*SessionProxy. While this might be intentional for migration scenarios (where retry logic could interfere with schema changes), please confirm:
- Is it safe to run migrations without retry/reconnection logic?
- Should migrations be wrapped in a transaction at a higher level?
- Are migrations idempotent enough that manual retry is acceptable?
Based on learnings from the PR context, other components like
util/sync/db/migrate.gohave been updated to use SessionProxy. Consider checking if this migration should follow the same pattern for consistency.util/sync/db/config.go (1)
26-30: SessionProxy integration inDBInfoandMigratelooks consistentSwitching
DBInfoto holdSessionProxyand delegating migration viamigrate(ctx, d.SessionProxy, &d.Config)cleanly aligns with the new proxy abstraction while preserving the “disable DB locks on migration failure” behavior.Also applies to: 57-75
workflow/sync/database_semaphore_test.go (1)
44-279: Tests correctly pivot toSessionProxyand transactionsessionProxyUpdating the tests to route all DB calls through
info.SessionProxy.Session()and to constructtransaction{sessionProxy: info.SessionProxy}keeps them aligned with the production SessionProxy abstraction and ensures semaphore/mutex behavior is validated against the new retry‑capable session layer.Also applies to: 299-369
workflow/sync/sync_manager_test.go (1)
355-1851: UpdatedNewLockManagertest wiring and SessionProxy assertions look soundAdapting all tests to the new
NewLockManager(..., ensureDBConnection bool) (*Manager, error)signature (withrequire.NoErrorchecks) and usingensureDBConnection=falsewhere DB connectivity isn’t under test keeps the suite focused and avoids spurious failures. The directcreateLockManager(..., info.SessionProxy, ...)usage and assertion thatsyncManager.dbInfo.SessionProxy.Session()is non‑nil appropriately exercise the SessionProxy‑backed DB path.util/sync/db/migrate.go (1)
13-47: SessionProxy-awaremigratehelper is correctly wiredUpdating
migrateto accept a*sqldb.SessionProxyand delegate viasessionProxy.Session()intosqldb.Migratecleanly aligns schema setup with the new proxy layer while keeping the migration SQL and ordering unchanged.workflow/controller/operator_concurrency_test.go (1)
163-627: Controller tests correctly adapt toNewLockManager’s new signatureInitializing
controller.syncManagerviasync.NewLockManager(..., false)with explicitrequire.NoErrorchecks in all the operator‑concurrency tests is the right adjustment: it exercises the updated API while keeping these tests focused on ConfigMap‑based synchronization rather than DB connectivity.Also applies to: 835-839, 910-919, 981-985
workflow/sync/database_helper_test.go (1)
44-83: Test DB helper’s switch toSessionProxyis well-structuredCreating a
SessionProxyincreateTestDBSession, storing it insyncdb.DBInfo, and using it for controller‑health inserts (with properCloseand container termination in the defer) cleanly mirrors the production path while keeping tests self‑contained.workflow/sync/mutex_test.go (4)
118-121: LGTM! Properly updated to handle the new API signature.The constructor call correctly passes
falseforensureDBConnection(appropriate for unit tests without DB) and properly handles the new error return value withrequire.NoError.
132-136: Consistent update pattern.Same appropriate pattern:
ensureDBConnection: falseand error assertion.
215-219: Consistent update pattern.
408-412: Consistent update pattern.workflow/sync/multiple_test.go (6)
46-49: LGTM! Properly updated to handle the new API signature.
128-131: Consistent update pattern.
211-214: Consistent update pattern.
331-334: Consistent update pattern.
410-413: Consistent update pattern.
423-426: Consistent update pattern.util/sqldb/session_test.go (1)
86-127: Race condition exists between container lifecycle operations and port binding.Container cleanup at line 100 and new container startup at line 120 may overlap if the first container hasn't fully released port 15432 before the replacement attempts to bind. A delay or port-availability verification is needed between these operations.
The concern about assertions inside goroutines is invalid—the test properly waits for goroutine completion via
doneChan(line 124) before returning, ensuring assertions execute before cleanup.test/e2e/fixtures/persistence.go (1)
41-45: VerifySessionProxyConfigfield names.The field is named
KubectlConfigbut typically Kubernetes client interfaces useKubeClientor similar naming. Confirm this matches the struct definition insqldb.SessionProxyConfig.workflow/controller/operator_test.go (2)
8662-8665: NewLockManager initialization and error handling look correctThe explicit
errvariable, handling of the new(manager, error)return, and passingensureDBConnection=falsefor this mutex-only test are all appropriate and idiomatic.
8668-8668: Updated TryAcquire usage is aligned with the new signatureDiscarding the non-error return values and only asserting
err == nilis fine for this test, which just needs to ensure the preemptive lock acquisition doesn’t fail.persist/sqldb/offload_node_status_repo.go (6)
35-41: LGTM! Constructor properly migrated to SessionProxy.The constructor signature and initialization correctly use
*sqldb.SessionProxyinstead ofdb.Session, aligning with the PR's objective to enable automatic reconnection on network errors.
93-107: LGTM! Save method correctly uses sessionProxy.With pattern.The closure properly captures
recordandlogCtx, and error handling for duplicate keys is preserved within the proxy-wrapped block.
128-145: LGTM! Get method properly migrated.The query result is correctly populated via the closure parameter, and the unmarshaling happens after the proxy block completes.
150-173: LGTM! List method correctly structured.The slice is populated within the closure, and post-processing (JSON unmarshaling) happens outside the proxy block as expected.
178-195: LGTM! ListOldOffloads properly migrated.Consistent pattern with other methods.
206-226: LGTM! Delete method correctly captures rowsAffected.The
rowsAffectedvariable is declared outside the closure and properly assigned within, allowing the logging to occur after the proxy block.workflow/sync/semaphore_test.go (3)
19-27: LGTM! Factory type correctly updated.The semaphore factory signature now returns
*sqldb.SessionProxy, and the in-memory implementation correctly returnsnilfor the proxy since it doesn't use a database.
30-44: LGTM! Database semaphore setup uses Session() for direct SQL.Using
info.SessionProxy.Session().SQL()for test setup (inserting sync_limit) is appropriate since test setup doesn't need retry logic.
100-126: LGTM! Test correctly uses updated transaction struct.The
transaction{sessionProxy: sessionProxy}initialization aligns with the new transaction implementation that uses SessionProxy.persist/sqldb/workflow_archive.go (3)
100-102: LGTM! Constructor properly migrated to SessionProxy.The
dbTypeis correctly derived fromsessionProxy.Session()for database-specific query handling.
115-166: LGTM! ArchiveWorkflow correctly uses TxWith for atomicity.The transactional wrapper ensures delete-insert-label operations are atomic. The nested
sp.Session()calls withinTxWithproperly use the transaction-scoped session.
550-624: LGTM! GetWorkflow correctly handles result via closure.The
resultvariable is declared outside and assigned within the closure. Thedb.ErrNoMoreRowscheck correctly returnsnil, nilfor not-found cases.workflow/sync/sync_manager.go (4)
48-59: LGTM! NewLockManager correctly handles conditional DB connection.The
ensureDBConnectionflag allows callers to control whether a database session should be created, and errors are properly propagated.
69-92: LGTM! createLockManager properly initializes with SessionProxy.The
DBInfo.SessionProxyfield is correctly set, and nil checks are used before starting background tasks that require database access.
324-336: LGTM! TryAcquire correctly uses TxWith with serializable isolation.The transaction block properly uses serializable isolation for lock acquisition, ensuring consistency. The transaction-scoped proxy is passed to
tryAcquireImpl.
244-249: LGTM! Initialize correctly creates transactions for lock acquisition.The
transaction{sessionProxy: sm.dbInfo.SessionProxy}pattern is consistent with the updated transaction struct.Also applies to: 269-272
util/sqldb/session.go (3)
56-70: LGTM! Validation correctly rejects negative parameters.The validation prevents misconfiguration with negative retry parameters.
181-233: LGTM! Comprehensive network error detection.The
isNetworkErrormethod covers common patterns including string matching,net.Errorinterface, and specific driver errors (driver.ErrBadConn,sql.ErrConnDone).
153-179: Potential race:sp.closedset beforesp.sessin connect().Line 175 sets
sp.closed = falsebefore line 177 setssp.sess = sess. If another goroutine checkssp.closedviaWith()between these lines, it might proceed with a nil session. This is safe only ifconnect()is always called with proper synchronization—verify thatconnect()is only called fromNewSessionProxy(before the proxy is returned) andReconnect()(which holds the mutex).workflow/sync/database_semaphore.go (5)
9-11: Import update aligns with SessionProxy migration.The import change from
github.com/upper/db/v4togithub.com/argoproj/argo-workflows/v3/util/sqldbcorrectly reflects the transition to the new SessionProxy abstraction for database operations.
38-41: Correct initialization with SessionProxy.The
SyncQueriesconstruction properly usesinfo.SessionProxyfor the new session proxy-based API.
90-114: Consistent SessionProxy propagation through state query methods.The pattern of accepting
sessionProxyas a parameter in internal methods while usings.info.SessionProxyin public methods allows for transaction-aware queries during acquire operations while maintaining a simple API for standalone calls.
161-169: Queue ordering query correctly uses SessionProxy.The
queueOrderedmethod properly accepts and uses thesessionProxyparameter for transactional consistency when called fromcheckAcquire.
232-377: Transaction-aware session handling in checkAcquire and acquire.The
checkAcquireandacquiremethods correctly usetx.sessionProxyfor all database operations, ensuring transactional consistency during semaphore acquisition. This allows the SessionProxy's retry logic to work within the transaction context.util/sync/db/mocks/SyncQueries.go (1)
1-14: Auto-generated mock file correctly updated.This file is auto-generated by mockery and properly reflects the interface changes to use
*sqldb.SessionProxy. The updated method signatures are consistent with theSyncQueriesinterface definition.util/sync/db/queries.go (5)
7-10: Import structure correctly updated.The import section properly adds the
sqldbpackage while retaininggithub.com/upper/db/v4for thedb.Sessiontype used within theWith()callback.
96-102: Constructor correctly initializes SessionProxy.The
NewSyncQueriesconstructor properly accepts and stores theSessionProxyfor use in query operations.
105-160: Limit operations correctly use SessionProxy.With() pattern.All limit operations properly wrap database interactions in
q.sessionProxy.With(), enabling automatic retry on network errors. The error handling inUpdateSemaphoreLimitcorrectly propagatesdb.ErrNoMoreRowswhen no rows are affected.
163-183: State query methods correctly implement session parameter pattern.
GetCurrentStateandGetCurrentHoldersaccept a session parameter for transaction support, whileGetCurrentPendinguses the internalsessionProxyfor standalone queries. This design supports both transactional and non-transactional use cases.
279-361: Lock and controller operations correctly wrapped with SessionProxy.All lock operations (
GetExistingLocks,InsertLock,DeleteLock,ExpireInactiveLocks) and controller operations (InsertControllerHealth,UpdateControllerTimestamp) properly useq.sessionProxy.With()for automatic retry support on network errors.
| func (sp *SessionProxy) Reconnect(ctx context.Context) error { | ||
| logger := logging.RequireLoggerFromContext(ctx) | ||
| sp.mu.Lock() | ||
| defer sp.mu.Unlock() | ||
|
|
||
| var err error | ||
|
|
||
| for attempt := 0; attempt <= sp.maxRetries; attempt++ { | ||
| // Perform the reconnection attempt | ||
| // Close the bad connection if it exists | ||
| if sp.sess != nil { | ||
| sp.sess.Close() | ||
| sp.closed = true | ||
| } | ||
|
|
||
| err = sp.connect(ctx) | ||
| if err == nil { | ||
| logger.WithField("attempt_number", attempt).Info(ctx, "connected to database") | ||
| return nil | ||
| } | ||
| logger.WithField("attempt_number", attempt).Warn(ctx, "failed to connect to database") | ||
|
|
||
| // If this is the last attempt, don't wait | ||
| if attempt == sp.maxRetries || !sp.isNetworkError(err) { | ||
| break | ||
| } | ||
|
|
||
| // Calculate delay for next retry with exponential backoff | ||
| delay := time.Duration(float64(sp.baseDelay) * float64(attempt+1) * sp.retryMultiple) | ||
| delay = min(delay, sp.maxDelay) | ||
|
|
||
| // Wait before retrying with context cancellation support | ||
| select { | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| case <-time.After(delay): | ||
| } | ||
| } | ||
|
|
||
| return fmt.Errorf("reconnection failed after %d retries, last error: %w", sp.maxRetries, err) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reconnect() holds mutex for entire retry duration.
Reconnect() acquires the write lock at line 286 and holds it through all retry attempts with backoff delays. This could cause significant contention if other goroutines are waiting to use the session. Consider releasing the lock during backoff waits.
func (sp *SessionProxy) Reconnect(ctx context.Context) error {
logger := logging.RequireLoggerFromContext(ctx)
- sp.mu.Lock()
- defer sp.mu.Unlock()
var err error
for attempt := 0; attempt <= sp.maxRetries; attempt++ {
+ sp.mu.Lock()
// Close the bad connection if it exists
if sp.sess != nil {
sp.sess.Close()
sp.closed = true
}
err = sp.connect(ctx)
if err == nil {
logger.WithField("attempt_number", attempt).Info(ctx, "connected to database")
+ sp.mu.Unlock()
return nil
}
+ sp.mu.Unlock()
logger.WithField("attempt_number", attempt).Warn(ctx, "failed to connect to database")
// If this is the last attempt, don't wait
if attempt == sp.maxRetries || !sp.isNetworkError(err) {
break
}
// Calculate delay for next retry with exponential backoff
delay := time.Duration(float64(sp.baseDelay) * float64(attempt+1) * sp.retryMultiple)
delay = min(delay, sp.maxDelay)
// Wait before retrying with context cancellation support
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
}
}
return fmt.Errorf("reconnection failed after %d retries, last error: %w", sp.maxRetries, err)
}|
@coderabbitai generate docstrings |
✅ Actions performedInitiated docstring generation; will generate only if new commits exist. |
|
Note Docstrings generation - SUCCESS |
Docstrings generation was requested by @Joibel. * #15006 (comment) The following files were modified: * `hack/db/main.go` * `persist/sqldb/migrate.go` * `persist/sqldb/offload_node_status_repo.go` * `persist/sqldb/workflow_archive.go` * `server/sync/sync_server.go` * `test/e2e/fixtures/persistence.go` * `util/sqldb/migrate.go` * `util/sqldb/session.go` * `util/sync/db/config.go` * `util/sync/db/migrate.go` * `workflow/sync/database_mutex.go` * `workflow/sync/database_semaphore.go` * `workflow/sync/sync_manager.go`
Fixes #15011
Motivation
Modifications
Verification
Documentation
Summary by CodeRabbit
Release Notes
New Features
Documentation
✏️ Tip: You can customize this high-level summary in your review settings.