Skip to content

Test cancellation / coop of LanceRead, Index queries #5150

@wjones127

Description

@wjones127

We should make sure that our FilteredReadExec and scalar queries are properly cooperative.

We can test this by add a tracing subscriber that measures the futures max_busy_ns (the maximum time a future was polling). Then we run various queries and assert that no future way polling longer than 10ms.

Example of recording `max_busy_ns`

/// A tracing layer that tracks the maximum continuous busy time for each span
/// and sets it as a `max_busy_ns` OpenTelemetry attribute
pub struct RecordLongestPollLayer {
    /// Map from span ID to timing information
    timings: Arc<Mutex<HashMap<Id, PollTimings>>>,
}

impl RecordLongestPollLayer {
    pub fn new() -> Self {
        Self {
            timings: Arc::new(Mutex::new(HashMap::new())),
        }
    }
}

impl Default for RecordLongestPollLayer {
    fn default() -> Self {
        Self::new()
    }
}

impl<S> Layer<S> for RecordLongestPollLayer
where
    S: Subscriber + for<'a> LookupSpan<'a>,
{
    fn on_new_span(&self, _attrs: &tracing::span::Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
        let mut timings = self.timings.lock().unwrap();
        timings.insert(id.clone(), PollTimings::new());
    }

    fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
        let mut timings = self.timings.lock().unwrap();
        if let Some(timing) = timings.get_mut(id) {
            if timing.entered_count == 0 {
                // First entry (or re-entry after yielding)
                timing.last_enter = Instant::now();
                timing.current_poll_duration = Duration::ZERO;
            }
            timing.entered_count += 1;
        }
    }

    fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
        let mut timings = self.timings.lock().unwrap();
        if let Some(timing) = timings.get_mut(id) {
            if timing.entered_count > 0 {
                timing.entered_count -= 1;

                if timing.entered_count == 0 {
                    // We're fully exiting this span
                    let now = Instant::now();
                    let poll_duration = now - timing.last_enter;
                    timing.current_poll_duration += poll_duration;

                    // Update max_busy if this poll was longer
                    if timing.current_poll_duration > timing.max_busy {
                        timing.max_busy = timing.current_poll_duration;
                    }
                }
            }
        }
    }

    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
        // Get the final max_busy time
        let max_busy = {
            let mut timings = self.timings.lock().unwrap();
            timings.remove(&id).map(|t| t.max_busy)
        };

        if let Some(max_busy) = max_busy {
            if let Some(span_ref) = ctx.span(&id) {
                let mut extensions = span_ref.extensions_mut();
                if let Some(otel_data) = extensions.get_mut::<tracing_opentelemetry::OtelData>() {
                    otel_data
                        .builder
                        .attributes
                        .get_or_insert_with(Vec::new)
                        .push(opentelemetry::KeyValue::new(
                            "max_busy_ns",
                            max_busy.as_nanos() as i64,
                        ));
                }
            }
        }
    }
}

Consider reading up on:

Metadata

Metadata

Assignees

No one assigned

    Labels

    ciGithub Action or Test issues

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions