Skip to content

Latest commit

 

History

History
1202 lines (1032 loc) · 33.1 KB

File metadata and controls

1202 lines (1032 loc) · 33.1 KB

PostgreSQL Integration Specification

Overview

This document specifies how the Neural DAG system integrates with PostgreSQL via the pgrx framework, including type definitions, operators, functions, and background workers.

Module Structure

crates/ruvector-postgres/src/dag/
├── mod.rs                    # Module root
├── operators.rs              # SQL function definitions
├── types.rs                  # PostgreSQL type mappings
├── hooks.rs                  # Query execution hooks
├── worker.rs                 # Background worker
├── gucs.rs                   # GUC variable definitions
└── state.rs                  # Per-connection state

GUC Variables

Definition

// crates/ruvector-postgres/src/dag/gucs.rs

use pgrx::prelude::*;
use std::ffi::CStr;

// Global enable/disable
static NEURAL_DAG_ENABLED: GucSetting<bool> = GucSetting::new(true);

// Learning parameters
static DAG_LEARNING_RATE: GucSetting<f64> = GucSetting::new(0.002);
static DAG_PATTERN_CLUSTERS: GucSetting<i32> = GucSetting::new(100);
static DAG_QUALITY_THRESHOLD: GucSetting<f64> = GucSetting::new(0.3);
static DAG_MAX_TRAJECTORIES: GucSetting<i32> = GucSetting::new(10000);

// Attention parameters
static DAG_ATTENTION_TYPE: GucSetting<&'static CStr> =
    GucSetting::new(unsafe { CStr::from_bytes_with_nul_unchecked(b"auto\0") });
static DAG_ATTENTION_EXPLORATION: GucSetting<f64> = GucSetting::new(0.1);

// EWC parameters
static DAG_EWC_LAMBDA: GucSetting<f64> = GucSetting::new(2000.0);
static DAG_EWC_MAX_LAMBDA: GucSetting<f64> = GucSetting::new(15000.0);

// MinCut parameters
static DAG_MINCUT_ENABLED: GucSetting<bool> = GucSetting::new(true);
static DAG_MINCUT_THRESHOLD: GucSetting<f64> = GucSetting::new(0.5);

// Background worker parameters
static DAG_BACKGROUND_INTERVAL_MS: GucSetting<i32> = GucSetting::new(3600000);

pub fn register_gucs() {
    GucRegistry::define_bool_guc(
        "ruvector.neural_dag_enabled",
        "Enable neural DAG optimization",
        "When enabled, queries are optimized using learned patterns",
        &NEURAL_DAG_ENABLED,
        GucContext::Userset,
        GucFlags::default(),
    );

    GucRegistry::define_float_guc(
        "ruvector.dag_learning_rate",
        "Learning rate for DAG optimization",
        "Controls how fast the system adapts to new patterns",
        &DAG_LEARNING_RATE,
        0.0001,
        1.0,
        GucContext::Userset,
        GucFlags::default(),
    );

    GucRegistry::define_int_guc(
        "ruvector.dag_pattern_clusters",
        "Number of pattern clusters",
        "K-means clusters for pattern extraction",
        &DAG_PATTERN_CLUSTERS,
        10,
        1000,
        GucContext::Userset,
        GucFlags::default(),
    );

    GucRegistry::define_float_guc(
        "ruvector.dag_quality_threshold",
        "Minimum quality for learning",
        "Trajectories below this quality are not learned",
        &DAG_QUALITY_THRESHOLD,
        0.0,
        1.0,
        GucContext::Userset,
        GucFlags::default(),
    );

    GucRegistry::define_string_guc(
        "ruvector.dag_attention_type",
        "Default attention type",
        "Options: auto, topological, causal_cone, critical_path, mincut_gated, hierarchical_lorentz, parallel_branch, temporal_btsp",
        &DAG_ATTENTION_TYPE,
        GucContext::Userset,
        GucFlags::default(),
    );

    GucRegistry::define_float_guc(
        "ruvector.dag_ewc_lambda",
        "EWC regularization strength",
        "Higher values prevent forgetting more aggressively",
        &DAG_EWC_LAMBDA,
        100.0,
        50000.0,
        GucContext::Userset,
        GucFlags::default(),
    );

    GucRegistry::define_bool_guc(
        "ruvector.dag_mincut_enabled",
        "Enable MinCut analysis",
        "When enabled, bottleneck operators are identified",
        &DAG_MINCUT_ENABLED,
        GucContext::Userset,
        GucFlags::default(),
    );

    GucRegistry::define_int_guc(
        "ruvector.dag_background_interval_ms",
        "Background learning interval (milliseconds)",
        "How often the background learning cycle runs",
        &DAG_BACKGROUND_INTERVAL_MS,
        60000,     // 1 minute minimum
        86400000,  // 24 hours maximum
        GucContext::Sighup,
        GucFlags::default(),
    );
}

/// Get current configuration from GUCs
pub fn get_config() -> DagSonaConfig {
    DagSonaConfig {
        enabled: NEURAL_DAG_ENABLED.get(),
        learning_rate: DAG_LEARNING_RATE.get() as f32,
        pattern_clusters: DAG_PATTERN_CLUSTERS.get() as usize,
        quality_threshold: DAG_QUALITY_THRESHOLD.get() as f32,
        max_trajectories: DAG_MAX_TRAJECTORIES.get() as usize,
        attention_type: parse_attention_type(DAG_ATTENTION_TYPE.get()),
        attention_exploration: DAG_ATTENTION_EXPLORATION.get() as f32,
        ewc_lambda: DAG_EWC_LAMBDA.get() as f32,
        ewc_max_lambda: DAG_EWC_MAX_LAMBDA.get() as f32,
        mincut_enabled: DAG_MINCUT_ENABLED.get(),
        mincut_threshold: DAG_MINCUT_THRESHOLD.get() as f32,
        background_interval_ms: DAG_BACKGROUND_INTERVAL_MS.get() as u64,
        ..Default::default()
    }
}

State Management

Global State

// crates/ruvector-postgres/src/dag/state.rs

use once_cell::sync::Lazy;
use dashmap::DashMap;
use std::sync::Arc;

/// Global registry of DAG engines per table
static DAG_ENGINES: Lazy<DashMap<String, Arc<DagSonaEngine>>> =
    Lazy::new(|| DashMap::new());

/// Get or create DAG engine for a table
pub fn get_or_create_engine(table_name: &str) -> Arc<DagSonaEngine> {
    DAG_ENGINES.entry(table_name.to_string())
        .or_insert_with(|| {
            let config = gucs::get_config();
            Arc::new(DagSonaEngine::new(table_name, config).unwrap())
        })
        .clone()
}

/// Check if neural DAG is enabled for a table
pub fn is_enabled(table_name: &str) -> bool {
    DAG_ENGINES.contains_key(table_name) && gucs::get_config().enabled
}

/// Remove engine (on DROP TABLE or disable)
pub fn remove_engine(table_name: &str) {
    DAG_ENGINES.remove(table_name);
}

/// Get all engine names (for monitoring)
pub fn list_engines() -> Vec<String> {
    DAG_ENGINES.iter().map(|e| e.key().clone()).collect()
}

Per-Connection State

/// Per-connection state for query tracking
pub struct ConnectionState {
    /// Current query trajectory being built
    current_trajectory: Option<TrajectoryBuilder>,

    /// Query start time
    query_start: Option<Instant>,

    /// Current table being queried
    current_table: Option<String>,
}

thread_local! {
    static CONNECTION_STATE: RefCell<ConnectionState> = RefCell::new(ConnectionState {
        current_trajectory: None,
        query_start: None,
        current_table: None,
    });
}

impl ConnectionState {
    pub fn start_query(&mut self, table_name: &str) {
        self.query_start = Some(Instant::now());
        self.current_table = Some(table_name.to_string());
        self.current_trajectory = Some(TrajectoryBuilder::new());
    }

    pub fn end_query(&mut self) -> Option<DagTrajectory> {
        let builder = self.current_trajectory.take()?;
        let start = self.query_start.take()?;
        let _table = self.current_table.take()?;

        Some(builder.build(start.elapsed()))
    }
}

SQL Function Implementations

Enable/Disable Functions

// crates/ruvector-postgres/src/dag/operators.rs

use pgrx::prelude::*;

/// Enable neural DAG learning for a table
#[pg_extern]
fn ruvector_enable_neural_dag(
    table_name: &str,
    config: Option<pgrx::JsonB>,
) -> bool {
    let base_config = gucs::get_config();

    let config = if let Some(pgrx::JsonB(json)) = config {
        // Merge JSON config with base config
        merge_config(base_config, &json)
    } else {
        base_config
    };

    // Validate table exists
    let table_oid = get_table_oid(table_name);
    if table_oid.is_none() {
        ereport!(
            PgLogLevel::ERROR,
            PgSqlErrorCode::ERRCODE_UNDEFINED_TABLE,
            format!("Table '{}' does not exist", table_name)
        );
        return false;
    }

    // Create engine
    let engine = DagSonaEngine::new(table_name, config)
        .map_err(|e| {
            ereport!(
                PgLogLevel::ERROR,
                PgSqlErrorCode::ERRCODE_INTERNAL_ERROR,
                format!("Failed to create DAG engine: {}", e)
            );
        })
        .unwrap();

    // Register engine
    DAG_ENGINES.insert(table_name.to_string(), Arc::new(engine));

    // Start background worker if not running
    start_background_worker_if_needed();

    true
}

/// Disable neural DAG learning for a table
#[pg_extern]
fn ruvector_disable_neural_dag(table_name: &str) -> bool {
    if DAG_ENGINES.remove(table_name).is_some() {
        true
    } else {
        warning!("Neural DAG was not enabled for table '{}'", table_name);
        false
    }
}

/// Check if neural DAG is enabled
#[pg_extern]
fn ruvector_neural_dag_enabled(table_name: &str) -> bool {
    state::is_enabled(table_name)
}

Pattern Functions

/// Get learned patterns for a table
#[pg_extern]
fn ruvector_dag_patterns(
    table_name: &str,
) -> TableIterator<'static, (
    name!(pattern_id, i64),
    name!(centroid, Vec<f32>),
    name!(attention_type, String),
    name!(confidence, f32),
    name!(sample_count, i32),
    name!(avg_latency_us, f64),
    name!(avg_quality, f64),
)> {
    let engine = match DAG_ENGINES.get(table_name) {
        Some(e) => e.clone(),
        None => {
            warning!("Neural DAG not enabled for table '{}'", table_name);
            return TableIterator::new(vec![].into_iter());
        }
    };

    let patterns: Vec<_> = {
        let bank = engine.dag_reasoning_bank.read();
        bank.patterns.iter()
            .map(|entry| {
                let p = entry.value();
                (
                    p.id as i64,
                    p.centroid.clone(),
                    format!("{:?}", p.optimal_attention),
                    p.confidence,
                    p.sample_count as i32,
                    p.avg_metrics.latency_us,
                    p.avg_metrics.quality,
                )
            })
            .collect()
    };

    TableIterator::new(patterns.into_iter())
}

/// Force pattern extraction
#[pg_extern]
fn ruvector_dag_extract_patterns(
    table_name: &str,
    num_clusters: Option<i32>,
) -> i32 {
    let engine = match DAG_ENGINES.get(table_name) {
        Some(e) => e.clone(),
        None => {
            ereport!(
                PgLogLevel::ERROR,
                PgSqlErrorCode::ERRCODE_INVALID_PARAMETER_VALUE,
                format!("Neural DAG not enabled for table '{}'", table_name)
            );
            return 0;
        }
    };

    // Override cluster count if specified
    if let Some(k) = num_clusters {
        // Temporary config override
    }

    match engine.run_background_cycle() {
        Ok(BackgroundCycleResult::Completed { patterns_extracted, .. }) => {
            patterns_extracted as i32
        }
        Ok(BackgroundCycleResult::Skipped { reason, .. }) => {
            warning!("Pattern extraction skipped: {}", reason);
            0
        }
        Err(e) => {
            ereport!(
                PgLogLevel::ERROR,
                PgSqlErrorCode::ERRCODE_INTERNAL_ERROR,
                format!("Pattern extraction failed: {}", e)
            );
            0
        }
    }
}

/// Consolidate similar patterns
#[pg_extern]
fn ruvector_dag_consolidate_patterns(
    table_name: &str,
    similarity_threshold: Option<f32>,
) -> i32 {
    let engine = match DAG_ENGINES.get(table_name) {
        Some(e) => e.clone(),
        None => {
            warning!("Neural DAG not enabled for table '{}'", table_name);
            return 0;
        }
    };

    let threshold = similarity_threshold.unwrap_or(0.95);

    let before_count = {
        let bank = engine.dag_reasoning_bank.read();
        bank.patterns.len()
    };

    {
        let mut bank = engine.dag_reasoning_bank.write();
        bank.consolidate(threshold);
    }

    let after_count = {
        let bank = engine.dag_reasoning_bank.read();
        bank.patterns.len()
    };

    (before_count - after_count) as i32
}

Attention Functions

/// Compute topological attention
#[pg_extern(immutable, parallel_safe)]
fn ruvector_attention_topological(
    query: Vec<f32>,
    ancestors: Vec<Vec<f32>>,
    config: Option<pgrx::JsonB>,
) -> Vec<f32> {
    let config = parse_attention_config(config);
    let attention = TopologicalAttention::new(config);

    let ctx = build_simple_context(&ancestors);
    let query_node = DagNode::from_embedding(query);

    match attention.forward(&query_node, &ctx, &config) {
        Ok(output) => output.weights,
        Err(e) => {
            warning!("Attention computation failed: {}", e);
            vec![1.0 / ancestors.len() as f32; ancestors.len()]  // Uniform fallback
        }
    }
}

/// Compute causal cone attention
#[pg_extern(immutable, parallel_safe)]
fn ruvector_attention_causal_cone(
    query: Vec<f32>,
    ancestors: Vec<Vec<f32>>,
    depths: Vec<i32>,
    decay_rate: f32,
    max_depth: i32,
) -> Vec<f32> {
    let config = AttentionConfig {
        hidden_dim: query.len(),
        ..Default::default()
    };

    let attention = CausalConeAttention::new(
        config.hidden_dim,
        8,  // num_heads
        decay_rate,
        max_depth as usize,
    );

    let ctx = build_context_with_depths(&ancestors, &depths);
    let query_node = DagNode::from_embedding(query);

    match attention.forward(&query_node, &ctx, &config) {
        Ok(output) => output.weights,
        Err(e) => {
            warning!("Causal cone attention failed: {}", e);
            vec![1.0 / ancestors.len() as f32; ancestors.len()]
        }
    }
}

/// Compute critical path attention
#[pg_extern(immutable, parallel_safe)]
fn ruvector_attention_critical_path(
    query: Vec<f32>,
    ancestors: Vec<Vec<f32>>,
    is_critical: Vec<bool>,
    boost: f32,
) -> Vec<f32> {
    let config = AttentionConfig {
        hidden_dim: query.len(),
        ..Default::default()
    };

    let attention = CriticalPathAttention::new(
        config.hidden_dim,
        8,
        boost,
    );

    let ctx = build_context_with_critical(&ancestors, &is_critical);
    let query_node = DagNode::from_embedding(query);

    match attention.forward(&query_node, &ctx, &config) {
        Ok(output) => output.weights,
        Err(e) => {
            warning!("Critical path attention failed: {}", e);
            vec![1.0 / ancestors.len() as f32; ancestors.len()]
        }
    }
}

/// Compute mincut gated attention
#[pg_extern(immutable, parallel_safe)]
fn ruvector_attention_mincut_gated(
    query: Vec<f32>,
    ancestors: Vec<Vec<f32>>,
    criticalities: Vec<f32>,
    threshold: f32,
) -> Vec<f32> {
    let config = AttentionConfig {
        hidden_dim: query.len(),
        ..Default::default()
    };

    let attention = MinCutGatedAttention::new(
        config.hidden_dim,
        8,
        threshold,
    );

    let ctx = build_context_with_criticalities(&ancestors, &criticalities);
    let query_node = DagNode::from_embedding(query);

    match attention.forward(&query_node, &ctx, &config) {
        Ok(output) => output.weights,
        Err(e) => {
            warning!("MinCut gated attention failed: {}", e);
            vec![1.0 / ancestors.len() as f32; ancestors.len()]
        }
    }
}

/// Compute hierarchical Lorentz attention
#[pg_extern(immutable, parallel_safe)]
fn ruvector_attention_hierarchical_lorentz(
    query: Vec<f32>,
    ancestors: Vec<Vec<f32>>,
    depths: Vec<i32>,
    curvature: f32,
    temperature: f32,
) -> Vec<f32> {
    let config = AttentionConfig {
        hidden_dim: query.len(),
        ..Default::default()
    };

    let attention = HierarchicalLorentzAttention::new(
        config.hidden_dim,
        8,
        curvature,
        temperature,
    );

    let ctx = build_context_with_depths(&ancestors, &depths);
    let query_node = DagNode::from_embedding(query);

    match attention.forward(&query_node, &ctx, &config) {
        Ok(output) => output.weights,
        Err(e) => {
            warning!("Hierarchical Lorentz attention failed: {}", e);
            vec![1.0 / ancestors.len() as f32; ancestors.len()]
        }
    }
}

/// Compute parallel branch attention
#[pg_extern(immutable, parallel_safe)]
fn ruvector_attention_parallel_branch(
    query: Vec<f32>,
    ancestors: Vec<Vec<f32>>,
    parallel_nodes: Vec<Vec<f32>>,
    common_ancestor_mask: Vec<bool>,
    cross_weight: f32,
    common_boost: f32,
) -> Vec<f32> {
    let config = AttentionConfig {
        hidden_dim: query.len(),
        ..Default::default()
    };

    let attention = ParallelBranchAttention::new(
        config.hidden_dim,
        8,
        cross_weight,
        common_boost,
    );

    let ctx = build_context_with_parallel(
        &ancestors,
        &parallel_nodes,
        &common_ancestor_mask,
    );
    let query_node = DagNode::from_embedding(query);

    match attention.forward(&query_node, &ctx, &config) {
        Ok(output) => output.weights,
        Err(e) => {
            warning!("Parallel branch attention failed: {}", e);
            let total = ancestors.len() + parallel_nodes.len();
            vec![1.0 / total as f32; total]
        }
    }
}

/// Compute temporal BTSP attention
#[pg_extern(immutable, parallel_safe)]
fn ruvector_attention_temporal_btsp(
    query: Vec<f32>,
    ancestors: Vec<Vec<f32>>,
    timestamps: Vec<f64>,
    window_ms: f32,
    boost: f32,
) -> Vec<f32> {
    let config = AttentionConfig {
        hidden_dim: query.len(),
        ..Default::default()
    };

    let attention = TemporalBTSPAttention::new(
        config.hidden_dim,
        8,
        window_ms,
        boost,
    );

    let ctx = build_context_with_timestamps(&ancestors, &timestamps);
    let query_node = DagNode::from_embedding(query);

    match attention.forward(&query_node, &ctx, &config) {
        Ok(output) => output.weights,
        Err(e) => {
            warning!("Temporal BTSP attention failed: {}", e);
            vec![1.0 / ancestors.len() as f32; ancestors.len()]
        }
    }
}

/// Compute ensemble attention
#[pg_extern(immutable, parallel_safe)]
fn ruvector_attention_ensemble(
    query: Vec<f32>,
    ancestors: Vec<Vec<f32>>,
    attention_types: Vec<String>,
    weights: Option<Vec<f32>>,
) -> Vec<f32> {
    let config = AttentionConfig {
        hidden_dim: query.len(),
        ..Default::default()
    };

    let types: Vec<DagAttentionType> = attention_types.iter()
        .filter_map(|s| parse_attention_type_str(s))
        .collect();

    let weights = weights.unwrap_or_else(|| {
        vec![1.0 / types.len() as f32; types.len()]
    });

    let attention = EnsembleAttention::new(types, weights);

    let ctx = build_simple_context(&ancestors);
    let query_node = DagNode::from_embedding(query);

    match attention.forward(&query_node, &ctx, &config) {
        Ok(output) => output.weights,
        Err(e) => {
            warning!("Ensemble attention failed: {}", e);
            vec![1.0 / ancestors.len() as f32; ancestors.len()]
        }
    }
}

MinCut Functions

/// Compute mincut criticality for DAG nodes
#[pg_extern]
fn ruvector_dag_mincut_criticality(
    table_name: &str,
) -> TableIterator<'static, (
    name!(node_id, i64),
    name!(criticality, f32),
    name!(is_bottleneck, bool),
)> {
    let engine = match DAG_ENGINES.get(table_name) {
        Some(e) => e.clone(),
        None => {
            warning!("Neural DAG not enabled for table '{}'", table_name);
            return TableIterator::new(vec![].into_iter());
        }
    };

    let mincut = match &engine.mincut_engine {
        Some(m) => m.clone(),
        None => {
            warning!("MinCut not enabled for table '{}'", table_name);
            return TableIterator::new(vec![].into_iter());
        }
    };

    let threshold = gucs::get_config().mincut_threshold;

    // Compute criticalities
    let criticalities = mincut.compute_all_criticalities();

    let results: Vec<_> = criticalities.into_iter()
        .map(|(node_id, crit)| {
            (node_id as i64, crit, crit > threshold)
        })
        .collect();

    TableIterator::new(results.into_iter())
}

/// Analyze DAG bottlenecks
#[pg_extern]
fn ruvector_dag_mincut_analysis(
    table_name: &str,
) -> pgrx::JsonB {
    let engine = match DAG_ENGINES.get(table_name) {
        Some(e) => e.clone(),
        None => {
            return pgrx::JsonB(json!({
                "error": "Neural DAG not enabled"
            }));
        }
    };

    let mincut = match &engine.mincut_engine {
        Some(m) => m.clone(),
        None => {
            return pgrx::JsonB(json!({
                "error": "MinCut not enabled"
            }));
        }
    };

    let global_cut = mincut.query();
    let criticalities = mincut.compute_all_criticalities();

    let threshold = gucs::get_config().mincut_threshold;
    let bottlenecks: Vec<_> = criticalities.iter()
        .filter(|(_, c)| *c > threshold)
        .map(|(id, c)| json!({
            "node_id": *id,
            "criticality": *c,
        }))
        .collect();

    pgrx::JsonB(json!({
        "global_mincut": global_cut,
        "bottleneck_count": bottlenecks.len(),
        "bottlenecks": bottlenecks,
        "threshold": threshold,
    }))
}

Monitoring Functions

/// Get learning statistics
#[pg_extern]
fn ruvector_dag_learning_stats(table_name: &str) -> pgrx::JsonB {
    let engine = match DAG_ENGINES.get(table_name) {
        Some(e) => e.clone(),
        None => {
            return pgrx::JsonB(json!({
                "enabled": false,
                "error": "Neural DAG not enabled"
            }));
        }
    };

    let metrics = engine.metrics.to_json();
    let buffer_stats = engine.dag_trajectory_buffer.stats();
    let pattern_count = {
        let bank = engine.dag_reasoning_bank.read();
        bank.patterns.len()
    };

    pgrx::JsonB(json!({
        "enabled": true,
        "queries_processed": metrics["queries_processed"],
        "pattern_hit_rate": metrics["pattern_hit_rate"],
        "avg_latency_us": metrics["avg_latency_us"],
        "patterns_stored": pattern_count,
        "trajectories_buffered": buffer_stats.current_size,
        "trajectories_dropped": buffer_stats.dropped,
        "background_cycles": metrics["background_cycles"],
        "ewc_tasks": metrics["ewc_tasks"],
    }))
}

/// Get health report
#[pg_extern]
fn ruvector_dag_health_report(table_name: &str) -> pgrx::JsonB {
    let engine = match DAG_ENGINES.get(table_name) {
        Some(e) => e.clone(),
        None => {
            return pgrx::JsonB(json!({
                "healthy": false,
                "error": "Neural DAG not enabled"
            }));
        }
    };

    let metrics = engine.metrics.to_json();
    let buffer_stats = engine.dag_trajectory_buffer.stats();

    // Health checks
    let mut issues = Vec::new();

    // Check drop rate
    let drop_rate = if buffer_stats.total_seen > 0 {
        buffer_stats.dropped as f64 / buffer_stats.total_seen as f64
    } else {
        0.0
    };
    if drop_rate > 0.1 {
        issues.push(format!("High trajectory drop rate: {:.1}%", drop_rate * 100.0));
    }

    // Check pattern hit rate
    let hit_rate = metrics["pattern_hit_rate"].as_f64().unwrap_or(0.0);
    if hit_rate < 0.1 && metrics["queries_processed"].as_u64().unwrap_or(0) > 1000 {
        issues.push(format!("Low pattern hit rate: {:.1}%", hit_rate * 100.0));
    }

    // Check MinCut bottlenecks
    if let Some(ref mincut) = engine.mincut_engine {
        let criticalities = mincut.compute_all_criticalities();
        let severe_bottlenecks = criticalities.values()
            .filter(|&&c| c > 0.8)
            .count();
        if severe_bottlenecks > 0 {
            issues.push(format!("{} severe bottlenecks detected", severe_bottlenecks));
        }
    }

    pgrx::JsonB(json!({
        "healthy": issues.is_empty(),
        "issues": issues,
        "metrics": metrics,
        "buffer": {
            "size": buffer_stats.current_size,
            "capacity": buffer_stats.capacity,
            "drop_rate": drop_rate,
        },
    }))
}

/// Force a learning cycle
#[pg_extern]
fn ruvector_dag_learn(table_name: &str) -> pgrx::JsonB {
    let engine = match DAG_ENGINES.get(table_name) {
        Some(e) => e.clone(),
        None => {
            return pgrx::JsonB(json!({
                "success": false,
                "error": "Neural DAG not enabled"
            }));
        }
    };

    match engine.run_background_cycle() {
        Ok(BackgroundCycleResult::Completed {
            trajectories_processed,
            patterns_extracted,
            duration,
        }) => {
            pgrx::JsonB(json!({
                "success": true,
                "trajectories_processed": trajectories_processed,
                "patterns_extracted": patterns_extracted,
                "duration_ms": duration.as_millis(),
            }))
        }
        Ok(BackgroundCycleResult::Skipped { reason, count }) => {
            pgrx::JsonB(json!({
                "success": false,
                "skipped": true,
                "reason": reason,
                "trajectory_count": count,
            }))
        }
        Err(e) => {
            pgrx::JsonB(json!({
                "success": false,
                "error": e.to_string(),
            }))
        }
    }
}

Background Worker

// crates/ruvector-postgres/src/dag/worker.rs

use pgrx::bgworkers::*;
use std::time::Duration;

#[pg_guard]
pub extern "C" fn dag_background_worker_main(_arg: pg_sys::Datum) {
    BackgroundWorker::attach_signal_handlers(SignalWakeFlags::SIGHUP | SignalWakeFlags::SIGTERM);

    log!(
        "DAG background worker started with interval {}ms",
        gucs::DAG_BACKGROUND_INTERVAL_MS.get()
    );

    while BackgroundWorker::wait_latch(Some(Duration::from_millis(
        gucs::DAG_BACKGROUND_INTERVAL_MS.get() as u64
    ))) {
        if BackgroundWorker::sighup_received() {
            // Reload configuration
            log!("DAG background worker received SIGHUP, reloading config");
        }

        // Run learning cycle for all enabled tables
        for engine_ref in DAG_ENGINES.iter() {
            let table_name = engine_ref.key();
            let engine = engine_ref.value();

            match engine.run_background_cycle() {
                Ok(BackgroundCycleResult::Completed { patterns_extracted, .. }) => {
                    log!(
                        "DAG learning cycle completed for '{}': {} patterns",
                        table_name,
                        patterns_extracted
                    );
                }
                Ok(BackgroundCycleResult::Skipped { reason, .. }) => {
                    // Normal - not enough data
                }
                Err(e) => {
                    elog!(
                        WARNING,
                        "DAG learning cycle failed for '{}': {}",
                        table_name,
                        e
                    );
                }
            }
        }
    }

    log!("DAG background worker shutting down");
}

/// Register the background worker
pub fn register_background_worker() {
    BackgroundWorkerBuilder::new("ruvector_dag_worker")
        .set_function("dag_background_worker_main")
        .set_library("ruvector")
        .set_argument(0.into())
        .enable_spi_access()
        .set_start_time(BgWorkerStartTime::RecoveryFinished)
        .set_restart_time(Some(Duration::from_secs(10)))
        .load();
}

Query Execution Hooks

// crates/ruvector-postgres/src/dag/hooks.rs

use pgrx::prelude::*;

/// Hook into query execution for learning
pub fn install_query_hooks() {
    // Install planner hook
    unsafe {
        prev_planner_hook = planner_hook;
        planner_hook = Some(dag_planner_hook);
    }

    // Install executor hooks
    unsafe {
        prev_ExecutorStart_hook = ExecutorStart_hook;
        ExecutorStart_hook = Some(dag_executor_start_hook);

        prev_ExecutorEnd_hook = ExecutorEnd_hook;
        ExecutorEnd_hook = Some(dag_executor_end_hook);
    }
}

/// Planner hook - optimize query plan
#[pg_guard]
unsafe extern "C" fn dag_planner_hook(
    parse: *mut pg_sys::Query,
    query_string: *const c_char,
    cursor_options: c_int,
    bound_params: *mut pg_sys::ParamListInfoData,
) -> *mut pg_sys::PlannedStmt {
    // Call original planner
    let stmt = if let Some(prev) = prev_planner_hook {
        prev(parse, query_string, cursor_options, bound_params)
    } else {
        pg_sys::standard_planner(parse, query_string, cursor_options, bound_params)
    };

    // Check if neural DAG optimization applies
    if !gucs::NEURAL_DAG_ENABLED.get() {
        return stmt;
    }

    // Extract table name from query
    if let Some(table_name) = extract_table_name(parse) {
        if let Some(engine) = DAG_ENGINES.get(&table_name) {
            // Build neural plan wrapper
            let mut neural_plan = NeuralDagPlan::from_planned_stmt(stmt);

            // Apply learned optimizations
            let _ = engine.pre_query(&mut neural_plan);

            // Store for post-query processing
            CONNECTION_STATE.with(|state| {
                state.borrow_mut().start_query(&table_name);
            });
        }
    }

    stmt
}

/// Executor start hook - record start time
#[pg_guard]
unsafe extern "C" fn dag_executor_start_hook(
    query_desc: *mut pg_sys::QueryDesc,
    eflags: c_int,
) {
    // Call original
    if let Some(prev) = prev_ExecutorStart_hook {
        prev(query_desc, eflags);
    } else {
        pg_sys::standard_ExecutorStart(query_desc, eflags);
    }

    // Record timing
    CONNECTION_STATE.with(|state| {
        if let Some(ref mut traj) = state.borrow_mut().current_trajectory {
            traj.mark_execution_start();
        }
    });
}

/// Executor end hook - record trajectory
#[pg_guard]
unsafe extern "C" fn dag_executor_end_hook(query_desc: *mut pg_sys::QueryDesc) {
    // Collect metrics before cleanup
    let metrics = extract_execution_metrics(query_desc);

    // Call original
    if let Some(prev) = prev_ExecutorEnd_hook {
        prev(query_desc);
    } else {
        pg_sys::standard_ExecutorEnd(query_desc);
    }

    // Record trajectory
    CONNECTION_STATE.with(|state| {
        let mut state = state.borrow_mut();
        if let (Some(trajectory), Some(table_name)) = (
            state.end_query(),
            state.current_table.as_ref(),
        ) {
            if let Some(engine) = DAG_ENGINES.get(table_name) {
                engine.post_query(&trajectory.plan, metrics);
            }
        }
    });
}

fn extract_execution_metrics(query_desc: *mut pg_sys::QueryDesc) -> ExecutionMetrics {
    unsafe {
        let estate = (*query_desc).estate;

        ExecutionMetrics {
            latency_us: 0,  // Computed from timestamps
            planning_us: 0,
            execution_us: 0,
            rows_processed: (*estate).es_processed as u64,
            memory_bytes: pg_sys::MemoryContextMemAllocated(
                (*estate).es_query_cxt,
                false,
            ) as u64,
            cache_hit_rate: 0.0,  // Would need buffer stats
            max_criticality: None,
        }
    }
}

Extension Initialization

// crates/ruvector-postgres/src/lib.rs (additions)

#[pg_guard]
pub extern "C" fn _PG_init() {
    // ... existing initialization ...

    // Register DAG GUCs
    dag::gucs::register_gucs();

    // Install query hooks
    dag::hooks::install_query_hooks();

    // Register background worker
    dag::worker::register_background_worker();

    pgrx::log!("RuVector Neural DAG module initialized");
}

SQL Installation Script

-- Extension installation additions

-- Create schema for DAG objects
CREATE SCHEMA IF NOT EXISTS ruvector_dag;

-- Pattern storage table (optional persistence)
CREATE TABLE IF NOT EXISTS ruvector_dag.patterns (
    id BIGSERIAL PRIMARY KEY,
    table_name TEXT NOT NULL,
    pattern_id BIGINT NOT NULL,
    centroid ruvector NOT NULL,
    attention_type TEXT NOT NULL,
    optimal_params JSONB NOT NULL,
    confidence FLOAT NOT NULL,
    sample_count INT NOT NULL,
    avg_latency_us FLOAT NOT NULL,
    avg_quality FLOAT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    UNIQUE (table_name, pattern_id)
);

-- Trajectory history table (optional)
CREATE TABLE IF NOT EXISTS ruvector_dag.trajectory_history (
    id BIGSERIAL PRIMARY KEY,
    table_name TEXT NOT NULL,
    trajectory_id BIGINT NOT NULL,
    plan_embedding ruvector NOT NULL,
    attention_type TEXT NOT NULL,
    latency_us BIGINT NOT NULL,
    quality FLOAT NOT NULL,
    recorded_at TIMESTAMPTZ DEFAULT NOW()
);

-- Index for pattern lookup
CREATE INDEX IF NOT EXISTS idx_patterns_table
ON ruvector_dag.patterns (table_name);

-- Index for trajectory analysis
CREATE INDEX IF NOT EXISTS idx_trajectories_table_time
ON ruvector_dag.trajectory_history (table_name, recorded_at DESC);

-- Cleanup old trajectories (run periodically)
CREATE OR REPLACE FUNCTION ruvector_dag.cleanup_old_trajectories(
    retention_days INT DEFAULT 7
) RETURNS INT AS $$
DECLARE
    deleted_count INT;
BEGIN
    DELETE FROM ruvector_dag.trajectory_history
    WHERE recorded_at < NOW() - (retention_days || ' days')::INTERVAL;

    GET DIAGNOSTICS deleted_count = ROW_COUNT;
    RETURN deleted_count;
END;
$$ LANGUAGE plpgsql;