Skip to content

Commit 5e93cfc

Browse files
committed
wrap result stream
1 parent c24f4b1 commit 5e93cfc

File tree

2 files changed

+26
-12
lines changed

2 files changed

+26
-12
lines changed

datafusion-examples/examples/thread_pools.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
use arrow::util::pretty::pretty_format_batches;
2929
use datafusion::error::Result;
3030
use datafusion::execution::SendableRecordBatchStream;
31-
use datafusion::physical_plan::DedicatedExecutor;
31+
use datafusion::physical_plan::{dedicated_executor, DedicatedExecutor};
3232
use datafusion::prelude::*;
3333
use futures::stream::StreamExt;
3434
use object_store::http::HttpBuilder;
@@ -172,6 +172,7 @@ async fn different_runtime_advanced() -> Result<()> {
172172

173173
let http_store = DedicatedExecutor::wrap_object_store(http_store);
174174

175+
// Tell datafusion about processing http:// urls with this wrapped object store
175176
ctx.register_object_store(&base_url, http_store);
176177

177178
// Plan (and execute) the query on the dedicated runtime
@@ -184,19 +185,28 @@ async fn different_runtime_advanced() -> Result<()> {
184185
.await?;
185186
let stream: SendableRecordBatchStream = df.execute_stream().await?;
186187

187-
Ok(stream) as Result<SendableRecordBatchStream>
188+
Ok(stream) as Result<_>
188189
}).await??;
189190

190-
// We have now planned the query on the dedicated runtime, but we still need to
191-
// drive the stream (aka call `next()` to get the results.
191+
// We have now planned the query on the dedicated runtime, Yay! but we still need to
192+
// drive the stream (aka call `next()` to get the results).
192193

193-
// as mentioned above, calling `next()` (including indirectly by using
194-
// FlightDataEncoder to convert the results to flight to send it over the
195-
// network), will *still* result in the CPU work (and a bunch of spawned
196-
// tasks) being done on the runtime calling next() (aka the current runtime)
197-
// and not on the dedicated runtime.
198-
199-
// to drive the stream on the dedicated runtime, we need to wrap it using a XXX stream function
194+
// However, as mentioned above, calling `next()` resolves the Stream (and
195+
// any work it may do) on a thread in the current (default) runtime.
196+
//
197+
// To drive the stream on the dedicated runtime, we need to wrap it using a
198+
// `DedicatedExecutor::wrap_stream` stream function
199+
//
200+
// Note if you don't do this you will likely see a panic about `No IO runtime registered.`
201+
// because the threads in the current (main) tokio runtime have not had the IO runtime
202+
// installed
203+
let mut stream = dedicated_executor.run_sendable_record_batch_stream(stream);
204+
205+
// Note you can run other streams on the DedicatedExecutor as well using the
206+
// DedicatedExecutor:YYYXXX function. This is helpful for example, if you
207+
// need to do non trivial CPU work on the results of the stream (e.g.
208+
// calling a FlightDataEncoder to convert the results to flight to send it
209+
// over the network),
200210

201211
while let Some(batch) = stream.next().await {
202212
println!("{}", pretty_format_batches(&[batch?]).unwrap());

datafusion/physical-plan/src/dedicated_executor.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,12 +218,16 @@ impl DedicatedExecutor {
218218

219219
/// Returns an ObjectStore instance that will always perform I/O work on the
220220
/// IO_RUNTIME.
221+
///
222+
/// Note that this object store will only work correctly if run on this
223+
/// dedicated executor. If you try and use it on another executor, it will
224+
/// panic with "no IO runtime registered" type error.
221225
pub fn wrap_object_store(object_store: Arc<dyn ObjectStore>) -> Arc<IoObjectStore> {
222226
Arc::new(IoObjectStore::new(object_store))
223227
}
224228

225229
/// Returns a SendableRecordBatchStream that will run on this executor's thread pool
226-
pub fn wrap_stream(
230+
pub fn run_sendable_record_batch_stream(
227231
&self,
228232
stream: SendableRecordBatchStream,
229233
) -> SendableRecordBatchStream {

0 commit comments

Comments
 (0)