Skip to content

Commit 907c8f3

Browse files
committed
merge origin/main
2 parents f6fe903 + ebde2bf commit 907c8f3

File tree

15 files changed

+2186
-112
lines changed

15 files changed

+2186
-112
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ datafusion = { version = "48.0.0" }
88
datafusion-proto = { version = "48.0.0" }
99
arrow-flight = "55.2.0"
1010
async-trait = "0.1.88"
11-
itertools = "0.14.0"
1211
tokio = { version = "1.46.1", features = ["full"] }
1312
# Fixed to 0.12.3 because of arrow-flight
1413
tonic = { version = "0.12.3", features = ["transport"] }
@@ -20,7 +19,7 @@ uuid = "1.17.0"
2019
delegate = "0.13.4"
2120
dashmap = "6.1.0"
2221
prost = "0.13.5"
22+
object_store = "0.12.3"
2323

2424
[dev-dependencies]
25-
insta = { version = "1.43.1", features = ["filters"] }
26-
25+
insta = { version = "1.43.1", features = ["filters"] }

src/errors/arrow_error.rs

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
use crate::errors::io_error::IoErrorProto;
2+
use datafusion::arrow::error::ArrowError;
3+
4+
#[derive(Clone, PartialEq, ::prost::Message)]
5+
pub struct ArrowErrorProto {
6+
#[prost(string, optional, tag = "1")]
7+
pub ctx: Option<String>,
8+
#[prost(
9+
oneof = "ArrowErrorInnerProto",
10+
tags = "2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19"
11+
)]
12+
pub inner: Option<ArrowErrorInnerProto>,
13+
}
14+
15+
#[derive(Clone, PartialEq, prost::Oneof)]
16+
pub enum ArrowErrorInnerProto {
17+
#[prost(string, tag = "2")]
18+
NotYetImplemented(String),
19+
#[prost(string, tag = "3")]
20+
ExternalError(String),
21+
#[prost(string, tag = "4")]
22+
CastError(String),
23+
#[prost(string, tag = "5")]
24+
MemoryError(String),
25+
#[prost(string, tag = "6")]
26+
ParseError(String),
27+
#[prost(string, tag = "7")]
28+
SchemaError(String),
29+
#[prost(string, tag = "8")]
30+
ComputeError(String),
31+
#[prost(bool, tag = "9")]
32+
DivideByZero(bool),
33+
#[prost(string, tag = "10")]
34+
ArithmeticOverflow(String),
35+
#[prost(string, tag = "11")]
36+
CsvError(String),
37+
#[prost(string, tag = "12")]
38+
JsonError(String),
39+
#[prost(message, tag = "13")]
40+
IoError(IoErrorProto),
41+
#[prost(message, tag = "14")]
42+
IpcError(String),
43+
#[prost(message, tag = "15")]
44+
InvalidArgumentError(String),
45+
#[prost(message, tag = "16")]
46+
ParquetError(String),
47+
#[prost(message, tag = "17")]
48+
CDataInterface(String),
49+
#[prost(bool, tag = "18")]
50+
DictionaryKeyOverflowError(bool),
51+
#[prost(bool, tag = "19")]
52+
RunEndIndexOverflowError(bool),
53+
}
54+
55+
impl ArrowErrorProto {
56+
pub fn from_arrow_error(err: &ArrowError, ctx: Option<&String>) -> Self {
57+
match err {
58+
ArrowError::NotYetImplemented(msg) => ArrowErrorProto {
59+
inner: Some(ArrowErrorInnerProto::NotYetImplemented(msg.to_string())),
60+
ctx: ctx.cloned(),
61+
},
62+
ArrowError::ExternalError(msg) => ArrowErrorProto {
63+
inner: Some(ArrowErrorInnerProto::ExternalError(msg.to_string())),
64+
ctx: ctx.cloned(),
65+
},
66+
ArrowError::CastError(msg) => ArrowErrorProto {
67+
inner: Some(ArrowErrorInnerProto::CastError(msg.to_string())),
68+
ctx: ctx.cloned(),
69+
},
70+
ArrowError::MemoryError(msg) => ArrowErrorProto {
71+
inner: Some(ArrowErrorInnerProto::MemoryError(msg.to_string())),
72+
ctx: ctx.cloned(),
73+
},
74+
ArrowError::ParseError(msg) => ArrowErrorProto {
75+
inner: Some(ArrowErrorInnerProto::ParseError(msg.to_string())),
76+
ctx: ctx.cloned(),
77+
},
78+
ArrowError::SchemaError(msg) => ArrowErrorProto {
79+
inner: Some(ArrowErrorInnerProto::SchemaError(msg.to_string())),
80+
ctx: ctx.cloned(),
81+
},
82+
ArrowError::ComputeError(msg) => ArrowErrorProto {
83+
inner: Some(ArrowErrorInnerProto::ComputeError(msg.to_string())),
84+
ctx: ctx.cloned(),
85+
},
86+
ArrowError::DivideByZero => ArrowErrorProto {
87+
inner: Some(ArrowErrorInnerProto::DivideByZero(true)),
88+
ctx: ctx.cloned(),
89+
},
90+
ArrowError::ArithmeticOverflow(msg) => ArrowErrorProto {
91+
inner: Some(ArrowErrorInnerProto::ArithmeticOverflow(msg.to_string())),
92+
ctx: ctx.cloned(),
93+
},
94+
ArrowError::CsvError(msg) => ArrowErrorProto {
95+
inner: Some(ArrowErrorInnerProto::CsvError(msg.to_string())),
96+
ctx: ctx.cloned(),
97+
},
98+
ArrowError::JsonError(msg) => ArrowErrorProto {
99+
inner: Some(ArrowErrorInnerProto::JsonError(msg.to_string())),
100+
ctx: ctx.cloned(),
101+
},
102+
ArrowError::IoError(msg, err) => ArrowErrorProto {
103+
inner: Some(ArrowErrorInnerProto::IoError(IoErrorProto::from_io_error(
104+
msg, err,
105+
))),
106+
ctx: ctx.cloned(),
107+
},
108+
ArrowError::IpcError(msg) => ArrowErrorProto {
109+
inner: Some(ArrowErrorInnerProto::IpcError(msg.to_string())),
110+
ctx: ctx.cloned(),
111+
},
112+
ArrowError::InvalidArgumentError(msg) => ArrowErrorProto {
113+
inner: Some(ArrowErrorInnerProto::InvalidArgumentError(msg.to_string())),
114+
ctx: ctx.cloned(),
115+
},
116+
ArrowError::ParquetError(msg) => ArrowErrorProto {
117+
inner: Some(ArrowErrorInnerProto::ParquetError(msg.to_string())),
118+
ctx: ctx.cloned(),
119+
},
120+
ArrowError::CDataInterface(msg) => ArrowErrorProto {
121+
inner: Some(ArrowErrorInnerProto::CDataInterface(msg.to_string())),
122+
ctx: ctx.cloned(),
123+
},
124+
ArrowError::DictionaryKeyOverflowError => ArrowErrorProto {
125+
inner: Some(ArrowErrorInnerProto::DictionaryKeyOverflowError(true)),
126+
ctx: ctx.cloned(),
127+
},
128+
ArrowError::RunEndIndexOverflowError => ArrowErrorProto {
129+
inner: Some(ArrowErrorInnerProto::RunEndIndexOverflowError(true)),
130+
ctx: ctx.cloned(),
131+
},
132+
}
133+
}
134+
135+
pub fn to_arrow_error(&self) -> (ArrowError, Option<String>) {
136+
let Some(ref inner) = self.inner else {
137+
return (
138+
ArrowError::ExternalError(Box::from("Malformed protobuf message".to_string())),
139+
None,
140+
);
141+
};
142+
let err = match inner {
143+
ArrowErrorInnerProto::NotYetImplemented(msg) => {
144+
ArrowError::NotYetImplemented(msg.to_string())
145+
}
146+
ArrowErrorInnerProto::ExternalError(msg) => {
147+
ArrowError::ExternalError(Box::from(msg.to_string()))
148+
}
149+
ArrowErrorInnerProto::CastError(msg) => ArrowError::CastError(msg.to_string()),
150+
ArrowErrorInnerProto::MemoryError(msg) => ArrowError::MemoryError(msg.to_string()),
151+
ArrowErrorInnerProto::ParseError(msg) => ArrowError::ParseError(msg.to_string()),
152+
ArrowErrorInnerProto::SchemaError(msg) => ArrowError::SchemaError(msg.to_string()),
153+
ArrowErrorInnerProto::ComputeError(msg) => ArrowError::ComputeError(msg.to_string()),
154+
ArrowErrorInnerProto::DivideByZero(_) => ArrowError::DivideByZero,
155+
ArrowErrorInnerProto::ArithmeticOverflow(msg) => {
156+
ArrowError::ArithmeticOverflow(msg.to_string())
157+
}
158+
ArrowErrorInnerProto::CsvError(msg) => ArrowError::CsvError(msg.to_string()),
159+
ArrowErrorInnerProto::JsonError(msg) => ArrowError::JsonError(msg.to_string()),
160+
ArrowErrorInnerProto::IoError(msg) => {
161+
let (msg, err) = msg.to_io_error();
162+
ArrowError::IoError(err, msg)
163+
}
164+
ArrowErrorInnerProto::IpcError(msg) => ArrowError::IpcError(msg.to_string()),
165+
ArrowErrorInnerProto::InvalidArgumentError(msg) => {
166+
ArrowError::InvalidArgumentError(msg.to_string())
167+
}
168+
ArrowErrorInnerProto::ParquetError(msg) => ArrowError::ParquetError(msg.to_string()),
169+
ArrowErrorInnerProto::CDataInterface(msg) => {
170+
ArrowError::CDataInterface(msg.to_string())
171+
}
172+
ArrowErrorInnerProto::DictionaryKeyOverflowError(_) => {
173+
ArrowError::DictionaryKeyOverflowError
174+
}
175+
ArrowErrorInnerProto::RunEndIndexOverflowError(_) => {
176+
ArrowError::RunEndIndexOverflowError
177+
}
178+
};
179+
(err, self.ctx.clone())
180+
}
181+
}
182+
183+
#[cfg(test)]
184+
mod tests {
185+
use super::*;
186+
use prost::Message;
187+
use std::io::{Error as IoError, ErrorKind};
188+
189+
#[test]
190+
fn test_arrow_error_roundtrip() {
191+
let test_cases = vec![
192+
ArrowError::NotYetImplemented("test not implemented".to_string()),
193+
ArrowError::ExternalError(Box::new(std::io::Error::new(
194+
ErrorKind::Other,
195+
"external error",
196+
))),
197+
ArrowError::CastError("cast error".to_string()),
198+
ArrowError::MemoryError("memory error".to_string()),
199+
ArrowError::ParseError("parse error".to_string()),
200+
ArrowError::SchemaError("schema error".to_string()),
201+
ArrowError::ComputeError("compute error".to_string()),
202+
ArrowError::DivideByZero,
203+
ArrowError::ArithmeticOverflow("overflow".to_string()),
204+
ArrowError::CsvError("csv error".to_string()),
205+
ArrowError::JsonError("json error".to_string()),
206+
ArrowError::IoError(
207+
"io message".to_string(),
208+
IoError::new(ErrorKind::NotFound, "file not found"),
209+
),
210+
ArrowError::IpcError("ipc error".to_string()),
211+
ArrowError::InvalidArgumentError("invalid arg".to_string()),
212+
ArrowError::ParquetError("parquet error".to_string()),
213+
ArrowError::CDataInterface("cdata error".to_string()),
214+
ArrowError::DictionaryKeyOverflowError,
215+
ArrowError::RunEndIndexOverflowError,
216+
];
217+
218+
for original_error in test_cases {
219+
let proto = ArrowErrorProto::from_arrow_error(
220+
&original_error,
221+
Some(&"test context".to_string()),
222+
);
223+
let proto = ArrowErrorProto::decode(proto.encode_to_vec().as_ref()).unwrap();
224+
let (recovered_error, recovered_ctx) = proto.to_arrow_error();
225+
226+
if original_error.to_string() != recovered_error.to_string() {
227+
println!("original error: {}", original_error.to_string());
228+
println!("recovered error: {}", recovered_error.to_string());
229+
}
230+
231+
assert_eq!(original_error.to_string(), recovered_error.to_string());
232+
assert_eq!(recovered_ctx, Some("test context".to_string()));
233+
234+
let proto_no_ctx = ArrowErrorProto::from_arrow_error(&original_error, None);
235+
let proto_no_ctx =
236+
ArrowErrorProto::decode(proto_no_ctx.encode_to_vec().as_ref()).unwrap();
237+
let (recovered_error_no_ctx, recovered_ctx_no_ctx) = proto_no_ctx.to_arrow_error();
238+
239+
assert_eq!(
240+
original_error.to_string(),
241+
recovered_error_no_ctx.to_string()
242+
);
243+
assert_eq!(recovered_ctx_no_ctx, None);
244+
}
245+
}
246+
247+
#[test]
248+
fn test_malformed_protobuf_message() {
249+
let malformed_proto = ArrowErrorProto {
250+
inner: None,
251+
ctx: None,
252+
};
253+
let (recovered_error, _) = malformed_proto.to_arrow_error();
254+
assert!(matches!(recovered_error, ArrowError::ExternalError(_)));
255+
}
256+
}

0 commit comments

Comments
 (0)