Skip to content

Commit 247283c

Browse files
committed
various changes for ArrowRow and PropRef
1 parent a183369 commit 247283c

File tree

9 files changed

+570
-64
lines changed

9 files changed

+570
-64
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

db4-storage/src/properties/mod.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
use crate::error::StorageError;
22
use arrow_array::{
33
ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array, Int64Array,
4-
StringViewArray, StructArray, TimestampMillisecondArray, UInt8Array, UInt16Array, UInt32Array,
5-
UInt64Array,
4+
StringViewArray, TimestampMillisecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
65
};
7-
use arrow_schema::{DECIMAL128_MAX_PRECISION, Field, Fields};
6+
use arrow_schema::DECIMAL128_MAX_PRECISION;
87
use bigdecimal::ToPrimitive;
98
use raphtory_api::core::entities::properties::{
109
meta::PropMapper,
11-
prop::{
12-
Prop, PropType, SerdeMap, SerdeProp, arrow_dtype_from_prop_type, struct_array_from_props,
13-
},
10+
prop::{Prop, PropType, SerdeMap, arrow_dtype_from_prop_type, struct_array_from_props},
1411
};
1512
use raphtory_core::{
1613
entities::{
@@ -246,7 +243,7 @@ impl Properties {
246243
}
247244
}
248245

249-
pub(crate) fn t_len(&self) -> usize {
246+
pub fn t_len(&self) -> usize {
250247
self.t_properties.len()
251248
}
252249

raphtory-api/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ display-error-chain = { workspace = true, optional = true }
4747
proptest.workspace = true
4848

4949
[features]
50-
default = []
50+
default = ["arrow"]
5151
# Enables generating the pyo3 python bindings
5252
python = [
5353
"dep:pyo3", "dep:pyo3-arrow", "dep:display-error-chain"
Lines changed: 333 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,333 @@
1-
use crate::core::{
2-
entities::properties::{prop::Prop, prop_array::PropArray},
3-
PropType,
4-
};
5-
use std::sync::Arc;
1+
use arrow_array::{cast::AsArray, types::*, Array, ArrowPrimitiveType, StructArray};
2+
use arrow_schema::{DataType, TimeUnit};
3+
use chrono::DateTime;
4+
use serde::{ser::SerializeMap, Serialize};
5+
6+
use crate::core::entities::properties::prop::{Prop, PropRef};
7+
8+
#[derive(Debug, Clone, Copy)]
9+
pub struct ArrowRow<'a> {
10+
array: &'a StructArray,
11+
index: usize,
12+
}
13+
14+
impl<'a> PartialEq for ArrowRow<'a> {
15+
// this has the downside of returning false for rows with same fields but different order of columns
16+
fn eq(&self, other: &Self) -> bool {
17+
if self.array.num_columns() != other.array.num_columns() {
18+
return false;
19+
}
20+
21+
for col in 0..self.array.num_columns() {
22+
let self_prop = self.prop_ref(col);
23+
let other_prop = other.prop_ref(col);
24+
if self_prop != other_prop {
25+
return false;
26+
}
27+
}
28+
true
29+
}
30+
}
31+
32+
impl<'a> Serialize for ArrowRow<'a> {
33+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
34+
where
35+
S: serde::Serializer,
36+
{
37+
let mut state = serializer.serialize_map(Some(self.array.num_columns()))?;
38+
for col in 0..self.array.num_columns() {
39+
let field = &self.array.fields()[col];
40+
let key = field.name();
41+
let value = self.prop_ref(col);
42+
state.serialize_entry(key, &value)?;
43+
}
44+
state.end()
45+
}
46+
}
47+
48+
impl<'a> ArrowRow<'a> {
49+
pub fn primitive_value<T: ArrowPrimitiveType>(&self, col: usize) -> Option<T::Native> {
50+
let primitive_array = self.array.column(col).as_primitive_opt::<T>()?;
51+
(primitive_array.len() > self.index && !primitive_array.is_null(self.index)).then(|| primitive_array.value(self.index))
52+
}
53+
54+
fn primitive_dt<T: DirectConvert>(&self, col: usize) -> Option<(T::Native, &DataType)> {
55+
let col = self.array.column(col).as_primitive_opt::<T>()?;
56+
(col.len() > self.index && !col.is_null(self.index)).then(|| (col.value(self.index), col.data_type()))
57+
}
58+
59+
fn primitive_prop<T: DirectConvert>(&self, col: usize) -> Option<Prop> {
60+
let (value, dt) = self.primitive_dt::<T>(col)?;
61+
let prop = T::prop(value, dt);
62+
Some(prop)
63+
}
64+
65+
fn primitive_prop_ref<T: DirectConvert>(self, col: usize) -> Option<PropRef<'static>> {
66+
let col = self.array.column(col).as_primitive_opt::<T>()?;
67+
let (value, dt) =
68+
(col.len() > self.index && !col.is_null(self.index)).then(|| (col.value(self.index), col.data_type()))?;
69+
let prop_ref = T::prop_ref(value, dt);
70+
Some(prop_ref)
71+
}
72+
73+
fn struct_prop(&self, col: usize) -> Option<Prop> {
74+
let column = self.array.column(col).as_struct_opt()?;
75+
let row = ArrowRow::new(column, self.index);
76+
row.into_prop()
77+
}
78+
79+
fn struct_prop_ref(&self, col: usize) -> Option<PropRef<'a>> {
80+
let column = self.array.column(col).as_struct_opt()?;
81+
let row = ArrowRow::new(column, self.index);
82+
(column.len() > self.index).then(|| PropRef::from(row))
83+
}
84+
85+
pub fn bool_value(&self, col: usize) -> Option<bool> {
86+
let column = self.array.column(col);
87+
match column.data_type() {
88+
DataType::Boolean => {
89+
let col = column.as_boolean();
90+
(col.len() > self.index && !col.is_null(self.index)).then(|| col.value(self.index))
91+
}
92+
_ => None,
93+
}
94+
}
95+
96+
pub fn str_value(self, col: usize) -> Option<&'a str> {
97+
let column = self.array.column(col);
98+
let len = column.len();
99+
let valid = len > self.index && !column.is_null(self.index);
100+
match column.data_type() {
101+
DataType::Utf8 => {
102+
valid.then(|| column.as_string::<i32>().value(self.index))
103+
}
104+
DataType::LargeUtf8 => {
105+
valid.then(|| column.as_string::<i64>().value(self.index))
106+
}
107+
DataType::Utf8View => {
108+
valid.then(|| column.as_string_view().value(self.index))
109+
}
110+
_ => None,
111+
}
112+
}
113+
114+
pub fn prop_value(self, col: usize) -> Option<Prop> {
115+
let dtype = self.array.fields().get(col)?.data_type();
116+
match dtype {
117+
DataType::Null => None,
118+
DataType::Boolean => self.bool_value(col).map(|b| b.into()),
119+
DataType::Int32 => self.primitive_prop::<Int32Type>(col),
120+
DataType::Int64 => self.primitive_prop::<Int64Type>(col),
121+
DataType::UInt8 => self.primitive_prop::<UInt8Type>(col),
122+
DataType::UInt16 => self.primitive_prop::<UInt16Type>(col),
123+
DataType::UInt32 => self.primitive_prop::<UInt32Type>(col),
124+
DataType::UInt64 => self.primitive_prop::<UInt64Type>(col),
125+
DataType::Float32 => self.primitive_prop::<Float32Type>(col),
126+
DataType::Float64 => self.primitive_prop::<Float64Type>(col),
127+
DataType::Timestamp(unit, _) => match unit {
128+
TimeUnit::Second => self.primitive_prop::<TimestampSecondType>(col),
129+
TimeUnit::Millisecond => self.primitive_prop::<TimestampMillisecondType>(col),
130+
TimeUnit::Microsecond => self.primitive_prop::<TimestampMicrosecondType>(col),
131+
TimeUnit::Nanosecond => self.primitive_prop::<TimestampNanosecondType>(col),
132+
},
133+
DataType::Date32 => self.primitive_prop::<Date32Type>(col),
134+
DataType::Date64 => self.primitive_prop::<Date64Type>(col),
135+
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
136+
self.str_value(col).map(|v| v.into())
137+
}
138+
DataType::Decimal128(_, _) => self.primitive_prop::<Decimal128Type>(col),
139+
DataType::Struct(_) => self.struct_prop(col),
140+
_ => None,
141+
}
142+
}
143+
144+
pub fn prop_ref(self, col: usize) -> Option<PropRef<'a>> {
145+
let dtype = self.array.fields().get(col)?.data_type();
146+
match dtype {
147+
DataType::Null => None,
148+
DataType::Boolean => self.bool_value(col).map(|b| b.into()),
149+
DataType::Int32 => self.primitive_prop_ref::<Int32Type>(col),
150+
DataType::Int64 => self.primitive_prop_ref::<Int64Type>(col),
151+
DataType::UInt8 => self.primitive_prop_ref::<UInt8Type>(col),
152+
DataType::UInt16 => self.primitive_prop_ref::<UInt16Type>(col),
153+
DataType::UInt32 => self.primitive_prop_ref::<UInt32Type>(col),
154+
DataType::UInt64 => self.primitive_prop_ref::<UInt64Type>(col),
155+
DataType::Float32 => self.primitive_prop_ref::<Float32Type>(col),
156+
DataType::Float64 => self.primitive_prop_ref::<Float64Type>(col),
157+
DataType::Timestamp(unit, _) => match unit {
158+
TimeUnit::Second => self.primitive_prop_ref::<TimestampSecondType>(col),
159+
TimeUnit::Millisecond => self.primitive_prop_ref::<TimestampMillisecondType>(col),
160+
TimeUnit::Microsecond => self.primitive_prop_ref::<TimestampMicrosecondType>(col),
161+
TimeUnit::Nanosecond => self.primitive_prop_ref::<TimestampNanosecondType>(col),
162+
},
163+
DataType::Date32 => self.primitive_prop_ref::<Date32Type>(col),
164+
DataType::Date64 => self.primitive_prop_ref::<Date64Type>(col),
165+
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
166+
self.str_value(col).map(|v| v.into())
167+
}
168+
DataType::Decimal128(_, _) => self.primitive_prop_ref::<Decimal128Type>(col),
169+
DataType::Struct(_) => self.struct_prop_ref(col),
170+
_ => None,
171+
}
172+
}
173+
174+
pub fn into_prop(self) -> Option<Prop> {
175+
let map = Prop::map(
176+
self.array
177+
.fields()
178+
.iter()
179+
.enumerate()
180+
.filter_map(|(col, field)| Some((field.name().as_ref(), self.prop_value(col)?))),
181+
);
182+
match map {
183+
Prop::Map(m) if m.is_empty() => None,
184+
_ => Some(map),
185+
}
186+
}
187+
188+
pub fn is_valid(&self, col: usize) -> bool {
189+
self.array.column(col).is_valid(self.index)
190+
}
191+
}
192+
193+
impl<'a> ArrowRow<'a> {
194+
pub fn new(array: &'a StructArray, index: usize) -> Self {
195+
Self { array, index }
196+
}
197+
198+
pub fn get<T: 'static>(&self, column: usize) -> Option<&T> {
199+
self.array.column(column).as_any().downcast_ref()
200+
}
201+
}
202+
203+
pub trait DirectConvert: ArrowPrimitiveType {
204+
fn prop_ref(native: Self::Native, dtype: &DataType) -> PropRef<'static>;
205+
fn prop(native: Self::Native, dtype: &DataType) -> Prop {
206+
Self::prop_ref(native, dtype).into()
207+
}
208+
}
209+
210+
impl DirectConvert for UInt8Type {
211+
fn prop_ref(native: Self::Native, _dtype: &DataType) -> PropRef<'static> {
212+
PropRef::from(native)
213+
}
214+
}
215+
216+
impl DirectConvert for UInt16Type {
217+
fn prop_ref(native: Self::Native, _dtype: &DataType) -> PropRef<'static> {
218+
PropRef::from(native)
219+
}
220+
}
221+
222+
impl DirectConvert for UInt32Type {
223+
fn prop_ref(native: Self::Native, _dtype: &DataType) -> PropRef<'static> {
224+
PropRef::from(native)
225+
}
226+
}
227+
228+
impl DirectConvert for UInt64Type {
229+
fn prop_ref(native: Self::Native, _dtype: &DataType) -> PropRef<'static> {
230+
PropRef::from(native)
231+
}
232+
}
233+
234+
impl DirectConvert for Int32Type {
235+
fn prop_ref(native: Self::Native, _dtype: &DataType) -> PropRef<'static> {
236+
PropRef::from(native)
237+
}
238+
}
239+
240+
impl DirectConvert for Int64Type {
241+
fn prop_ref(native: Self::Native, _dtype: &DataType) -> PropRef<'static> {
242+
PropRef::from(native)
243+
}
244+
}
245+
246+
impl DirectConvert for Float32Type {
247+
fn prop_ref(native: Self::Native, _dtype: &DataType) -> PropRef<'static> {
248+
PropRef::from(native)
249+
}
250+
}
251+
252+
impl DirectConvert for Float64Type {
253+
fn prop_ref(native: Self::Native, _dtype: &DataType) -> PropRef<'static> {
254+
PropRef::from(native)
255+
}
256+
}
257+
258+
impl DirectConvert for Date64Type {
259+
fn prop_ref(native: Self::Native, _dtype: &DataType) -> PropRef<'static> {
260+
PropRef::from(DateTime::from_timestamp_millis(native).unwrap())
261+
}
262+
}
263+
264+
impl DirectConvert for Date32Type {
265+
fn prop_ref(native: Self::Native, _dtype: &DataType) -> PropRef<'static> {
266+
PropRef::from(
267+
Date32Type::to_naive_date(native)
268+
.and_hms_opt(0, 0, 0)
269+
.unwrap()
270+
.and_utc(),
271+
)
272+
}
273+
}
274+
275+
impl DirectConvert for TimestampNanosecondType {
276+
fn prop_ref(native: Self::Native, dtype: &DataType) -> PropRef<'static> {
277+
match dtype {
278+
DataType::Timestamp(_, tz) => match tz {
279+
None => PropRef::from(DateTime::from_timestamp_nanos(native).naive_utc()),
280+
Some(_) => PropRef::from(DateTime::from_timestamp_nanos(native)),
281+
},
282+
_ => unreachable!(),
283+
}
284+
}
285+
}
286+
287+
impl DirectConvert for TimestampMicrosecondType {
288+
fn prop_ref(native: Self::Native, dtype: &DataType) -> PropRef<'static> {
289+
match dtype {
290+
DataType::Timestamp(_, tz) => match tz {
291+
None => PropRef::from(DateTime::from_timestamp_micros(native).unwrap().naive_utc()),
292+
Some(_) => PropRef::from(DateTime::from_timestamp_micros(native).unwrap()),
293+
},
294+
_ => unreachable!(),
295+
}
296+
}
297+
}
298+
299+
impl DirectConvert for TimestampMillisecondType {
300+
fn prop_ref(native: Self::Native, dtype: &DataType) -> PropRef<'static> {
301+
match dtype {
302+
DataType::Timestamp(_, tz) => match tz {
303+
None => PropRef::from(DateTime::from_timestamp_millis(native).unwrap().naive_utc()),
304+
Some(_) => PropRef::from(DateTime::from_timestamp_millis(native).unwrap()),
305+
},
306+
_ => unreachable!(),
307+
}
308+
}
309+
}
310+
311+
impl DirectConvert for TimestampSecondType {
312+
fn prop_ref(native: Self::Native, dtype: &DataType) -> PropRef<'static> {
313+
match dtype {
314+
DataType::Timestamp(_, tz) => match tz {
315+
None => PropRef::from(DateTime::from_timestamp(native, 0).unwrap().naive_utc()),
316+
Some(_) => PropRef::from(DateTime::from_timestamp(native, 0).unwrap()),
317+
},
318+
_ => unreachable!(),
319+
}
320+
}
321+
}
322+
323+
impl DirectConvert for Decimal128Type {
324+
fn prop_ref(native: Self::Native, dtype: &DataType) -> PropRef<'static> {
325+
match dtype {
326+
DataType::Decimal128(_, scale) => PropRef::Decimal {
327+
num: native,
328+
scale: *scale as i8,
329+
},
330+
_ => unreachable!(),
331+
}
332+
}
333+
}

0 commit comments

Comments
 (0)