Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .features/pending/feat-refresh-connection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Component: General
Issues: 15011
Description: Reconnect and retry queries
Author: [Isitha Subasinghe](https://github.com/isubasinghe)

Queries against the database are now retried where a network connection issue was the cause of failure, this
is done through reconnecting first.
14 changes: 14 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,20 @@ type DBConfig struct {
MySQL *MySQLConfig `json:"mysql,omitempty"`
// Pooled connection settings for all types of database connections
ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"`
// DBReconnectConfig are configuration options for database retries and reconnections
DBReconnectConfig *DBReconnectConfig `json:"reconnectionConfig,omitempty"`
}

// DBReconnectConfig contains database reconnect settings
type DBReconnectConfig struct {
// MaxRetries defines how many connection attempts should be made before we give up
MaxRetries int `json:"maxRetries"`
// BaseDelaySeconds delays retries by this amount multiplied by the retryMultiple, capped to `maxDelaySeconds`
BaseDelaySeconds int `json:"baseDelaySeconds"`
// MaxDelaySeconds the absolute upper limit to wait before retrying
MaxDelaySeconds int `json:"maxDelaySeconds"`
// RetryMultiple is the growth factor for `baseDelaySeconds`
RetryMultiple float64 `json:"retryMultiple"`
}

// PersistConfig contains workflow persistence configuration
Expand Down
47 changes: 31 additions & 16 deletions docs/workflow-controller-configmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ PersistConfig contains workflow persistence configuration
| `PostgreSQL` | [`PostgreSQLConfig`](#postgresqlconfig) | PostgreSQL configuration for PostgreSQL database, don't use MySQL at the same time |
| `MySQL` | [`MySQLConfig`](#mysqlconfig) | MySQL configuration for MySQL database, don't use PostgreSQL at the same time |
| `ConnectionPool` | [`ConnectionPool`](#connectionpool) | Pooled connection settings for all types of database connections |
| `DBReconnectConfig` | [`DBReconnectConfig`](#dbreconnectconfig) | DBReconnectConfig are configuration options for database retries and reconnections |
| `NodeStatusOffload` | `bool` | NodeStatusOffload saves node status only to the persistence DB to avoid the 1MB limit in etcd |
| `Archive` | `bool` | Archive completed and Workflows to persistence so you can access them after they're removed from kubernetes |
| `ArchiveLabelSelector` | [`metav1.LabelSelector`](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#labelselector-v1-meta) | ArchiveLabelSelector holds LabelSelector to determine which Workflows to archive |
Expand Down Expand Up @@ -234,6 +235,19 @@ ConnectionPool contains database connection pool settings
| `MaxOpenConns` | `int` | MaxOpenConns sets the maximum number of open connections to the database |
| `ConnMaxLifetime` | `TTL` (time.Duration forces you to specify in millis, and does not support days see https://stackoverflow.com/questions/48050945/how-to-unmarshal-json-into-durations (underlying type: time.Duration)) | ConnMaxLifetime sets the maximum amount of time a connection may be reused |

## DBReconnectConfig

DBReconnectConfig contains database reconnect settings

### Fields

| Field Name | Field Type | Description |
|--------------------|------------|-------------------------------------------------------------------------------------------------------------|
| `MaxRetries` | `int` | MaxRetries defines how many connection attempts should be made before we give up |
| `BaseDelaySeconds` | `int` | BaseDelaySeconds delays retries by this amount multiplied by the retryMultiple, capped to `maxDelaySeconds` |
| `MaxDelaySeconds` | `int` | MaxDelaySeconds the absolute upper limit to wait before retrying |
| `RetryMultiple` | `float64` | RetryMultiple is the growth factor for `baseDelaySeconds` |

## PodSpecLogStrategy

PodSpecLogStrategy contains the configuration for logging the pod spec in controller log for debugging purpose
Expand Down Expand Up @@ -315,22 +329,23 @@ SyncConfig contains synchronization configuration for database locks (semaphores

### Fields

| Field Name | Field Type | Description |
|------------------------------|-----------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `PostgreSQL` | [`PostgreSQLConfig`](#postgresqlconfig) | PostgreSQL configuration for PostgreSQL database, don't use MySQL at the same time |
| `MySQL` | [`MySQLConfig`](#mysqlconfig) | MySQL configuration for MySQL database, don't use PostgreSQL at the same time |
| `ConnectionPool` | [`ConnectionPool`](#connectionpool) | Pooled connection settings for all types of database connections |
| `EnableAPI` | `bool` | EnableAPI enables the database synchronization API |
| `ControllerName` | `string` | ControllerName sets a unique name for this controller instance |
| `SkipMigration` | `bool` | SkipMigration skips database migration if needed |
| `LimitTableName` | `string` | LimitTableName customizes the table name for semaphore limits, if not set, the default value is "sync_limit" |
| `StateTableName` | `string` | StateTableName customizes the table name for current lock state, if not set, the default value is "sync_state" |
| `ControllerTableName` | `string` | ControllerTableName customizes the table name for controller heartbeats, if not set, the default value is "sync_controller" |
| `LockTableName` | `string` | LockTableName customizes the table name for lock coordination data, if not set, the default value is "sync_lock" |
| `PollSeconds` | `int` | PollSeconds specifies how often to check for lock changes, if not set, the default value is 5 seconds |
| `HeartbeatSeconds` | `int` | HeartbeatSeconds specifies how often to update controller heartbeat, if not set, the default value is 60 seconds |
| `InactiveControllerSeconds` | `int` | InactiveControllerSeconds specifies when to consider a controller dead, if not set, the default value is 300 seconds |
| `SemaphoreLimitCacheSeconds` | `int64` | SemaphoreLimitCacheSeconds specifies the duration in seconds before the workflow controller will re-fetch the limit for a semaphore from its associated data source. Defaults to 0 seconds (re-fetch every time the semaphore is checked). |
| Field Name | Field Type | Description |
|------------------------------|-------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `PostgreSQL` | [`PostgreSQLConfig`](#postgresqlconfig) | PostgreSQL configuration for PostgreSQL database, don't use MySQL at the same time |
| `MySQL` | [`MySQLConfig`](#mysqlconfig) | MySQL configuration for MySQL database, don't use PostgreSQL at the same time |
| `ConnectionPool` | [`ConnectionPool`](#connectionpool) | Pooled connection settings for all types of database connections |
| `DBReconnectConfig` | [`DBReconnectConfig`](#dbreconnectconfig) | DBReconnectConfig are configuration options for database retries and reconnections |
| `EnableAPI` | `bool` | EnableAPI enables the database synchronization API |
| `ControllerName` | `string` | ControllerName sets a unique name for this controller instance |
| `SkipMigration` | `bool` | SkipMigration skips database migration if needed |
| `LimitTableName` | `string` | LimitTableName customizes the table name for semaphore limits, if not set, the default value is "sync_limit" |
| `StateTableName` | `string` | StateTableName customizes the table name for current lock state, if not set, the default value is "sync_state" |
| `ControllerTableName` | `string` | ControllerTableName customizes the table name for controller heartbeats, if not set, the default value is "sync_controller" |
| `LockTableName` | `string` | LockTableName customizes the table name for lock coordination data, if not set, the default value is "sync_lock" |
| `PollSeconds` | `int` | PollSeconds specifies how often to check for lock changes, if not set, the default value is 5 seconds |
| `HeartbeatSeconds` | `int` | HeartbeatSeconds specifies how often to update controller heartbeat, if not set, the default value is 60 seconds |
| `InactiveControllerSeconds` | `int` | InactiveControllerSeconds specifies when to consider a controller dead, if not set, the default value is 300 seconds |
| `SemaphoreLimitCacheSeconds` | `int64` | SemaphoreLimitCacheSeconds specifies the duration in seconds before the workflow controller will re-fetch the limit for a semaphore from its associated data source. Defaults to 0 seconds (re-fetch every time the semaphore is checked). |

## ArtifactDriver

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ require (
github.com/cpuguy83/dockercfg v0.3.2 // indirect
github.com/cyphar/filepath-securejoin v0.4.1 // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/docker v28.1.1+incompatible // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/docker v28.1.1+incompatible
github.com/docker/go-connections v0.5.0
github.com/docker/go-units v0.5.0 // indirect
github.com/ebitengine/purego v0.8.2 // indirect
github.com/envoyproxy/go-control-plane/envoy v1.32.4 // indirect
Expand Down
7 changes: 6 additions & 1 deletion hack/db/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/argoproj/argo-workflows/v3/persist/sqldb"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util/instanceid"
utilsqldb "github.com/argoproj/argo-workflows/v3/util/sqldb"
)

var session db.Session
Expand All @@ -29,6 +30,9 @@ func main() {
}
rootCmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) (err error) {
session, err = createDBSession(dsn)
if err != nil {
return err
}
return
}
rootCmd.PersistentFlags().StringVarP(&dsn, "dsn", "d", "postgres://postgres@localhost:5432/postgres", "DSN connection string. For MySQL, use 'mysql:password@tcp/argo'.")
Expand Down Expand Up @@ -72,7 +76,8 @@ func NewFakeDataCommand() *cobra.Command {
for i := 0; i < rows; i++ {
wf := randomizeWorkflow(wfTmpl, namespaces)
cluster := clusters[rand.Intn(len(clusters))]
wfArchive := sqldb.NewWorkflowArchive(session, cluster, "", instanceIDService)
sessionProxy := utilsqldb.NewSessionProxyFromSession(session, nil, "", "").Tx()
wfArchive := sqldb.NewWorkflowArchive(sessionProxy, cluster, "", instanceIDService)
if err := wfArchive.ArchiveWorkflow(ctx, wf); err != nil {
return err
}
Expand Down
23 changes: 13 additions & 10 deletions persist/sqldb/archived_workflow_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
// SELECT DISTINCT name FROM argo_archived_workflows_labels
func (r *workflowArchive) ListWorkflowsLabelKeys(ctx context.Context) (*wfv1.LabelKeys, error) {
var archivedWfLabels []archivedWorkflowLabelRecord

err := r.session.SQL().
Select(db.Raw("DISTINCT name")).
From(archiveLabelsTableName).
All(&archivedWfLabels)
err := r.sessionProxy.With(ctx, func(s db.Session) error {
return s.SQL().
Select(db.Raw("DISTINCT name")).
From(archiveLabelsTableName).
All(&archivedWfLabels)
})
if err != nil {
return nil, err
}
Expand All @@ -38,11 +39,13 @@ func (r *workflowArchive) ListWorkflowsLabelKeys(ctx context.Context) (*wfv1.Lab
// SELECT DISTINCT value FROM argo_archived_workflows_labels WHERE name=labelkey
func (r *workflowArchive) ListWorkflowsLabelValues(ctx context.Context, key string) (*wfv1.LabelValues, error) {
var archivedWfLabels []archivedWorkflowLabelRecord
err := r.session.SQL().
Select(db.Raw("DISTINCT value")).
From(archiveLabelsTableName).
Where(db.Cond{"name": key}).
All(&archivedWfLabels)
err := r.sessionProxy.With(ctx, func(s db.Session) error {
return s.SQL().
Select(db.Raw("DISTINCT value")).
From(archiveLabelsTableName).
Where(db.Cond{"name": key}).
All(&archivedWfLabels)
})
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions persist/sqldb/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
versionTable = "schema_history"
)

// Migrate runs the migrations
func Migrate(ctx context.Context, session db.Session, clusterName, tableName string) (err error) {
dbType := sqldb.DBTypeFor(session)
return sqldb.Migrate(ctx, session, versionTable, []sqldb.Change{
Expand Down
Loading
Loading