Skip to content

Commit 9819daf

Browse files
committed
add PG Materlized View for prometheus aggregator queries
1 parent d2c2926 commit 9819daf

File tree

14 files changed

+280
-26
lines changed

14 files changed

+280
-26
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,5 @@ convoy-ce
5353
# used to generate test stubs
5454
.cursor
5555

56-
mise.local.toml
56+
mise.local.toml
57+
api/ui/*

api/ui/build/index.html

Lines changed: 2 additions & 2 deletions
Large diffs are not rendered by default.

cmd/hooks/hooks.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -716,11 +716,17 @@ func buildCliConfiguration(cmd *cobra.Command) (*config.Configuration, error) {
716716
return nil, errors.New("metrics-prometheus-sample-time must be non-zero")
717717
}
718718

719+
queryTimeout, err := cmd.Flags().GetUint64("metrics-prometheus-query-timeout")
720+
if err != nil {
721+
return nil, err
722+
}
723+
719724
c.Metrics = config.MetricsConfiguration{
720725
IsEnabled: true,
721726
Backend: config.MetricsBackend(metricsBackend),
722727
Prometheus: config.PrometheusMetricsConfiguration{
723-
SampleTime: sampleTime,
728+
SampleTime: sampleTime,
729+
QueryTimeout: queryTimeout,
724730
},
725731
}
726732
}

cmd/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func main() {
7171
var dataDogAgentUrl string
7272
var metricsBackend string
7373
var prometheusMetricsSampleTime uint64
74+
var prometheusMetricsQueryTimeout uint64
7475

7576
var retentionPolicy string
7677
var retentionPolicyEnabled bool
@@ -143,6 +144,7 @@ func main() {
143144
// metrics
144145
c.Flags().StringVar(&metricsBackend, "metrics-backend", "prometheus", "Metrics backend e.g. prometheus. ('prometheus' feature flag required")
145146
c.Flags().Uint64Var(&prometheusMetricsSampleTime, "metrics-prometheus-sample-time", 5, "Prometheus metrics sample time")
147+
c.Flags().Uint64Var(&prometheusMetricsQueryTimeout, "metrics-prometheus-query-timeout", 30, "Prometheus metrics query timeout in seconds")
146148

147149
c.Flags().StringVar(&retentionPolicy, "retention-policy", "", "Retention Policy Duration")
148150
c.Flags().BoolVar(&retentionPolicyEnabled, "retention-policy-enabled", false, "Retention Policy Enabled")

cmd/server/server.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,18 @@ func StartConvoyServer(a *cli.App) error {
175175
s.RegisterTask("30 * * * *", convoy.ScheduleQueue, convoy.MonitorTwitterSources)
176176
s.RegisterTask("0 * * * *", convoy.ScheduleQueue, convoy.TokenizeSearch)
177177

178+
// Refresh metrics materialized views based on SampleTime
179+
if cfg.Metrics.IsEnabled && cfg.Metrics.Backend == config.PrometheusMetricsProvider {
180+
st := cfg.Metrics.Prometheus.SampleTime
181+
182+
interval := max(1, min(60, st/120))
183+
184+
cronSpec := fmt.Sprintf("*/%d * * * *", interval)
185+
s.RegisterTask(cronSpec, convoy.ScheduleQueue, convoy.RefreshMetricsMaterializedViews)
186+
187+
lo.Infof("Registered metrics refresh every %d min (SampleTime: %d s)", interval, st)
188+
}
189+
178190
// ensures that project data is backed up about 2 hours before they are deleted
179191
if a.Licenser.RetentionPolicy() {
180192
// runs at 10pm

cmd/worker/worker.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,8 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration) erro
419419
consumer.RegisterHandlers(convoy.MetaEventProcessor, task.ProcessMetaEvent(projectRepo, metaEventRepo, dispatcher, a.TracerBackend), nil)
420420
consumer.RegisterHandlers(convoy.DeleteArchivedTasksProcessor, task.DeleteArchivedTasks(a.Queue, rd), nil)
421421

422+
consumer.RegisterHandlers(convoy.RefreshMetricsMaterializedViews, task.RefreshMetricsMaterializedViews(a.DB, rd), nil)
423+
422424
consumer.RegisterHandlers(convoy.BatchRetryProcessor, task.ProcessBatchRetry(batchRetryRepo, eventDeliveryRepo, a.Queue, lo), nil)
423425

424426
err = metrics.RegisterQueueMetrics(a.Queue, a.DB, circuitBreakerManager)

config/config.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ var DefaultConfiguration = Configuration{
112112
IsEnabled: false,
113113
Backend: PrometheusMetricsProvider,
114114
Prometheus: PrometheusMetricsConfiguration{
115-
SampleTime: 5,
115+
SampleTime: 5,
116+
QueryTimeout: 30,
116117
},
117118
},
118119
Dispatcher: DispatcherConfiguration{
@@ -376,7 +377,8 @@ type MetricsConfiguration struct {
376377
}
377378

378379
type PrometheusMetricsConfiguration struct {
379-
SampleTime uint64 `json:"sample_time" envconfig:"CONVOY_METRICS_SAMPLE_TIME"`
380+
SampleTime uint64 `json:"sample_time" envconfig:"CONVOY_METRICS_SAMPLE_TIME"`
381+
QueryTimeout uint64 `json:"query_timeout" envconfig:"CONVOY_METRICS_QUERY_TIMEOUT"` // Timeout in seconds for metrics collection queries
380382
}
381383

382384
const (

convoy.json.example

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@
100100
"metrics": {
101101
"metrics_backend": "prometheus",
102102
"prometheus_metrics": {
103-
"sample_time": 10
103+
"sample_time": 5,
104+
"query_timeout": 30
104105
}
105106
}
106107
}

database/postgres/postgres_collector.go

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,8 @@ func (p *Postgres) Collect(ch chan<- prometheus.Metric) {
141141
// Return empty metrics to prevent blocking the endpoint
142142
metrics = &Metrics{}
143143
}
144-
} else {
145-
cachedMetrics = metrics
146144
}
145+
cachedMetrics = metrics
147146
}
148147

149148
// Use unique keys per metric type to prevent collisions
@@ -244,15 +243,24 @@ func (p *Postgres) Collect(ch chan<- prometheus.Metric) {
244243

245244
// collectMetrics gathers essential metrics from the DB
246245
func (p *Postgres) collectMetrics() (*Metrics, error) {
247-
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(metricsConfig.Prometheus.SampleTime)*time.Second)
246+
queryTimeout := time.Duration(metricsConfig.Prometheus.QueryTimeout) * time.Second
247+
if queryTimeout == 0 {
248+
queryTimeout = 30 * time.Second
249+
}
250+
ctx, cancel := context.WithTimeout(context.Background(), queryTimeout)
248251
defer cancel()
249252

250253
metrics := &Metrics{}
251254

252-
queryEventQueueMetrics := "select distinct project_id, coalesce(source_id, 'http') as source_id, count(*) as total from convoy.events group by project_id, source_id"
255+
queryEventQueueMetrics := "SELECT project_id, source_id, total FROM convoy.event_queue_metrics_mv"
253256
rows, err := p.GetDB().QueryxContext(ctx, queryEventQueueMetrics)
254257
if err != nil {
255-
return nil, fmt.Errorf("failed to query event queue metrics: %w", err)
258+
log.Warnf("materialized view query failed, falling back to direct query: %v", err)
259+
queryEventQueueMetrics = "SELECT DISTINCT project_id, COALESCE(source_id, 'http') as source_id, COUNT(*) as total FROM convoy.events GROUP BY project_id, source_id"
260+
rows, err = p.GetDB().QueryxContext(ctx, queryEventQueueMetrics)
261+
if err != nil {
262+
return nil, fmt.Errorf("failed to query event queue metrics: %w", err)
263+
}
256264
}
257265
defer closeWithError(rows)
258266
eventQueueMetrics := make([]EventQueueMetrics, 0)
@@ -266,7 +274,11 @@ func (p *Postgres) collectMetrics() (*Metrics, error) {
266274
}
267275
metrics.EventQueueMetrics = eventQueueMetrics
268276

269-
backlogQM := `WITH a1 AS (
277+
backlogQM := "SELECT project_id, source_id, age_seconds FROM convoy.event_queue_backlog_metrics_mv"
278+
rows1, err := p.GetDB().QueryxContext(ctx, backlogQM)
279+
if err != nil {
280+
log.Warnf("materialized view query failed, falling back to direct query: %v", err)
281+
backlogQM = `WITH a1 AS (
270282
SELECT ed.project_id,
271283
COALESCE(e.source_id, 'http') AS source_id,
272284
EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds
@@ -287,9 +299,10 @@ func (p *Postgres) collectMetrics() (*Metrics, error) {
287299
WHERE ed.status = 'Success' AND a1.source_id IS NULL
288300
GROUP BY ed.project_id, e.source_id
289301
LIMIT 1000; -- samples`
290-
rows1, err := p.GetDB().QueryxContext(ctx, backlogQM)
291-
if err != nil {
292-
return nil, fmt.Errorf("failed to query backlog metrics: %w", err)
302+
rows1, err = p.GetDB().QueryxContext(ctx, backlogQM)
303+
if err != nil {
304+
return nil, fmt.Errorf("failed to query backlog metrics: %w", err)
305+
}
293306
}
294307
defer closeWithError(rows1)
295308
eventQueueBacklogMetrics := make([]EventQueueBacklogMetrics, 0)
@@ -303,7 +316,21 @@ func (p *Postgres) collectMetrics() (*Metrics, error) {
303316
}
304317
metrics.EventQueueBacklogMetrics = eventQueueBacklogMetrics
305318

306-
queryDeliveryQ := `SELECT DISTINCT
319+
queryDeliveryQ := `SELECT
320+
project_id,
321+
project_name,
322+
endpoint_id,
323+
status,
324+
event_type,
325+
source_id,
326+
organisation_id,
327+
organisation_name,
328+
total
329+
FROM convoy.event_delivery_queue_metrics_mv`
330+
rows2, err := p.GetDB().QueryxContext(ctx, queryDeliveryQ)
331+
if err != nil {
332+
log.Warnf("materialized view query failed, falling back to direct query: %v", err)
333+
queryDeliveryQ = `SELECT DISTINCT
307334
ed.project_id,
308335
COALESCE(p.name, '') as project_name,
309336
ed.endpoint_id,
@@ -319,9 +346,10 @@ func (p *Postgres) collectMetrics() (*Metrics, error) {
319346
LEFT JOIN convoy.organisations o ON p.organisation_id = o.id
320347
WHERE ed.deleted_at IS NULL
321348
GROUP BY ed.project_id, p.name, ed.endpoint_id, ed.status, ed.event_type, e.source_id, p.organisation_id, o.name`
322-
rows2, err := p.GetDB().QueryxContext(ctx, queryDeliveryQ)
323-
if err != nil {
324-
return nil, fmt.Errorf("failed to query delivery queue metrics: %w", err)
349+
rows2, err = p.GetDB().QueryxContext(ctx, queryDeliveryQ)
350+
if err != nil {
351+
return nil, fmt.Errorf("failed to query delivery queue metrics: %w", err)
352+
}
325353
}
326354
defer closeWithError(rows2)
327355
eventDeliveryQueueMetrics := make([]EventDeliveryQueueMetrics, 0)
@@ -335,7 +363,11 @@ func (p *Postgres) collectMetrics() (*Metrics, error) {
335363
}
336364
metrics.EventDeliveryQueueMetrics = eventDeliveryQueueMetrics
337365

338-
backlogEQM := `WITH a1 AS (
366+
backlogEQM := "SELECT project_id, source_id, endpoint_id, age_seconds FROM convoy.event_endpoint_backlog_metrics_mv"
367+
rows3, err := p.GetDB().QueryxContext(ctx, backlogEQM)
368+
if err != nil {
369+
log.Warnf("materialized view query failed, falling back to direct query: %v", err)
370+
backlogEQM = `WITH a1 AS (
339371
SELECT ed.project_id,
340372
COALESCE(e.source_id, 'http') AS source_id,
341373
ed.endpoint_id,
@@ -358,9 +390,10 @@ func (p *Postgres) collectMetrics() (*Metrics, error) {
358390
WHERE ed.status = 'Success' AND a1.endpoint_id IS NULL
359391
GROUP BY ed.project_id, e.source_id, ed.endpoint_id
360392
LIMIT 1000; -- samples`
361-
rows3, err := p.GetDB().QueryxContext(ctx, backlogEQM)
362-
if err != nil {
363-
return nil, fmt.Errorf("failed to query endpoint backlog metrics: %w", err)
393+
rows3, err = p.GetDB().QueryxContext(ctx, backlogEQM)
394+
if err != nil {
395+
return nil, fmt.Errorf("failed to query endpoint backlog metrics: %w", err)
396+
}
364397
}
365398
defer closeWithError(rows3)
366399
eventQueueEndpointBacklogMetrics := make([]EventQueueEndpointBacklogMetrics, 0)

docker-compose.dev.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ services:
4444
build:
4545
context: ./
4646
dockerfile: Dockerfile.dev
47-
entrypoint: ["./cmd", "agent", "--config", "convoy.json"]
47+
entrypoint: ["./cmd", "agent"]
4848
volumes:
4949
- ./convoy.json:/convoy.json
5050
restart: on-failure

0 commit comments

Comments
 (0)