diff --git a/Cargo.lock b/Cargo.lock index b8fbf8549f..e52b40c023 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7386,6 +7386,7 @@ dependencies = [ "nats-multiplexer", "nats-multiplexer-client", "permissions", + "postcard", "pretty_assertions_sorted", "rebaser-client", "remain", @@ -8065,6 +8066,7 @@ version = "0.1.0" dependencies = [ "async-trait", "chrono", + "postcard", "postgres-types", "refinery", "remain", @@ -8076,8 +8078,10 @@ dependencies = [ "si-data-pg", "si-events", "si-id", + "si-layer-cache", "strum", "telemetry", + "telemetry-utils", "thiserror 2.0.12", "tokio", "ulid", diff --git a/bin/sdf/src/args.rs b/bin/sdf/src/args.rs index 9243c5d932..2856685103 100644 --- a/bin/sdf/src/args.rs +++ b/bin/sdf/src/args.rs @@ -164,7 +164,7 @@ pub(crate) struct Args { #[arg(long, env = "SI_BACKFILL_CACHE_TYPES")] pub(crate) backfill_cache_types: Option, - /// Key batch size for PostgreSQL queries during backfill + /// Batch size for PostgreSQL queries during backfill (layer cache and func runs) #[arg(long, default_value = "1000", env = "SI_BACKFILL_KEY_BATCH_SIZE")] pub(crate) backfill_key_batch_size: usize, @@ -180,6 +180,14 @@ pub(crate) struct Args { #[arg(long, default_value = "5", env = "SI_BACKFILL_MAX_CONCURRENT_UPLOADS")] pub(crate) backfill_max_concurrent_uploads: usize, + /// Cutoff ID for func runs backfill (optional, start from a specific func run ID) + #[arg(long, env = "SI_BACKFILL_FUNC_RUNS_CUTOFF_ID")] + pub(crate) backfill_func_runs_cutoff_id: Option, + + /// Cutoff ID for func run logs backfill (optional, start from a specific func run log ID) + #[arg(long, env = "SI_BACKFILL_FUNC_RUN_LOGS_CUTOFF_ID")] + pub(crate) backfill_func_run_logs_cutoff_id: Option, + /// Veritech encryption key file location [default: /run/sdf/veritech_encryption.key] #[arg(long)] pub(crate) veritech_encryption_key_path: Option, @@ -502,6 +510,17 @@ fn build_config_map(args: Args, config_map: &mut ConfigMap) -> &ConfigMap { i64::try_from(args.backfill_max_concurrent_uploads).unwrap_or(5), ); + if let Some(backfill_func_runs_cutoff_id) = args.backfill_func_runs_cutoff_id { + config_map.set("backfill_func_runs_cutoff_id", backfill_func_runs_cutoff_id); + } + + if let Some(backfill_func_run_logs_cutoff_id) = args.backfill_func_run_logs_cutoff_id { + config_map.set( + "backfill_func_run_logs_cutoff_id", + backfill_func_run_logs_cutoff_id, + ); + } + config_map.set("nats.connection_name", NAME); config_map.set("pg.application_name", NAME); config_map.set("layer_db_config.pg_pool_config.application_name", NAME); diff --git a/bin/sdf/src/main.rs b/bin/sdf/src/main.rs index 92dd8aa11a..0aee14c5b3 100644 --- a/bin/sdf/src/main.rs +++ b/bin/sdf/src/main.rs @@ -9,6 +9,7 @@ use innit_client::InnitClient; use sdf_server::{ BackfillConfig, Config, + FuncRunsBackfiller, LayerCacheBackfiller, Migrator, Server, @@ -156,6 +157,18 @@ async fn async_main() -> Result<()> { telemetry_shutdown, ) .await + } else if config.migration_mode().is_backfill_func_runs() { + backfill_func_runs( + config, + main_tracker, + main_token, + helping_tasks_tracker, + helping_tasks_token, + telemetry_tracker, + telemetry_token, + telemetry_shutdown, + ) + .await } else { run_server( config, @@ -395,3 +408,47 @@ async fn generate_symmetric_key( .await .map_err(Into::into) } + +#[inline] +#[allow(clippy::too_many_arguments)] +async fn backfill_func_runs( + config: Config, + main_tracker: TaskTracker, + main_token: CancellationToken, + helping_tasks_tracker: TaskTracker, + helping_tasks_token: CancellationToken, + telemetry_tracker: TaskTracker, + telemetry_token: CancellationToken, + telemetry_shutdown: TelemetryShutdownGuard, +) -> Result<()> { + // Extract parameters before config is moved + let func_run_cutoff_id = config + .backfill_func_runs_cutoff_id() + .and_then(|id_str| id_str.parse().ok()); + let func_run_log_cutoff_id = config + .backfill_func_run_logs_cutoff_id() + .and_then(|id_str| id_str.parse().ok()); + let batch_size = config.backfill_key_batch_size() as i64; + + let handle = main_tracker.spawn( + FuncRunsBackfiller::upload_all_func_runs_and_logs_concurrently( + config, + helping_tasks_tracker.clone(), + helping_tasks_token.clone(), + main_token.clone(), + func_run_cutoff_id, + func_run_log_cutoff_id, + batch_size, + ), + ); + + shutdown::graceful_with_handle(handle) + .group(main_tracker, main_token) + .group(helping_tasks_tracker, helping_tasks_token) + .group(telemetry_tracker, telemetry_token) + .telemetry_guard(telemetry_shutdown.into_future()) + .timeout(GRACEFUL_SHUTDOWN_TIMEOUT) + .wait() + .await + .map_err(Into::into) +} diff --git a/dev/config/grafana/provisioning/dashboards/func-runs-migration.json b/dev/config/grafana/provisioning/dashboards/func-runs-migration.json new file mode 100644 index 0000000000..22f05cd407 --- /dev/null +++ b/dev/config/grafana/provisioning/dashboards/func-runs-migration.json @@ -0,0 +1,1092 @@ +{ + "id": null, + "uid": "func-runs-migration", + "title": "Func Runs Migration", + "tags": [ + "funcruns", + "migration", + "layerdb", + "sidb" + ], + "timezone": "browser", + "schemaVersion": 39, + "version": 0, + "refresh": "10s", + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "templating": { + "list": [ + { + "name": "datasource", + "type": "datasource", + "label": "Data Source", + "query": "prometheus", + "refresh": 1, + "hide": 0 + }, + { + "name": "service", + "type": "query", + "label": "Service", + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "query": "label_values(func_runs_backfill_runs_items_processed_total, exported_job)", + "refresh": 2, + "sort": 1, + "multi": true, + "includeAll": true, + "allValue": ".*" + } + ] + }, + "panels": [ + { + "id": 1, + "type": "row", + "title": "Backfill Progress", + "gridPos": { + "x": 0, + "y": 0, + "w": 24, + "h": 1 + }, + "collapsed": false + }, + { + "id": 2, + "type": "stat", + "title": "Func Runs - Total Items Processed", + "interval": "1m", + "gridPos": { + "x": 0, + "y": 1, + "w": 6, + "h": 4 + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(func_runs_backfill_runs_items_processed_total{exported_job=~\"${service}\"})", + "refId": "A", + "description": "Total number of func runs processed" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "value": null, + "color": "blue" + } + ] + } + } + }, + "options": { + "graphMode": "area", + "colorMode": "value", + "orientation": "auto", + "textMode": "value_and_name", + "reduceOptions": { + "values": false, + "calcs": [ + "lastNotNull" + ] + } + } + }, + { + "id": 3, + "type": "stat", + "title": "Func Runs - Total Items Uploaded", + "interval": "1m", + "gridPos": { + "x": 6, + "y": 1, + "w": 6, + "h": 4 + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(func_runs_backfill_runs_items_uploaded_total{exported_job=~\"${service}\"})", + "refId": "A", + "description": "Total number of func runs uploaded to si-db" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "value": null, + "color": "green" + } + ] + } + } + }, + "options": { + "graphMode": "area", + "colorMode": "value", + "orientation": "auto", + "textMode": "value_and_name", + "reduceOptions": { + "values": false, + "calcs": [ + "lastNotNull" + ] + } + } + }, + { + "id": 4, + "type": "stat", + "title": "Func Runs - Total Items Skipped", + "interval": "1m", + "gridPos": { + "x": 12, + "y": 1, + "w": 6, + "h": 4 + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(func_runs_backfill_runs_items_skipped_total{exported_job=~\"${service}\"})", + "refId": "A", + "description": "Total number of func runs skipped (already in si-db)" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "value": null, + "color": "yellow" + } + ] + } + } + }, + "options": { + "graphMode": "area", + "colorMode": "value", + "orientation": "auto", + "textMode": "value_and_name", + "reduceOptions": { + "values": false, + "calcs": [ + "lastNotNull" + ] + } + } + }, + { + "id": 5, + "type": "gauge", + "title": "Func Runs - Migration Progress", + "interval": "1m", + "gridPos": { + "x": 18, + "y": 1, + "w": 6, + "h": 4 + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(func_runs_backfill_runs_items_uploaded_total{exported_job=~\"${service}\"}) / (sum(func_runs_backfill_runs_items_uploaded_total{exported_job=~\"${service}\"}) + sum(func_runs_backfill_runs_items_skipped_total{exported_job=~\"${service}\"})) * 100", + "refId": "A", + "description": "Percentage of func runs migrated (uploaded / total)" + } + ], + "fieldConfig": { + "defaults": { + "unit": "percent", + "min": 0, + "max": 100, + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "value": null, + "color": "red" + }, + { + "value": 50, + "color": "yellow" + }, + { + "value": 90, + "color": "green" + } + ] + } + } + }, + "options": { + "showThresholdLabels": false, + "showThresholdMarkers": true + } + }, + { + "id": 6, + "type": "stat", + "title": "Func Run Logs - Total Items Processed", + "interval": "1m", + "gridPos": { + "x": 0, + "y": 5, + "w": 6, + "h": 4 + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(func_run_logs_backfill_logs_items_processed_total{exported_job=~\"${service}\"})", + "refId": "A", + "description": "Total number of func run logs processed" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "value": null, + "color": "blue" + } + ] + } + } + }, + "options": { + "graphMode": "area", + "colorMode": "value", + "orientation": "auto", + "textMode": "value_and_name", + "reduceOptions": { + "values": false, + "calcs": [ + "lastNotNull" + ] + } + } + }, + { + "id": 7, + "type": "stat", + "title": "Func Run Logs - Total Items Uploaded", + "interval": "1m", + "gridPos": { + "x": 6, + "y": 5, + "w": 6, + "h": 4 + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(func_run_logs_backfill_logs_items_uploaded_total{exported_job=~\"${service}\"})", + "refId": "A", + "description": "Total number of func run logs uploaded to si-db" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "value": null, + "color": "green" + } + ] + } + } + }, + "options": { + "graphMode": "area", + "colorMode": "value", + "orientation": "auto", + "textMode": "value_and_name", + "reduceOptions": { + "values": false, + "calcs": [ + "lastNotNull" + ] + } + } + }, + { + "id": 8, + "type": "stat", + "title": "Func Run Logs - Total Items Skipped", + "interval": "1m", + "gridPos": { + "x": 12, + "y": 5, + "w": 6, + "h": 4 + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(func_run_logs_backfill_logs_items_skipped_total{exported_job=~\"${service}\"})", + "refId": "A", + "description": "Total number of func run logs skipped (already in si-db)" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "value": null, + "color": "yellow" + } + ] + } + } + }, + "options": { + "graphMode": "area", + "colorMode": "value", + "orientation": "auto", + "textMode": "value_and_name", + "reduceOptions": { + "values": false, + "calcs": [ + "lastNotNull" + ] + } + } + }, + { + "id": 9, + "type": "gauge", + "title": "Func Run Logs - Migration Progress", + "interval": "1m", + "gridPos": { + "x": 18, + "y": 5, + "w": 6, + "h": 4 + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(func_run_logs_backfill_logs_items_uploaded_total{exported_job=~\"${service}\"}) / (sum(func_run_logs_backfill_logs_items_uploaded_total{exported_job=~\"${service}\"}) + sum(func_run_logs_backfill_logs_items_skipped_total{exported_job=~\"${service}\"})) * 100", + "refId": "A", + "description": "Percentage of func run logs migrated (uploaded / total)" + } + ], + "fieldConfig": { + "defaults": { + "unit": "percent", + "min": 0, + "max": 100, + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "value": null, + "color": "red" + }, + { + "value": 50, + "color": "yellow" + }, + { + "value": 90, + "color": "green" + } + ] + } + } + }, + "options": { + "showThresholdLabels": false, + "showThresholdMarkers": true + } + }, + { + "id": 10, + "type": "row", + "title": "LayerDB Fallback Metrics", + "gridPos": { + "x": 0, + "y": 9, + "w": 24, + "h": 1 + }, + "collapsed": false + }, + { + "id": 11, + "type": "stat", + "title": "Total Func Run Fallbacks", + "interval": "1m", + "gridPos": { + "x": 0, + "y": 10, + "w": 6, + "h": 4 + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(func_runs_layerdb_fallback_total{exported_job=~\"${service}\"})", + "refId": "A", + "description": "Total number of func run queries that fell back to layer-db" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "value": null, + "color": "green" + }, + { + "value": 100, + "color": "yellow" + }, + { + "value": 1000, + "color": "red" + } + ] + } + } + }, + "options": { + "graphMode": "area", + "colorMode": "value", + "orientation": "auto", + "textMode": "value_and_name", + "reduceOptions": { + "values": false, + "calcs": [ + "lastNotNull" + ] + } + } + }, + { + "id": 12, + "type": "stat", + "title": "Total Func Run Log Fallbacks", + "interval": "1m", + "gridPos": { + "x": 6, + "y": 10, + "w": 6, + "h": 4 + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(func_run_logs_layerdb_fallback_total{exported_job=~\"${service}\"})", + "refId": "A", + "description": "Total number of func run log queries that fell back to layer-db" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "value": null, + "color": "green" + }, + { + "value": 100, + "color": "yellow" + }, + { + "value": 1000, + "color": "red" + } + ] + } + } + }, + "options": { + "graphMode": "area", + "colorMode": "value", + "orientation": "auto", + "textMode": "value_and_name", + "reduceOptions": { + "values": false, + "calcs": [ + "lastNotNull" + ] + } + } + }, + { + "id": 13, + "type": "timeseries", + "title": "Func Run Fallback Rate by Method", + "interval": "1m", + "gridPos": { + "x": 12, + "y": 10, + "w": 12, + "h": 4 + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(rate(func_runs_layerdb_fallback_total{exported_job=~\"${service}\"}[$__interval])) by (method)", + "legendFormat": "{{method}}", + "refId": "A", + "description": "Rate of fallbacks per second by method" + } + ], + "fieldConfig": { + "defaults": { + "unit": "ops", + "color": { + "mode": "palette-classic" + }, + "custom": { + "lineWidth": 2, + "fillOpacity": 10, + "showPoints": "never" + } + } + }, + "options": { + "tooltip": { + "mode": "multi", + "sort": "desc" + }, + "legend": { + "displayMode": "table", + "placement": "bottom", + "calcs": [ + "lastNotNull", + "max", + "mean" + ] + } + } + }, + { + "id": 14, + "type": "timeseries", + "title": "Func Runs - Backfill Processing Rate", + "interval": "1m", + "gridPos": { + "x": 0, + "y": 14, + "w": 12, + "h": 8 + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(rate(func_runs_backfill_runs_items_processed_total{exported_job=~\"${service}\"}[$__interval]))", + "legendFormat": "processed", + "refId": "A", + "description": "Rate of func runs processed per second" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(rate(func_runs_backfill_runs_items_uploaded_total{exported_job=~\"${service}\"}[$__interval]))", + "legendFormat": "uploaded", + "refId": "B", + "description": "Rate of func runs uploaded to si-db per second" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(rate(func_runs_backfill_runs_items_skipped_total{exported_job=~\"${service}\"}[$__interval]))", + "legendFormat": "skipped", + "refId": "C", + "description": "Rate of func runs skipped per second" + } + ], + "fieldConfig": { + "defaults": { + "unit": "ops", + "color": { + "mode": "palette-classic" + }, + "custom": { + "lineWidth": 2, + "fillOpacity": 10, + "showPoints": "never" + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "processed" + }, + "properties": [ + { + "id": "color", + "value": { + "mode": "fixed", + "fixedColor": "blue" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "uploaded" + }, + "properties": [ + { + "id": "color", + "value": { + "mode": "fixed", + "fixedColor": "green" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "skipped" + }, + "properties": [ + { + "id": "color", + "value": { + "mode": "fixed", + "fixedColor": "yellow" + } + } + ] + } + ] + }, + "options": { + "tooltip": { + "mode": "multi", + "sort": "desc" + }, + "legend": { + "displayMode": "table", + "placement": "bottom", + "calcs": [ + "lastNotNull", + "max", + "mean" + ] + } + } + }, + { + "id": 15, + "type": "timeseries", + "title": "Func Run Logs - Backfill Processing Rate", + "interval": "1m", + "gridPos": { + "x": 12, + "y": 14, + "w": 12, + "h": 8 + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(rate(func_run_logs_backfill_logs_items_processed_total{exported_job=~\"${service}\"}[$__interval]))", + "legendFormat": "processed", + "refId": "A", + "description": "Rate of func run logs processed per second" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(rate(func_run_logs_backfill_logs_items_uploaded_total{exported_job=~\"${service}\"}[$__interval]))", + "legendFormat": "uploaded", + "refId": "B", + "description": "Rate of func run logs uploaded to si-db per second" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(rate(func_run_logs_backfill_logs_items_skipped_total{exported_job=~\"${service}\"}[$__interval]))", + "legendFormat": "skipped", + "refId": "C", + "description": "Rate of func run logs skipped per second" + } + ], + "fieldConfig": { + "defaults": { + "unit": "ops", + "color": { + "mode": "palette-classic" + }, + "custom": { + "lineWidth": 2, + "fillOpacity": 10, + "showPoints": "never" + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "processed" + }, + "properties": [ + { + "id": "color", + "value": { + "mode": "fixed", + "fixedColor": "blue" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "uploaded" + }, + "properties": [ + { + "id": "color", + "value": { + "mode": "fixed", + "fixedColor": "green" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "skipped" + }, + "properties": [ + { + "id": "color", + "value": { + "mode": "fixed", + "fixedColor": "yellow" + } + } + ] + } + ] + }, + "options": { + "tooltip": { + "mode": "multi", + "sort": "desc" + }, + "legend": { + "displayMode": "table", + "placement": "bottom", + "calcs": [ + "lastNotNull", + "max", + "mean" + ] + } + } + }, + { + "id": 16, + "type": "table", + "title": "Func Run Fallbacks by Method", + "interval": "1m", + "gridPos": { + "x": 0, + "y": 22, + "w": 12, + "h": 8 + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(func_runs_layerdb_fallback_total{exported_job=~\"${service}\"}) by (method)", + "refId": "A", + "description": "Total fallbacks by method" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short" + } + }, + "options": { + "showHeader": true, + "sortBy": [ + { + "displayName": "Value", + "desc": true + } + ] + }, + "transformations": [ + { + "id": "organize", + "options": { + "renameByName": { + "method": "Method", + "Value": "Total Fallbacks" + } + } + } + ] + }, + { + "id": 17, + "type": "timeseries", + "title": "Func Run Log Fallback Rate", + "interval": "1m", + "gridPos": { + "x": 12, + "y": 22, + "w": 12, + "h": 8 + }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(rate(func_run_logs_layerdb_fallback_total{exported_job=~\"${service}\"}[$__interval])) by (method)", + "legendFormat": "{{method}}", + "refId": "A", + "description": "Rate of func run log fallbacks per second by method" + } + ], + "fieldConfig": { + "defaults": { + "unit": "ops", + "color": { + "mode": "palette-classic" + }, + "custom": { + "lineWidth": 2, + "fillOpacity": 10, + "showPoints": "never" + } + } + }, + "options": { + "tooltip": { + "mode": "multi", + "sort": "desc" + }, + "legend": { + "displayMode": "table", + "placement": "bottom", + "calcs": [ + "lastNotNull", + "max", + "mean" + ] + } + } + } + ], + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": { + "refresh_intervals": [ + "5s", + "10s", + "30s", + "1m", + "5m", + "15m", + "30m", + "1h", + "2h", + "1d" + ] + } +} diff --git a/lib/dal/src/action.rs b/lib/dal/src/action.rs index 772a7bc853..955ba31a88 100644 --- a/lib/dal/src/action.rs +++ b/lib/dal/src/action.rs @@ -16,6 +16,7 @@ use serde::{ Deserialize, Serialize, }; +use si_db::FuncRunDb; use si_events::{ audit_log::AuditLogKind, ulid::Ulid, @@ -817,12 +818,12 @@ impl Action { ctx: &DalContext, id: ActionId, ) -> ActionResult> { - Ok(ctx - .layer_db() - .func_run() - .get_last_run_for_action_id_opt(ctx.events_tenancy().workspace_pk, id) - .await? - .map(|f| f.id())) + Ok( + FuncRunDb::get_last_run_for_action_id_opt(ctx, ctx.events_tenancy().workspace_pk, id) + .await + .expect("unable to get func run id") + .map(|f| f.id()), + ) } /// This function behaves differently if on head vs. in an open change set. diff --git a/lib/dal/src/audit_logging.rs b/lib/dal/src/audit_logging.rs index 86b0f46664..ee63ad89d6 100644 --- a/lib/dal/src/audit_logging.rs +++ b/lib/dal/src/audit_logging.rs @@ -53,7 +53,7 @@ pub enum AuditLoggingError { #[error("shuttle error: {0}")] Shuttle(#[from] ShuttleError), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("transactions error: {0}")] Transactions(#[from] Box), } @@ -81,7 +81,7 @@ pub(crate) async fn publish_pending( let workspace_id = match ctx.workspace_pk() { Ok(workspace_id) => workspace_id, Err(TransactionsError::SiDb(si_db_err)) - if matches!(si_db_err.as_ref(), si_db::Error::NoWorkspace) => + if matches!(si_db_err.as_ref(), si_db::SiDbError::NoWorkspace) => { return Ok(()); } @@ -227,7 +227,7 @@ pub(crate) async fn write( let workspace_id = match ctx.workspace_pk() { Ok(workspace_id) => workspace_id, Err(TransactionsError::SiDb(si_db_err)) - if matches!(si_db_err.as_ref(), si_db::Error::NoWorkspace) => + if matches!(si_db_err.as_ref(), si_db::SiDbError::NoWorkspace) => { return Ok(()); } @@ -267,7 +267,7 @@ pub(crate) async fn write_final_message(ctx: &DalContext) -> Result<()> { let workspace_id = match ctx.workspace_pk() { Ok(workspace_id) => workspace_id, Err(TransactionsError::SiDb(si_db_err)) - if matches!(si_db_err.as_ref(), si_db::Error::NoWorkspace) => + if matches!(si_db_err.as_ref(), si_db::SiDbError::NoWorkspace) => { return Ok(()); } diff --git a/lib/dal/src/change_set.rs b/lib/dal/src/change_set.rs index dcd50e244e..9aac1934c6 100644 --- a/lib/dal/src/change_set.rs +++ b/lib/dal/src/change_set.rs @@ -134,7 +134,7 @@ pub enum ChangeSetError { #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("si db error: {0}")] - SiDb(#[from] Box), + SiDb(#[from] Box), #[error("slow runtime error: {0}")] SlowRuntime(#[from] SlowRuntimeError), #[error("timeout out waiting for dvu after {0}ms")] @@ -161,8 +161,8 @@ impl From for ChangeSetError { } } -impl From for ChangeSetError { - fn from(value: si_db::Error) -> Self { +impl From for ChangeSetError { + fn from(value: si_db::SiDbError) -> Self { Box::new(value).into() } } @@ -215,8 +215,8 @@ impl From for ChangeSetApplyError { } } -impl From for ChangeSetApplyError { - fn from(value: si_db::Error) -> Self { +impl From for ChangeSetApplyError { + fn from(value: si_db::SiDbError) -> Self { Box::new(value).into() } } @@ -251,7 +251,7 @@ pub enum ChangeSetApplyError { #[error("change set ({0}) does not have a base change set")] NoBaseChangeSet(ChangeSetId), #[error("si db error: {0}")] - SiDb(#[from] Box), + SiDb(#[from] Box), #[error("transactions error: {0}")] Transactions(#[from] Box), } diff --git a/lib/dal/src/component.rs b/lib/dal/src/component.rs index 913aa4e6fd..3ecd8d44e0 100644 --- a/lib/dal/src/component.rs +++ b/lib/dal/src/component.rs @@ -319,7 +319,7 @@ pub enum ComponentError { #[error("serde_json error: {0}")] Serde(#[from] serde_json::Error), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("split graph error: {0}")] SplitGraph(#[from] SplitGraphError), #[error("transactions error: {0}")] diff --git a/lib/dal/src/context.rs b/lib/dal/src/context.rs index 5d098b56d7..6eedfa580f 100644 --- a/lib/dal/src/context.rs +++ b/lib/dal/src/context.rs @@ -506,6 +506,15 @@ impl SiDbContext for DalContext { fn change_set_id(&self) -> ChangeSetId { self.change_set_id() } + + // TODO get rid of these after we don't need layer db fallbacks + fn func_run_layer_db(&self) -> &si_layer_cache::db::func_run::FuncRunLayerDb { + self.services_context.layer_db.func_run() + } + + fn func_run_log_layer_db(&self) -> &si_layer_cache::db::func_run_log::FuncRunLogLayerDb { + self.services_context.layer_db.func_run_log() + } } impl SiDbTransactions for Transactions { @@ -1743,7 +1752,7 @@ pub enum TransactionsError { #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("si db error: {0}")] - SiDb(#[from] Box), + SiDb(#[from] Box), #[error("slow rt error: {0}")] SlowRuntime(#[from] Box), #[error("unable to acquire lock: {0}")] @@ -1800,8 +1809,8 @@ impl From for TransactionsError { } } -impl From for TransactionsError { - fn from(value: si_db::Error) -> Self { +impl From for TransactionsError { + fn from(value: si_db::SiDbError) -> Self { Box::new(value).into() } } diff --git a/lib/dal/src/diagram.rs b/lib/dal/src/diagram.rs index b0cbb808a5..6cec4b9c9f 100644 --- a/lib/dal/src/diagram.rs +++ b/lib/dal/src/diagram.rs @@ -178,7 +178,7 @@ pub enum DiagramError { #[error("serde error: {0}")] Serde(#[from] serde_json::Error), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("socket not found")] SocketNotFound, #[error("Transactions error: {0}")] diff --git a/lib/dal/src/func/debug.rs b/lib/dal/src/func/debug.rs index e6a9b15da0..644fd227c8 100644 --- a/lib/dal/src/func/debug.rs +++ b/lib/dal/src/func/debug.rs @@ -74,7 +74,7 @@ pub enum DebugFuncError { #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("si-db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("strum parse error: {0}")] StrumParse(#[from] strum::ParseError), #[error("transactions error: {0}")] diff --git a/lib/dal/src/func/runner.rs b/lib/dal/src/func/runner.rs index 64573688a3..a3c78ad2d4 100644 --- a/lib/dal/src/func/runner.rs +++ b/lib/dal/src/func/runner.rs @@ -9,6 +9,10 @@ use serde::{ Serialize, }; use serde_json; +use si_db::{ + FuncRunDb, + FuncRunLogDb, +}; use si_events::{ ActionId, ActionResultState, @@ -187,7 +191,7 @@ pub enum FuncRunnerError { #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error( "too many attribute prototype arguments for protoype corresponding to component ({0}) and prop ({1}): {2:?}" )] @@ -391,17 +395,9 @@ impl FuncRunner { ); } - let func_run = Arc::new(func_run_inner); + FuncRunDb::upsert(ctx, func_run_inner.clone()).await?; - ctx.layer_db() - .func_run() - .write( - func_run.clone(), - None, - ctx.events_tenancy(), - ctx.events_actor(), - ) - .await?; + let func_run = Arc::new(func_run_inner); Ok(FuncRunner { func_run, @@ -533,17 +529,9 @@ impl FuncRunner { ); } - let func_run = Arc::new(func_run_inner); + FuncRunDb::upsert(ctx, func_run_inner.clone()).await?; - ctx.layer_db() - .func_run() - .write( - func_run.clone(), - None, - ctx.events_tenancy(), - ctx.events_actor(), - ) - .await?; + let func_run = Arc::new(func_run_inner); Ok(FuncRunner { func_run, @@ -703,17 +691,9 @@ impl FuncRunner { ); } - let func_run = Arc::new(func_run_inner); + FuncRunDb::upsert(ctx, func_run_inner.clone()).await?; - ctx.layer_db() - .func_run() - .write( - func_run.clone(), - None, - ctx.events_tenancy(), - ctx.events_actor(), - ) - .await?; + let func_run = Arc::new(func_run_inner); Ok(FuncRunner { func_run, @@ -867,20 +847,12 @@ impl FuncRunner { parent_span.record("si.component.id", component_id.array_to_str(&mut id_buf)); } - let func_run = Arc::new(func_run_inner); - if !func.is_intrinsic() { - ctx.layer_db() - .func_run() - .write( - func_run.clone(), - None, - ctx.events_tenancy(), - ctx.events_actor(), - ) - .await?; + FuncRunDb::upsert(ctx, func_run_inner.clone()).await?; } + let func_run = Arc::new(func_run_inner); + Ok(FuncRunner { func_run, func, @@ -1042,17 +1014,9 @@ impl FuncRunner { ); } - let func_run = Arc::new(func_run_inner); + FuncRunDb::upsert(ctx, func_run_inner.clone()).await?; - ctx.layer_db() - .func_run() - .write( - func_run.clone(), - None, - ctx.events_tenancy(), - ctx.events_actor(), - ) - .await?; + let func_run = Arc::new(func_run_inner); Ok(FuncRunner { func_run, @@ -1219,17 +1183,9 @@ impl FuncRunner { ); } - let func_run = Arc::new(func_run_inner); + FuncRunDb::upsert(ctx, func_run_inner.clone()).await?; - ctx.layer_db() - .func_run() - .write( - func_run.clone(), - None, - ctx.events_tenancy(), - ctx.events_actor(), - ) - .await?; + let func_run = Arc::new(func_run_inner); Ok(FuncRunner { func_run, @@ -1425,17 +1381,9 @@ impl FuncRunner { ); } - let func_run = Arc::new(func_run_inner); + FuncRunDb::upsert(ctx, func_run_inner.clone()).await?; - ctx.layer_db() - .func_run() - .write( - func_run.clone(), - None, - ctx.events_tenancy(), - ctx.events_actor(), - ) - .await?; + let func_run = Arc::new(func_run_inner); Ok(FuncRunner { func_run, @@ -1512,18 +1460,12 @@ impl FuncRunner { id: FuncRunId, update_fn: impl FnOnce(&mut FuncRun), ) -> FuncRunnerResult<()> { - let mut func_run = Arc::unwrap_or_clone(ctx.layer_db().func_run().try_read(id).await?); + let mut func_run = FuncRunDb::try_read(ctx, id).await?; update_fn(&mut func_run); - Ok(ctx - .layer_db() - .func_run() - .write( - Arc::new(func_run), - None, - ctx.events_tenancy(), - ctx.events_actor(), - ) - .await?) + + FuncRunDb::upsert(ctx, func_run).await?; + + Ok(()) } // Update the latest func run for the given action in LayerDB, setting tenancy/actor to ctx.events_tenancy()/events_actor()). @@ -1532,22 +1474,18 @@ impl FuncRunner { action_id: ActionId, update_fn: impl FnOnce(&mut FuncRun), ) -> FuncRunnerResult<()> { - let mut func_run = ctx - .layer_db() - .func_run() - .get_last_run_for_action_id(ctx.events_tenancy().workspace_pk, action_id) - .await?; + let mut func_run = FuncRunDb::get_last_run_for_action_id( + ctx, + ctx.events_tenancy().workspace_pk, + action_id, + ) + .await?; + update_fn(&mut func_run); - Ok(ctx - .layer_db() - .func_run() - .write( - Arc::new(func_run), - None, - ctx.events_tenancy(), - ctx.events_actor(), - ) - .await?) + + FuncRunDb::upsert(ctx, func_run.clone()).await?; + + Ok(()) } pub fn id(&self) -> FuncRunId { @@ -1869,16 +1807,7 @@ impl FuncRunnerLogsTask { timestamp: item.timestamp, }); - self.ctx - .layer_db() - .func_run_log() - .write( - Arc::new(func_run_log.clone()), - None, - self.ctx.events_tenancy(), - self.ctx.events_actor(), - ) - .await?; + FuncRunLogDb::upsert(&self.ctx, func_run_log.clone()).await?; WsEvent::func_run_log_updated( &self.ctx, @@ -1899,16 +1828,7 @@ impl FuncRunnerLogsTask { // logs from a function execution, even if this task takes far longer than // the execution time of the function). func_run_log.set_finalized(); - self.ctx - .layer_db() - .func_run_log() - .write( - Arc::new(func_run_log.clone()), - None, - self.ctx.events_tenancy(), - self.ctx.events_actor(), - ) - .await?; + FuncRunLogDb::upsert(&self.ctx, func_run_log.clone()).await?; WsEvent::func_run_log_updated( &self.ctx, diff --git a/lib/dal/src/key_pair.rs b/lib/dal/src/key_pair.rs index 8149e42f8e..1aadf2bee5 100644 --- a/lib/dal/src/key_pair.rs +++ b/lib/dal/src/key_pair.rs @@ -62,7 +62,7 @@ pub enum KeyPairError { #[error("error serializing/deserializing json: {0}")] SerdeJson(#[from] serde_json::Error), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("symmetric crypto error: {0}")] SymmetricCrypto(#[from] SymmetricCryptoError), #[error("transactions error: {0}")] diff --git a/lib/dal/src/lib.rs b/lib/dal/src/lib.rs index ca39cadcc4..c3903c94cd 100644 --- a/lib/dal/src/lib.rs +++ b/lib/dal/src/lib.rs @@ -278,6 +278,7 @@ pub fn generate_name() -> String { )] #[strum(serialize_all = "camelCase")] pub enum MigrationMode { + BackfillFuncRuns, BackfillLayerCache, GarbageCollectSnapshots, Run, @@ -312,6 +313,10 @@ impl MigrationMode { pub fn is_backfill_layer_cache(&self) -> bool { matches!(self, Self::BackfillLayerCache) } + + pub fn is_backfill_func_runs(&self) -> bool { + matches!(self, Self::BackfillFuncRuns) + } } #[cfg(test)] diff --git a/lib/dal/src/management/mod.rs b/lib/dal/src/management/mod.rs index 948e258809..784045c904 100644 --- a/lib/dal/src/management/mod.rs +++ b/lib/dal/src/management/mod.rs @@ -136,7 +136,7 @@ pub enum ManagementError { #[error("schema error: {0}")] Schema(#[from] Box), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("transactions error: {0}")] Transactions(#[from] TransactionsError), #[error("ulid decode error: {0}")] diff --git a/lib/dal/src/module.rs b/lib/dal/src/module.rs index 6078cb228e..f2ebf722ac 100644 --- a/lib/dal/src/module.rs +++ b/lib/dal/src/module.rs @@ -102,7 +102,7 @@ pub enum ModuleError { #[error("schema variant error: {0}")] SchemaVariant(#[from] SchemaVariantError), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("too many latest modules for schema: {0} (at least two hashes found: {1} and {2})")] TooManyLatestModulesForSchema(SchemaId, String, String), #[error("transactions error: {0}")] diff --git a/lib/dal/src/pkg.rs b/lib/dal/src/pkg.rs index 9af8c0962f..9ce17cfa91 100644 --- a/lib/dal/src/pkg.rs +++ b/lib/dal/src/pkg.rs @@ -174,7 +174,7 @@ pub enum PkgError { #[error("json serialization error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error( "taking output socket as input for a prop is unsupported for name ({0}) and socket name ({1})" )] diff --git a/lib/dal/src/qualification.rs b/lib/dal/src/qualification.rs index 19bda283a5..862435140a 100644 --- a/lib/dal/src/qualification.rs +++ b/lib/dal/src/qualification.rs @@ -3,6 +3,10 @@ use serde::{ Serialize, }; use si_data_pg::PgError; +use si_db::{ + FuncRunDb, + FuncRunLogDb, +}; use si_frontend_types::ComponentQualificationStats; use si_id::AttributeValueId; use si_layer_cache::LayerDbError; @@ -173,6 +177,8 @@ pub enum QualificationError { Prop(#[from] PropError), #[error("error serializing/deserializing json: {0}")] SerdeJson(#[from] serde_json::Error), + #[error("si db error: {0}")] + SiDb(#[from] si_db::SiDbError), #[error("validation resolver error: {0}")] ValidationResolver(#[from] ValidationError), } @@ -223,14 +229,12 @@ impl QualificationView { ctx: &DalContext, attribute_value_id: AttributeValueId, ) -> Result, QualificationError> { - let maybe_qual_run = ctx - .layer_db() - .func_run() - .get_last_qualification_for_attribute_value_id( - ctx.events_tenancy().workspace_pk, - attribute_value_id, - ) - .await?; + let maybe_qual_run = FuncRunDb::get_last_qualification_for_attribute_value_id( + ctx, + ctx.events_tenancy().workspace_pk, + attribute_value_id, + ) + .await?; match maybe_qual_run { Some(qual_run) => { let qualification_entry: QualificationEntry = @@ -256,28 +260,24 @@ impl QualificationView { sub_checks: vec![sub_check], }); - let (output, finalized) = match ctx - .layer_db() - .func_run_log() - .get_for_func_run_id(qual_run.id()) - .await? - { - Some(func_run_logs) => { - let output = func_run_logs - .logs() - .iter() - .map(|l| QualificationOutputStreamView { - stream: l.stream.clone(), - line: l.message.clone(), - level: l.level.clone(), - }) - .collect(); - let finalized = func_run_logs.is_finalized(); - - (output, finalized) - } - None => (Vec::new(), false), - }; + let (output, finalized) = + match FuncRunLogDb::get_for_func_run_id(ctx, qual_run.id()).await? { + Some(func_run_logs) => { + let output = func_run_logs + .logs() + .iter() + .map(|l| QualificationOutputStreamView { + stream: l.stream.clone(), + line: l.message.clone(), + level: l.level.clone(), + }) + .collect(); + let finalized = func_run_logs.is_finalized(); + + (output, finalized) + } + None => (Vec::new(), false), + }; Ok(Some(QualificationView { title: qual_run diff --git a/lib/dal/src/schema/variant/authoring.rs b/lib/dal/src/schema/variant/authoring.rs index b8425a18f2..01d17d08fa 100644 --- a/lib/dal/src/schema/variant/authoring.rs +++ b/lib/dal/src/schema/variant/authoring.rs @@ -151,7 +151,7 @@ pub enum VariantAuthoringError { #[error("json serialization error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("si pkg error: {0}")] SiPkg(#[from] SiPkgError), #[error("spec error: {0}")] diff --git a/lib/dal/src/secret/view.rs b/lib/dal/src/secret/view.rs index 4da38e9f19..3a908591c8 100644 --- a/lib/dal/src/secret/view.rs +++ b/lib/dal/src/secret/view.rs @@ -27,7 +27,7 @@ pub enum SecretViewError { #[error("secret error: {0}")] Secret(#[from] SecretError), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), } #[allow(missing_docs)] diff --git a/lib/dal/src/workspace.rs b/lib/dal/src/workspace.rs index 0553b5e04d..d7cac3cc3d 100644 --- a/lib/dal/src/workspace.rs +++ b/lib/dal/src/workspace.rs @@ -125,7 +125,7 @@ pub enum WorkspaceError { #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("strum parse error: {0}")] StrumParse(#[from] strum::ParseError), #[error("transactions error: {0}")] diff --git a/lib/dal/tests/integration_test/func/authoring/binding/action.rs b/lib/dal/tests/integration_test/func/authoring/binding/action.rs index c94e1b643a..75af85ce1c 100644 --- a/lib/dal/tests/integration_test/func/authoring/binding/action.rs +++ b/lib/dal/tests/integration_test/func/authoring/binding/action.rs @@ -27,6 +27,7 @@ use dal_test::{ }, test, }; +use si_db::FuncRunDb; #[test] async fn attach_multiple_action_funcs(ctx: &mut DalContext) { @@ -124,13 +125,14 @@ async fn detach_attach_then_delete_action_func_while_enqueued(ctx: &mut DalConte let _prototype = ActionPrototype::get_by_id(ctx, prototype_id) .await .expect("unable to get prototype"); - let _func_run_id = ctx - .layer_db() - .func_run() - .get_last_run_for_action_id_opt(ctx.events_tenancy().workspace_pk, action.id()) - .await - .expect("unable to get func run id") - .map(|f| f.id()); + let _func_run_id = FuncRunDb::get_last_run_for_action_id_opt( + ctx, + ctx.events_tenancy().workspace_pk, + action.id(), + ) + .await + .expect("unable to get func run id") + .map(|f| f.id()); let _component_id = Action::component_id(ctx, action_id) .await .expect("unable to get component id"); diff --git a/lib/dal/tests/integration_test/func/authoring/test_execute.rs b/lib/dal/tests/integration_test/func/authoring/test_execute.rs index fdbb4ce74c..ddc4c5abca 100644 --- a/lib/dal/tests/integration_test/func/authoring/test_execute.rs +++ b/lib/dal/tests/integration_test/func/authoring/test_execute.rs @@ -1,7 +1,4 @@ -use std::{ - sync::Arc, - time::Duration, -}; +use std::time::Duration; use dal::{ DalContext, @@ -15,6 +12,7 @@ use dal_test::{ }, test, }; +use si_db::FuncRunDb; use si_events::{ FuncRun, FuncRunId, @@ -236,16 +234,13 @@ async fn wait_for_func_run_with_success_state(ctx: &DalContext, func_run_id: Fun let seconds = 15; for _ in 0..(seconds * 10) { - let func_run = ctx - .layer_db() - .func_run() - .read(func_run_id) + let func_run = FuncRunDb::read(ctx, func_run_id) .await .expect("could not read func run") .expect("func run not found"); if func_run.state() == FuncRunState::Success { - return Arc::unwrap_or_clone(func_run); + return func_run; } tokio::time::sleep(Duration::from_millis(100)).await; diff --git a/lib/luminork-server/src/api_types/func_run/v1.rs b/lib/luminork-server/src/api_types/func_run/v1.rs index df37e192e6..d0ad018ae8 100644 --- a/lib/luminork-server/src/api_types/func_run/v1.rs +++ b/lib/luminork-server/src/api_types/func_run/v1.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use chrono::{ DateTime, Utc, @@ -14,6 +12,7 @@ use serde::{ Deserialize, Serialize, }; +use si_db::FuncRunLogDb; use si_events::{ ActionKind, ActionResultState, @@ -275,12 +274,8 @@ impl FuncRunViewV1 { } }; - let logs = ctx - .layer_db() - .func_run_log() - .get_for_func_run_id(func_run.id()) + let logs = FuncRunLogDb::get_for_func_run_id(ctx, func_run.id()) .await? - .map(Arc::::unwrap_or_clone) .map(|v| v.into()); Ok(FuncRunViewV1 { diff --git a/lib/luminork-server/src/service/v1/actions/get_actions.rs b/lib/luminork-server/src/service/v1/actions/get_actions.rs index b7de20ba14..a372cefbe5 100644 --- a/lib/luminork-server/src/service/v1/actions/get_actions.rs +++ b/lib/luminork-server/src/service/v1/actions/get_actions.rs @@ -8,6 +8,7 @@ use dal::{ }; use serde::Serialize; use serde_json::json; +use si_db::FuncRunDb; use utoipa::ToSchema; use super::ActionsResult; @@ -50,12 +51,14 @@ pub async fn get_actions( let func_id = ActionPrototype::func_id(ctx, prototype_id).await?; let func = Func::get_by_id(ctx, func_id).await?; let prototype = ActionPrototype::get_by_id(ctx, prototype_id).await?; - let func_run_id = ctx - .layer_db() - .func_run() - .get_last_run_for_action_id_opt(ctx.events_tenancy().workspace_pk, action.id()) - .await? - .map(|f| f.id()); + + let func_run_id = FuncRunDb::get_last_run_for_action_id_opt( + ctx, + ctx.events_tenancy().workspace_pk, + action.id(), + ) + .await? + .map(|f| f.id()); let action = ActionViewV1 { id: action_id, diff --git a/lib/luminork-server/src/service/v1/actions/mod.rs b/lib/luminork-server/src/service/v1/actions/mod.rs index b6bd5b5148..0c8700cefa 100644 --- a/lib/luminork-server/src/service/v1/actions/mod.rs +++ b/lib/luminork-server/src/service/v1/actions/mod.rs @@ -9,6 +9,7 @@ use axum::{ }, }; use serde::Deserialize; +use si_db::SiDbError; use si_id::ActionId; use si_layer_cache::LayerDbError; use thiserror::Error; @@ -36,6 +37,8 @@ pub enum ActionsError { InvalidOnHoldTransition(ActionId), #[error("layer db error: {0}")] LayerDb(#[from] LayerDbError), + #[error("SI db error: {0}")] + SiDb(#[from] SiDbError), #[error("transactions error: {0}")] Transactions(#[from] dal::TransactionsError), #[error("validation error: {0}")] diff --git a/lib/luminork-server/src/service/v1/components/mod.rs b/lib/luminork-server/src/service/v1/components/mod.rs index 03043f9700..9241af43f6 100644 --- a/lib/luminork-server/src/service/v1/components/mod.rs +++ b/lib/luminork-server/src/service/v1/components/mod.rs @@ -181,6 +181,8 @@ pub enum ComponentsError { SecretNotFound(String), #[error("serde_json error: {0}")] Serde(#[from] serde_json::Error), + #[error("si db error: {0}")] + SiDb(#[from] SiDbError), #[error("transactions error: {0}")] Transactions(#[from] dal::TransactionsError), #[error("Ulid Decode Error: {0}")] @@ -305,6 +307,7 @@ use get_component::{ GetComponentV1ResponseActionFunction, GetComponentV1ResponseManagementFunction, }; +use si_db::SiDbError; pub async fn get_component_functions( ctx: &dal::DalContext, diff --git a/lib/luminork-server/src/service/v1/funcs/get_func_run.rs b/lib/luminork-server/src/service/v1/funcs/get_func_run.rs index eef2728f10..eda8f8ea8b 100644 --- a/lib/luminork-server/src/service/v1/funcs/get_func_run.rs +++ b/lib/luminork-server/src/service/v1/funcs/get_func_run.rs @@ -4,6 +4,7 @@ use axum::{ }; use serde::Serialize; use serde_json::json; +use si_db::FuncRunDb; use utoipa::ToSchema; use super::{ @@ -121,7 +122,7 @@ pub async fn get_func_run( tracker: PosthogEventTracker, Path(FuncRunV1RequestPath { func_run_id }): Path, ) -> FuncsResult> { - let maybe_func_run = ctx.layer_db().func_run().read(func_run_id).await?; + let maybe_func_run = FuncRunDb::read(ctx, func_run_id).await?; match maybe_func_run { Some(func_run) => { let func_run_view = FuncRunViewV1::assemble(ctx, &func_run).await?; diff --git a/lib/luminork-server/src/service/v1/funcs/mod.rs b/lib/luminork-server/src/service/v1/funcs/mod.rs index 2136774beb..61462f7ea7 100644 --- a/lib/luminork-server/src/service/v1/funcs/mod.rs +++ b/lib/luminork-server/src/service/v1/funcs/mod.rs @@ -49,6 +49,8 @@ pub enum FuncsError { SchemaVariant(#[from] SchemaVariantError), #[error("funcs can only be unlocked for unlocked schema variants")] SchemaVariantMustBeUnlocked, + #[error("si db error: {0}")] + SiDb(#[from] si_db::SiDbError), #[error("transactions error: {0}")] Transactions(#[from] dal::TransactionsError), #[error("validation error: {0}")] diff --git a/lib/luminork-server/src/service/v1/user.rs b/lib/luminork-server/src/service/v1/user.rs index a62eb2e90b..469b5f2e89 100644 --- a/lib/luminork-server/src/service/v1/user.rs +++ b/lib/luminork-server/src/service/v1/user.rs @@ -27,7 +27,7 @@ pub enum UserError { #[error("component error: {0}")] Component(#[from] dal::ComponentError), #[error("db error: {0}")] - Db(#[from] si_db::Error), + Db(#[from] si_db::SiDbError), #[error("edda client error: {0}")] EddaClient(#[from] EddaClientError), #[error("func error: {0}")] diff --git a/lib/luminork-server/src/service/workspace_management.rs b/lib/luminork-server/src/service/workspace_management.rs index 62993abb2b..e43a2bfcf3 100644 --- a/lib/luminork-server/src/service/workspace_management.rs +++ b/lib/luminork-server/src/service/workspace_management.rs @@ -68,7 +68,7 @@ pub enum WorkspaceManagementError { #[error("http error: {0}")] Request(#[from] reqwest::Error), #[error("si-db error: {0}")] - SiDb(#[from] Box), + SiDb(#[from] Box), #[error("user not found: {0}")] UserNotFound(String), #[error("validation error: {0}")] @@ -91,8 +91,8 @@ impl From for WorkspaceManagementError { } } -impl From for WorkspaceManagementError { - fn from(value: si_db::Error) -> Self { +impl From for WorkspaceManagementError { + fn from(value: si_db::SiDbError) -> Self { Box::new(value).into() } } diff --git a/lib/pinga-server/src/handlers.rs b/lib/pinga-server/src/handlers.rs index 973671d2f2..053ec1db7e 100644 --- a/lib/pinga-server/src/handlers.rs +++ b/lib/pinga-server/src/handlers.rs @@ -66,7 +66,7 @@ pub enum HandlerError { #[error("job consumer error: {0}")] JobConsumer(#[from] JobConsumerError), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), } type Result = result::Result; diff --git a/lib/sdf-core/src/dal_wrapper.rs b/lib/sdf-core/src/dal_wrapper.rs index c055519ddd..6158423190 100644 --- a/lib/sdf-core/src/dal_wrapper.rs +++ b/lib/sdf-core/src/dal_wrapper.rs @@ -58,7 +58,7 @@ pub enum DalWrapperError { #[error("permissions error: {0}")] Permissions(#[from] permissions::Error), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("spicedb lookup subjects error: {0}")] SpiceDBLookupSubjects(#[source] si_data_spicedb::Error), #[error("transactions error: {0}")] diff --git a/lib/sdf-server/BUCK b/lib/sdf-server/BUCK index 0c6edf866f..a11078893e 100644 --- a/lib/sdf-server/BUCK +++ b/lib/sdf-server/BUCK @@ -55,6 +55,7 @@ rust_library( "//third-party/rust:futures-lite", "//third-party/rust:hyper", "//third-party/rust:jsonptr", + "//third-party/rust:postcard", "//third-party/rust:remain", "//third-party/rust:reqwest", "//third-party/rust:serde", diff --git a/lib/sdf-server/Cargo.toml b/lib/sdf-server/Cargo.toml index 9ba5d44e95..f179dd6d33 100644 --- a/lib/sdf-server/Cargo.toml +++ b/lib/sdf-server/Cargo.toml @@ -34,6 +34,7 @@ module-index-client = { path = "../../lib/module-index-client" } nats-multiplexer = { path = "../../lib/nats-multiplexer" } nats-multiplexer-client = { path = "../../lib/nats-multiplexer-client" } permissions = { path = "../../lib/permissions" } +postcard = { workspace = true } rebaser-client = { path = "../../lib/rebaser-client" } remain = { workspace = true } reqwest = { workspace = true } diff --git a/lib/sdf-server/src/config.rs b/lib/sdf-server/src/config.rs index 86f64962c0..73502d460b 100644 --- a/lib/sdf-server/src/config.rs +++ b/lib/sdf-server/src/config.rs @@ -85,7 +85,7 @@ impl ConfigError { type Result = std::result::Result; -#[derive(Debug, Builder, Serialize)] +#[derive(Debug, Builder, Serialize, Clone)] pub struct Config { #[builder(default = "random_instance_id()")] instance_id: String, @@ -160,6 +160,12 @@ pub struct Config { #[builder(default = "default_backfill_max_concurrent_uploads()")] backfill_max_concurrent_uploads: usize, + + #[builder(default)] + backfill_func_runs_cutoff_id: Option, + + #[builder(default)] + backfill_func_run_logs_cutoff_id: Option, } impl StandardConfig for Config { @@ -302,6 +308,14 @@ impl Config { pub fn backfill_max_concurrent_uploads(&self) -> usize { self.backfill_max_concurrent_uploads } + + pub fn backfill_func_runs_cutoff_id(&self) -> Option<&str> { + self.backfill_func_runs_cutoff_id.as_deref() + } + + pub fn backfill_func_run_logs_cutoff_id(&self) -> Option<&str> { + self.backfill_func_run_logs_cutoff_id.as_deref() + } } impl ConfigBuilder { @@ -366,6 +380,10 @@ pub struct ConfigFile { backfill_checkpoint_interval_secs: u64, #[serde(default = "default_backfill_max_concurrent_uploads")] backfill_max_concurrent_uploads: usize, + #[serde(default)] + backfill_func_runs_cutoff_id: Option, + #[serde(default)] + backfill_func_run_logs_cutoff_id: Option, } impl Default for ConfigFile { @@ -396,6 +414,8 @@ impl Default for ConfigFile { backfill_key_batch_size: default_backfill_key_batch_size(), backfill_checkpoint_interval_secs: default_backfill_checkpoint_interval_secs(), backfill_max_concurrent_uploads: default_backfill_max_concurrent_uploads(), + backfill_func_runs_cutoff_id: None, + backfill_func_run_logs_cutoff_id: None, } } } @@ -437,6 +457,8 @@ impl TryFrom for Config { backfill_key_batch_size: value.backfill_key_batch_size, backfill_checkpoint_interval_secs: value.backfill_checkpoint_interval_secs, backfill_max_concurrent_uploads: value.backfill_max_concurrent_uploads, + backfill_func_runs_cutoff_id: value.backfill_func_runs_cutoff_id, + backfill_func_run_logs_cutoff_id: value.backfill_func_run_logs_cutoff_id, }) } } diff --git a/lib/sdf-server/src/func_runs_backfill.rs b/lib/sdf-server/src/func_runs_backfill.rs new file mode 100644 index 0000000000..4627ca5af4 --- /dev/null +++ b/lib/sdf-server/src/func_runs_backfill.rs @@ -0,0 +1,10 @@ +mod backfiller; +mod error; + +pub use self::{ + backfiller::FuncRunsBackfiller, + error::{ + FuncRunsBackfillError, + FuncRunsBackfillResult, + }, +}; diff --git a/lib/sdf-server/src/func_runs_backfill/backfiller.rs b/lib/sdf-server/src/func_runs_backfill/backfiller.rs new file mode 100644 index 0000000000..73acbe5f30 --- /dev/null +++ b/lib/sdf-server/src/func_runs_backfill/backfiller.rs @@ -0,0 +1,353 @@ +use std::time::Instant; + +use dal::{ + DalContext, + ServicesContext, +}; +use si_db::{ + FuncRunDb, + FuncRunLogDb, + HistoryActor, +}; +use si_events::authentication_method::AuthenticationMethodV1; +use si_id::{ + FuncRunId, + FuncRunLogId, +}; +use telemetry::prelude::*; +use telemetry_utils::monotonic; +use tokio::join; +use tokio_util::sync::CancellationToken; + +use super::FuncRunsBackfillResult; +use crate::init; + +pub struct FuncRunsBackfiller { + services_context: ServicesContext, +} + +impl FuncRunsBackfiller { + #[instrument(name = "sdf.func_runs_backfiller.new", level = "info", skip_all)] + pub async fn new( + config: crate::Config, + task_tracker: tokio_util::task::TaskTracker, + task_token: CancellationToken, + ) -> FuncRunsBackfillResult { + let (services_context, layer_db_graceful_shutdown) = + init::services_context_from_config(&config, task_token).await?; + + task_tracker.spawn(layer_db_graceful_shutdown.into_future()); + + Ok(Self { services_context }) + } + + #[instrument( + name = "sdf.func_runs_backfiller.log_all_func_runs", + level = "info", + skip_all + )] + pub async fn upload_all_func_runs( + &self, + shutdown_token: CancellationToken, + cutoff_id: Option, + batch_size: i64, + ) -> FuncRunsBackfillResult<()> { + info!( + cutoff_id = ?cutoff_id, + batch_size = batch_size, + "starting func runs backfill" + ); + + let func_run_db = self.services_context.layer_db().func_run(); + + let mut cutoff_id = cutoff_id; + let mut last_checkpoint = Instant::now(); + let mut total_processed = 0u64; + let mut total_uploaded = 0u64; + let mut total_skipped = 0u64; + + let ctx = DalContext::builder(self.services_context.clone(), false) + .build_without_workspace( + HistoryActor::SystemInit, + None, + AuthenticationMethodV1::System, + ) + .await?; + + loop { + // Check for shutdown signal + if shutdown_token.is_cancelled() { + info!( + last_cutoff_id = ?cutoff_id, + total_processed = total_processed, + total_uploaded = total_uploaded, + total_skipped = total_skipped, + "backfill shutting down gracefully" + ); + break; + } + + // Fetch next batch + let id_batch = func_run_db.read_batch_of_ids(batch_size, cutoff_id).await?; + + trace!( + cutoff_id = ?cutoff_id, + batch_size = id_batch.len(), + "fetched func run id batch" + ); + + if id_batch.is_empty() { + // No more ids to process + break; + } + + // Update cutoff_id to the last ID in the batch + if let Some(last_id) = id_batch.last() { + cutoff_id = Some(*last_id); + } + + // Find which IDs from the batch are missing in si-db + let missing_ids = FuncRunDb::find_missing_ids(&ctx, &id_batch).await?; + + let batch_skipped = id_batch.len() - missing_ids.len(); + total_skipped += batch_skipped as u64; + total_processed += id_batch.len() as u64; + + trace!( + batch_total = id_batch.len(), + batch_missing = missing_ids.len(), + batch_skipped = batch_skipped, + "processed batch, found missing IDs" + ); + + if !missing_ids.is_empty() { + // Read the missing func runs from layer-db + let mut func_runs_to_insert = Vec::with_capacity(missing_ids.len()); + for id in &missing_ids { + let layer_func_run = ctx + .services_context() + .layer_db() + .func_run() + .try_read(*id) + .await?; + func_runs_to_insert.push((*layer_func_run).clone()); + } + + // Batch upsert all missing func runs + FuncRunDb::upsert_batch(&ctx, func_runs_to_insert).await?; + ctx.commit().await?; + + total_uploaded += missing_ids.len() as u64; + } + + // Update telemetry + monotonic!(func_runs.backfill_runs.items_uploaded = missing_ids.len() as u64); + monotonic!(func_runs.backfill_runs.items_skipped = batch_skipped as u64); + monotonic!(func_runs.backfill_runs.items_processed = id_batch.len() as u64); + + // Time-based checkpoint for progress visibility + if last_checkpoint.elapsed() > std::time::Duration::from_secs(10) { + info!( + cutoff_id = ?cutoff_id, + total_processed = total_processed, + total_uploaded = total_uploaded, + total_skipped = total_skipped, + "backfill checkpoint" + ); + last_checkpoint = Instant::now(); + } + } + + // Final log when backfill complete + info!( + total_processed = total_processed, + total_uploaded = total_uploaded, + total_skipped = total_skipped, + "completed func runs backfill" + ); + + Ok(()) + } + + #[instrument( + name = "sdf.func_runs_backfiller.upload_all_func_run_logs", + level = "info", + skip_all + )] + pub async fn upload_all_func_run_logs( + &self, + shutdown_token: CancellationToken, + cutoff_id: Option, + batch_size: i64, + ) -> FuncRunsBackfillResult<()> { + info!( + cutoff_id = ?cutoff_id, + batch_size = batch_size, + "starting func run logs backfill" + ); + + let func_run_log_db = self.services_context.layer_db().func_run_log(); + + let mut cutoff_id = cutoff_id; + let mut last_checkpoint = Instant::now(); + let mut total_processed = 0u64; + let mut total_uploaded = 0u64; + let mut total_skipped = 0u64; + + let ctx = DalContext::builder(self.services_context.clone(), false) + .build_without_workspace( + HistoryActor::SystemInit, + None, + AuthenticationMethodV1::System, + ) + .await?; + + loop { + // Check for shutdown signal + if shutdown_token.is_cancelled() { + info!( + last_cutoff_id = ?cutoff_id, + total_processed = total_processed, + total_uploaded = total_uploaded, + total_skipped = total_skipped, + "backfill shutting down gracefully" + ); + break; + } + + // Fetch next batch + let id_batch = func_run_log_db + .read_batch_of_ids(batch_size, cutoff_id) + .await?; + + trace!( + cutoff_id = ?cutoff_id, + batch_size = id_batch.len(), + "fetched func run log id batch" + ); + + if id_batch.is_empty() { + // No more ids to process + break; + } + + // Update cutoff_id to the last ID in the batch + if let Some(last_id) = id_batch.last() { + cutoff_id = Some(*last_id); + } + + // Find which IDs from the batch are missing in si-db + let missing_ids = FuncRunLogDb::find_missing_ids(&ctx, &id_batch).await?; + + let batch_skipped = id_batch.len() - missing_ids.len(); + total_skipped += batch_skipped as u64; + total_processed += id_batch.len() as u64; + + trace!( + batch_total = id_batch.len(), + batch_missing = missing_ids.len(), + batch_skipped = batch_skipped, + "processed batch, found missing IDs" + ); + + if !missing_ids.is_empty() { + // Read the missing func run logs from layer-db + let mut func_run_logs_to_insert = Vec::with_capacity(missing_ids.len()); + for id in &missing_ids { + let layer_func_run_log = ctx + .services_context() + .layer_db() + .func_run_log() + .try_read(*id) + .await?; + func_run_logs_to_insert.push((*layer_func_run_log).clone()); + } + + // Batch upsert all missing func run logs + FuncRunLogDb::upsert_batch(&ctx, func_run_logs_to_insert).await?; + ctx.commit().await?; + + total_uploaded += missing_ids.len() as u64; + } + + // Update telemetry + monotonic!(func_run_logs.backfill_logs.items_uploaded = missing_ids.len() as u64); + monotonic!(func_run_logs.backfill_logs.items_skipped = batch_skipped as u64); + monotonic!(func_run_logs.backfill_logs.items_processed = id_batch.len() as u64); + + // Time-based checkpoint for progress visibility + if last_checkpoint.elapsed() > std::time::Duration::from_secs(10) { + info!( + cutoff_id = ?cutoff_id, + total_processed = total_processed, + total_uploaded = total_uploaded, + total_skipped = total_skipped, + "backfill checkpoint" + ); + last_checkpoint = Instant::now(); + } + } + + // Final log when backfill complete + info!( + total_processed = total_processed, + total_uploaded = total_uploaded, + total_skipped = total_skipped, + "completed func run logs backfill" + ); + + Ok(()) + } + + #[instrument( + name = "sdf.func_runs_backfiller.upload_all_func_runs_and_logs", + level = "info", + skip_all + )] + pub async fn upload_all_func_runs_and_logs_concurrently( + config: crate::Config, + task_tracker: tokio_util::task::TaskTracker, + task_token: CancellationToken, + shutdown_token: CancellationToken, + func_run_cutoff_id: Option, + func_run_log_cutoff_id: Option, + batch_size: i64, + ) -> FuncRunsBackfillResult<()> { + // Create one backfiller for each (each with their own services context) + // so they don't bleed transactions between them + let backfiller_runs = + FuncRunsBackfiller::new(config.clone(), task_tracker.clone(), task_token.clone()) + .await?; + + let runs_token = shutdown_token.clone(); + + let backfiller_logs = + FuncRunsBackfiller::new(config, task_tracker.clone(), task_token.clone()).await?; + + let logs_token = shutdown_token.clone(); + + match join!( + tokio::spawn(async move { + backfiller_runs + .upload_all_func_runs(runs_token, func_run_cutoff_id, batch_size) + .await + }), + tokio::spawn(async move { + backfiller_logs + .upload_all_func_run_logs(logs_token, func_run_log_cutoff_id, batch_size) + .await + }), + ) { + (Ok(_), Ok(_)) => Ok(()), + (Ok(_), err @ Err(_)) => err?, + (err @ Err(_), Ok(_)) => err?, + (err @ Err(_), Err(e)) => { + error!( + logs_error = e.to_string(), + "got error for both tasks. exiting on func runs error. logging the func run logs error here." + ); + err? + } + } + } +} diff --git a/lib/sdf-server/src/func_runs_backfill/error.rs b/lib/sdf-server/src/func_runs_backfill/error.rs new file mode 100644 index 0000000000..181edebf41 --- /dev/null +++ b/lib/sdf-server/src/func_runs_backfill/error.rs @@ -0,0 +1,36 @@ +use dal::TransactionsError; +use si_data_pg::{ + PgError, + PgPoolError, +}; +use si_db::SiDbError; +use si_layer_cache::LayerDbError; +use thiserror::Error; +use tokio::task::JoinError; + +use crate::init::InitError; + +pub type FuncRunsBackfillResult = Result; + +#[remain::sorted] +#[derive(Debug, Error)] +pub enum FuncRunsBackfillError { + #[error("dal initialization error: {0}")] + DalInit(#[from] crate::ServerError), + #[error("init error: {0}")] + Init(#[from] InitError), + #[error("tokio join error: {0}")] + Join(#[from] JoinError), + #[error("layer db error: {0}")] + LayerDb(#[from] LayerDbError), + #[error("parse error: {0}")] + Parse(String), + #[error("postgres error: {0}")] + Pg(#[from] PgError), + #[error("postgres pool error: {0}")] + PgPool(#[from] PgPoolError), + #[error("si-db error: {0}")] + SiDb(#[from] SiDbError), + #[error("transactions error: {0}")] + Transactions(#[from] TransactionsError), +} diff --git a/lib/sdf-server/src/lib.rs b/lib/sdf-server/src/lib.rs index 1733007393..d21e9c2e0f 100644 --- a/lib/sdf-server/src/lib.rs +++ b/lib/sdf-server/src/lib.rs @@ -23,6 +23,7 @@ mod app; mod app_state; mod config; mod extract; +mod func_runs_backfill; mod garbage_collection; mod init; pub mod key_generation; @@ -71,6 +72,11 @@ pub use self::{ WorkspacePermissions, WorkspacePermissionsMode, }, + func_runs_backfill::{ + FuncRunsBackfillError, + FuncRunsBackfillResult, + FuncRunsBackfiller, + }, garbage_collection::SnapshotGarbageCollector, layer_cache_backfill::{ BackfillConfig, diff --git a/lib/sdf-server/src/migrations.rs b/lib/sdf-server/src/migrations.rs index ae8ec3e1fc..1392abab21 100644 --- a/lib/sdf-server/src/migrations.rs +++ b/lib/sdf-server/src/migrations.rs @@ -45,7 +45,7 @@ pub enum MigratorError { #[error("error while migrating cached modules: {0}")] MigrateCachedModules(#[source] Box), #[error("error while migrating dal database: {0}")] - MigrateDalDatabase(#[source] si_db::Error), + MigrateDalDatabase(#[source] si_db::SiDbError), #[error("error while migrating layer db database: {0}")] MigrateLayerDbDatabase(#[source] si_layer_cache::LayerDbError), #[error("error while migrating snapshots: {0}")] diff --git a/lib/sdf-server/src/service/dev.rs b/lib/sdf-server/src/service/dev.rs index 9eac6ccde0..ec599ada13 100644 --- a/lib/sdf-server/src/service/dev.rs +++ b/lib/sdf-server/src/service/dev.rs @@ -28,7 +28,7 @@ pub enum DevError { #[error("pg error: {0}")] Pg(#[from] si_data_pg::PgError), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("transactions error: {0}")] Transactions(#[from] TransactionsError), #[error("could not publish websocket event: {0}")] diff --git a/lib/sdf-server/src/service/v2/audit_log.rs b/lib/sdf-server/src/service/v2/audit_log.rs index 051e7d3728..8e3d9a12a2 100644 --- a/lib/sdf-server/src/service/v2/audit_log.rs +++ b/lib/sdf-server/src/service/v2/audit_log.rs @@ -23,7 +23,7 @@ pub enum AuditLogError { #[error("dal transactions error: {0}")] DalTransactions(#[from] Box), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), } impl From for AuditLogError { diff --git a/lib/sdf-server/src/service/v2/change_set.rs b/lib/sdf-server/src/service/v2/change_set.rs index 412970534d..1fa62e25d4 100644 --- a/lib/sdf-server/src/service/v2/change_set.rs +++ b/lib/sdf-server/src/service/v2/change_set.rs @@ -116,7 +116,7 @@ pub enum Error { #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("slow runtime error: {0}")] SlowRuntime(#[from] dal::slow_rt::SlowRuntimeError), #[error("spice db error: {0}")] diff --git a/lib/sdf-server/src/service/v2/func.rs b/lib/sdf-server/src/service/v2/func.rs index 2323d1d74b..10733f4037 100644 --- a/lib/sdf-server/src/service/v2/func.rs +++ b/lib/sdf-server/src/service/v2/func.rs @@ -120,6 +120,8 @@ pub enum FuncAPIError { Serde(#[from] serde_json::Error), #[error("cannot set binding on transformation function")] SettingBindingOnTransformationFunction, + #[error("si db error: {0}")] + SiDb(#[from] si_db::SiDbError), #[error("transactions error: {0}")] Transactions(#[from] dal::TransactionsError), #[error("workspace snapshot error: {0}")] diff --git a/lib/sdf-server/src/service/v2/func/get_func_run.rs b/lib/sdf-server/src/service/v2/func/get_func_run.rs index a107ca4b41..cb036a4f04 100644 --- a/lib/sdf-server/src/service/v2/func/get_func_run.rs +++ b/lib/sdf-server/src/service/v2/func/get_func_run.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use axum::{ Json, extract::Path, @@ -17,6 +15,10 @@ use serde::{ Deserialize, Serialize, }; +use si_db::{ + FuncRunDb, + FuncRunLogDb, +}; use si_events::{ ActionId, ActionKind, @@ -242,12 +244,8 @@ pub async fn get_func_run_view(ctx: &DalContext, func_run: &FuncRun) -> FuncAPIR } }; - let logs = ctx - .layer_db() - .func_run_log() - .get_for_func_run_id(func_run.id()) + let logs = FuncRunLogDb::get_for_func_run_id(ctx, func_run.id()) .await? - .map(Arc::::unwrap_or_clone) .map(|v| v.into()); Ok(FuncRunView::new( @@ -273,7 +271,7 @@ pub async fn get_func_run( .build(access_builder.build(change_set_id.into())) .await?; - let maybe_func_run = ctx.layer_db().func_run().read(func_run_id).await?; + let maybe_func_run = FuncRunDb::read(&ctx, func_run_id).await?; match maybe_func_run { Some(func_run) => { diff --git a/lib/sdf-server/src/service/v2/func/get_func_run_logs.rs b/lib/sdf-server/src/service/v2/func/get_func_run_logs.rs index e5cdf3a928..950cb96969 100644 --- a/lib/sdf-server/src/service/v2/func/get_func_run_logs.rs +++ b/lib/sdf-server/src/service/v2/func/get_func_run_logs.rs @@ -1,10 +1,9 @@ -use std::sync::Arc; - use axum::{ Json, extract::Path, }; use dal::WorkspacePk; +use si_db::FuncRunLogDb; use si_events::FuncRunId; use super::get_func_run::FuncRunLogView; @@ -41,12 +40,8 @@ pub async fn get_func_run_logs( .await?; // Fetch only the logs for this function run - let logs = ctx - .layer_db() - .func_run_log() - .get_for_func_run_id(func_run_id) + let logs = FuncRunLogDb::get_for_func_run_id(&ctx, func_run_id) .await? - .map(Arc::unwrap_or_clone) .map(|v| v.into()); Ok(Json(GetFuncRunLogsResponse { logs })) diff --git a/lib/sdf-server/src/service/v2/func/get_func_run_logs_av.rs b/lib/sdf-server/src/service/v2/func/get_func_run_logs_av.rs index 84510469fd..5970485bb8 100644 --- a/lib/sdf-server/src/service/v2/func/get_func_run_logs_av.rs +++ b/lib/sdf-server/src/service/v2/func/get_func_run_logs_av.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use axum::{ Json, extract::Path, @@ -12,6 +10,10 @@ use serde::{ Deserialize, Serialize, }; +use si_db::{ + FuncRunDb, + FuncRunLogDb, +}; use super::get_func_run::FuncRunLogView; use crate::{ @@ -41,22 +43,17 @@ pub async fn get_func_run_logs_av( .build(access_builder.build(change_set_id.into())) .await?; - let maybe_av_run = ctx - .layer_db() - .func_run() - .get_last_qualification_for_attribute_value_id( - ctx.events_tenancy().workspace_pk, - attribute_value_id, - ) - .await?; + let maybe_av_run = FuncRunDb::get_last_qualification_for_attribute_value_id( + &ctx, + ctx.events_tenancy().workspace_pk, + attribute_value_id, + ) + .await?; + match maybe_av_run { Some(av_run) => { - let logs = ctx - .layer_db() - .func_run_log() - .get_for_func_run_id(av_run.id()) + let logs = FuncRunLogDb::get_for_func_run_id(&ctx, av_run.id()) .await? - .map(Arc::unwrap_or_clone) .map(|v| v.into()); Ok(Json(GetFuncRunLogsResponse { logs })) diff --git a/lib/sdf-server/src/service/v2/func/get_func_runs_paginated.rs b/lib/sdf-server/src/service/v2/func/get_func_runs_paginated.rs index d4adc04b2c..ed81640adc 100644 --- a/lib/sdf-server/src/service/v2/func/get_func_runs_paginated.rs +++ b/lib/sdf-server/src/service/v2/func/get_func_runs_paginated.rs @@ -10,6 +10,7 @@ use serde::{ Deserialize, Serialize, }; +use si_db::FuncRunDb; use si_events::{ CasValue, FuncRun, @@ -139,28 +140,24 @@ pub async fn get_func_runs_paginated( // Query the database with pagination parameters let func_runs = if let Some(component_id) = params.component_id { // Component-specific query - ctx.layer_db() - .func_run() - .read_many_for_component_paginated( - workspace_pk, - change_set_id, - component_id, - limit as i64, - params.cursor, - ) - .await? - .unwrap_or_default() + FuncRunDb::read_many_for_component_paginated( + &ctx, + workspace_pk, + change_set_id, + component_id, + limit as i64, + params.cursor, + ) + .await? } else { - ctx.layer_db() - .func_run() - .read_many_for_workspace_paginated( - workspace_pk, - change_set_id, - limit as i64, - params.cursor, - ) - .await? - .unwrap_or_default() + FuncRunDb::read_many_for_workspace_paginated( + &ctx, + workspace_pk, + change_set_id, + limit as i64, + params.cursor, + ) + .await? }; // Determine the next cursor (if we have at least `limit` results) diff --git a/lib/sdf-server/src/service/v2/management.rs b/lib/sdf-server/src/service/v2/management.rs index cdbc4d6ca6..ba56c5266c 100644 --- a/lib/sdf-server/src/service/v2/management.rs +++ b/lib/sdf-server/src/service/v2/management.rs @@ -61,6 +61,7 @@ use si_db::{ ManagementFuncExecutionError, ManagementFuncJobState, ManagementState, + SiDbError, }; use si_layer_cache::LayerDbError; use telemetry::prelude::*; @@ -116,6 +117,8 @@ pub enum ManagementApiError { SchemaVariant(#[from] SchemaVariantError), #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), + #[error("si db error: {0}")] + SiDb(#[from] SiDbError), #[error("transactions error: {0}")] Transactions(#[from] TransactionsError), #[error("translating string to ulid: {0} is not a valid ulid")] diff --git a/lib/sdf-server/src/service/v2/management/history.rs b/lib/sdf-server/src/service/v2/management/history.rs index b976871a50..b2a4175916 100644 --- a/lib/sdf-server/src/service/v2/management/history.rs +++ b/lib/sdf-server/src/service/v2/management/history.rs @@ -10,7 +10,10 @@ use serde::{ Deserialize, Serialize, }; -use si_db::Visibility; +use si_db::{ + FuncRunDb, + Visibility, +}; use si_events::{ ActionResultState, ComponentId, @@ -63,18 +66,15 @@ pub async fn history( let ctx = builder.build(request_ctx.build(request.visibility)).await?; let mut result = vec![]; - if let Some(management_history_list) = ctx - .layer_db() - .func_run() - .list_management_history( - ctx.events_tenancy().workspace_pk, - ctx.events_tenancy().change_set_id, - ) - .await? + + for func_run in FuncRunDb::list_management_history( + &ctx, + ctx.events_tenancy().workspace_pk, + ctx.events_tenancy().change_set_id, + ) + .await? { - for func_run in management_history_list { - result.push(ManagementHistoryItem::try_from(func_run)?); - } + result.push(ManagementHistoryItem::try_from(func_run)?); } Ok(Json(result)) diff --git a/lib/sdf-server/src/service/v2/management/latest.rs b/lib/sdf-server/src/service/v2/management/latest.rs index 3e450d93e4..4fcd61cadc 100644 --- a/lib/sdf-server/src/service/v2/management/latest.rs +++ b/lib/sdf-server/src/service/v2/management/latest.rs @@ -13,6 +13,7 @@ use dal::{ }, }; use sdf_extract::change_set::ChangeSetDalContext; +use si_db::FuncRunDb; use super::ManagementApiResult; use crate::service::v2::func::get_func_run::{ @@ -31,22 +32,22 @@ pub async fn latest( ) -> ManagementApiResult>> { let func_id = ManagementPrototype::func_id(ctx, prototype_id).await?; - Ok( - match ctx - .layer_db() - .func_run() - .get_last_management_run_for_func_and_component_id( - workspace_pk, - change_set_id, - component_id, - func_id.into_inner().into(), - ) - .await? - { - Some(func_run) => Json(Some(get_func_run_view(ctx, &func_run).await?)), - None => Json(None), - }, - ) + let maybe_view = if let Some(func_run) = + FuncRunDb::get_last_management_run_for_func_and_component_id( + ctx, + workspace_pk, + change_set_id, + component_id, + func_id.into_inner().into(), + ) + .await? + { + Some(get_func_run_view(ctx, &func_run).await?) + } else { + None + }; + + Ok(Json(maybe_view)) } pub async fn all_latest_for_component( @@ -66,16 +67,14 @@ pub async fn all_latest_for_component( { let func_id = ManagementPrototype::func_id(ctx, mgmt_prototype.id).await?; - let Some(run) = ctx - .layer_db() - .func_run() - .get_last_management_run_for_func_and_component_id( - workspace_pk, - change_set_id, - component_id, - func_id.into_inner().into(), - ) - .await? + let Some(run) = FuncRunDb::get_last_management_run_for_func_and_component_id( + ctx, + workspace_pk, + change_set_id, + component_id, + func_id.into_inner().into(), + ) + .await? else { continue; }; diff --git a/lib/sdf-server/src/service/v2/module.rs b/lib/sdf-server/src/service/v2/module.rs index 4d7a9a61f7..637e7f3b3d 100644 --- a/lib/sdf-server/src/service/v2/module.rs +++ b/lib/sdf-server/src/service/v2/module.rs @@ -76,7 +76,7 @@ pub enum ModulesAPIError { #[error("changeset error: {0:?}")] Serde(#[from] serde_json::Error), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("si pkg error: {0}")] SiPkg(#[from] SiPkgError), #[error("transactions error: {0}")] diff --git a/lib/sdf-server/src/service/v2/policy_report.rs b/lib/sdf-server/src/service/v2/policy_report.rs index 300bea65d1..f011641357 100644 --- a/lib/sdf-server/src/service/v2/policy_report.rs +++ b/lib/sdf-server/src/service/v2/policy_report.rs @@ -61,7 +61,7 @@ pub enum PolicyReportError { #[error("dal transactions error: {0}")] DalTransactions(#[from] Box), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("si db policy report error: {0}")] SiDbPolicyReport(#[from] si_db::PolicyReportError), } diff --git a/lib/sdf-server/src/service/v2/workspace.rs b/lib/sdf-server/src/service/v2/workspace.rs index 499a4518af..7d26ef4b87 100644 --- a/lib/sdf-server/src/service/v2/workspace.rs +++ b/lib/sdf-server/src/service/v2/workspace.rs @@ -53,7 +53,7 @@ pub enum WorkspaceAPIError { #[error("cannot install workspace using root tenancy")] RootTenancyInstallAttempt, #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("tokio task join error: {0}")] TokioJoin(#[from] JoinError), #[error("transactions error: {0}")] diff --git a/lib/sdf-v1-routes-module/src/lib.rs b/lib/sdf-v1-routes-module/src/lib.rs index f432ef0fec..099d20e7f4 100644 --- a/lib/sdf-v1-routes-module/src/lib.rs +++ b/lib/sdf-v1-routes-module/src/lib.rs @@ -120,7 +120,7 @@ pub enum ModuleError { #[error("json serialization error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("si pkg error: {0}")] SiPkg(#[from] SiPkgError), #[error("transactions error: {0}")] diff --git a/lib/sdf-v1-routes-session/src/lib.rs b/lib/sdf-v1-routes-session/src/lib.rs index 430374f65c..cb1969eaa4 100644 --- a/lib/sdf-v1-routes-session/src/lib.rs +++ b/lib/sdf-v1-routes-session/src/lib.rs @@ -57,7 +57,7 @@ pub enum SessionError { #[error("http error: {0}")] Request(#[from] reqwest::Error), #[error("si db error: {0}")] - SiDb(#[from] si_db::Error), + SiDb(#[from] si_db::SiDbError), #[error("SpiceDb error: {0}")] SpiceDb(#[from] SpiceDbError), #[error("workspace error: {0}")] diff --git a/lib/si-db/BUCK b/lib/si-db/BUCK index 0937bc74db..462788b401 100644 --- a/lib/si-db/BUCK +++ b/lib/si-db/BUCK @@ -11,9 +11,12 @@ rust_library( "//lib/si-data-pg:si-data-pg", "//lib/si-events-rs:si-events", "//lib/si-id:si-id", + "//lib/si-layer-cache:si-layer-cache", "//lib/telemetry-rs:telemetry", + "//lib/telemetry-utils-rs:telemetry-utils", "//third-party/rust:async-trait", "//third-party/rust:chrono", + "//third-party/rust:postcard", "//third-party/rust:postgres-types", "//third-party/rust:refinery", "//third-party/rust:remain", diff --git a/lib/si-db/Cargo.toml b/lib/si-db/Cargo.toml index aa78642806..51e4e91fc1 100644 --- a/lib/si-db/Cargo.toml +++ b/lib/si-db/Cargo.toml @@ -11,6 +11,7 @@ publish.workspace = true [dependencies] async-trait = { workspace = true } chrono = { workspace = true } +postcard = { workspace = true } postgres-types = { workspace = true } refinery = { workspace = true } remain = { workspace = true } @@ -22,8 +23,10 @@ si-data-nats = { path = "../../lib/si-data-nats" } si-data-pg = { path = "../../lib/si-data-pg" } si-events = { path = "../../lib/si-events-rs" } si-id = { path = "../../lib/si-id" } +si-layer-cache = { path = "../../lib/si-layer-cache" } strum = { workspace = true } telemetry = { path = "../../lib/telemetry-rs" } +telemetry-utils = { path = "../../lib/telemetry-utils-rs" } thiserror = { workspace = true } tokio = { workspace = true } ulid = { workspace = true } diff --git a/lib/si-db/src/actor_view.rs b/lib/si-db/src/actor_view.rs index 3a5544e554..72c755cfd3 100644 --- a/lib/si-db/src/actor_view.rs +++ b/lib/si-db/src/actor_view.rs @@ -12,7 +12,7 @@ use serde::{ use si_id::UserPk; use crate::{ - Result, + SiDbResult, context::SiDbContext, history_event::HistoryActor, user::User, @@ -54,7 +54,7 @@ impl ActorView { pub async fn from_history_actor( ctx: &impl SiDbContext, history_actor: HistoryActor, - ) -> Result { + ) -> SiDbResult { match history_actor { HistoryActor::User(user_pk) => { let user = User::get_by_pk(ctx, user_pk).await?; diff --git a/lib/si-db/src/context.rs b/lib/si-db/src/context.rs index 234b6484a4..9d67943bec 100644 --- a/lib/si-db/src/context.rs +++ b/lib/si-db/src/context.rs @@ -1,5 +1,9 @@ use async_trait::async_trait; use si_id::ChangeSetId; +use si_layer_cache::db::{ + func_run::FuncRunLayerDb, + func_run_log::FuncRunLogLayerDb, +}; use tokio::sync::MappedMutexGuard; use crate::{ @@ -21,4 +25,7 @@ pub trait SiDbContext { fn tenancy(&self) -> &Tenancy; fn visibility(&self) -> &Visibility; fn change_set_id(&self) -> ChangeSetId; + // TODO get rid of these after we don't need layer db fallbacks + fn func_run_layer_db(&self) -> &FuncRunLayerDb; + fn func_run_log_layer_db(&self) -> &FuncRunLogLayerDb; } diff --git a/lib/si-db/src/func_run.rs b/lib/si-db/src/func_run.rs new file mode 100644 index 0000000000..a94778ac89 --- /dev/null +++ b/lib/si-db/src/func_run.rs @@ -0,0 +1,704 @@ +use std::sync::Arc; + +use si_events::{ + ActionId, + AttributeValueId, + ChangeSetId, + ComponentId, + FuncId, + FuncRun, + FuncRunId, + WorkspacePk, +}; +use telemetry::prelude::*; +use telemetry_utils::monotonic; + +use crate::{ + SiDbContext, + SiDbError, + SiDbResult, + transactions::SiDbTransactions as _, +}; + +pub const DBNAME: &str = "func_runs"; + +const READY_MANY_FOR_WORKSPACE_ID_QUERY: &str = "SELECT * FROM func_runs WHERE workspace_id = $1"; + +const GET_LAST_QUALIFICATION_FOR_ATTRIBUTE_VALUE_ID_QUERY: &str = "SELECT value FROM func_runs + WHERE attribute_value_id = $2 AND workspace_id = $1 + ORDER BY updated_at DESC + LIMIT 1"; + +const GET_LAST_ACTION_BY_ACTION_ID_QUERY: &str = "SELECT value FROM func_runs + WHERE function_kind = 'Action' AND workspace_id = $1 AND action_id = $2 + ORDER BY updated_at DESC + LIMIT 1"; + +const LIST_MANAGEMENT_HISTORY_QUERY: &str = "SELECT value FROM func_runs + WHERE function_kind = 'Management' AND workspace_id = $1 AND change_set_id = $2 AND action_id IS NOT NULL + ORDER BY updated_at DESC"; + +const GET_LAST_MANAGEMENT_BY_FUNC_AND_COMPONENT_ID_QUERY: &str = "SELECT value FROM func_runs + WHERE function_kind = 'Management' AND workspace_id = $1 AND change_set_id = $2 AND component_id = $3 AND action_id = $4 + ORDER BY updated_at DESC + LIMIT 1"; + +const PAGINATED_WORKSPACE_QUERY_WITH_CURSOR: &str = "SELECT * FROM func_runs + WHERE workspace_id = $1 + AND change_set_id = $2 + AND ( + created_at < (SELECT created_at FROM func_runs WHERE key = $3) OR + (created_at = (SELECT created_at FROM func_runs WHERE key = $3) AND key < $3) + ) + ORDER BY created_at DESC, key DESC + LIMIT $4"; + +const PAGINATED_WORKSPACE_QUERY_NO_CURSOR: &str = "SELECT * FROM func_runs + WHERE workspace_id = $1 + AND change_set_id = $2 + ORDER BY created_at DESC, key DESC + LIMIT $3"; + +const PAGINATED_COMPONENT_QUERY_WITH_CURSOR: &str = "SELECT * FROM func_runs + WHERE workspace_id = $1 + AND change_set_id = $2 + AND component_id = $3 + AND ( + created_at < (SELECT created_at FROM func_runs WHERE key = $4) OR + (created_at = (SELECT created_at FROM func_runs WHERE key = $4) AND key < $4) + ) + ORDER BY created_at DESC, key DESC + LIMIT $5"; + +const PAGINATED_COMPONENT_QUERY_NO_CURSOR: &str = "SELECT * FROM func_runs + WHERE workspace_id = $1 + AND change_set_id = $2 + AND component_id = $3 + ORDER BY created_at DESC, key DESC + LIMIT $4"; + +#[derive(Debug, Clone)] +pub struct FuncRunDb {} + +impl FuncRunDb { + /// Write a new func run to the database. + /// This function can be used to replace the layer-cache write() function. + pub async fn upsert(ctx: &impl SiDbContext, func_run: FuncRun) -> SiDbResult<()> { + let json: serde_json::Value = serde_json::to_value(&func_run)?; + let postcard_bytes = + postcard::to_stdvec(&func_run).map_err(|e| SiDbError::Postcard(e.to_string()))?; + + // Write to si-db + ctx.txns() + .await? + .pg() + .execute( + "INSERT INTO func_runs ( + key, + created_at, + updated_at, + state, + function_kind, + workspace_id, + change_set_id, + actor_id, + component_id, + attribute_value_id, + action_id, + action_originating_change_set_id, + json_value, + value + ) VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7, + $8, + $9, + $10, + $11, + $12, + $13, + $14 + ) ON CONFLICT (key) DO UPDATE SET + updated_at = EXCLUDED.updated_at, + state = EXCLUDED.state, + json_value = EXCLUDED.json_value, + value = EXCLUDED.value", + &[ + &func_run.id().to_string(), + &func_run.created_at(), + &func_run.updated_at(), + &func_run.state().to_string(), + &func_run.function_kind().to_string(), + &func_run.tenancy().workspace_pk.to_string(), + &func_run.tenancy().change_set_id.to_string(), + &func_run.actor().to_string(), + &func_run.component_id().map(|v| v.to_string()), + &func_run.attribute_value_id().map(|v| v.to_string()), + &func_run.action_id().map(|v| v.to_string()), + &func_run + .action_originating_change_set_id() + .map(|v| v.to_string()), + &json, + &postcard_bytes.as_slice(), + ], + ) + .await?; + + // Also write to layer-db for backward compatibility during migration + ctx.func_run_layer_db() + .write( + Arc::new(func_run.clone()), + None, // web_events + *func_run.tenancy(), + *func_run.actor(), + ) + .await + .map_err(|e| SiDbError::LayerDb(e.to_string()))?; + + Ok(()) + } + + // NOTE(victor): This is only used by migration so it does not write to layer-db + /// Write multiple func runs to the database in a single INSERT query. + /// This is more efficient than calling upsert multiple times. + pub async fn upsert_batch(ctx: &impl SiDbContext, func_runs: Vec) -> SiDbResult<()> { + if func_runs.is_empty() { + return Ok(()); + } + + // Build the VALUES part of the query dynamically + // Store all the data we need to keep alive for the query + struct RowData { + id: String, + workspace_pk: String, + created_at: chrono::DateTime, + updated_at: chrono::DateTime, + state: String, + function_kind: String, + change_set_id: String, + actor: String, + component_id: Option, + attribute_value_id: Option, + action_id: Option, + action_originating_change_set_id: Option, + json: serde_json::Value, + postcard_bytes: Vec, + } + + let mut values_clauses = Vec::new(); + let mut row_data_vec = Vec::new(); + let mut param_index = 1; + const COL_COUNT: usize = 14; + + for func_run in &func_runs { + let json: serde_json::Value = serde_json::to_value(func_run)?; + let postcard_bytes = + postcard::to_stdvec(func_run).map_err(|e| SiDbError::Postcard(e.to_string()))?; + + // Create placeholders for this row ($1, $2, ... $COL_COUNT) + let placeholders: Vec = (param_index..param_index + COL_COUNT) + .map(|i| format!("${i}")) + .collect(); + values_clauses.push(format!("({})", placeholders.join(", "))); + + row_data_vec.push(RowData { + id: func_run.id().to_string(), + workspace_pk: func_run.tenancy().workspace_pk.to_string(), + created_at: func_run.created_at(), + updated_at: func_run.updated_at(), + state: func_run.state().to_string(), + function_kind: func_run.function_kind().to_string(), + change_set_id: func_run.tenancy().change_set_id.to_string(), + actor: func_run.actor().to_string(), + component_id: func_run.component_id().map(|v| v.to_string()), + attribute_value_id: func_run.attribute_value_id().map(|v| v.to_string()), + action_id: func_run.action_id().map(|v| v.to_string()), + action_originating_change_set_id: func_run + .action_originating_change_set_id() + .map(|v| v.to_string()), + json, + postcard_bytes, + }); + + param_index += COL_COUNT; + } + + let query = format!( + "INSERT INTO func_runs ( + key, + created_at, + updated_at, + state, + function_kind, + workspace_id, + change_set_id, + actor_id, + component_id, + attribute_value_id, + action_id, + action_originating_change_set_id, + json_value, + value + ) VALUES {} + ON CONFLICT (key) DO UPDATE SET + updated_at = EXCLUDED.updated_at, + state = EXCLUDED.state, + json_value = EXCLUDED.json_value, + value = EXCLUDED.value", + values_clauses.join(", ") + ); + + // Build the parameter array dynamically + // This looks like extra work, but since the pg library expects refs of everything, + // we had to create the row_data_vec to own the values while we pass them down + let postcard_slices: Vec<&[u8]> = row_data_vec + .iter() + .map(|rd| rd.postcard_bytes.as_slice()) + .collect(); + + let mut params: Vec<&(dyn postgres_types::ToSql + Sync)> = Vec::new(); + for (idx, row_data) in row_data_vec.iter().enumerate() { + params.push(&row_data.id); + params.push(&row_data.created_at); + params.push(&row_data.updated_at); + params.push(&row_data.state); + params.push(&row_data.function_kind); + params.push(&row_data.workspace_pk); + params.push(&row_data.change_set_id); + params.push(&row_data.actor); + params.push(&row_data.component_id); + params.push(&row_data.attribute_value_id); + params.push(&row_data.action_id); + params.push(&row_data.action_originating_change_set_id); + params.push(&row_data.json); + params.push(&postcard_slices[idx]); + } + + ctx.txns().await?.pg().execute(&query, ¶ms[..]).await?; + + Ok(()) + } + + // NOTE(victor): This is only used by migration so it does not fallback to layer-db + /// Returns the IDs from the input batch that do NOT exist in the database. + /// This is useful for determining which func runs need to be migrated. + pub async fn find_missing_ids( + ctx: &impl SiDbContext, + ids: &[FuncRunId], + ) -> SiDbResult> { + if ids.is_empty() { + return Ok(vec![]); + } + + // Convert IDs to strings for the query + let id_strings: Vec = ids.iter().map(|id| id.to_string()).collect(); + + // Build a query with ANY to check which IDs exist + let query = format!("SELECT key FROM {DBNAME} WHERE key = ANY($1)"); + + let rows = ctx.txns().await?.pg().query(&query, &[&id_strings]).await?; + + // Collect the IDs that exist in the database + let existing_ids: std::collections::HashSet = + rows.iter().map(|row| row.get::<_, String>("key")).collect(); + + // Return the IDs that don't exist + let missing_ids: Vec = ids + .iter() + .filter(|id| !existing_ids.contains(&id.to_string())) + .copied() + .collect(); + + Ok(missing_ids) + } + + pub async fn get_last_run_for_action_id_opt( + ctx: &impl SiDbContext, + workspace_pk: WorkspacePk, + action_id: ActionId, + ) -> SiDbResult> { + let maybe_row = ctx + .txns() + .await? + .pg() + .query_opt( + GET_LAST_ACTION_BY_ACTION_ID_QUERY, + &[&workspace_pk, &action_id], + ) + .await?; + + if let Some(row) = maybe_row { + let value_bytes: Vec = row.try_get("value")?; + let func_run: FuncRun = postcard::from_bytes(&value_bytes) + .map_err(|e| SiDbError::Postcard(e.to_string()))?; + Ok(Some(func_run)) + } else { + // Fall back to layer-db if not found in si-db + monotonic!( + func_runs.layerdb_fallback_total = 1, + method = "get_last_run_for_action_id_opt" + ); + ctx.func_run_layer_db() + .get_last_run_for_action_id_opt(workspace_pk, action_id) + .await + .map_err(|e| SiDbError::LayerDb(e.to_string())) + } + } + + pub async fn get_last_run_for_action_id( + ctx: &impl SiDbContext, + workspace_pk: WorkspacePk, + action_id: ActionId, + ) -> SiDbResult { + Self::get_last_run_for_action_id_opt(ctx, workspace_pk, action_id) + .await? + .ok_or(SiDbError::ActionIdNotFound(action_id)) + } + + pub async fn list_management_history( + ctx: &impl SiDbContext, + workspace_pk: WorkspacePk, + change_set_id: ChangeSetId, + ) -> SiDbResult> { + let rows = ctx + .txns() + .await? + .pg() + .query( + LIST_MANAGEMENT_HISTORY_QUERY, + &[&workspace_pk, &change_set_id], + ) + .await?; + + let mut func_runs = Vec::with_capacity(rows.len()); + for row in rows { + let value_bytes: Vec = row.try_get("value")?; + let func_run: FuncRun = postcard::from_bytes(&value_bytes) + .map_err(|e| SiDbError::Postcard(e.to_string()))?; + func_runs.push(func_run); + } + + // If si-db returned empty, fall back to layer-db + if func_runs.is_empty() { + monotonic!( + func_runs.layerdb_fallback_total = 1, + method = "list_management_history" + ); + if let Some(layer_func_runs) = ctx + .func_run_layer_db() + .list_management_history(workspace_pk, change_set_id) + .await + .map_err(|e| SiDbError::LayerDb(e.to_string()))? + { + return Ok(layer_func_runs); + } + } + + Ok(func_runs) + } + + pub async fn get_last_management_run_for_func_and_component_id( + ctx: &impl SiDbContext, + workspace_pk: WorkspacePk, + change_set_id: ChangeSetId, + component_id: ComponentId, + func_id: FuncId, + ) -> SiDbResult> { + let maybe_row = ctx + .txns() + .await? + .pg() + .query_opt( + GET_LAST_MANAGEMENT_BY_FUNC_AND_COMPONENT_ID_QUERY, + &[&workspace_pk, &change_set_id, &component_id, &func_id], + ) + .await?; + + if let Some(row) = maybe_row { + let value_bytes: Vec = row.try_get("value")?; + let func_run: FuncRun = postcard::from_bytes(&value_bytes) + .map_err(|e| SiDbError::Postcard(e.to_string()))?; + Ok(Some(func_run)) + } else { + // Fall back to layer-db if not found in si-db + monotonic!( + func_runs.layerdb_fallback_total = 1, + method = "get_last_management_run_for_func_and_component_id" + ); + ctx.func_run_layer_db() + .get_last_management_run_for_func_and_component_id( + workspace_pk, + change_set_id, + component_id, + func_id, + ) + .await + .map_err(|e| SiDbError::LayerDb(e.to_string())) + } + } + + pub async fn get_last_qualification_for_attribute_value_id( + ctx: &impl SiDbContext, + workspace_pk: WorkspacePk, + attribute_value_id: AttributeValueId, + ) -> SiDbResult> { + let maybe_row = ctx + .txns() + .await? + .pg() + .query_opt( + GET_LAST_QUALIFICATION_FOR_ATTRIBUTE_VALUE_ID_QUERY, + &[&workspace_pk, &attribute_value_id], + ) + .await?; + + if let Some(row) = maybe_row { + let value_bytes: Vec = row.try_get("value")?; + let func_run: FuncRun = postcard::from_bytes(&value_bytes) + .map_err(|e| SiDbError::Postcard(e.to_string()))?; + Ok(Some(func_run)) + } else { + // Fall back to layer-db if not found in si-db + monotonic!( + func_runs.layerdb_fallback_total = 1, + method = "get_last_qualification_for_attribute_value_id" + ); + ctx.func_run_layer_db() + .get_last_qualification_for_attribute_value_id(workspace_pk, attribute_value_id) + .await + .map_err(|e| SiDbError::LayerDb(e.to_string())) + } + } + + pub async fn read(ctx: &impl SiDbContext, key: FuncRunId) -> SiDbResult> { + let maybe_row = ctx + .txns() + .await? + .pg() + .query_opt( + &format!("SELECT value FROM {DBNAME} WHERE key = $1"), + &[&key.to_string()], + ) + .await?; + + if let Some(row) = maybe_row { + let value_bytes: Vec = row.try_get("value")?; + let func_run: FuncRun = postcard::from_bytes(&value_bytes) + .map_err(|e| SiDbError::Postcard(e.to_string()))?; + Ok(Some(func_run)) + } else { + // Fall back to layer-db if not found in si-db + monotonic!(func_runs.layerdb_fallback_total = 1, method = "read"); + ctx.func_run_layer_db() + .try_read(key) + .await + .map(|arc_func_run| Some((*arc_func_run).clone())) + .map_err(|e| SiDbError::LayerDb(e.to_string())) + } + } + + pub async fn try_read(ctx: &impl SiDbContext, key: FuncRunId) -> SiDbResult { + Self::read(ctx, key) + .await? + .ok_or(SiDbError::MissingFuncRun(key)) + } + + pub async fn read_many_for_workspace( + ctx: &impl SiDbContext, + workspace_pk: WorkspacePk, + ) -> SiDbResult> { + let rows = ctx + .txns() + .await? + .pg() + .query(READY_MANY_FOR_WORKSPACE_ID_QUERY, &[&workspace_pk]) + .await?; + + let mut func_runs = Vec::with_capacity(rows.len()); + for row in rows { + let value_bytes: Vec = row.try_get("value")?; + let func_run: FuncRun = postcard::from_bytes(&value_bytes) + .map_err(|e| SiDbError::Postcard(e.to_string()))?; + func_runs.push(func_run); + } + + // Fall back to layer-db if si-db returned no results + if func_runs.is_empty() { + monotonic!( + func_runs.layerdb_fallback_total = 1, + method = "read_many_for_workspace" + ); + if let Some(layer_func_runs) = ctx + .func_run_layer_db() + .read_many_for_workspace(workspace_pk) + .await + .map_err(|e| SiDbError::LayerDb(e.to_string()))? + { + return Ok(layer_func_runs + .into_iter() + .map(|arc| (*arc).clone()) + .collect()); + } + } + + Ok(func_runs) + } + + /// Read function runs for a workspace with pagination support. + /// + /// This method uses cursor-based pagination where: + /// - `limit` controls how many items to return per page + /// - `cursor` is the ID of the last item from the previous page + /// - Results are filtered by workspace_id and change_set_id + /// + /// Results are ordered by creation time (newest first). + pub async fn read_many_for_workspace_paginated( + ctx: &impl SiDbContext, + workspace_pk: WorkspacePk, + change_set_id: ChangeSetId, + limit: i64, + cursor: Option, + ) -> SiDbResult> { + let rows = if let Some(cursor_id) = cursor { + ctx.txns() + .await? + .pg() + .query( + PAGINATED_WORKSPACE_QUERY_WITH_CURSOR, + &[ + &workspace_pk, + &change_set_id.to_string(), + &cursor_id.to_string(), + &limit, + ], + ) + .await? + } else { + ctx.txns() + .await? + .pg() + .query( + PAGINATED_WORKSPACE_QUERY_NO_CURSOR, + &[&workspace_pk, &change_set_id.to_string(), &limit], + ) + .await? + }; + + let mut func_runs = Vec::with_capacity(rows.len()); + for row in rows { + let value_bytes: Vec = row.try_get("value")?; + let func_run: FuncRun = postcard::from_bytes(&value_bytes) + .map_err(|e| SiDbError::Postcard(e.to_string()))?; + func_runs.push(func_run); + } + + // Fall back to layer-db if si-db returned no results + if func_runs.is_empty() { + monotonic!( + func_runs.layerdb_fallback_total = 1, + method = "read_many_for_workspace_paginated" + ); + if let Some(layer_func_runs) = ctx + .func_run_layer_db() + .read_many_for_workspace_paginated(workspace_pk, change_set_id, limit, cursor) + .await + .map_err(|e| SiDbError::LayerDb(e.to_string()))? + { + return Ok(layer_func_runs + .into_iter() + .map(|arc| (*arc).clone()) + .collect()); + } + } + + Ok(func_runs) + } + + /// Read function runs for a specific component with pagination support. + /// + /// This method uses cursor-based pagination where: + /// - `limit` controls how many items to return per page + /// - `cursor` is the ID of the last item from the previous page + /// - Results are filtered by workspace_id, change_set_id, and component_id + /// + /// Results are ordered by creation time (newest first). + pub async fn read_many_for_component_paginated( + ctx: &impl SiDbContext, + workspace_pk: WorkspacePk, + change_set_id: ChangeSetId, + component_id: ComponentId, + limit: i64, + cursor: Option, + ) -> SiDbResult> { + let rows = if let Some(cursor_id) = cursor { + ctx.txns() + .await? + .pg() + .query( + PAGINATED_COMPONENT_QUERY_WITH_CURSOR, + &[ + &workspace_pk, + &change_set_id.to_string(), + &component_id.to_string(), + &cursor_id.to_string(), + &limit, + ], + ) + .await? + } else { + ctx.txns() + .await? + .pg() + .query( + PAGINATED_COMPONENT_QUERY_NO_CURSOR, + &[ + &workspace_pk, + &change_set_id.to_string(), + &component_id.to_string(), + &limit, + ], + ) + .await? + }; + + let mut func_runs = Vec::with_capacity(rows.len()); + for row in rows { + let value_bytes: Vec = row.try_get("value")?; + let func_run: FuncRun = postcard::from_bytes(&value_bytes) + .map_err(|e| SiDbError::Postcard(e.to_string()))?; + func_runs.push(func_run); + } + + // Fall back to layer-db if si-db returned no results + if func_runs.is_empty() { + monotonic!( + func_runs.layerdb_fallback_total = 1, + method = "read_many_for_component_paginated" + ); + if let Some(layer_func_runs) = ctx + .func_run_layer_db() + .read_many_for_component_paginated( + workspace_pk, + change_set_id, + component_id, + limit, + cursor, + ) + .await + .map_err(|e| SiDbError::LayerDb(e.to_string()))? + { + return Ok(layer_func_runs + .into_iter() + .map(|arc| (*arc).clone()) + .collect()); + } + } + + Ok(func_runs) + } +} diff --git a/lib/si-db/src/func_run_log.rs b/lib/si-db/src/func_run_log.rs new file mode 100644 index 0000000000..157d973278 --- /dev/null +++ b/lib/si-db/src/func_run_log.rs @@ -0,0 +1,244 @@ +use std::sync::Arc; + +use si_events::{ + FuncRunId, + FuncRunLog, + FuncRunLogId, +}; +use telemetry::prelude::*; +use telemetry_utils::monotonic; + +use crate::{ + SiDbContext, + SiDbError, + SiDbResult, + transactions::SiDbTransactions as _, +}; + +pub const DBNAME: &str = "func_run_logs"; + +#[derive(Debug, Clone)] +pub struct FuncRunLogDb {} + +impl FuncRunLogDb { + /// Write a new func run log to the database. + /// This function writes to both si-db and layer-db for backward compatibility during migration. + pub async fn upsert(ctx: &impl SiDbContext, func_run_log: FuncRunLog) -> SiDbResult<()> { + let postcard_bytes = + postcard::to_stdvec(&func_run_log).map_err(|e| SiDbError::Postcard(e.to_string()))?; + + // Write to si-db + ctx.txns() + .await? + .pg() + .execute( + &format!( + "INSERT INTO {DBNAME} ( + key, + created_at, + updated_at, + workspace_id, + change_set_id, + func_run_id, + value + ) VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7 + ) ON CONFLICT (key) DO UPDATE SET + updated_at = EXCLUDED.updated_at, + value = EXCLUDED.value;" + ), + &[ + &func_run_log.id().to_string(), + &func_run_log.created_at(), + &func_run_log.updated_at(), + &func_run_log.tenancy().workspace_pk.to_string(), + &func_run_log.tenancy().change_set_id.to_string(), + &func_run_log.func_run_id().to_string(), + &postcard_bytes.as_slice(), + ], + ) + .await?; + + // Also write to layer-db for backward compatibility during migration + // Convert HistoryActor to Actor + let actor = match ctx.history_actor() { + crate::history_event::HistoryActor::SystemInit => si_events::Actor::System, + crate::history_event::HistoryActor::User(pk) => si_events::Actor::User(*pk), + }; + + ctx.func_run_log_layer_db() + .write( + Arc::new(func_run_log.clone()), + None, // web_events + func_run_log.tenancy(), + actor, + ) + .await + .map_err(|e| SiDbError::LayerDb(e.to_string()))?; + + Ok(()) + } + + pub async fn get_for_func_run_id( + ctx: &impl SiDbContext, + func_run_id: FuncRunId, + ) -> SiDbResult> { + let maybe_row = ctx + .txns() + .await? + .pg() + .query_opt( + &format!("SELECT value FROM {DBNAME} WHERE func_run_id = $1"), + &[&func_run_id], + ) + .await?; + + if let Some(row) = maybe_row { + let value_bytes: Vec = row.try_get("value")?; + let func_run_log: FuncRunLog = postcard::from_bytes(&value_bytes) + .map_err(|e| SiDbError::Postcard(e.to_string()))?; + Ok(Some(func_run_log)) + } else { + // Fall back to layer-db if not found in si-db + monotonic!( + func_run_logs.layerdb_fallback_total = 1, + method = "get_for_func_run_id" + ); + ctx.func_run_log_layer_db() + .get_for_func_run_id(func_run_id) + .await + .map(|opt_arc| opt_arc.map(|arc| (*arc).clone())) + .map_err(|e| SiDbError::LayerDb(e.to_string())) + } + } + + /// Returns the IDs from the input batch that do NOT exist in the database. + /// This is useful for determining which func run logs need to be migrated. + pub async fn find_missing_ids( + ctx: &impl SiDbContext, + ids: &[FuncRunLogId], + ) -> SiDbResult> { + if ids.is_empty() { + return Ok(vec![]); + } + + // Convert IDs to strings for the query + let id_strings: Vec = ids.iter().map(|id| id.to_string()).collect(); + + // Build a query with ANY to check which IDs exist + let query = format!("SELECT key FROM {DBNAME} WHERE key = ANY($1)"); + + let rows = ctx.txns().await?.pg().query(&query, &[&id_strings]).await?; + + // Collect the IDs that exist in the database + let existing_ids: std::collections::HashSet = + rows.iter().map(|row| row.get::<_, String>("key")).collect(); + + // Return the IDs that don't exist + let missing_ids: Vec = ids + .iter() + .filter(|id| !existing_ids.contains(&id.to_string())) + .copied() + .collect(); + + Ok(missing_ids) + } + + /// Write multiple func run logs to the database in a single INSERT query. + /// This is more efficient than calling upsert multiple times. + pub async fn upsert_batch( + ctx: &impl SiDbContext, + func_run_logs: Vec, + ) -> SiDbResult<()> { + if func_run_logs.is_empty() { + return Ok(()); + } + + // Store all the data we need to keep alive for the query + struct RowData { + id: String, + workspace_pk: String, + created_at: chrono::DateTime, + updated_at: chrono::DateTime, + change_set_id: String, + func_run_id: String, + postcard_bytes: Vec, + } + + let mut values_clauses = Vec::new(); + let mut row_data_vec = Vec::new(); + let mut param_index = 1; + + const COL_COUNT: usize = 7; + + for func_run_log in &func_run_logs { + let postcard_bytes = postcard::to_stdvec(func_run_log) + .map_err(|e| SiDbError::Postcard(e.to_string()))?; + + // Create placeholders for this row ($1, $2, ... $COL_COUNT) + let placeholders: Vec = (param_index..param_index + COL_COUNT) + .map(|i| format!("${i}")) + .collect(); + values_clauses.push(format!("({})", placeholders.join(", "))); + + row_data_vec.push(RowData { + id: func_run_log.id().to_string(), + workspace_pk: func_run_log.tenancy().workspace_pk.to_string(), + created_at: func_run_log.created_at(), + updated_at: func_run_log.updated_at(), + change_set_id: func_run_log.tenancy().change_set_id.to_string(), + func_run_id: func_run_log.func_run_id().to_string(), + postcard_bytes, + }); + + param_index += COL_COUNT; + } + + let query = format!( + "INSERT INTO {} ( + key, + created_at, + updated_at, + workspace_id, + change_set_id, + func_run_id, + value + ) VALUES {} + ON CONFLICT (key) DO UPDATE SET + updated_at = EXCLUDED.updated_at, + value = EXCLUDED.value", + DBNAME, + values_clauses.join(", ") + ); + + // Build the parameter array dynamically + + // This looks like extra work, but since the pg library expects refs of everything, + // we had to create the row_data_vec to own the values while we pass them down + let postcard_slices: Vec<&[u8]> = row_data_vec + .iter() + .map(|rd| rd.postcard_bytes.as_slice()) + .collect(); + + let mut params: Vec<&(dyn postgres_types::ToSql + Sync)> = Vec::new(); + for (idx, row_data) in row_data_vec.iter().enumerate() { + params.push(&row_data.id); + params.push(&row_data.created_at); + params.push(&row_data.updated_at); + params.push(&row_data.workspace_pk); + params.push(&row_data.change_set_id); + params.push(&row_data.func_run_id); + params.push(&postcard_slices[idx]); + } + + ctx.txns().await?.pg().execute(&query, ¶ms[..]).await?; + + Ok(()) + } +} diff --git a/lib/si-db/src/history_event.rs b/lib/si-db/src/history_event.rs index 765353a9c1..79f604cb82 100644 --- a/lib/si-db/src/history_event.rs +++ b/lib/si-db/src/history_event.rs @@ -10,7 +10,7 @@ use si_events::Timestamp; use si_id::UserPk; use crate::{ - Result, + SiDbResult, actor_view::ActorView, context::SiDbContext, tenancy::Tenancy, @@ -35,14 +35,14 @@ impl HistoryActor { } } - pub async fn email(&self, ctx: &impl SiDbContext) -> Result { + pub async fn email(&self, ctx: &impl SiDbContext) -> SiDbResult { Ok(match self { HistoryActor::SystemInit => "sally@systeminit.com".to_string(), HistoryActor::User(user_pk) => User::get_by_pk(ctx, *user_pk).await?.email().clone(), }) } - pub async fn email_is_systeminit(&self, ctx: &impl SiDbContext) -> Result { + pub async fn email_is_systeminit(&self, ctx: &impl SiDbContext) -> SiDbResult { let email_as_lowercase = self.email(ctx).await?.to_lowercase(); Ok(Self::is_systeminit_domain(email_as_lowercase)) } @@ -101,7 +101,7 @@ impl HistoryEvent { label: impl AsRef, message: impl AsRef, data: &serde_json::Value, - ) -> Result { + ) -> SiDbResult { let label = label.as_ref(); let message = message.as_ref(); let actor = serde_json::to_value(ctx.history_actor())?; diff --git a/lib/si-db/src/lib.rs b/lib/si-db/src/lib.rs index e45caedf0e..5b7a465ebe 100644 --- a/lib/si-db/src/lib.rs +++ b/lib/si-db/src/lib.rs @@ -5,6 +5,8 @@ use si_id::UserPk; mod actor_view; pub mod change_set; mod context; +mod func_run; +mod func_run_log; mod history_event; pub mod key_pair; mod management_func_execution; @@ -20,6 +22,8 @@ pub mod workspace; pub use actor_view::ActorView; pub use context::SiDbContext; +pub use func_run::FuncRunDb; +pub use func_run_log::FuncRunLogDb; pub use history_event::{ HistoryActor, HistoryEvent, @@ -54,7 +58,13 @@ mod embedded { #[remain::sorted] #[derive(thiserror::Error, Debug)] -pub enum Error { +pub enum SiDbError { + #[error("action id not found: {0}")] + ActionIdNotFound(si_events::ActionId), + #[error("layer db error: {0}")] + LayerDb(String), + #[error("missing func run: {0}")] + MissingFuncRun(si_events::FuncRunId), #[error("nats error")] Nats(#[from] si_data_nats::NatsError), #[error("no workspace")] @@ -63,6 +73,8 @@ pub enum Error { Pg(#[from] si_data_pg::PgError), #[error("pg pool error: {0}")] PgPool(#[from] si_data_pg::PgPoolError), + #[error("postcard error: {0}")] + Postcard(String), #[error("error serializing/deserializing json: {0}")] SerdeJson(#[from] serde_json::Error), #[error("si db transactions error: {0}")] @@ -71,4 +83,4 @@ pub enum Error { UserNotFound(UserPk), } -pub type Result = std::result::Result; +pub type SiDbResult = Result; diff --git a/lib/si-db/src/management_func_execution.rs b/lib/si-db/src/management_func_execution.rs index a312c5de53..be27268b50 100644 --- a/lib/si-db/src/management_func_execution.rs +++ b/lib/si-db/src/management_func_execution.rs @@ -46,7 +46,7 @@ pub enum ManagementFuncExecutionError { #[error("pg pool error: {0}")] PgPool(#[from] si_data_pg::PgPoolError), #[error("si db error: {0}")] - SiDb(#[from] crate::Error), + SiDb(#[from] crate::SiDbError), #[error("si db transactions error: {0}")] SiDbTransactions(#[from] crate::transactions::SiDbTransactionsError), #[error("strum parse error: {0}")] diff --git a/lib/si-db/src/migrate.rs b/lib/si-db/src/migrate.rs index 2cea246303..a2dfc9b89a 100644 --- a/lib/si-db/src/migrate.rs +++ b/lib/si-db/src/migrate.rs @@ -23,7 +23,7 @@ use tokio::{ time::Instant, }; -use crate::Result; +use crate::SiDbResult; mod embedded { use refinery::embed_migrations; @@ -32,13 +32,13 @@ mod embedded { } #[instrument(level = "info", skip_all)] -pub async fn migrate_all(pg_pool: &PgPool) -> Result<()> { +pub async fn migrate_all(pg_pool: &PgPool) -> SiDbResult<()> { migrate(pg_pool).await?; Ok(()) } #[instrument(level = "info", skip_all)] -pub async fn migrate_all_with_progress(pg_pool: &PgPool) -> Result<()> { +pub async fn migrate_all_with_progress(pg_pool: &PgPool) -> SiDbResult<()> { let mut interval = time::interval(Duration::from_secs(5)); let instant = Instant::now(); let migrate_all = migrate_all(pg_pool); @@ -63,7 +63,7 @@ pub async fn migrate_all_with_progress(pg_pool: &PgPool) -> Result<()> { } #[instrument(level = "info", skip_all)] -pub async fn migrate(pg: &PgPool) -> Result<()> { +pub async fn migrate(pg: &PgPool) -> SiDbResult<()> { pg.migrate(embedded::migrations::runner()).await?; Ok(()) } @@ -82,6 +82,7 @@ pub async fn migrate(pg: &PgPool) -> Result<()> { )] #[strum(serialize_all = "camelCase")] pub enum MigrationMode { + BackfillFuncRuns, BackfillLayerCache, GarbageCollectSnapshots, Run, @@ -116,6 +117,10 @@ impl MigrationMode { pub fn is_backfill_layer_cache(&self) -> bool { matches!(self, Self::BackfillLayerCache) } + + pub fn is_backfill_func_runs(&self) -> bool { + matches!(self, Self::BackfillFuncRuns) + } } #[cfg(test)] diff --git a/lib/si-db/src/migrations/U4024__func_runs.sql b/lib/si-db/src/migrations/U4024__func_runs.sql new file mode 100644 index 0000000000..d9b78391f5 --- /dev/null +++ b/lib/si-db/src/migrations/U4024__func_runs.sql @@ -0,0 +1,20 @@ +CREATE TABLE func_runs +( + key text NOT NULL PRIMARY KEY, + created_at timestamp with time zone NOT NULL DEFAULT CLOCK_TIMESTAMP(), + updated_at timestamp with time zone NOT NULL DEFAULT CLOCK_TIMESTAMP(), + state text NOT NULL, + function_kind text NOT NULL, + workspace_id text NOT NULL, + change_set_id text NOT NULL, + actor_id text NOT NULL, + component_id text, + attribute_value_id text, + action_id text, + action_originating_change_set_id text, + value bytea NOT NULL, + json_value jsonb NOT NULL, + serialization_lib text NOT NULL DEFAULT 'postcard' +); + +CREATE INDEX IF NOT EXISTS by_attribute_value_id ON func_runs (attribute_value_id, updated_at DESC); diff --git a/lib/si-db/src/migrations/U4025__func_run_logs.sql b/lib/si-db/src/migrations/U4025__func_run_logs.sql new file mode 100644 index 0000000000..ab91f7fd30 --- /dev/null +++ b/lib/si-db/src/migrations/U4025__func_run_logs.sql @@ -0,0 +1,15 @@ +CREATE TABLE func_run_logs +( + key text NOT NULL PRIMARY KEY, + created_at timestamp with time zone NOT NULL DEFAULT CLOCK_TIMESTAMP(), + updated_at timestamp with time zone NOT NULL DEFAULT CLOCK_TIMESTAMP(), + + workspace_id text NOT NULL, + change_set_id text NOT NULL, + + func_run_id text NOT NULL UNIQUE, + value bytea NOT NULL, + serialization_lib text NOT NULL DEFAULT 'postcard' +); + +CREATE INDEX IF NOT EXISTS func_run_log_by_func_run_id ON func_run_logs (func_run_id); diff --git a/lib/si-db/src/policy_report.rs b/lib/si-db/src/policy_report.rs index 5f92e87cc3..a93938b0ea 100644 --- a/lib/si-db/src/policy_report.rs +++ b/lib/si-db/src/policy_report.rs @@ -71,7 +71,7 @@ pub enum PolicyReportError { #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("si db error: {0}")] - SiDb(#[from] crate::Error), + SiDb(#[from] crate::SiDbError), #[error("si db transactions error: {0}")] SiDbTransactions(#[from] crate::transactions::SiDbTransactionsError), } diff --git a/lib/si-db/src/tenancy.rs b/lib/si-db/src/tenancy.rs index f52ffd85a0..9ce023143c 100644 --- a/lib/si-db/src/tenancy.rs +++ b/lib/si-db/src/tenancy.rs @@ -7,8 +7,8 @@ use si_id::WorkspacePk; use telemetry::prelude::*; use crate::{ - Error, - Result, + SiDbError, + SiDbResult, }; #[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone, Copy, Hash)] @@ -29,7 +29,7 @@ impl Tenancy { } #[instrument(level = "debug", skip_all)] - pub async fn check(&self, txn: &PgTxn, tenancy: &Tenancy) -> Result { + pub async fn check(&self, txn: &PgTxn, tenancy: &Tenancy) -> SiDbResult { let row = txn .query_one( "SELECT in_tenancy_v1($1::jsonb, $2::ident) AS result", @@ -40,8 +40,8 @@ impl Tenancy { Ok(result) } - pub fn workspace_pk(&self) -> Result { - self.workspace_pk.ok_or(Error::NoWorkspace) + pub fn workspace_pk(&self) -> SiDbResult { + self.workspace_pk.ok_or(SiDbError::NoWorkspace) } pub fn workspace_pk_opt(&self) -> Option { diff --git a/lib/si-db/src/user.rs b/lib/si-db/src/user.rs index 54e8300004..679b1f26bf 100644 --- a/lib/si-db/src/user.rs +++ b/lib/si-db/src/user.rs @@ -9,8 +9,8 @@ use si_id::{ }; use crate::{ - Error, - Result, + SiDbError, + SiDbResult, context::SiDbContext, getter, history_event::HistoryEvent, @@ -50,7 +50,7 @@ impl User { name: impl AsRef, email: impl AsRef, picture_url: Option>, - ) -> Result { + ) -> SiDbResult { let name = name.as_ref(); let email = email.as_ref(); @@ -86,7 +86,7 @@ impl User { Ok(object) } - pub async fn get_by_pk_opt(ctx: &impl SiDbContext, pk: UserPk) -> Result> { + pub async fn get_by_pk_opt(ctx: &impl SiDbContext, pk: UserPk) -> SiDbResult> { let row = ctx .txns() .await? @@ -100,17 +100,17 @@ impl User { Ok(None) } } - pub async fn get_by_pk(ctx: &impl SiDbContext, pk: UserPk) -> Result { + pub async fn get_by_pk(ctx: &impl SiDbContext, pk: UserPk) -> SiDbResult { Self::get_by_pk_opt(ctx, pk) .await? - .ok_or(Error::UserNotFound(pk)) + .ok_or(SiDbError::UserNotFound(pk)) } pub async fn associate_workspace( &self, ctx: &impl SiDbContext, workspace_pk: WorkspacePk, - ) -> Result<()> { + ) -> SiDbResult<()> { ctx.txns() .await? .pg() @@ -122,7 +122,7 @@ impl User { Ok(()) } - pub async fn is_first_user(&self, ctx: &impl SiDbContext) -> Result { + pub async fn is_first_user(&self, ctx: &impl SiDbContext) -> SiDbResult { let row = ctx .txns() .await? @@ -143,7 +143,7 @@ impl User { ctx: &impl SiDbContext, user_pk: UserPk, workspace_pkg: String, - ) -> Result<()> { + ) -> SiDbResult<()> { ctx.txns() .await? .pg() @@ -158,7 +158,7 @@ impl User { pub async fn list_members_for_workspace( ctx: &impl SiDbContext, workspace_pk: String, - ) -> Result> { + ) -> SiDbResult> { let rows = ctx .txns() .await? @@ -179,7 +179,7 @@ impl User { pub async fn list_member_pks_for_workspace( ctx: &impl SiDbContext, workspace_pk: String, - ) -> Result> { + ) -> SiDbResult> { let rows = ctx .txns() .await? @@ -201,7 +201,7 @@ impl User { ctx: &impl SiDbContext, user_pk: UserPk, workspace_pk: WorkspacePk, - ) -> Result { + ) -> SiDbResult { let row = ctx .txns() .await? @@ -220,7 +220,7 @@ impl User { workspace_pk: WorkspacePk, key: impl AsRef, value: serde_json::Value, - ) -> Result { + ) -> SiDbResult { let formatted_key = Vec::from([key.as_ref()]); let row = ctx diff --git a/lib/si-layer-cache/src/db.rs b/lib/si-layer-cache/src/db.rs index 2738145fd2..7fd82f518e 100644 --- a/lib/si-layer-cache/src/db.rs +++ b/lib/si-layer-cache/src/db.rs @@ -44,8 +44,8 @@ use crate::{ activity_client::ActivityClient, db::{ encrypted_secret::EncryptedSecretDb, - func_run::FuncRunDb, - func_run_log::FuncRunLogDb, + func_run::FuncRunLayerDb, + func_run_log::FuncRunLogLayerDb, }, error::LayerDbResult, hybrid_cache::CacheConfig, @@ -101,8 +101,8 @@ pub struct LayerDb< cas: CasDb, change_batch: ChangeBatchDb, encrypted_secret: EncryptedSecretDb, - func_run: FuncRunDb, - func_run_log: FuncRunLogDb, + func_run: FuncRunLayerDb, + func_run_log: FuncRunLogLayerDb, rebase_batch: RebaseBatchDb, workspace_snapshot: WorkspaceSnapshotDb, split_snapshot_subgraph: SplitSnapshotSubGraphDb, @@ -439,8 +439,8 @@ where let change_batch = ChangeBatchDb::new(change_batch_cache, persister_client.clone()); let encrypted_secret = EncryptedSecretDb::new(encrypted_secret_cache, persister_client.clone()); - let func_run = FuncRunDb::new(func_run_cache, persister_client.clone()); - let func_run_log = FuncRunLogDb::new(func_run_log_cache, persister_client.clone()); + let func_run = FuncRunLayerDb::new(func_run_cache, persister_client.clone()); + let func_run_log = FuncRunLogLayerDb::new(func_run_log_cache, persister_client.clone()); let workspace_snapshot = WorkspaceSnapshotDb::new(snapshot_cache, persister_client.clone()); let rebase_batch = RebaseBatchDb::new(rebase_batch_cache, persister_client.clone()); let split_snapshot_subgraph = @@ -502,11 +502,11 @@ where &self.encrypted_secret } - pub fn func_run(&self) -> &FuncRunDb { + pub fn func_run(&self) -> &FuncRunLayerDb { &self.func_run } - pub fn func_run_log(&self) -> &FuncRunLogDb { + pub fn func_run_log(&self) -> &FuncRunLogLayerDb { &self.func_run_log } diff --git a/lib/si-layer-cache/src/db/func_run.rs b/lib/si-layer-cache/src/db/func_run.rs index 4b4d1d2bf6..0ac43d81cd 100644 --- a/lib/si-layer-cache/src/db/func_run.rs +++ b/lib/si-layer-cache/src/db/func_run.rs @@ -16,6 +16,7 @@ use si_events::{ WebEvent, WorkspacePk, }; +use si_id::ulid::Ulid; use telemetry::prelude::*; use super::serialize; @@ -37,12 +38,13 @@ pub const CACHE_NAME: &str = DBNAME; pub const PARTITION_KEY: &str = "workspace_id"; #[derive(Debug, Clone)] -pub struct FuncRunDb { +pub struct FuncRunLayerDb { pub cache: Arc>>, persister_client: PersisterClient, + read_id_batch_query: String, + read_id_batch_no_cutoff_query: String, ready_many_for_workspace_id_query: String, get_last_qualification_for_attribute_value_id: String, - list_action_history: String, get_last_action_by_action_id: String, list_management_history: String, get_last_management_by_func_and_component_id: String, @@ -52,11 +54,21 @@ pub struct FuncRunDb { paginated_component_query_no_cursor: String, } -impl FuncRunDb { +// This func run db will be deprecated in favor of the si_db::FuncRunDb, since +// we're doing away with the pg backend of layerdb and these never fit the model +// anyway. Don't use! +impl FuncRunLayerDb { + // NOTE(victor): Won't migrate to si_db::FuncRunDb - layer cache internal func pub fn new(cache: Arc>>, persister_client: PersisterClient) -> Self { Self { cache, persister_client, + read_id_batch_query: format!( + "SELECT key FROM {DBNAME} WHERE key < $1 ORDER BY created_at DESC LIMIT $2" + ), + read_id_batch_no_cutoff_query: format!( + "SELECT key FROM {DBNAME} ORDER BY created_at DESC LIMIT $1" + ), ready_many_for_workspace_id_query: format!( "SELECT * FROM {DBNAME} WHERE workspace_id = $1" ), @@ -66,11 +78,6 @@ impl FuncRunDb { ORDER BY updated_at DESC LIMIT 1", ), - list_action_history: format!( - "SELECT value FROM {DBNAME} - WHERE function_kind = 'Action' AND workspace_id = $1 - ORDER BY updated_at DESC", - ), get_last_action_by_action_id: format!( " SELECT value FROM {DBNAME} @@ -96,7 +103,7 @@ impl FuncRunDb { paginated_workspace_query_with_cursor: format!( r#" SELECT * FROM {DBNAME} - WHERE workspace_id = $1 + WHERE workspace_id = $1 AND change_set_id = $2 AND ( created_at < (SELECT created_at FROM {DBNAME} WHERE key = $3) OR @@ -118,7 +125,7 @@ impl FuncRunDb { paginated_component_query_with_cursor: format!( r#" SELECT * FROM {DBNAME} - WHERE workspace_id = $1 + WHERE workspace_id = $1 AND change_set_id = $2 AND component_id = $3 AND ( @@ -142,30 +149,38 @@ impl FuncRunDb { } } - pub async fn list_action_history( + // NOTE(victor): Migrated to si_db::FuncRunDb as ::new() + pub async fn write( &self, - workspace_id: WorkspacePk, - ) -> LayerDbResult>> { - let maybe_rows = self - .cache - .pg() - .query(&self.list_action_history, &[&workspace_id]) - .await?; - let result = match maybe_rows { - Some(rows) => { - let mut result_rows = Vec::with_capacity(rows.len()); - for row in rows.into_iter() { - let postcard_bytes: Vec = row.get("value"); - let func_run: FuncRun = serialize::from_bytes(&postcard_bytes[..])?; - result_rows.push(func_run); - } - Some(result_rows) - } - None => None, - }; - Ok(result) + value: Arc, + web_events: Option>, + tenancy: Tenancy, + actor: Actor, + ) -> LayerDbResult<()> { + let (postcard_value, size_hint) = serialize::to_vec(&value)?; + let cache_key: Arc = value.id().to_string().into(); + let sort_key: Arc = value.tenancy().workspace_pk.to_string().into(); + + self.cache + .insert_or_update(cache_key.clone(), value, size_hint); + + let event = LayeredEvent::new( + LayeredEventKind::FuncRunWrite, + Arc::new(DBNAME.to_string()), + cache_key, + Arc::new(postcard_value), + Arc::new(sort_key.to_string()), + web_events, + tenancy, + actor, + ); + let reader = self.persister_client.write_event(event)?; + let _ = reader.get_status().await; + + Ok(()) } + // NOTE(victor): Migrated to si_db::FuncRunDb #[instrument(level = "debug", skip_all)] pub async fn get_last_run_for_action_id_opt( &self, @@ -190,6 +205,7 @@ impl FuncRunDb { Ok(maybe_func) } + // NOTE(victor): Migrated to si_db::FuncRunDb pub async fn get_last_run_for_action_id( &self, workspace_pk: WorkspacePk, @@ -200,6 +216,7 @@ impl FuncRunDb { .ok_or_else(|| LayerDbError::ActionIdNotFound(action_id)) } + // NOTE(victor): Migrated to si_db::FuncRunDb pub async fn list_management_history( &self, workspace_pk: WorkspacePk, @@ -228,6 +245,7 @@ impl FuncRunDb { Ok(result) } + // NOTE(victor): Migrated to si_db::FuncRunDb pub async fn get_last_management_run_for_func_and_component_id( &self, workspace_pk: WorkspacePk, @@ -252,6 +270,8 @@ impl FuncRunDb { Ok(maybe_func) } + + // NOTE(victor): Migrated to si_db::FuncRunDb pub async fn get_last_qualification_for_attribute_value_id( &self, workspace_id: WorkspacePk, @@ -286,40 +306,12 @@ impl FuncRunDb { Ok(None) } - pub async fn write( - &self, - value: Arc, - web_events: Option>, - tenancy: Tenancy, - actor: Actor, - ) -> LayerDbResult<()> { - let (postcard_value, size_hint) = serialize::to_vec(&value)?; - let cache_key: Arc = value.id().to_string().into(); - let sort_key: Arc = value.tenancy().workspace_pk.to_string().into(); - - self.cache - .insert_or_update(cache_key.clone(), value, size_hint); - - let event = LayeredEvent::new( - LayeredEventKind::FuncRunWrite, - Arc::new(DBNAME.to_string()), - cache_key, - Arc::new(postcard_value), - Arc::new(sort_key.to_string()), - web_events, - tenancy, - actor, - ); - let reader = self.persister_client.write_event(event)?; - let _ = reader.get_status().await; - - Ok(()) - } - + // NOTE(victor): Migrated to si_db::FuncRunDb pub async fn read(&self, key: FuncRunId) -> LayerDbResult>> { self.cache.get(key.to_string().into()).await } + // NOTE(victor): Migrated to si_db::FuncRunDb pub async fn try_read(&self, key: FuncRunId) -> LayerDbResult> { self.cache .get(key.to_string().into()) @@ -327,9 +319,7 @@ impl FuncRunDb { .ok_or_else(|| LayerDbError::MissingFuncRun(key)) } - // NOTE(nick): this is just to test that things are working. We probably want some customization - // for where clauses, etc. in the real version. This should be a step closer to how we'll query - // for history though. + // NOTE(victor): Migrated to si_db::FuncRunDb pub async fn read_many_for_workspace( &self, workspace_id: WorkspacePk, @@ -352,6 +342,41 @@ impl FuncRunDb { } } + // NOTE(victor): Created for the data migration only, won't be ported to si_db::FuncRunDb + pub async fn read_batch_of_ids( + &self, + batch_size: i64, + cutoff_id: Option, + ) -> LayerDbResult> { + let maybe_rows = if let Some(cutoff_id) = cutoff_id { + let id_str = cutoff_id.to_string(); + self.cache + .pg() + .query(&self.read_id_batch_query, &[&id_str, &batch_size]) + .await? + } else { + self.cache + .pg() + .query(&self.read_id_batch_no_cutoff_query, &[&batch_size]) + .await? + }; + + let Some(rows) = maybe_rows else { + return Ok(vec![]); + }; + + let mut func_runs = Vec::new(); + for row in rows { + let id_string: String = row.get("key"); + let ulid = Ulid::from_string(&id_string)?; + let func_id = FuncRunId::from(ulid); + + func_runs.push(func_id) + } + Ok(func_runs) + } + + // NOTE(victor): Migrated to si_db::FuncRunDb /// Read function runs for a workspace with pagination support. /// /// This method uses cursor-based pagination where: @@ -407,6 +432,7 @@ impl FuncRunDb { } } + // NOTE(victor): Migrated to si_db::FuncRunDb /// Read function runs for a specific component with pagination support. /// /// This method uses cursor-based pagination where: @@ -469,6 +495,7 @@ impl FuncRunDb { } } + // NOTE(victor): Won't migrate to si_db::FuncRunDb - internal layer cache func pub async fn insert_to_pg( pg: &PgLayer, event_payload: &LayeredEventPayload, diff --git a/lib/si-layer-cache/src/db/func_run_log.rs b/lib/si-layer-cache/src/db/func_run_log.rs index 6cbc52b9ec..561aa35aef 100644 --- a/lib/si-layer-cache/src/db/func_run_log.rs +++ b/lib/si-layer-cache/src/db/func_run_log.rs @@ -4,6 +4,7 @@ use si_events::{ Actor, FuncRunId, FuncRunLog, + FuncRunLogId, Tenancy, WebEvent, }; @@ -24,21 +25,31 @@ pub const CACHE_NAME: &str = DBNAME; pub const PARTITION_KEY: &str = "workspace_id"; #[derive(Debug, Clone)] -pub struct FuncRunLogDb { +pub struct FuncRunLogLayerDb { pub cache: Arc>>, persister_client: PersisterClient, get_for_func_run_id_query: String, + read_id_batch_query: String, + read_id_batch_no_cutoff_query: String, } -impl FuncRunLogDb { +impl FuncRunLogLayerDb { + // NOTE(victor): Won't migrate to si_db::FuncRunLogsDb - layer cache internal func pub fn new(cache: Arc>>, persister_client: PersisterClient) -> Self { Self { cache, persister_client, get_for_func_run_id_query: format!("SELECT value FROM {DBNAME} WHERE func_run_id = $1"), + read_id_batch_query: format!( + "SELECT key FROM {DBNAME} WHERE key < $1 ORDER BY created_at DESC LIMIT $2" + ), + read_id_batch_no_cutoff_query: format!( + "SELECT key FROM {DBNAME} ORDER BY created_at DESC LIMIT $1" + ), } } + // NOTE(victor): Migrated to si_db::FuncRunLogsDb as upsert pub async fn write( &self, value: Arc, @@ -72,6 +83,7 @@ impl FuncRunLogDb { Ok(()) } + // NOTE(victor): Migrated to si_db::FuncRunLogsDb pub async fn get_for_func_run_id( &self, func_run_id: FuncRunId, @@ -88,7 +100,8 @@ impl FuncRunLogDb { } } - pub async fn insert_to_pg(&self, func_run_log: Arc) -> LayerDbResult<()> { + // NOTE(victor): Won't migrate to si_db::FuncRunLogsDb - internal layer cache func + async fn insert_to_pg(&self, func_run_log: Arc) -> LayerDbResult<()> { self.cache .pg() .insert_raw( @@ -129,4 +142,55 @@ impl FuncRunLogDb { .await?; Ok(()) } + + // NOTE(victor): Created for the data migration only, won't be ported to si_db::FuncRunLogDb + pub async fn read_batch_of_ids( + &self, + batch_size: i64, + cutoff_id: Option, + ) -> LayerDbResult> { + let maybe_rows = if let Some(cutoff_id) = cutoff_id { + let id_str = cutoff_id.to_string(); + self.cache + .pg() + .query(&self.read_id_batch_query, &[&id_str, &batch_size]) + .await? + } else { + self.cache + .pg() + .query(&self.read_id_batch_no_cutoff_query, &[&batch_size]) + .await? + }; + + let Some(rows) = maybe_rows else { + return Ok(vec![]); + }; + + let mut func_run_log_ids = Vec::new(); + for row in rows { + let id_string: String = row.get("key"); + let ulid = ulid::Ulid::from_string(&id_string)?; + func_run_log_ids.push(FuncRunLogId::from(ulid)); + } + + Ok(func_run_log_ids) + } + + // NOTE(victor): Created for the data migration only + pub async fn try_read(&self, key: FuncRunLogId) -> LayerDbResult> { + let maybe_row = self + .cache + .pg() + .query_opt( + &format!("SELECT value FROM {DBNAME} WHERE key = $1"), + &[&key.to_string()], + ) + .await?; + + let Some(row) = maybe_row else { + return Err(crate::LayerDbError::FuncRunLogNotFound(key)); + }; + + serialize::from_bytes(row.get("value")) + } } diff --git a/lib/si-layer-cache/src/error.rs b/lib/si-layer-cache/src/error.rs index e8c511cf5b..af650612f5 100644 --- a/lib/si-layer-cache/src/error.rs +++ b/lib/si-layer-cache/src/error.rs @@ -19,11 +19,13 @@ use si_data_pg::{ use si_events::{ ActionId, FuncRunId, + FuncRunLogId, content_hash::ContentHashParseError, }; use si_std::CanonicalFileError; use thiserror::Error; use tokio_stream::Elapsed; +use ulid::DecodeError; use crate::{ activities::{ @@ -242,10 +244,14 @@ pub enum LayerDbError { ContentConversion(String), #[error("could not convert to key from string")] CouldNotConvertToKeyFromString(String), + #[error("decoding error: {0}")] + Decode(#[from] DecodeError), #[error("decompression error: {0}")] Decompress(String), #[error("Foyer error: {0}")] Foyer(#[source] Box), + #[error("missing func_run_log when one was expected: {0}")] + FuncRunLogNotFound(FuncRunLogId), #[error("failed to parse content hash from str: {0}")] HashParse(#[from] ContentHashParseError), #[error("incomplete key: {0}")] diff --git a/lib/si-layer-cache/src/persister.rs b/lib/si-layer-cache/src/persister.rs index b1a150a6d3..a30f9cc15e 100644 --- a/lib/si-layer-cache/src/persister.rs +++ b/lib/si-layer-cache/src/persister.rs @@ -40,7 +40,7 @@ use crate::{ encrypted_secret, func_run::{ self, - FuncRunDb, + FuncRunLayerDb, }, func_run_log, rebase_batch, @@ -491,7 +491,7 @@ impl PersisterTask { Ok(()) } LayeredEventKind::FuncRunWrite => { - FuncRunDb::insert_to_pg(&pg_layer, &event.payload).await + FuncRunLayerDb::insert_to_pg(&pg_layer, &event.payload).await } } } @@ -1130,7 +1130,7 @@ impl PersistEventTask { // FuncRunLogDb::insert_to_pg(&pg_layer, &event.payload).await? } LayeredEventKind::FuncRunWrite => { - FuncRunDb::insert_to_pg(&pg_layer, &event.payload).await? + FuncRunLayerDb::insert_to_pg(&pg_layer, &event.payload).await? } } Ok(())