Skip to content

Commit 82931e1

Browse files
committed
execution failing test
2 parents 907c8f3 + 0ad682e commit 82931e1

39 files changed

+851
-1263
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@ tokio = { version = "1.46.1", features = ["full"] }
1313
tonic = { version = "0.12.3", features = ["transport"] }
1414
tower = "0.5.2"
1515
http = "1.3.1"
16+
itertools = "0.14.0"
1617
futures = "0.3.31"
1718
url = "2.5.4"
1819
uuid = "1.17.0"
1920
delegate = "0.13.4"
2021
dashmap = "6.1.0"
2122
prost = "0.13.5"
23+
rand = "0.8.5"
2224
object_store = "0.12.3"
2325

2426
[dev-dependencies]
25-
insta = { version = "1.43.1", features = ["filters"] }
27+
insta = { version = "1.43.1", features = ["filters"] }

context.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use url::Url;
2+
use uuid::Uuid;
3+
4+
#[derive(Debug, Clone)]
5+
pub struct StageContext {
6+
/// Unique identifier of the Stage.
7+
pub id: Uuid,
8+
/// Number of tasks involved in the query.
9+
pub n_tasks: usize,
10+
/// Unique identifier of the input Stage.
11+
pub input_id: Uuid,
12+
/// Urls from which the current stage will need to read data.
13+
pub input_urls: Vec<Url>,
14+
}
15+
16+
#[derive(Debug, Clone)]
17+
pub struct StageTaskContext {
18+
/// Index of the current task in a stage
19+
pub task_idx: usize,
20+
}

rust-toolchain.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[toolchain]
2+
channel = "1.83.0"
3+
profile = "default"

src/channel_manager.rs

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
use async_trait::async_trait;
22
use datafusion::common::internal_datafusion_err;
33
use datafusion::error::DataFusionError;
4-
use datafusion::prelude::SessionConfig;
4+
use datafusion::execution::TaskContext;
5+
use datafusion::prelude::{SessionConfig, SessionContext};
56
use delegate::delegate;
67
use std::sync::Arc;
78
use tonic::body::BoxBody;
89
use url::Url;
910

11+
#[derive(Clone)]
1012
pub struct ChannelManager(Arc<dyn ChannelResolver + Send + Sync>);
1113

1214
impl ChannelManager {
@@ -21,29 +23,49 @@ pub type BoxCloneSyncChannel = tower::util::BoxCloneSyncService<
2123
tonic::transport::Error,
2224
>;
2325

24-
#[derive(Clone, Debug)]
25-
pub struct ArrowFlightChannel {
26-
pub url: Url,
27-
pub channel: BoxCloneSyncChannel,
28-
}
29-
26+
/// Abstracts networking details so that users can implement their own network resolution
27+
/// mechanism.
3028
#[async_trait]
3129
pub trait ChannelResolver {
32-
async fn get_n_channels(&self, n: usize) -> Result<Vec<ArrowFlightChannel>, DataFusionError>;
33-
async fn get_channel_for_url(&self, url: &Url) -> Result<ArrowFlightChannel, DataFusionError>;
30+
/// Gets all available worker URLs. Used during stage assignment.
31+
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError>;
32+
/// For a given URL, get a channel for communicating to it.
33+
async fn get_channel_for_url(&self, url: &Url) -> Result<BoxCloneSyncChannel, DataFusionError>;
3434
}
3535

3636
impl ChannelManager {
37-
pub fn try_from_session(session: &SessionConfig) -> Result<Arc<Self>, DataFusionError> {
38-
session
39-
.get_extension::<ChannelManager>()
40-
.ok_or_else(|| internal_datafusion_err!("No extension ChannelManager"))
41-
}
42-
4337
delegate! {
4438
to self.0 {
45-
pub async fn get_n_channels(&self, n: usize) -> Result<Vec<ArrowFlightChannel>, DataFusionError>;
46-
pub async fn get_channel_for_url(&self, url: &Url) -> Result<ArrowFlightChannel, DataFusionError>;
39+
pub fn get_urls(&self) -> Result<Vec<Url>, DataFusionError>;
40+
pub async fn get_channel_for_url(&self, url: &Url) -> Result<BoxCloneSyncChannel, DataFusionError>;
4741
}
4842
}
4943
}
44+
45+
impl TryInto<ChannelManager> for &SessionConfig {
46+
type Error = DataFusionError;
47+
48+
fn try_into(self) -> Result<ChannelManager, Self::Error> {
49+
Ok(self
50+
.get_extension::<ChannelManager>()
51+
.ok_or_else(|| internal_datafusion_err!("No extension ChannelManager"))?
52+
.as_ref()
53+
.clone())
54+
}
55+
}
56+
57+
impl TryInto<ChannelManager> for &TaskContext {
58+
type Error = DataFusionError;
59+
60+
fn try_into(self) -> Result<ChannelManager, Self::Error> {
61+
self.session_config().try_into()
62+
}
63+
}
64+
65+
impl TryInto<ChannelManager> for &SessionContext {
66+
type Error = DataFusionError;
67+
68+
fn try_into(self) -> Result<ChannelManager, Self::Error> {
69+
self.task_ctx().as_ref().try_into()
70+
}
71+
}

src/errors/io_error.rs

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,17 @@ impl IoErrorProto {
4040
ErrorKind::WriteZero => 22,
4141
ErrorKind::StorageFull => 23,
4242
ErrorKind::NotSeekable => 24,
43-
ErrorKind::QuotaExceeded => 25,
44-
ErrorKind::FileTooLarge => 26,
45-
ErrorKind::ResourceBusy => 27,
46-
ErrorKind::ExecutableFileBusy => 28,
47-
ErrorKind::Deadlock => 29,
48-
ErrorKind::CrossesDevices => 30,
49-
ErrorKind::TooManyLinks => 31,
50-
ErrorKind::ArgumentListTooLong => 32,
51-
ErrorKind::Interrupted => 33,
52-
ErrorKind::Unsupported => 34,
53-
ErrorKind::UnexpectedEof => 35,
54-
ErrorKind::OutOfMemory => 36,
55-
ErrorKind::Other => 37,
43+
ErrorKind::FileTooLarge => 25,
44+
ErrorKind::ResourceBusy => 26,
45+
ErrorKind::ExecutableFileBusy => 27,
46+
ErrorKind::Deadlock => 28,
47+
ErrorKind::TooManyLinks => 29,
48+
ErrorKind::ArgumentListTooLong => 30,
49+
ErrorKind::Interrupted => 31,
50+
ErrorKind::Unsupported => 32,
51+
ErrorKind::UnexpectedEof => 33,
52+
ErrorKind::OutOfMemory => 34,
53+
ErrorKind::Other => 35,
5654
_ => -1,
5755
},
5856
err: err.to_string(),
@@ -86,19 +84,17 @@ impl IoErrorProto {
8684
22 => ErrorKind::WriteZero,
8785
23 => ErrorKind::StorageFull,
8886
24 => ErrorKind::NotSeekable,
89-
25 => ErrorKind::QuotaExceeded,
90-
26 => ErrorKind::FileTooLarge,
91-
27 => ErrorKind::ResourceBusy,
92-
28 => ErrorKind::ExecutableFileBusy,
93-
29 => ErrorKind::Deadlock,
94-
30 => ErrorKind::CrossesDevices,
95-
31 => ErrorKind::TooManyLinks,
96-
32 => ErrorKind::ArgumentListTooLong,
97-
33 => ErrorKind::Interrupted,
98-
34 => ErrorKind::Unsupported,
99-
35 => ErrorKind::UnexpectedEof,
100-
36 => ErrorKind::OutOfMemory,
101-
37 => ErrorKind::Other,
87+
25 => ErrorKind::FileTooLarge,
88+
26 => ErrorKind::ResourceBusy,
89+
27 => ErrorKind::ExecutableFileBusy,
90+
28 => ErrorKind::Deadlock,
91+
29 => ErrorKind::TooManyLinks,
92+
30 => ErrorKind::ArgumentListTooLong,
93+
31 => ErrorKind::Interrupted,
94+
32 => ErrorKind::Unsupported,
95+
33 => ErrorKind::UnexpectedEof,
96+
34 => ErrorKind::OutOfMemory,
97+
35 => ErrorKind::Other,
10298
_ => ErrorKind::Other,
10399
};
104100
(

src/exec/distributed.rs

Lines changed: 0 additions & 98 deletions
This file was deleted.

src/exec/mod.rs

Lines changed: 0 additions & 2 deletions
This file was deleted.

0 commit comments

Comments
 (0)