Skip to content

Commit 138a721

Browse files
committed
add PG Materlized View for prometheus aggregator queries
1 parent d2c2926 commit 138a721

File tree

17 files changed

+343
-79
lines changed

17 files changed

+343
-79
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,5 @@ convoy-ce
5353
# used to generate test stubs
5454
.cursor
5555

56-
mise.local.toml
56+
mise.local.toml
57+
api/ui/*

api/ui/build/index.html

Lines changed: 2 additions & 2 deletions
Large diffs are not rendered by default.

cmd/hooks/hooks.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -716,11 +716,23 @@ func buildCliConfiguration(cmd *cobra.Command) (*config.Configuration, error) {
716716
return nil, errors.New("metrics-prometheus-sample-time must be non-zero")
717717
}
718718

719+
queryTimeout, err := cmd.Flags().GetUint64("metrics-prometheus-query-timeout")
720+
if err != nil {
721+
return nil, err
722+
}
723+
724+
materializedViewRefreshInterval, err := cmd.Flags().GetUint64("metrics-prometheus-materialized-view-refresh-interval")
725+
if err != nil {
726+
return nil, err
727+
}
728+
719729
c.Metrics = config.MetricsConfiguration{
720730
IsEnabled: true,
721731
Backend: config.MetricsBackend(metricsBackend),
722732
Prometheus: config.PrometheusMetricsConfiguration{
723-
SampleTime: sampleTime,
733+
SampleTime: sampleTime,
734+
QueryTimeout: queryTimeout,
735+
MaterializedViewRefreshInterval: materializedViewRefreshInterval,
724736
},
725737
}
726738
}

cmd/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ func main() {
7171
var dataDogAgentUrl string
7272
var metricsBackend string
7373
var prometheusMetricsSampleTime uint64
74+
var prometheusMetricsQueryTimeout uint64
75+
var prometheusMetricsMaterializedViewRefreshInterval uint64
7476

7577
var retentionPolicy string
7678
var retentionPolicyEnabled bool
@@ -143,6 +145,8 @@ func main() {
143145
// metrics
144146
c.Flags().StringVar(&metricsBackend, "metrics-backend", "prometheus", "Metrics backend e.g. prometheus. ('prometheus' feature flag required")
145147
c.Flags().Uint64Var(&prometheusMetricsSampleTime, "metrics-prometheus-sample-time", 5, "Prometheus metrics sample time")
148+
c.Flags().Uint64Var(&prometheusMetricsQueryTimeout, "metrics-prometheus-query-timeout", 30, "Prometheus metrics query timeout in seconds")
149+
c.Flags().Uint64Var(&prometheusMetricsMaterializedViewRefreshInterval, "metrics-prometheus-materialized-view-refresh-interval", 2, "Materialized view refresh interval in minutes")
146150

147151
c.Flags().StringVar(&retentionPolicy, "retention-policy", "", "Retention Policy Duration")
148152
c.Flags().BoolVar(&retentionPolicyEnabled, "retention-policy-enabled", false, "Retention Policy Enabled")

cmd/server/server.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,22 @@ func StartConvoyServer(a *cli.App) error {
175175
s.RegisterTask("30 * * * *", convoy.ScheduleQueue, convoy.MonitorTwitterSources)
176176
s.RegisterTask("0 * * * *", convoy.ScheduleQueue, convoy.TokenizeSearch)
177177

178+
// Refresh metrics materialized views
179+
if cfg.Metrics.IsEnabled && cfg.Metrics.Backend == config.PrometheusMetricsProvider {
180+
refreshInterval := cfg.Metrics.Prometheus.MaterializedViewRefreshInterval
181+
if refreshInterval < 1 {
182+
refreshInterval = 1
183+
}
184+
if refreshInterval > 60 {
185+
refreshInterval = 60
186+
}
187+
188+
cronSpec := fmt.Sprintf("*/%d * * * *", refreshInterval)
189+
s.RegisterTask(cronSpec, convoy.ScheduleQueue, convoy.RefreshMetricsMaterializedViews)
190+
191+
lo.Infof("Registered metrics materialized view refresh every %d min", refreshInterval)
192+
}
193+
178194
// ensures that project data is backed up about 2 hours before they are deleted
179195
if a.Licenser.RetentionPolicy() {
180196
// runs at 10pm

cmd/worker/worker.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,8 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration) erro
419419
consumer.RegisterHandlers(convoy.MetaEventProcessor, task.ProcessMetaEvent(projectRepo, metaEventRepo, dispatcher, a.TracerBackend), nil)
420420
consumer.RegisterHandlers(convoy.DeleteArchivedTasksProcessor, task.DeleteArchivedTasks(a.Queue, rd), nil)
421421

422+
consumer.RegisterHandlers(convoy.RefreshMetricsMaterializedViews, task.RefreshMetricsMaterializedViews(a.DB, rd), nil)
423+
422424
consumer.RegisterHandlers(convoy.BatchRetryProcessor, task.ProcessBatchRetry(batchRetryRepo, eventDeliveryRepo, a.Queue, lo), nil)
423425

424426
err = metrics.RegisterQueueMetrics(a.Queue, a.DB, circuitBreakerManager)

config/config.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,9 @@ var DefaultConfiguration = Configuration{
112112
IsEnabled: false,
113113
Backend: PrometheusMetricsProvider,
114114
Prometheus: PrometheusMetricsConfiguration{
115-
SampleTime: 5,
115+
SampleTime: 5,
116+
QueryTimeout: 30,
117+
MaterializedViewRefreshInterval: 2, // Default: refresh every 2 minutes
116118
},
117119
},
118120
Dispatcher: DispatcherConfiguration{
@@ -376,7 +378,9 @@ type MetricsConfiguration struct {
376378
}
377379

378380
type PrometheusMetricsConfiguration struct {
379-
SampleTime uint64 `json:"sample_time" envconfig:"CONVOY_METRICS_SAMPLE_TIME"`
381+
SampleTime uint64 `json:"sample_time" envconfig:"CONVOY_METRICS_SAMPLE_TIME"`
382+
QueryTimeout uint64 `json:"query_timeout" envconfig:"CONVOY_METRICS_QUERY_TIMEOUT"` // Timeout in seconds for metrics collection queries
383+
MaterializedViewRefreshInterval uint64 `json:"materialized_view_refresh_interval" envconfig:"CONVOY_METRICS_MATERIALIZED_VIEW_REFRESH_INTERVAL"`
380384
}
381385

382386
const (

config/config_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,9 @@ func TestLoadConfig(t *testing.T) {
179179
IsEnabled: false,
180180
Backend: "prometheus",
181181
Prometheus: PrometheusMetricsConfiguration{
182-
SampleTime: 5,
182+
SampleTime: 5,
183+
QueryTimeout: 30,
184+
MaterializedViewRefreshInterval: 2,
183185
},
184186
},
185187
Dispatcher: DispatcherConfiguration{
@@ -276,7 +278,9 @@ func TestLoadConfig(t *testing.T) {
276278
IsEnabled: false,
277279
Backend: "prometheus",
278280
Prometheus: PrometheusMetricsConfiguration{
279-
SampleTime: 5,
281+
SampleTime: 5,
282+
QueryTimeout: 30,
283+
MaterializedViewRefreshInterval: 2,
280284
},
281285
},
282286
Dispatcher: DispatcherConfiguration{
@@ -372,7 +376,9 @@ func TestLoadConfig(t *testing.T) {
372376
IsEnabled: false,
373377
Backend: "prometheus",
374378
Prometheus: PrometheusMetricsConfiguration{
375-
SampleTime: 5,
379+
SampleTime: 5,
380+
QueryTimeout: 30,
381+
MaterializedViewRefreshInterval: 2,
376382
},
377383
},
378384
Dispatcher: DispatcherConfiguration{

convoy.json.example

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,9 @@
100100
"metrics": {
101101
"metrics_backend": "prometheus",
102102
"prometheus_metrics": {
103-
"sample_time": 10
103+
"sample_time": 5,
104+
"query_timeout": 30,
105+
"materialized_view_refresh_interval": 2
104106
}
105107
}
106108
}

database/postgres/postgres.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,19 @@ func parseDBConfig(dbConfig config.DatabaseConfiguration, src ...string) (*Postg
8585
return nil, fmt.Errorf("failed to create %sconnection pool: %w", src, err)
8686
}
8787

88-
if dbConfig.SetMaxOpenConnections > 0 {
89-
pgxCfg.MaxConns = int32(dbConfig.SetMaxOpenConnections)
88+
// Set MaxConns - use configured value or default to 100 if not set
89+
// This prevents unlimited connections when SetMaxOpenConnections is 0
90+
maxConns := dbConfig.SetMaxOpenConnections
91+
if maxConns <= 0 {
92+
maxConns = 100
93+
log.Warnf("[%s]: SetMaxOpenConnections not set or 0, using default: %d. Set CONVOY_DB_MAX_OPEN_CONN to override.", pkgName, maxConns)
9094
}
95+
pgxCfg.MaxConns = int32(maxConns)
96+
97+
if dbConfig.SetMaxIdleConnections > 0 {
98+
pgxCfg.MaxConnIdleTime = time.Minute * 5
99+
}
100+
91101
pgxCfg.MaxConnLifetime = time.Second * time.Duration(dbConfig.SetConnMaxLifetime)
92102
pgxCfg.ConnConfig.Tracer = otelpgx.NewTracer(otelpgx.WithTrimSQLInSpanName())
93103

@@ -100,6 +110,18 @@ func parseDBConfig(dbConfig config.DatabaseConfiguration, src ...string) (*Postg
100110
sqlDB := stdlib.OpenDBFromPool(pool)
101111
db := sqlx.NewDb(sqlDB, "pgx")
102112

113+
maxOpenConns := maxConns
114+
maxIdleConns := dbConfig.SetMaxIdleConnections
115+
if maxIdleConns <= 0 {
116+
maxIdleConns = maxOpenConns / 4
117+
if maxIdleConns < 2 {
118+
maxIdleConns = 2
119+
}
120+
}
121+
db.SetMaxOpenConns(maxOpenConns)
122+
db.SetMaxIdleConns(maxIdleConns)
123+
db.SetConnMaxLifetime(time.Second * time.Duration(dbConfig.SetConnMaxLifetime))
124+
103125
return &Postgres{dbx: db, pool: pool, conn: pool}, nil
104126
}
105127

0 commit comments

Comments
 (0)