diff --git a/.gitignore b/.gitignore index 38ba09cf8c..2894b68769 100644 --- a/.gitignore +++ b/.gitignore @@ -53,4 +53,5 @@ convoy-ce # used to generate test stubs .cursor -mise.local.toml \ No newline at end of file +mise.local.toml +api/ui/* \ No newline at end of file diff --git a/api/ui/build/index.html b/api/ui/build/index.html index cd62229e9c..8d2c7375fc 100644 --- a/api/ui/build/index.html +++ b/api/ui/build/index.html @@ -5,7 +5,7 @@ - + @@ -383,6 +383,6 @@ - + \ No newline at end of file diff --git a/cmd/hooks/hooks.go b/cmd/hooks/hooks.go index 66e1bf1d59..48e004a198 100644 --- a/cmd/hooks/hooks.go +++ b/cmd/hooks/hooks.go @@ -716,11 +716,23 @@ func buildCliConfiguration(cmd *cobra.Command) (*config.Configuration, error) { return nil, errors.New("metrics-prometheus-sample-time must be non-zero") } + queryTimeout, err := cmd.Flags().GetUint64("metrics-prometheus-query-timeout") + if err != nil { + return nil, err + } + + materializedViewRefreshInterval, err := cmd.Flags().GetUint64("metrics-prometheus-materialized-view-refresh-interval") + if err != nil { + return nil, err + } + c.Metrics = config.MetricsConfiguration{ IsEnabled: true, Backend: config.MetricsBackend(metricsBackend), Prometheus: config.PrometheusMetricsConfiguration{ - SampleTime: sampleTime, + SampleTime: sampleTime, + QueryTimeout: queryTimeout, + MaterializedViewRefreshInterval: materializedViewRefreshInterval, }, } } diff --git a/cmd/main.go b/cmd/main.go index e8118ca4fb..0ee754427d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -71,6 +71,8 @@ func main() { var dataDogAgentUrl string var metricsBackend string var prometheusMetricsSampleTime uint64 + var prometheusMetricsQueryTimeout uint64 + var prometheusMetricsMaterializedViewRefreshInterval uint64 var retentionPolicy string var retentionPolicyEnabled bool @@ -143,6 +145,8 @@ func main() { // metrics c.Flags().StringVar(&metricsBackend, "metrics-backend", "prometheus", "Metrics backend e.g. prometheus. ('prometheus' feature flag required") c.Flags().Uint64Var(&prometheusMetricsSampleTime, "metrics-prometheus-sample-time", 5, "Prometheus metrics sample time") + c.Flags().Uint64Var(&prometheusMetricsQueryTimeout, "metrics-prometheus-query-timeout", 30, "Prometheus metrics query timeout in seconds") + c.Flags().Uint64Var(&prometheusMetricsMaterializedViewRefreshInterval, "metrics-prometheus-materialized-view-refresh-interval", 2, "Materialized view refresh interval in minutes") c.Flags().StringVar(&retentionPolicy, "retention-policy", "", "Retention Policy Duration") c.Flags().BoolVar(&retentionPolicyEnabled, "retention-policy-enabled", false, "Retention Policy Enabled") diff --git a/cmd/server/server.go b/cmd/server/server.go index 3f000b3be6..5ae2153ac7 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -175,6 +175,22 @@ func StartConvoyServer(a *cli.App) error { s.RegisterTask("30 * * * *", convoy.ScheduleQueue, convoy.MonitorTwitterSources) s.RegisterTask("0 * * * *", convoy.ScheduleQueue, convoy.TokenizeSearch) + // Refresh metrics materialized views + if cfg.Metrics.IsEnabled && cfg.Metrics.Backend == config.PrometheusMetricsProvider { + refreshInterval := cfg.Metrics.Prometheus.MaterializedViewRefreshInterval + if refreshInterval < 1 { + refreshInterval = 1 + } + if refreshInterval > 60 { + refreshInterval = 60 + } + + cronSpec := fmt.Sprintf("*/%d * * * *", refreshInterval) + s.RegisterTask(cronSpec, convoy.ScheduleQueue, convoy.RefreshMetricsMaterializedViews) + + lo.Infof("Registered metrics materialized view refresh every %d min", refreshInterval) + } + // ensures that project data is backed up about 2 hours before they are deleted if a.Licenser.RetentionPolicy() { // runs at 10pm diff --git a/cmd/worker/worker.go b/cmd/worker/worker.go index 452c9518dd..380d4b8235 100644 --- a/cmd/worker/worker.go +++ b/cmd/worker/worker.go @@ -419,6 +419,8 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration) erro consumer.RegisterHandlers(convoy.MetaEventProcessor, task.ProcessMetaEvent(projectRepo, metaEventRepo, dispatcher, a.TracerBackend), nil) consumer.RegisterHandlers(convoy.DeleteArchivedTasksProcessor, task.DeleteArchivedTasks(a.Queue, rd), nil) + consumer.RegisterHandlers(convoy.RefreshMetricsMaterializedViews, task.RefreshMetricsMaterializedViews(a.DB, rd), nil) + consumer.RegisterHandlers(convoy.BatchRetryProcessor, task.ProcessBatchRetry(batchRetryRepo, eventDeliveryRepo, a.Queue, lo), nil) err = metrics.RegisterQueueMetrics(a.Queue, a.DB, circuitBreakerManager) diff --git a/config/config.go b/config/config.go index 264ced6bbc..264a7167d5 100644 --- a/config/config.go +++ b/config/config.go @@ -112,7 +112,9 @@ var DefaultConfiguration = Configuration{ IsEnabled: false, Backend: PrometheusMetricsProvider, Prometheus: PrometheusMetricsConfiguration{ - SampleTime: 5, + SampleTime: 5, + QueryTimeout: 30, + MaterializedViewRefreshInterval: 2, // Default: refresh every 2 minutes }, }, Dispatcher: DispatcherConfiguration{ @@ -376,7 +378,9 @@ type MetricsConfiguration struct { } type PrometheusMetricsConfiguration struct { - SampleTime uint64 `json:"sample_time" envconfig:"CONVOY_METRICS_SAMPLE_TIME"` + SampleTime uint64 `json:"sample_time" envconfig:"CONVOY_METRICS_SAMPLE_TIME"` + QueryTimeout uint64 `json:"query_timeout" envconfig:"CONVOY_METRICS_QUERY_TIMEOUT"` // Timeout in seconds for metrics collection queries + MaterializedViewRefreshInterval uint64 `json:"materialized_view_refresh_interval" envconfig:"CONVOY_METRICS_MATERIALIZED_VIEW_REFRESH_INTERVAL"` } const ( diff --git a/config/config_test.go b/config/config_test.go index caf5c84e38..d5f51a4281 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -179,7 +179,9 @@ func TestLoadConfig(t *testing.T) { IsEnabled: false, Backend: "prometheus", Prometheus: PrometheusMetricsConfiguration{ - SampleTime: 5, + SampleTime: 5, + QueryTimeout: 30, + MaterializedViewRefreshInterval: 2, }, }, Dispatcher: DispatcherConfiguration{ @@ -276,7 +278,9 @@ func TestLoadConfig(t *testing.T) { IsEnabled: false, Backend: "prometheus", Prometheus: PrometheusMetricsConfiguration{ - SampleTime: 5, + SampleTime: 5, + QueryTimeout: 30, + MaterializedViewRefreshInterval: 2, }, }, Dispatcher: DispatcherConfiguration{ @@ -372,7 +376,9 @@ func TestLoadConfig(t *testing.T) { IsEnabled: false, Backend: "prometheus", Prometheus: PrometheusMetricsConfiguration{ - SampleTime: 5, + SampleTime: 5, + QueryTimeout: 30, + MaterializedViewRefreshInterval: 2, }, }, Dispatcher: DispatcherConfiguration{ diff --git a/convoy.json.example b/convoy.json.example index 737386253f..4f8ae37ed9 100644 --- a/convoy.json.example +++ b/convoy.json.example @@ -100,7 +100,9 @@ "metrics": { "metrics_backend": "prometheus", "prometheus_metrics": { - "sample_time": 10 + "sample_time": 5, + "query_timeout": 30, + "materialized_view_refresh_interval": 2 } } } diff --git a/database/postgres/event.go b/database/postgres/event.go index 79fbdea8cd..fde0ceb6dc 100644 --- a/database/postgres/event.go +++ b/database/postgres/event.go @@ -828,6 +828,12 @@ BEGIN is_duplicate_event, acknowledged_at, status, metadata FROM convoy.events; + -- Drop materialized views that depend on events table + DROP MATERIALIZED VIEW IF EXISTS convoy.event_queue_metrics_mv; + DROP MATERIALIZED VIEW IF EXISTS convoy.event_delivery_queue_metrics_mv; + DROP MATERIALIZED VIEW IF EXISTS convoy.event_queue_backlog_metrics_mv; + DROP MATERIALIZED VIEW IF EXISTS convoy.event_endpoint_backlog_metrics_mv; + -- Manage table renaming ALTER TABLE convoy.event_deliveries DROP CONSTRAINT IF EXISTS event_deliveries_event_id_fkey; ALTER TABLE convoy.events RENAME TO events_old; @@ -850,6 +856,119 @@ BEGIN BEFORE INSERT ON convoy.event_deliveries FOR EACH ROW EXECUTE FUNCTION convoy.enforce_event_fk(); + RAISE NOTICE 'Recreating materialized views...'; + -- Recreate materialized views that were dropped + CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_queue_metrics_mv AS + SELECT DISTINCT + project_id, + COALESCE(source_id, 'http') as source_id, + COUNT(*) as total + FROM convoy.events + GROUP BY project_id, source_id; + + CREATE UNIQUE INDEX IF NOT EXISTS idx_event_queue_metrics_mv_unique + ON convoy.event_queue_metrics_mv(project_id, source_id); + + CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_delivery_queue_metrics_mv AS + SELECT DISTINCT + ed.project_id, + COALESCE(p.name, '') as project_name, + ed.endpoint_id, + ed.status, + COALESCE(ed.event_type, '') as event_type, + COALESCE(e.source_id, 'http') as source_id, + COALESCE(p.organisation_id, '') as organisation_id, + COALESCE(o.name, '') as organisation_name, + COUNT(*) as total + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON ed.event_id = e.id + LEFT JOIN convoy.projects p ON ed.project_id = p.id + LEFT JOIN convoy.organisations o ON p.organisation_id = o.id + WHERE ed.deleted_at IS NULL + GROUP BY ed.project_id, p.name, ed.endpoint_id, ed.status, ed.event_type, e.source_id, p.organisation_id, o.name; + + CREATE UNIQUE INDEX IF NOT EXISTS idx_event_delivery_queue_metrics_mv_unique + ON convoy.event_delivery_queue_metrics_mv(project_id, endpoint_id, status, event_type, source_id, organisation_id); + + CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_queue_backlog_metrics_mv AS + WITH a1 AS ( + SELECT ed.project_id, + COALESCE(e.source_id, 'http') AS source_id, + EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Processing' + GROUP BY ed.project_id, e.source_id + ORDER BY age_seconds DESC, ed.project_id, e.source_id + LIMIT 1000 + ) + SELECT project_id, source_id, age_seconds + FROM ( + SELECT * FROM a1 + UNION ALL + SELECT ed.project_id, + COALESCE(e.source_id, 'http'), + 0 AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Success' + AND NOT EXISTS ( + SELECT 1 FROM a1 + WHERE a1.project_id = ed.project_id + AND a1.source_id = COALESCE(e.source_id, 'http') + ) + GROUP BY ed.project_id, e.source_id + ) AS combined + ORDER BY project_id, source_id + LIMIT 1000; + + CREATE UNIQUE INDEX IF NOT EXISTS idx_event_queue_backlog_metrics_mv_unique + ON convoy.event_queue_backlog_metrics_mv(project_id, source_id); + + CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_endpoint_backlog_metrics_mv AS + WITH a1 AS ( + SELECT ed.project_id, + COALESCE(e.source_id, 'http') AS source_id, + ed.endpoint_id, + EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Processing' + GROUP BY ed.project_id, e.source_id, ed.endpoint_id + ORDER BY age_seconds DESC, ed.project_id, e.source_id, ed.endpoint_id + LIMIT 1000 + ) + SELECT project_id, source_id, endpoint_id, age_seconds + FROM ( + SELECT * FROM a1 + UNION ALL + SELECT ed.project_id, + COALESCE(e.source_id, 'http'), + ed.endpoint_id, + 0 AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Success' + AND NOT EXISTS ( + SELECT 1 FROM a1 + WHERE a1.project_id = ed.project_id + AND a1.source_id = COALESCE(e.source_id, 'http') + AND a1.endpoint_id = ed.endpoint_id + ) + GROUP BY ed.project_id, e.source_id, ed.endpoint_id + ) AS combined + ORDER BY project_id, source_id, endpoint_id + LIMIT 1000; + + CREATE UNIQUE INDEX IF NOT EXISTS idx_event_endpoint_backlog_metrics_mv_unique + ON convoy.event_endpoint_backlog_metrics_mv(project_id, source_id, endpoint_id); + + -- Refresh materialized views + REFRESH MATERIALIZED VIEW convoy.event_queue_metrics_mv; + REFRESH MATERIALIZED VIEW convoy.event_delivery_queue_metrics_mv; + REFRESH MATERIALIZED VIEW convoy.event_queue_backlog_metrics_mv; + REFRESH MATERIALIZED VIEW convoy.event_endpoint_backlog_metrics_mv; + RAISE NOTICE 'Migration complete!'; END; $$ LANGUAGE plpgsql; @@ -893,6 +1012,13 @@ begin RAISE NOTICE 'Migrating data...'; insert into convoy.events_new select * from convoy.events; + + -- Drop materialized views that depend on events table + DROP MATERIALIZED VIEW IF EXISTS convoy.event_queue_metrics_mv; + DROP MATERIALIZED VIEW IF EXISTS convoy.event_delivery_queue_metrics_mv; + DROP MATERIALIZED VIEW IF EXISTS convoy.event_queue_backlog_metrics_mv; + DROP MATERIALIZED VIEW IF EXISTS convoy.event_endpoint_backlog_metrics_mv; + ALTER TABLE convoy.event_deliveries DROP CONSTRAINT if exists event_deliveries_event_id_fkey; ALTER TABLE convoy.event_deliveries ADD CONSTRAINT event_deliveries_event_id_fkey @@ -911,6 +1037,120 @@ begin CREATE INDEX idx_events_source_id ON convoy.events (source_id); CREATE INDEX idx_idempotency_key_key ON convoy.events (idempotency_key); CREATE INDEX idx_project_id_on_not_deleted ON convoy.events (project_id) WHERE deleted_at IS NULL; + + RAISE NOTICE 'Recreating materialized views...'; + -- Recreate materialized views that were dropped + CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_queue_metrics_mv AS + SELECT DISTINCT + project_id, + COALESCE(source_id, 'http') as source_id, + COUNT(*) as total + FROM convoy.events + GROUP BY project_id, source_id; + + CREATE UNIQUE INDEX IF NOT EXISTS idx_event_queue_metrics_mv_unique + ON convoy.event_queue_metrics_mv(project_id, source_id); + + CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_delivery_queue_metrics_mv AS + SELECT DISTINCT + ed.project_id, + COALESCE(p.name, '') as project_name, + ed.endpoint_id, + ed.status, + COALESCE(ed.event_type, '') as event_type, + COALESCE(e.source_id, 'http') as source_id, + COALESCE(p.organisation_id, '') as organisation_id, + COALESCE(o.name, '') as organisation_name, + COUNT(*) as total + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON ed.event_id = e.id + LEFT JOIN convoy.projects p ON ed.project_id = p.id + LEFT JOIN convoy.organisations o ON p.organisation_id = o.id + WHERE ed.deleted_at IS NULL + GROUP BY ed.project_id, p.name, ed.endpoint_id, ed.status, ed.event_type, e.source_id, p.organisation_id, o.name; + + CREATE UNIQUE INDEX IF NOT EXISTS idx_event_delivery_queue_metrics_mv_unique + ON convoy.event_delivery_queue_metrics_mv(project_id, endpoint_id, status, event_type, source_id, organisation_id); + + CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_queue_backlog_metrics_mv AS + WITH a1 AS ( + SELECT ed.project_id, + COALESCE(e.source_id, 'http') AS source_id, + EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Processing' + GROUP BY ed.project_id, e.source_id + ORDER BY age_seconds DESC, ed.project_id, e.source_id + LIMIT 1000 + ) + SELECT project_id, source_id, age_seconds + FROM ( + SELECT * FROM a1 + UNION ALL + SELECT ed.project_id, + COALESCE(e.source_id, 'http'), + 0 AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Success' + AND NOT EXISTS ( + SELECT 1 FROM a1 + WHERE a1.project_id = ed.project_id + AND a1.source_id = COALESCE(e.source_id, 'http') + ) + GROUP BY ed.project_id, e.source_id + ) AS combined + ORDER BY project_id, source_id + LIMIT 1000; + + CREATE UNIQUE INDEX IF NOT EXISTS idx_event_queue_backlog_metrics_mv_unique + ON convoy.event_queue_backlog_metrics_mv(project_id, source_id); + + CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_endpoint_backlog_metrics_mv AS + WITH a1 AS ( + SELECT ed.project_id, + COALESCE(e.source_id, 'http') AS source_id, + ed.endpoint_id, + EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Processing' + GROUP BY ed.project_id, e.source_id, ed.endpoint_id + ORDER BY age_seconds DESC, ed.project_id, e.source_id, ed.endpoint_id + LIMIT 1000 + ) + SELECT project_id, source_id, endpoint_id, age_seconds + FROM ( + SELECT * FROM a1 + UNION ALL + SELECT ed.project_id, + COALESCE(e.source_id, 'http'), + ed.endpoint_id, + 0 AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Success' + AND NOT EXISTS ( + SELECT 1 FROM a1 + WHERE a1.project_id = ed.project_id + AND a1.source_id = COALESCE(e.source_id, 'http') + AND a1.endpoint_id = ed.endpoint_id + ) + GROUP BY ed.project_id, e.source_id, ed.endpoint_id + ) AS combined + ORDER BY project_id, source_id, endpoint_id + LIMIT 1000; + + CREATE UNIQUE INDEX IF NOT EXISTS idx_event_endpoint_backlog_metrics_mv_unique + ON convoy.event_endpoint_backlog_metrics_mv(project_id, source_id, endpoint_id); + + -- Refresh materialized views + REFRESH MATERIALIZED VIEW convoy.event_queue_metrics_mv; + REFRESH MATERIALIZED VIEW convoy.event_delivery_queue_metrics_mv; + REFRESH MATERIALIZED VIEW convoy.event_queue_backlog_metrics_mv; + REFRESH MATERIALIZED VIEW convoy.event_endpoint_backlog_metrics_mv; + RAISE NOTICE 'Successfully un-partitioned events table...'; end $$ language plpgsql; select convoy.un_partition_events_table() diff --git a/database/postgres/event_delivery.go b/database/postgres/event_delivery.go index 24eb277a39..f9efcafa2c 100644 --- a/database/postgres/event_delivery.go +++ b/database/postgres/event_delivery.go @@ -1199,6 +1199,11 @@ BEGIN latency_seconds, COALESCE(delivery_mode, 'at_least_once')::convoy.delivery_mode FROM convoy.event_deliveries; + -- Drop materialized views that depend on event_deliveries table + DROP MATERIALIZED VIEW IF EXISTS convoy.event_delivery_queue_metrics_mv; + DROP MATERIALIZED VIEW IF EXISTS convoy.event_queue_backlog_metrics_mv; + DROP MATERIALIZED VIEW IF EXISTS convoy.event_endpoint_backlog_metrics_mv; + -- Manage table renaming ALTER TABLE convoy.delivery_attempts DROP CONSTRAINT IF EXISTS delivery_attempts_event_delivery_id_fkey; ALTER TABLE convoy.event_deliveries RENAME TO event_deliveries_old; @@ -1224,6 +1229,107 @@ BEGIN BEFORE INSERT ON convoy.delivery_attempts FOR EACH ROW EXECUTE FUNCTION enforce_event_delivery_fk(); + RAISE NOTICE 'Recreating materialized views...'; + -- Recreate materialized views that were dropped + CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_delivery_queue_metrics_mv AS + SELECT DISTINCT + ed.project_id, + COALESCE(p.name, '') as project_name, + ed.endpoint_id, + ed.status, + COALESCE(ed.event_type, '') as event_type, + COALESCE(e.source_id, 'http') as source_id, + COALESCE(p.organisation_id, '') as organisation_id, + COALESCE(o.name, '') as organisation_name, + COUNT(*) as total + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON ed.event_id = e.id + LEFT JOIN convoy.projects p ON ed.project_id = p.id + LEFT JOIN convoy.organisations o ON p.organisation_id = o.id + WHERE ed.deleted_at IS NULL + GROUP BY ed.project_id, p.name, ed.endpoint_id, ed.status, ed.event_type, e.source_id, p.organisation_id, o.name; + + CREATE UNIQUE INDEX IF NOT EXISTS idx_event_delivery_queue_metrics_mv_unique + ON convoy.event_delivery_queue_metrics_mv(project_id, endpoint_id, status, event_type, source_id, organisation_id); + + CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_queue_backlog_metrics_mv AS + WITH a1 AS ( + SELECT ed.project_id, + COALESCE(e.source_id, 'http') AS source_id, + EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Processing' + GROUP BY ed.project_id, e.source_id + ORDER BY age_seconds DESC, ed.project_id, e.source_id + LIMIT 1000 + ) + SELECT project_id, source_id, age_seconds + FROM ( + SELECT * FROM a1 + UNION ALL + SELECT ed.project_id, + COALESCE(e.source_id, 'http'), + 0 AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Success' + AND NOT EXISTS ( + SELECT 1 FROM a1 + WHERE a1.project_id = ed.project_id + AND a1.source_id = COALESCE(e.source_id, 'http') + ) + GROUP BY ed.project_id, e.source_id + ) AS combined + ORDER BY project_id, source_id + LIMIT 1000; + + CREATE UNIQUE INDEX IF NOT EXISTS idx_event_queue_backlog_metrics_mv_unique + ON convoy.event_queue_backlog_metrics_mv(project_id, source_id); + + CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_endpoint_backlog_metrics_mv AS + WITH a1 AS ( + SELECT ed.project_id, + COALESCE(e.source_id, 'http') AS source_id, + ed.endpoint_id, + EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Processing' + GROUP BY ed.project_id, e.source_id, ed.endpoint_id + ORDER BY age_seconds DESC, ed.project_id, e.source_id, ed.endpoint_id + LIMIT 1000 + ) + SELECT project_id, source_id, endpoint_id, age_seconds + FROM ( + SELECT * FROM a1 + UNION ALL + SELECT ed.project_id, + COALESCE(e.source_id, 'http'), + ed.endpoint_id, + 0 AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Success' + AND NOT EXISTS ( + SELECT 1 FROM a1 + WHERE a1.project_id = ed.project_id + AND a1.source_id = COALESCE(e.source_id, 'http') + AND a1.endpoint_id = ed.endpoint_id + ) + GROUP BY ed.project_id, e.source_id, ed.endpoint_id + ) AS combined + ORDER BY project_id, source_id, endpoint_id + LIMIT 1000; + + CREATE UNIQUE INDEX IF NOT EXISTS idx_event_endpoint_backlog_metrics_mv_unique + ON convoy.event_endpoint_backlog_metrics_mv(project_id, source_id, endpoint_id); + + -- Refresh materialized views + REFRESH MATERIALIZED VIEW convoy.event_delivery_queue_metrics_mv; + REFRESH MATERIALIZED VIEW convoy.event_queue_backlog_metrics_mv; + REFRESH MATERIALIZED VIEW convoy.event_endpoint_backlog_metrics_mv; + RAISE NOTICE 'Migration complete!'; END; $$ LANGUAGE plpgsql; @@ -1276,6 +1382,11 @@ begin latency_seconds, COALESCE(delivery_mode, 'at_least_once')::convoy.delivery_mode FROM convoy.event_deliveries; + -- Drop materialized views that depend on event_deliveries table + DROP MATERIALIZED VIEW IF EXISTS convoy.event_delivery_queue_metrics_mv; + DROP MATERIALIZED VIEW IF EXISTS convoy.event_queue_backlog_metrics_mv; + DROP MATERIALIZED VIEW IF EXISTS convoy.event_endpoint_backlog_metrics_mv; + ALTER TABLE convoy.delivery_attempts DROP CONSTRAINT if exists delivery_attempts_event_delivery_id_fkey; ALTER TABLE convoy.delivery_attempts ADD CONSTRAINT delivery_attempts_event_delivery_id_fkey @@ -1299,6 +1410,107 @@ begin create index idx_event_deliveries_status on convoy.event_deliveries (status); create index idx_event_deliveries_status_key on convoy.event_deliveries (status); + RAISE NOTICE 'Recreating materialized views...'; + -- Recreate materialized views that were dropped + CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_delivery_queue_metrics_mv AS + SELECT DISTINCT + ed.project_id, + COALESCE(p.name, '') as project_name, + ed.endpoint_id, + ed.status, + COALESCE(ed.event_type, '') as event_type, + COALESCE(e.source_id, 'http') as source_id, + COALESCE(p.organisation_id, '') as organisation_id, + COALESCE(o.name, '') as organisation_name, + COUNT(*) as total + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON ed.event_id = e.id + LEFT JOIN convoy.projects p ON ed.project_id = p.id + LEFT JOIN convoy.organisations o ON p.organisation_id = o.id + WHERE ed.deleted_at IS NULL + GROUP BY ed.project_id, p.name, ed.endpoint_id, ed.status, ed.event_type, e.source_id, p.organisation_id, o.name; + + CREATE UNIQUE INDEX IF NOT EXISTS idx_event_delivery_queue_metrics_mv_unique + ON convoy.event_delivery_queue_metrics_mv(project_id, endpoint_id, status, event_type, source_id, organisation_id); + + CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_queue_backlog_metrics_mv AS + WITH a1 AS ( + SELECT ed.project_id, + COALESCE(e.source_id, 'http') AS source_id, + EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Processing' + GROUP BY ed.project_id, e.source_id + ORDER BY age_seconds DESC, ed.project_id, e.source_id + LIMIT 1000 + ) + SELECT project_id, source_id, age_seconds + FROM ( + SELECT * FROM a1 + UNION ALL + SELECT ed.project_id, + COALESCE(e.source_id, 'http'), + 0 AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Success' + AND NOT EXISTS ( + SELECT 1 FROM a1 + WHERE a1.project_id = ed.project_id + AND a1.source_id = COALESCE(e.source_id, 'http') + ) + GROUP BY ed.project_id, e.source_id + ) AS combined + ORDER BY project_id, source_id + LIMIT 1000; + + CREATE UNIQUE INDEX IF NOT EXISTS idx_event_queue_backlog_metrics_mv_unique + ON convoy.event_queue_backlog_metrics_mv(project_id, source_id); + + CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_endpoint_backlog_metrics_mv AS + WITH a1 AS ( + SELECT ed.project_id, + COALESCE(e.source_id, 'http') AS source_id, + ed.endpoint_id, + EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Processing' + GROUP BY ed.project_id, e.source_id, ed.endpoint_id + ORDER BY age_seconds DESC, ed.project_id, e.source_id, ed.endpoint_id + LIMIT 1000 + ) + SELECT project_id, source_id, endpoint_id, age_seconds + FROM ( + SELECT * FROM a1 + UNION ALL + SELECT ed.project_id, + COALESCE(e.source_id, 'http'), + ed.endpoint_id, + 0 AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Success' + AND NOT EXISTS ( + SELECT 1 FROM a1 + WHERE a1.project_id = ed.project_id + AND a1.source_id = COALESCE(e.source_id, 'http') + AND a1.endpoint_id = ed.endpoint_id + ) + GROUP BY ed.project_id, e.source_id, ed.endpoint_id + ) AS combined + ORDER BY project_id, source_id, endpoint_id + LIMIT 1000; + + CREATE UNIQUE INDEX IF NOT EXISTS idx_event_endpoint_backlog_metrics_mv_unique + ON convoy.event_endpoint_backlog_metrics_mv(project_id, source_id, endpoint_id); + + -- Refresh materialized views + REFRESH MATERIALIZED VIEW convoy.event_delivery_queue_metrics_mv; + REFRESH MATERIALIZED VIEW convoy.event_queue_backlog_metrics_mv; + REFRESH MATERIALIZED VIEW convoy.event_endpoint_backlog_metrics_mv; + RAISE NOTICE 'Successfully un-partitioned events table...'; end $$ language plpgsql; select convoy.un_partition_event_deliveries_table() diff --git a/database/postgres/postgres.go b/database/postgres/postgres.go index 5d5e34ae23..dde9a21d57 100644 --- a/database/postgres/postgres.go +++ b/database/postgres/postgres.go @@ -85,9 +85,19 @@ func parseDBConfig(dbConfig config.DatabaseConfiguration, src ...string) (*Postg return nil, fmt.Errorf("failed to create %sconnection pool: %w", src, err) } - if dbConfig.SetMaxOpenConnections > 0 { - pgxCfg.MaxConns = int32(dbConfig.SetMaxOpenConnections) + // Set MaxConns - use configured value or default to 100 if not set + // This prevents unlimited connections when SetMaxOpenConnections is 0 + maxConns := dbConfig.SetMaxOpenConnections + if maxConns <= 0 { + maxConns = 100 + log.Warnf("[%s]: SetMaxOpenConnections not set or 0, using default: %d. Set CONVOY_DB_MAX_OPEN_CONN to override.", pkgName, maxConns) } + pgxCfg.MaxConns = int32(maxConns) + + if dbConfig.SetMaxIdleConnections > 0 { + pgxCfg.MaxConnIdleTime = time.Minute * 5 + } + pgxCfg.MaxConnLifetime = time.Second * time.Duration(dbConfig.SetConnMaxLifetime) pgxCfg.ConnConfig.Tracer = otelpgx.NewTracer(otelpgx.WithTrimSQLInSpanName()) @@ -100,6 +110,18 @@ func parseDBConfig(dbConfig config.DatabaseConfiguration, src ...string) (*Postg sqlDB := stdlib.OpenDBFromPool(pool) db := sqlx.NewDb(sqlDB, "pgx") + maxOpenConns := maxConns + maxIdleConns := dbConfig.SetMaxIdleConnections + if maxIdleConns <= 0 { + maxIdleConns = maxOpenConns / 4 + if maxIdleConns < 2 { + maxIdleConns = 2 + } + } + db.SetMaxOpenConns(maxOpenConns) + db.SetMaxIdleConns(maxIdleConns) + db.SetConnMaxLifetime(time.Second * time.Duration(dbConfig.SetConnMaxLifetime)) + return &Postgres{dbx: db, pool: pool, conn: pool}, nil } diff --git a/database/postgres/postgres_collector.go b/database/postgres/postgres_collector.go index 5a3bdf20e0..f296fa2b17 100644 --- a/database/postgres/postgres_collector.go +++ b/database/postgres/postgres_collector.go @@ -141,9 +141,8 @@ func (p *Postgres) Collect(ch chan<- prometheus.Metric) { // Return empty metrics to prevent blocking the endpoint metrics = &Metrics{} } - } else { - cachedMetrics = metrics } + cachedMetrics = metrics } // Use unique keys per metric type to prevent collisions @@ -244,12 +243,16 @@ func (p *Postgres) Collect(ch chan<- prometheus.Metric) { // collectMetrics gathers essential metrics from the DB func (p *Postgres) collectMetrics() (*Metrics, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(metricsConfig.Prometheus.SampleTime)*time.Second) + queryTimeout := time.Duration(metricsConfig.Prometheus.QueryTimeout) * time.Second + if queryTimeout == 0 { + queryTimeout = 30 * time.Second + } + ctx, cancel := context.WithTimeout(context.Background(), queryTimeout) defer cancel() metrics := &Metrics{} - queryEventQueueMetrics := "select distinct project_id, coalesce(source_id, 'http') as source_id, count(*) as total from convoy.events group by project_id, source_id" + queryEventQueueMetrics := "SELECT project_id, source_id, total FROM convoy.event_queue_metrics_mv" rows, err := p.GetDB().QueryxContext(ctx, queryEventQueueMetrics) if err != nil { return nil, fmt.Errorf("failed to query event queue metrics: %w", err) @@ -266,27 +269,7 @@ func (p *Postgres) collectMetrics() (*Metrics, error) { } metrics.EventQueueMetrics = eventQueueMetrics - backlogQM := `WITH a1 AS ( - SELECT ed.project_id, - COALESCE(e.source_id, 'http') AS source_id, - EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds - FROM convoy.event_deliveries ed - LEFT JOIN convoy.events e ON e.id = ed.event_id - WHERE ed.status = 'Processing' - GROUP BY ed.project_id, e.source_id - LIMIT 1000 -- samples - ) - SELECT * FROM a1 - UNION ALL - SELECT ed.project_id, - COALESCE(e.source_id, 'http'), - 0 AS age_seconds - FROM convoy.event_deliveries ed - LEFT JOIN convoy.events e ON e.id = ed.event_id - LEFT JOIN a1 ON e.source_id = a1.source_id - WHERE ed.status = 'Success' AND a1.source_id IS NULL - GROUP BY ed.project_id, e.source_id - LIMIT 1000; -- samples` + backlogQM := "SELECT project_id, source_id, age_seconds FROM convoy.event_queue_backlog_metrics_mv" rows1, err := p.GetDB().QueryxContext(ctx, backlogQM) if err != nil { return nil, fmt.Errorf("failed to query backlog metrics: %w", err) @@ -303,22 +286,17 @@ func (p *Postgres) collectMetrics() (*Metrics, error) { } metrics.EventQueueBacklogMetrics = eventQueueBacklogMetrics - queryDeliveryQ := `SELECT DISTINCT - ed.project_id, - COALESCE(p.name, '') as project_name, - ed.endpoint_id, - ed.status, - COALESCE(ed.event_type, '') as event_type, - COALESCE(e.source_id, 'http') as source_id, - COALESCE(p.organisation_id, '') as organisation_id, - COALESCE(o.name, '') as organisation_name, - COUNT(*) as total - FROM convoy.event_deliveries ed - LEFT JOIN convoy.events e ON ed.event_id = e.id - LEFT JOIN convoy.projects p ON ed.project_id = p.id - LEFT JOIN convoy.organisations o ON p.organisation_id = o.id - WHERE ed.deleted_at IS NULL - GROUP BY ed.project_id, p.name, ed.endpoint_id, ed.status, ed.event_type, e.source_id, p.organisation_id, o.name` + queryDeliveryQ := `SELECT + project_id, + project_name, + endpoint_id, + status, + event_type, + source_id, + organisation_id, + organisation_name, + total + FROM convoy.event_delivery_queue_metrics_mv` rows2, err := p.GetDB().QueryxContext(ctx, queryDeliveryQ) if err != nil { return nil, fmt.Errorf("failed to query delivery queue metrics: %w", err) @@ -335,29 +313,7 @@ func (p *Postgres) collectMetrics() (*Metrics, error) { } metrics.EventDeliveryQueueMetrics = eventDeliveryQueueMetrics - backlogEQM := `WITH a1 AS ( - SELECT ed.project_id, - COALESCE(e.source_id, 'http') AS source_id, - ed.endpoint_id, - EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds - FROM convoy.event_deliveries ed - LEFT JOIN convoy.events e ON e.id = ed.event_id - WHERE ed.status = 'Processing' - GROUP BY ed.project_id, e.source_id, ed.endpoint_id - LIMIT 1000 -- samples - ) - SELECT * FROM a1 - UNION ALL - SELECT ed.project_id, - COALESCE(e.source_id, 'http'), - ed.endpoint_id, - 0 AS age_seconds - FROM convoy.event_deliveries ed - LEFT JOIN convoy.events e ON e.id = ed.event_id - LEFT JOIN a1 ON ed.endpoint_id = a1.endpoint_id - WHERE ed.status = 'Success' AND a1.endpoint_id IS NULL - GROUP BY ed.project_id, e.source_id, ed.endpoint_id - LIMIT 1000; -- samples` + backlogEQM := "SELECT project_id, source_id, endpoint_id, age_seconds FROM convoy.event_endpoint_backlog_metrics_mv" rows3, err := p.GetDB().QueryxContext(ctx, backlogEQM) if err != nil { return nil, fmt.Errorf("failed to query endpoint backlog metrics: %w", err) diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index f31fb1da02..06f1dd5aa9 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -44,7 +44,7 @@ services: build: context: ./ dockerfile: Dockerfile.dev - entrypoint: ["./cmd", "agent", "--config", "convoy.json"] + entrypoint: ["./cmd", "agent"] volumes: - ./convoy.json:/convoy.json restart: on-failure diff --git a/go.mod b/go.mod index ecd43f0260..0c229578c5 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,6 @@ require ( github.com/go-redis/redis_rate/v10 v10.0.1 github.com/go-redsync/redsync/v4 v4.13.0 github.com/golang-jwt/jwt/v5 v5.2.2 - github.com/gorilla/websocket v1.5.3 github.com/grafana/pyroscope-go v1.2.2 github.com/hashicorp/vault/api v1.21.0 github.com/hibiken/asynq v0.24.1 @@ -129,6 +128,7 @@ require ( github.com/google/go-querystring v1.1.0 // indirect github.com/google/pprof v0.0.0-20250422154841-e1f9c1950416 // indirect github.com/google/s2a-go v0.1.9 // indirect + github.com/gorilla/websocket v1.5.3 // indirect github.com/grafana/pyroscope-go/godeltaprof v0.1.8 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect diff --git a/mise.toml b/mise.toml index f78224b7c3..661578751b 100644 --- a/mise.toml +++ b/mise.toml @@ -8,7 +8,7 @@ go = "1.25.3" golangci-lint = "2.4.0" nodejs = "24.10.0" -postgres = "15.1" +# postgres = "15.1" "asdf:jonathanmorley/asdf-pre-commit" = "latest" "aqua:pvolok/mprocs" = "0.7.2" @@ -17,6 +17,7 @@ postgres = "15.1" "go:github.com/swaggo/swag/cmd/swag" = "latest" "go:go.uber.org/mock/mockgen" = "latest" "go:github.com/sqlc-dev/sqlc/cmd/sqlc" = "latest" +"go:github.com/daixiang0/gci" = "latest" # ℹ️ The settings below are sensible defaults for local development. When # deploying to production, you will want to configure these more carefully. diff --git a/sql/1767658336.sql b/sql/1767658336.sql new file mode 100644 index 0000000000..c70bbd709f --- /dev/null +++ b/sql/1767658336.sql @@ -0,0 +1,156 @@ +-- +migrate Up + +-- Create indexes on event_deliveries to optimize materialized view queries +CREATE INDEX IF NOT EXISTS idx_event_deliveries_status_processing_project_source_created +ON convoy.event_deliveries (status, project_id, event_id, created_at) +WHERE status = 'Processing'; + +CREATE INDEX IF NOT EXISTS idx_event_deliveries_status_processing_project_source_endpoint_created +ON convoy.event_deliveries (status, project_id, endpoint_id, event_id, created_at) +WHERE status = 'Processing'; + +CREATE INDEX IF NOT EXISTS idx_event_deliveries_status_success_project_source +ON convoy.event_deliveries (status, project_id, event_id) +WHERE status = 'Success'; + +-- Index for Success status with endpoint_id +CREATE INDEX IF NOT EXISTS idx_event_deliveries_status_success_project_source_endpoint +ON convoy.event_deliveries (status, project_id, endpoint_id, event_id) +WHERE status = 'Success'; + +-- Materialized view for event queue metrics +CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_queue_metrics_mv AS +SELECT DISTINCT + project_id, + COALESCE(source_id, 'http') as source_id, + COUNT(*) as total +FROM convoy.events +GROUP BY project_id, source_id; + +-- Create unique index for concurrent refresh and fast lookups +CREATE UNIQUE INDEX IF NOT EXISTS idx_event_queue_metrics_mv_unique +ON convoy.event_queue_metrics_mv(project_id, source_id); + +-- Materialized view for event delivery queue metrics +CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_delivery_queue_metrics_mv AS +SELECT DISTINCT + ed.project_id, + COALESCE(p.name, '') as project_name, + ed.endpoint_id, + ed.status, + COALESCE(ed.event_type, '') as event_type, + COALESCE(e.source_id, 'http') as source_id, + COALESCE(p.organisation_id, '') as organisation_id, + COALESCE(o.name, '') as organisation_name, + COUNT(*) as total +FROM convoy.event_deliveries ed +LEFT JOIN convoy.events e ON ed.event_id = e.id +LEFT JOIN convoy.projects p ON ed.project_id = p.id +LEFT JOIN convoy.organisations o ON p.organisation_id = o.id +WHERE ed.deleted_at IS NULL +GROUP BY ed.project_id, p.name, ed.endpoint_id, ed.status, ed.event_type, e.source_id, p.organisation_id, o.name; + +-- Create unique index for concurrent refresh +CREATE UNIQUE INDEX IF NOT EXISTS idx_event_delivery_queue_metrics_mv_unique +ON convoy.event_delivery_queue_metrics_mv(project_id, endpoint_id, status, event_type, source_id, organisation_id); + +-- Materialized view for event queue backlog metrics +CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_queue_backlog_metrics_mv AS +WITH a1 AS ( + SELECT ed.project_id, + COALESCE(e.source_id, 'http') AS source_id, + EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Processing' + GROUP BY ed.project_id, e.source_id + ORDER BY age_seconds DESC, ed.project_id, e.source_id + LIMIT 1000 -- samples +) +SELECT project_id, source_id, age_seconds +FROM ( + SELECT * FROM a1 + UNION ALL + SELECT ed.project_id, + COALESCE(e.source_id, 'http'), + 0 AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Success' + AND NOT EXISTS ( + SELECT 1 FROM a1 + WHERE a1.project_id = ed.project_id + AND a1.source_id = COALESCE(e.source_id, 'http') + ) + GROUP BY ed.project_id, e.source_id +) AS combined +ORDER BY project_id, source_id +LIMIT 1000; -- samples + +-- Create unique index for concurrent refresh +CREATE UNIQUE INDEX IF NOT EXISTS idx_event_queue_backlog_metrics_mv_unique +ON convoy.event_queue_backlog_metrics_mv(project_id, source_id); + +-- Materialized view for endpoint backlog metrics +CREATE MATERIALIZED VIEW IF NOT EXISTS convoy.event_endpoint_backlog_metrics_mv AS +WITH a1 AS ( + SELECT ed.project_id, + COALESCE(e.source_id, 'http') AS source_id, + ed.endpoint_id, + EXTRACT(EPOCH FROM (NOW() - MIN(ed.created_at))) AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Processing' + GROUP BY ed.project_id, e.source_id, ed.endpoint_id + ORDER BY age_seconds DESC, ed.project_id, e.source_id, ed.endpoint_id + LIMIT 1000 -- samples +) +SELECT project_id, source_id, endpoint_id, age_seconds +FROM ( + SELECT * FROM a1 + UNION ALL + SELECT ed.project_id, + COALESCE(e.source_id, 'http'), + ed.endpoint_id, + 0 AS age_seconds + FROM convoy.event_deliveries ed + LEFT JOIN convoy.events e ON e.id = ed.event_id + WHERE ed.status = 'Success' + AND NOT EXISTS ( + SELECT 1 FROM a1 + WHERE a1.project_id = ed.project_id + AND a1.source_id = COALESCE(e.source_id, 'http') + AND a1.endpoint_id = ed.endpoint_id + ) + GROUP BY ed.project_id, e.source_id, ed.endpoint_id +) AS combined +ORDER BY project_id, source_id, endpoint_id +LIMIT 1000; -- samples + +-- Create unique index for concurrent refresh +CREATE UNIQUE INDEX IF NOT EXISTS idx_event_endpoint_backlog_metrics_mv_unique +ON convoy.event_endpoint_backlog_metrics_mv(project_id, source_id, endpoint_id); + +-- Initial refresh of all materialized views +REFRESH MATERIALIZED VIEW convoy.event_queue_metrics_mv; +REFRESH MATERIALIZED VIEW convoy.event_delivery_queue_metrics_mv; +REFRESH MATERIALIZED VIEW convoy.event_queue_backlog_metrics_mv; +REFRESH MATERIALIZED VIEW convoy.event_endpoint_backlog_metrics_mv; + +-- +migrate Down + +DROP INDEX IF EXISTS convoy.idx_event_endpoint_backlog_metrics_mv_unique; +DROP INDEX IF EXISTS convoy.idx_event_queue_backlog_metrics_mv_unique; +DROP INDEX IF EXISTS convoy.idx_event_delivery_queue_metrics_mv_unique; +DROP INDEX IF EXISTS convoy.idx_event_queue_metrics_mv_unique; + +DROP MATERIALIZED VIEW IF EXISTS convoy.event_endpoint_backlog_metrics_mv; +DROP MATERIALIZED VIEW IF EXISTS convoy.event_queue_backlog_metrics_mv; +DROP MATERIALIZED VIEW IF EXISTS convoy.event_delivery_queue_metrics_mv; +DROP MATERIALIZED VIEW IF EXISTS convoy.event_queue_metrics_mv; + +-- Drop indexes created for materialized view optimization +DROP INDEX IF EXISTS convoy.idx_event_deliveries_status_processing_project_source_created; +DROP INDEX IF EXISTS convoy.idx_event_deliveries_status_processing_project_source_endpoint_created; +DROP INDEX IF EXISTS convoy.idx_event_deliveries_status_success_project_source; +DROP INDEX IF EXISTS convoy.idx_event_deliveries_status_success_project_source_endpoint; diff --git a/type.go b/type.go index 9e5d9ebc30..7a91ec2d56 100644 --- a/type.go +++ b/type.go @@ -107,6 +107,7 @@ const ( DeleteArchivedTasksProcessor TaskName = "DeleteArchivedTasksProcessor" MatchEventSubscriptionsProcessor TaskName = "MatchEventSubscriptionsProcessor" BatchRetryProcessor TaskName = "BatchRetryProcessor" + RefreshMetricsMaterializedViews TaskName = "RefreshMetricsMaterializedViews" TokenCacheKey CacheKey = "tokens" ) diff --git a/worker/task/refresh_metrics_mv.go b/worker/task/refresh_metrics_mv.go new file mode 100644 index 0000000000..0d6d791502 --- /dev/null +++ b/worker/task/refresh_metrics_mv.go @@ -0,0 +1,81 @@ +package task + +import ( + "context" + "fmt" + "time" + + "github.com/go-redsync/redsync/v4" + "github.com/go-redsync/redsync/v4/redis/goredis/v9" + "github.com/hibiken/asynq" + + "github.com/frain-dev/convoy/database" + "github.com/frain-dev/convoy/internal/pkg/rdb" + "github.com/frain-dev/convoy/pkg/log" +) + +func RefreshMetricsMaterializedViews(db database.Database, rd *rdb.Redis) func(context.Context, *asynq.Task) error { + pool := goredis.NewPool(rd.Client()) + rs := redsync.New(pool) + + return func(ctx context.Context, t *asynq.Task) error { + const mutexName = "convoy:refresh_metrics_mv:mutex" + mutex := rs.NewMutex(mutexName, redsync.WithExpiry(25*time.Minute), redsync.WithTries(1)) + + lockCtx, cancel := context.WithTimeout(ctx, time.Second*2) + defer cancel() + + err := mutex.LockContext(lockCtx) + if err != nil { + return fmt.Errorf("failed to obtain lock: %v", err) + } + + defer func() { + unlockCtx, unlockCancel := context.WithTimeout(ctx, time.Second*2) + defer unlockCancel() + + ok, err := mutex.UnlockContext(unlockCtx) + if !ok || err != nil { + log.FromContext(ctx).WithError(err).Error("failed to release lock") + } + }() + + start := time.Now() + queries := []struct { + name string + sql string + }{ + { + name: "event_queue_metrics_mv", + sql: "REFRESH MATERIALIZED VIEW CONCURRENTLY convoy.event_queue_metrics_mv", + }, + { + name: "event_delivery_queue_metrics_mv", + sql: "REFRESH MATERIALIZED VIEW CONCURRENTLY convoy.event_delivery_queue_metrics_mv", + }, + { + name: "event_queue_backlog_metrics_mv", + sql: "REFRESH MATERIALIZED VIEW CONCURRENTLY convoy.event_queue_backlog_metrics_mv", + }, + { + name: "event_endpoint_backlog_metrics_mv", + sql: "REFRESH MATERIALIZED VIEW CONCURRENTLY convoy.event_endpoint_backlog_metrics_mv", + }, + } + + for _, q := range queries { + refreshCtx, refreshCancel := context.WithTimeout(ctx, 5*time.Minute) + _, err := db.GetDB().ExecContext(refreshCtx, q.sql) + refreshCancel() + if err != nil { + log.FromContext(ctx).WithError(err).Errorf("failed to refresh materialized view: %s", q.name) + // Continue with other views even if one fails + continue + } + log.FromContext(ctx).Infof("refreshed materialized view: %s", q.name) + } + + log.FromContext(ctx).Infof("refreshed all metrics materialized views in %v", time.Since(start)) + return nil + } +}