Skip to content

Commit 5c6fe91

Browse files
committed
First draft
1 parent b7d6a21 commit 5c6fe91

File tree

1 file changed

+325
-0
lines changed

1 file changed

+325
-0
lines changed
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
# Query Cancellation
2+
3+
## The Challenge of Cancelling Long-Running Queries
4+
5+
Have you ever tried to cancel a query that just wouldn't stop?
6+
This document explores why that used to sometimes happen in DataFusion and what the community did to squash the problem once and for all.
7+
8+
### Understanding Rust's Async Model
9+
10+
To really understand the cancellation problem you need to be somewhat familiar with Rust's asynchronous programming model.
11+
This is a bit different compared to many other ecosystems.
12+
Let's go over the basics again as a refresher.
13+
If you know the ins and outs of `Future` and friends well you can skip this section.
14+
15+
#### Futures are inert
16+
17+
Rust's asynchronous programming model is built around the `Future<T>` trait, which works quite differently from async models in other languages.
18+
Unlike systems where async code runs continuously on background threads, a Rust `Future<T>` represents a lazy calculation that only makes progress when explicitly polled.
19+
If nothing tells a `Future` to try and make progress explicitly, it is an inert object.
20+
21+
You ask a `Future<T>`to advance its calculation as much as possible by calling its `poll` method.
22+
The future responds with either:
23+
- `Poll::Pending` if it needs to wait for something (like I/O) before it can continue
24+
- `Poll::Ready<T>` when it has completed and produced a value
25+
26+
When a future returns `Pending`, it saves its internal state so it can pick up where it left off the next time you poll it.
27+
This state management is what makes Rust's futures memory-efficient and composable.
28+
29+
Rust's `async` keyword provides syntactic sugar over this model.
30+
When you write an `async` function or block, the compiler transforms it into an implementation of the `Future` trait.
31+
This transformation makes async code much more readable while maintaining the same underlying mechanics.
32+
33+
The `await` keyword complements this by letting you pause execution until a future completes.
34+
When you write `.await` after a future, you're essentially telling the compiler to poll that future until it's ready, and then continue with the result.
35+
36+
#### From Futures to Streams
37+
38+
The `futures` crate extends this model with the `Stream` trait.
39+
Streams represent a sequence of values produced asynchronously rather than just a single value.
40+
The `Stream` trait has one method named `poll_next` that returns:
41+
- `Poll::Pending` when the next value isn't ready yet, just like a `Future` would
42+
- `Poll::Ready(Some(value))` when a new value is available
43+
- `Poll::Ready(None)` when the stream is exhausted
44+
45+
### How DataFusion Executes Queries
46+
47+
In DataFusion, queries are executed as follows:
48+
49+
1. First the query is compiled into a tree of `ExecutionPlan` nodes
50+
2. `ExecutionPlan::execute` is called on the root of the tree. This method returns a `SendableRecordBatchStream` (a pinned `Box<dyn Stream<RecordBatch>>`)
51+
3. `Stream::poll_next` is called in a loop to get the results
52+
53+
The stream we get in step 2 is actually the root of a tree of streams that mostly mirrors the execution plan tree.
54+
Each stream tree node processes the record batches it gets from its children.
55+
The leaves of the tree produce record batches themselves.
56+
57+
Query execution progresses each time you call `poll_next` on the root stream.
58+
This call typically cascades down the tree, with each node calling `poll_next` on its children to get the data it needs to process.
59+
60+
Here's where the first signs of problems start to show up: some operations (like aggregations, sorts, or certain join phases) need to process a lot of data before producing any output.
61+
When `poll_next` encounters one of these operations, it might need to perform a substantial amount of work before it can return a record batch.
62+
63+
### Tokio and Cooperative Scheduling
64+
65+
We need to make a small detour now via Tokio's scheduler before we can get to the query cancellation problem.
66+
DataFusion makes use of the Tokio asynchronous runtime, which uses a cooperative scheduling model.
67+
This is fundamentally different from preemptive scheduling that you might be used to:
68+
69+
- In preemptive scheduling, the system can interrupt a task at any time to run something else
70+
- In cooperative scheduling, tasks must voluntarily yield control back to the scheduler
71+
72+
This distinction is crucial for understanding our cancellation problem.
73+
When a Tokio task is running, it can't be forcibly interrupted - it must cooperate by periodically yielding control.
74+
75+
Similarly, when you try to abort a task by calling `JoinHandle::abort()`, the Tokio runtime can't immediately force it to stop.
76+
You're just telling Tokio: "When this task next yields control, don't resume it."
77+
If the task never yields, it can't be aborted.
78+
79+
### The Cancellation Problem
80+
81+
With all the necessary background in place, now let's look at how the DataFusion CLI tries to run and cancel a query.
82+
The code below paraphrases what the CLI actually does:
83+
84+
```rust
85+
fn exec_query() {
86+
let runtime: tokio::runtime::Runtime = ...;
87+
let stream: SendableRecordBatchStream = ...;
88+
89+
runtime.block_on(async {
90+
tokio::select! {
91+
next_batch = stream.next() => ...
92+
_ = signal::ctrl_c() => ...,
93+
}
94+
})
95+
}
96+
```
97+
98+
First the CLI sets up a Tokio runtime instance.
99+
It then reads the query you want to execute from standard input or file and turns it into a stream.
100+
Then if calls `next` on stream which is an `async` wrapper for `poll_next`.
101+
It passes this to the `select!` macro along with a ctrl-C handler.
102+
103+
The `select!` macro is supposed to race these two futures and complete when either one finishes.
104+
When you press Ctrl+C, the `signal::ctrl_c()` future should complete, allowing the query to be cancelled.
105+
106+
But there's a catch: `select!` still follows cooperative scheduling rules.
107+
It polls each future in sequence, and if the first one (our query) gets stuck in a long computation, it never gets around to polling the cancellation signal.
108+
109+
Imagine a query that needs to calculate something intensive, like sorting billions of rows.
110+
The `poll_next` call might run for hours without returning.
111+
During this time, Tokio can't check if you've pressed Ctrl+C, and the query continues running despite your cancellation request.
112+
113+
## A Closer Look at Blocking Operators
114+
115+
Let's peel back a layer of the onion and look at what's happening in a blocking `poll_next` implementation.
116+
Here's a drastically simplified version of a `COUNT(*)` aggregation - something you might use in a query like `SELECT COUNT(*) FROM table`:
117+
118+
```rust
119+
struct BlockingStream {
120+
stream: SendableRecordBatchStream,
121+
count: usize,
122+
finished: bool,
123+
}
124+
125+
impl Stream for BlockingStream {
126+
type Item = Result<RecordBatch>;
127+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
128+
if self.finished {
129+
return Poll::Ready(None);
130+
}
131+
132+
loop {
133+
match ready!(self.stream.poll_next_unpin(cx)) {
134+
Some(Ok(batch)) => self.count += batch.num_rows(),
135+
None => {
136+
self.finished = true;
137+
return Poll::Ready(Some(Ok(create_record_batch(self.count))));
138+
}
139+
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
140+
}
141+
}
142+
}
143+
}
144+
```
145+
146+
How does this code work? Let's break it down step by step:
147+
148+
1. **Initial check**: We first check if we've already finished processing. If so, we return `Ready(None)` to signal the end of our stream:
149+
```rust
150+
if self.finished {
151+
return Poll::Ready(None);
152+
}
153+
```
154+
155+
2. **Processing loop**: If we're not done yet, we enter a loop to process incoming batches from our input stream:
156+
```rust
157+
loop {
158+
match ready!(self.stream.poll_next_unpin(cx)) {
159+
// Handle different cases...
160+
}
161+
}
162+
```
163+
The `ready!` macro checks if the input stream returned `Pending` and if so, immediately returns `Pending` from our function as well.
164+
165+
3. **Processing data**: For each batch we receive, we simply add its row count to our running total:
166+
```rust
167+
Some(Ok(batch)) => self.count += batch.num_rows(),
168+
```
169+
170+
4. **End of input**: When the child stream is exhausted (returns `None`), we calculate our final result and convert it into a record batch (omitted for brevity):
171+
```rust
172+
None => {
173+
self.finished = true;
174+
return Poll::Ready(Some(Ok(create_record_batch(self.count))));
175+
}
176+
```
177+
178+
5. **Error handling**: If we encounter an error, we pass it along immediately:
179+
```rust
180+
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
181+
```
182+
183+
This code looks perfectly reasonable at first glance.
184+
But there's a subtle issue lurking here: what happens if the input stream consistently returns `Ready` and never returns `Pending`?
185+
186+
In that case, the processing loop will keep running without ever yielding control back to Tokio's scheduler.
187+
This means we could be stuck in a single `poll_next` call for minutes or even hours - exactly the scenario that prevents query cancellation from working!
188+
189+
## Unblocking Operators
190+
191+
Now let's look at how we can ensure we return `Pending` every now and then.
192+
193+
### Independent Cooperative Operators
194+
195+
One simple way to achieve this is using a loop counter.
196+
We do the exact same thing as before, but on each loop iteration we decrement our counter.
197+
If the counter hits zero we return `Pending`.
198+
This ensures we iterate at most 128 times before yielding.
199+
200+
```rust
201+
struct CountingSourceStream {
202+
counter: usize
203+
}
204+
205+
impl Stream for CountingSourceStream {
206+
type Item = Result<RecordBatch>;
207+
208+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
209+
if self.counter >= self.128 {
210+
self.counter = 0;
211+
cx.waker().wake_by_ref();
212+
return Poll::Pending;
213+
}
214+
215+
self.counter += 1;
216+
let batch = ...;
217+
Ready(Some(Ok(batch)))
218+
}
219+
}
220+
```
221+
222+
This would be a simple solution for this source operator.
223+
If our simple aggregation operator consumes this stream it will receive a `Pending` periodically causing it to yield.
224+
Could it be that simple?
225+
226+
Unfortunately, no.
227+
Let's look at what happens when we start combining operators in more complex configurations?
228+
Suppose we create a plan like this.
229+
230+
```
231+
Merge
232+
┌─────┴─────┐
233+
Filter CountingSource
234+
235+
CountingSource
236+
```
237+
238+
Each source is producing a pending every 128 batches.
239+
The filter is the standard DataFusion filter, with a filter expression that drops a batch every 50 record batches.
240+
Merge is a simple combining operator the uses `futures::stream::select` to combine two stream.
241+
242+
When we set this in motion, the merge operator will poll the left and right branch in a round-robin fashion.
243+
The sources will each emit `Pending` every 128 batches, but the filter dropping batches makes it so that these arrive out-of-phase at the merge operator.
244+
As a consequence the merge operator will always have the opportunity of polling the other stream when one of the two returns `Pending`.
245+
The output stream of merge is again an always ready stream.
246+
If we pass this to our aggregating operator we're right back where we started.
247+
248+
### Coordinated Cooperation
249+
250+
Wouldn't it be great if we could get all the operators to coordinate amongst each other?
251+
When one of them determines that it's time to yield, all the other operators agree and start returning `Pending` as well.
252+
That way our task would be coaxed towards yielding even if it tried to poll many different operators.
253+
254+
Luckily the [developers of Tokio ran into the exact same problem](https://tokio.rs/blog/2020-04-preemption) described above when network servers were under heavy load and come up with a solution.
255+
Back in 2020 already, Tokio 0.2.14 introduced a per-task operation budget.
256+
Rather than having individual counter littered throughout the code, the runtime manages a per task counter which is decremented by Tokio resources.
257+
When the counter hits zero, all resources start returning `Pending`.
258+
The task will then yield, after which the Tokio runtime resets the counter.
259+
260+
As it turns out DataFusion was already using this mechanism implicitly.
261+
Every exchange-like operator internally makes use of a Tokio multiple producer, single consumer `Channel`.
262+
When calling `Receiver::recv` for one of these channels, a unit of Tokio task budget is consumed.
263+
As a consequence, query plans that made use of exchange-like operators were already mostly cancelable.
264+
265+
### Depleting The Budget
266+
267+
Let's revisit our original counting stream and adapt it to use Tokio's budget system.
268+
269+
```rust
270+
struct BudgetSourceStream {
271+
}
272+
273+
impl Stream for BudgetSourceStream {
274+
type Item = Result<RecordBatch>;
275+
276+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
277+
let coop = ready!(tokio::task::coop::poll_proceed(cx));
278+
let batch: Poll<Option<Self::Item>> = ...;
279+
if batch.is_ready() {
280+
coop.made_progress();
281+
}
282+
batch
283+
}
284+
}
285+
```
286+
287+
The steam now goes through the following steps:
288+
289+
1. **Try to consume budget**: the first thing the operator does is use `poll_proceed` to try to consume a unit of budget.
290+
If the budget is depleted, this function will return `Pending`.
291+
Otherwise, we consumed one budget unit and we can continue.
292+
```rust
293+
let coop = ready!(tokio::task::coop::poll_proceed(cx));
294+
```
295+
2. **Try to do some work**: next we try to produce a record batch.
296+
That might not be possible if we're reading from some asynchronous resource that's not ready.
297+
```rust
298+
let batch: Poll<Option<Self::Item>> = ...;
299+
```
300+
3. **Commit the budget consumption**: finally, if we did produce a batch, we need to tell Tokio that we were able to make progress.
301+
That's done by calling the `made_progress` method on the value `poll_proceed` returned.
302+
```rust
303+
if batch.is_ready() {
304+
coop.made_progress();
305+
}
306+
```
307+
308+
You might be wondering why that `made_progress` construct is necessary.
309+
This clever constructs actually makes it easier to manage the budget.
310+
The value returned by `poll_proceed` will actually restore the budget to its original value when it is dropped.
311+
It does so unless `made_progress` is called.
312+
This ensures that if we exit early from our `poll_next` implementation by returning `Pending`, that the budget we had consumed becomes available again.
313+
The task that invoked `poll_next` can then use that budget again to try to make some other `Stream` (or any resource for that matter) make progress.
314+
315+
### Making It Automatic
316+
317+
The next version of DataFusion integrates the Tokio task budget based fix in all built-in source operators.
318+
This ensures that going forward, most queries will automatically be cancelable.
319+
320+
On top of that a new `ExecutionPlan` property was introduced that indicates if an operator participates in cooperative scheduling or not.
321+
A new 'EnsureCooperative' optimizer rule can inspect query plans and insert wrapper `CooperativeExec` nodes as needed to ensure custom source operators also participate.
322+
323+
These two changes combined already make it very unlikely you'll encounter another query that refuses to stop.
324+
For those situations where the automatic mechanisms are still not sufficient though there's a third addition in the form of the `datafusion::physical_plan::coop` module.
325+
This new module provides utility functions that make it easy to adopt cooperative scheduling in your custom operators as well.

0 commit comments

Comments
 (0)