|
| 1 | +# Query Cancellation |
| 2 | + |
| 3 | +## The Challenge of Cancelling Long-Running Queries |
| 4 | + |
| 5 | +Have you ever tried to cancel a query that just wouldn't stop? |
| 6 | +In this post, we'll take a look at why that can happen in DataFusion and what the community did to resolve the problem in depth. |
| 7 | + |
| 8 | +### Understanding Rust's Async Model |
| 9 | + |
| 10 | +To really understand the cancellation problem you need to be somewhat familiar with Rust's asynchronous programming model. |
| 11 | +This is a bit different than what you might be used to from other ecosystems. |
| 12 | +Let's go over the basics again as a refresher. |
| 13 | +If you're familiar with the ins and outs of `Future` and `async` you can skip this section. |
| 14 | + |
| 15 | +#### Futures Are Inert |
| 16 | + |
| 17 | +Rust's asynchronous programming model is built around the `Future<T>` trait. |
| 18 | +In contrast to, for instance, Javascript's `Promise` or Java's `Future` a Rust `Future` does not necessarily represent an actively running asynchronous job. |
| 19 | +Instead, a `Future<T>` represents a lazy calculation that only makes progress when explicitly polled. |
| 20 | +If nothing tells a `Future` to try and make progress explicitly, it is an inert object. |
| 21 | + |
| 22 | +You ask a `Future<T>`to advance its calculation as much as possible by calling the `poll` method. |
| 23 | +The future responds with either: |
| 24 | +- `Poll::Pending` if it needs to wait for something (like I/O) before it can continue |
| 25 | +- `Poll::Ready<T>` when it has completed and produced a value |
| 26 | + |
| 27 | +When a future returns `Pending`, it saves its internal state so it can pick up where it left off the next time you poll it. |
| 28 | +This state management is what makes Rust's futures memory-efficient and composable. |
| 29 | +It also needs to set up the necessary signaling so that the caller gets notified when it should try to call `poll` again. |
| 30 | +This avoids having to call `poll` in a busy-waiting loop. |
| 31 | + |
| 32 | +Rust's `async` keyword provides syntactic sugar over this model. |
| 33 | +When you write an `async` function or block, the compiler transforms it into an implementation of the `Future` trait for you. |
| 34 | +Since all the state management is compiler generated and hidden from sight, async code tends to be more readable while maintaining the same underlying mechanics. |
| 35 | + |
| 36 | +The `await` keyword complements this by letting you pause execution until a future completes. |
| 37 | +When you write `.await` after a future, you're essentially telling the compiler to poll that future until it's ready before program execution continues with the statement after the await. |
| 38 | + |
| 39 | +#### From Futures to Streams |
| 40 | + |
| 41 | +The `futures` crate extends the `Future` model with the `Stream` trait. |
| 42 | +Streams represent a sequence of values produced asynchronously rather than just a single value. |
| 43 | +The `Stream` trait has one method named `poll_next` that returns: |
| 44 | +- `Poll::Pending` when the next value isn't ready yet, just like a `Future` would |
| 45 | +- `Poll::Ready(Some(value))` when a new value is available |
| 46 | +- `Poll::Ready(None)` when the stream is exhausted |
| 47 | + |
| 48 | +### How DataFusion Executes Queries |
| 49 | + |
| 50 | +In DataFusion, queries are executed as follows: |
| 51 | + |
| 52 | +1. First the query is compiled into a tree of `ExecutionPlan` nodes |
| 53 | +2. `ExecutionPlan::execute` is called on the root of the tree. This method returns a `SendableRecordBatchStream` (a pinned `Box<dyn Stream<RecordBatch>>`) |
| 54 | +3. `Stream::poll_next` is called in a loop to get the results |
| 55 | + |
| 56 | +The stream we get in step 2 is actually the root of a tree of streams that mostly mirrors the execution plan tree. |
| 57 | +Each stream tree node processes the record batches it gets from its children. |
| 58 | +The leaves of the tree produce record batches themselves. |
| 59 | + |
| 60 | +Query execution progresses each time you call `poll_next` on the root stream. |
| 61 | +This call typically cascades down the tree, with each node calling `poll_next` on its children to get the data it needs to process. |
| 62 | + |
| 63 | +Here's where the first signs of problems start to show up: some operations (like aggregations, sorts, or certain join phases) need to process a lot of data before producing any output. |
| 64 | +When `poll_next` encounters one of these operations, it might need to perform a substantial amount of work before it can return a record batch. |
| 65 | + |
| 66 | +### Tokio and Cooperative Scheduling |
| 67 | + |
| 68 | +We need to make a small detour now via Tokio's scheduler before we can get to the query cancellation problem. |
| 69 | +DataFusion makes use of the Tokio asynchronous runtime, which uses a cooperative scheduling model. |
| 70 | +This is fundamentally different from preemptive scheduling that you might be used to: |
| 71 | + |
| 72 | +- In preemptive scheduling, the system can interrupt a task at any time to run something else |
| 73 | +- In cooperative scheduling, tasks must voluntarily yield control back to the scheduler |
| 74 | + |
| 75 | +This distinction is crucial for understanding our cancellation problem. |
| 76 | +When a Tokio task is running, it can't be forcibly interrupted - it must cooperate by periodically yielding control. |
| 77 | + |
| 78 | +Similarly, when you try to abort a task by calling `JoinHandle::abort()`, the Tokio runtime can't immediately force it to stop. |
| 79 | +You're just telling Tokio: "When this task next yields control, don't resume it." |
| 80 | +If the task never yields, it can't be aborted. |
| 81 | + |
| 82 | +### The Cancellation Problem |
| 83 | + |
| 84 | +With all the necessary background in place, now let's look at how the DataFusion CLI tries to run and cancel a query. |
| 85 | +The code below paraphrases what the CLI actually does: |
| 86 | + |
| 87 | +```rust |
| 88 | +fn exec_query() { |
| 89 | + let runtime: tokio::runtime::Runtime = ...; |
| 90 | + let stream: SendableRecordBatchStream = ...; |
| 91 | + |
| 92 | + runtime.block_on(async { |
| 93 | + tokio::select! { |
| 94 | + next_batch = stream.next() => ... |
| 95 | + _ = signal::ctrl_c() => ..., |
| 96 | + } |
| 97 | + }) |
| 98 | +} |
| 99 | +``` |
| 100 | + |
| 101 | +First the CLI sets up a Tokio runtime instance. |
| 102 | +It then reads the query you want to execute from standard input or file and turns it into a stream. |
| 103 | +Then if calls `next` on stream which is an `async` wrapper for `poll_next`. |
| 104 | +It passes this to the `select!` macro along with a ctrl-C handler. |
| 105 | + |
| 106 | +The `select!` macro is supposed to race these two futures and complete when either one finishes. |
| 107 | +When you press Ctrl+C, the `signal::ctrl_c()` future should complete, allowing the query to be cancelled. |
| 108 | + |
| 109 | +But there's a catch: `select!` still follows cooperative scheduling rules. |
| 110 | +It polls each future in sequence, and if the first one (our query) gets stuck in a long computation, it never gets around to polling the cancellation signal. |
| 111 | + |
| 112 | +Imagine a query that needs to calculate something intensive, like sorting billions of rows. |
| 113 | +The `poll_next` call might run for hours without returning. |
| 114 | +During this time, Tokio can't check if you've pressed Ctrl+C, and the query continues running despite your cancellation request. |
| 115 | + |
| 116 | +## A Closer Look at Blocking Operators |
| 117 | + |
| 118 | +Let's peel back a layer of the onion and look at what's happening in a blocking `poll_next` implementation. |
| 119 | +Here's a drastically simplified version of a `COUNT(*)` aggregation - something you might use in a query like `SELECT COUNT(*) FROM table`: |
| 120 | + |
| 121 | +```rust |
| 122 | +struct BlockingStream { |
| 123 | + stream: SendableRecordBatchStream, |
| 124 | + count: usize, |
| 125 | + finished: bool, |
| 126 | +} |
| 127 | + |
| 128 | +impl Stream for BlockingStream { |
| 129 | + type Item = Result<RecordBatch>; |
| 130 | + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 131 | + if self.finished { |
| 132 | + return Poll::Ready(None); |
| 133 | + } |
| 134 | + |
| 135 | + loop { |
| 136 | + match ready!(self.stream.poll_next_unpin(cx)) { |
| 137 | + Some(Ok(batch)) => self.count += batch.num_rows(), |
| 138 | + None => { |
| 139 | + self.finished = true; |
| 140 | + return Poll::Ready(Some(Ok(create_record_batch(self.count)))); |
| 141 | + } |
| 142 | + Some(Err(e)) => return Poll::Ready(Some(Err(e))), |
| 143 | + } |
| 144 | + } |
| 145 | + } |
| 146 | +} |
| 147 | +``` |
| 148 | + |
| 149 | +How does this code work? Let's break it down step by step: |
| 150 | + |
| 151 | +1. **Initial check**: We first check if we've already finished processing. If so, we return `Ready(None)` to signal the end of our stream: |
| 152 | + ```rust |
| 153 | + if self.finished { |
| 154 | + return Poll::Ready(None); |
| 155 | + } |
| 156 | + ``` |
| 157 | + |
| 158 | +2. **Processing loop**: If we're not done yet, we enter a loop to process incoming batches from our input stream: |
| 159 | + ```rust |
| 160 | + loop { |
| 161 | + match ready!(self.stream.poll_next_unpin(cx)) { |
| 162 | + // Handle different cases... |
| 163 | + } |
| 164 | + } |
| 165 | + ``` |
| 166 | + The `ready!` macro checks if the input stream returned `Pending` and if so, immediately returns `Pending` from our function as well. |
| 167 | + |
| 168 | +3. **Processing data**: For each batch we receive, we simply add its row count to our running total: |
| 169 | + ```rust |
| 170 | + Some(Ok(batch)) => self.count += batch.num_rows(), |
| 171 | + ``` |
| 172 | + |
| 173 | +4. **End of input**: When the child stream is exhausted (returns `None`), we calculate our final result and convert it into a record batch (omitted for brevity): |
| 174 | + ```rust |
| 175 | + None => { |
| 176 | + self.finished = true; |
| 177 | + return Poll::Ready(Some(Ok(create_record_batch(self.count)))); |
| 178 | + } |
| 179 | + ``` |
| 180 | + |
| 181 | +5. **Error handling**: If we encounter an error, we pass it along immediately: |
| 182 | + ```rust |
| 183 | + Some(Err(e)) => return Poll::Ready(Some(Err(e))), |
| 184 | + ``` |
| 185 | + |
| 186 | +This code looks perfectly reasonable at first glance. |
| 187 | +But there's a subtle issue lurking here: what happens if the input stream consistently returns `Ready` and never returns `Pending`? |
| 188 | + |
| 189 | +In that case, the processing loop will keep running without ever yielding control back to Tokio's scheduler. |
| 190 | +This means we could be stuck in a single `poll_next` call for minutes or even hours - exactly the scenario that prevents query cancellation from working! |
| 191 | + |
| 192 | +## Unblocking Operators |
| 193 | + |
| 194 | +Now let's look at how we can ensure we return `Pending` every now and then. |
| 195 | + |
| 196 | +### Independent Cooperative Operators |
| 197 | + |
| 198 | +One simple way to achieve this is using a loop counter. |
| 199 | +We do the exact same thing as before, but on each loop iteration we decrement our counter. |
| 200 | +If the counter hits zero we return `Pending`. |
| 201 | +This ensures we iterate at most 128 times before yielding. |
| 202 | + |
| 203 | +```rust |
| 204 | +struct CountingSourceStream { |
| 205 | + counter: usize |
| 206 | +} |
| 207 | + |
| 208 | +impl Stream for CountingSourceStream { |
| 209 | + type Item = Result<RecordBatch>; |
| 210 | + |
| 211 | + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 212 | + if self.counter >= self.128 { |
| 213 | + self.counter = 0; |
| 214 | + cx.waker().wake_by_ref(); |
| 215 | + return Poll::Pending; |
| 216 | + } |
| 217 | + |
| 218 | + self.counter += 1; |
| 219 | + let batch = ...; |
| 220 | + Ready(Some(Ok(batch))) |
| 221 | + } |
| 222 | +} |
| 223 | +``` |
| 224 | + |
| 225 | +This would be a simple solution for this source operator. |
| 226 | +If our simple aggregation operator consumes this stream it will receive a `Pending` periodically causing it to yield. |
| 227 | +Could it be that simple? |
| 228 | + |
| 229 | +Unfortunately, no. |
| 230 | +Let's look at what happens when we start combining operators in more complex configurations? |
| 231 | +Suppose we create a plan like this. |
| 232 | + |
| 233 | +``` |
| 234 | + Merge |
| 235 | + ┌─────┴─────┐ |
| 236 | + Filter CountingSource |
| 237 | + │ |
| 238 | +CountingSource |
| 239 | +``` |
| 240 | + |
| 241 | +Each source is producing a pending every 128 batches. |
| 242 | +The filter is the standard DataFusion filter, with a filter expression that drops a batch every 50 record batches. |
| 243 | +Merge is a simple combining operator the uses `futures::stream::select` to combine two stream. |
| 244 | + |
| 245 | +When we set this in motion, the merge operator will poll the left and right branch in a round-robin fashion. |
| 246 | +The sources will each emit `Pending` every 128 batches, but the filter dropping batches makes it so that these arrive out-of-phase at the merge operator. |
| 247 | +As a consequence the merge operator will always have the opportunity of polling the other stream when one of the two returns `Pending`. |
| 248 | +The output stream of merge is again an always ready stream. |
| 249 | +If we pass this to our aggregating operator we're right back where we started. |
| 250 | + |
| 251 | +### Coordinated Cooperation |
| 252 | + |
| 253 | +Wouldn't it be great if we could get all the operators to coordinate amongst each other? |
| 254 | +When one of them determines that it's time to yield, all the other operators agree and start returning `Pending` as well. |
| 255 | +That way our task would be coaxed towards yielding even if it tried to poll many different operators. |
| 256 | + |
| 257 | +Luckily the [developers of Tokio ran into the exact same problem](https://tokio.rs/blog/2020-04-preemption) described above when network servers were under heavy load and come up with a solution. |
| 258 | +Back in 2020 already, Tokio 0.2.14 introduced a per-task operation budget. |
| 259 | +Rather than having individual counter littered throughout the code, the runtime manages a per task counter which is decremented by Tokio resources. |
| 260 | +When the counter hits zero, all resources start returning `Pending`. |
| 261 | +The task will then yield, after which the Tokio runtime resets the counter. |
| 262 | + |
| 263 | +As it turns out DataFusion was already using this mechanism implicitly. |
| 264 | +Every exchange-like operator internally makes use of a Tokio multiple producer, single consumer `Channel`. |
| 265 | +When calling `Receiver::recv` for one of these channels, a unit of Tokio task budget is consumed. |
| 266 | +As a consequence, query plans that made use of exchange-like operators were already mostly cancelable. |
| 267 | + |
| 268 | +### Depleting The Budget |
| 269 | + |
| 270 | +Let's revisit our original counting stream and adapt it to use Tokio's budget system. |
| 271 | + |
| 272 | +```rust |
| 273 | +struct BudgetSourceStream { |
| 274 | +} |
| 275 | + |
| 276 | +impl Stream for BudgetSourceStream { |
| 277 | + type Item = Result<RecordBatch>; |
| 278 | + |
| 279 | + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 280 | + let coop = ready!(tokio::task::coop::poll_proceed(cx)); |
| 281 | + let batch: Poll<Option<Self::Item>> = ...; |
| 282 | + if batch.is_ready() { |
| 283 | + coop.made_progress(); |
| 284 | + } |
| 285 | + batch |
| 286 | + } |
| 287 | +} |
| 288 | +``` |
| 289 | + |
| 290 | +The stream now goes through the following steps: |
| 291 | + |
| 292 | +1. **Try to consume budget**: the first thing the operator does is use `poll_proceed` to try to consume a unit of budget. |
| 293 | + If the budget is depleted, this function will return `Pending`. |
| 294 | + Otherwise, we consumed one budget unit and we can continue. |
| 295 | + ```rust |
| 296 | + let coop = ready!(tokio::task::coop::poll_proceed(cx)); |
| 297 | + ``` |
| 298 | +2. **Try to do some work**: next we try to produce a record batch. |
| 299 | + That might not be possible if we're reading from some asynchronous resource that's not ready. |
| 300 | + ```rust |
| 301 | + let batch: Poll<Option<Self::Item>> = ...; |
| 302 | + ``` |
| 303 | +3. **Commit the budget consumption**: finally, if we did produce a batch, we need to tell Tokio that we were able to make progress. |
| 304 | + That's done by calling the `made_progress` method on the value `poll_proceed` returned. |
| 305 | + ```rust |
| 306 | + if batch.is_ready() { |
| 307 | + coop.made_progress(); |
| 308 | + } |
| 309 | + ``` |
| 310 | + |
| 311 | +You might be wondering why that `made_progress` construct is necessary. |
| 312 | +This clever constructs actually makes it easier to manage the budget. |
| 313 | +The value returned by `poll_proceed` will actually restore the budget to its original value when it is dropped. |
| 314 | +It does so unless `made_progress` is called. |
| 315 | +This ensures that if we exit early from our `poll_next` implementation by returning `Pending`, that the budget we had consumed becomes available again. |
| 316 | +The task that invoked `poll_next` can then use that budget again to try to make some other `Stream` (or any resource for that matter) make progress. |
| 317 | + |
| 318 | +### Making It Automatic |
| 319 | + |
| 320 | +The next version of DataFusion integrates the Tokio task budget based fix in all built-in source operators. |
| 321 | +This ensures that going forward, most queries will automatically be cancelable. |
| 322 | + |
| 323 | +On top of that a new `ExecutionPlan` property was introduced that indicates if an operator participates in cooperative scheduling or not. |
| 324 | +A new 'EnsureCooperative' optimizer rule can inspect query plans and insert wrapper `CooperativeExec` nodes as needed to ensure custom source operators also participate. |
| 325 | + |
| 326 | +These two changes combined already make it very unlikely you'll encounter another query that refuses to stop. |
| 327 | +For those situations where the automatic mechanisms are still not sufficient though there's a third addition in the form of the `datafusion::physical_plan::coop` module. |
| 328 | +This new module provides utility functions that make it easy to adopt cooperative scheduling in your custom operators as well. |
0 commit comments