Skip to content

Commit 62e231a

Browse files
authored
Support converting TimestampTZ to and from duckdb (#4226)
fix #4208 Signed-off-by: Robert Kruszewski <[email protected]>
1 parent fde6f42 commit 62e231a

File tree

20 files changed

+604
-680
lines changed

20 files changed

+604
-680
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bench-vortex/src/engines/ddb/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::time::{Duration, Instant};
66
use anyhow::Result;
77
use log::trace;
88
use url::Url;
9+
use vortex::error::VortexExpect;
910
use vortex_duckdb::duckdb::{Connection, Database};
1011

1112
use crate::{BenchmarkDataset, Format, IdempotentPath};
@@ -71,7 +72,10 @@ impl DuckDBCtx {
7172
let query_time = time_instant.elapsed();
7273
trace!("query completed in {:.3}s", query_time.as_secs_f64());
7374

74-
Ok((query_time, result.row_count()?))
75+
Ok((
76+
query_time,
77+
usize::try_from(result.row_count()).vortex_expect("row count overflow"),
78+
))
7579
}
7680

7781
/// Register tables for benchmarks using the internal connection

encodings/fastlanes/src/bitpacking/compress.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
use std::mem::MaybeUninit;
55

6-
use arrow_buffer::ArrowNativeType;
76
use fastlanes::BitPacking;
87
use itertools::Itertools;
98
use num_traits::{AsPrimitive, PrimInt};
@@ -133,10 +132,7 @@ pub unsafe fn bitpack_unchecked(
133132
/// Bitpack a slice of primitives down to the given width.
134133
///
135134
/// See `bitpack` for more caller information.
136-
pub fn bitpack_primitive<T: NativePType + BitPacking + ArrowNativeType>(
137-
array: &[T],
138-
bit_width: u8,
139-
) -> Buffer<T> {
135+
pub fn bitpack_primitive<T: NativePType + BitPacking>(array: &[T], bit_width: u8) -> Buffer<T> {
140136
if bit_width == 0 {
141137
return Buffer::<T>::empty();
142138
}

encodings/fastlanes/src/bitpacking/compute/filter.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
use std::mem;
55
use std::mem::MaybeUninit;
66

7-
use arrow_buffer::ArrowNativeType;
87
use fastlanes::BitPacking;
98
use vortex_array::arrays::PrimitiveArray;
109
use vortex_array::compute::{FilterKernel, FilterKernelAdapter, filter};
@@ -43,12 +42,12 @@ register_kernel!(FilterKernelAdapter(BitPackedVTable).lift());
4342
/// FastLanes decompression is so fast and the bookkeepping necessary to decompress individual
4443
/// elements is relatively slow. If you prefer to never fully decompress, use
4544
/// [filter_primitive_no_decompression].
46-
fn filter_primitive<T: NativePType + BitPacking + ArrowNativeType>(
45+
fn filter_primitive<T: NativePType + BitPacking>(
4746
array: &BitPackedArray,
4847
mask: &Mask,
4948
) -> VortexResult<PrimitiveArray> {
5049
// Short-circuit if the selectivity is high enough.
51-
let full_decompression_threshold = match T::get_byte_width() {
50+
let full_decompression_threshold = match size_of::<T>() {
5251
1 => 0.03,
5352
2 => 0.03,
5453
4 => 0.075,
@@ -67,7 +66,7 @@ fn filter_primitive<T: NativePType + BitPacking + ArrowNativeType>(
6766
/// Filter a bit-packed array, without using full decompression.
6867
///
6968
/// You should probably use [filter_primitive].
70-
fn filter_primitive_no_decompression<T: NativePType + BitPacking + ArrowNativeType>(
69+
fn filter_primitive_no_decompression<T: NativePType + BitPacking>(
7170
array: &BitPackedArray,
7271
mask: &Mask,
7372
) -> VortexResult<PrimitiveArray> {
@@ -96,7 +95,7 @@ fn filter_primitive_no_decompression<T: NativePType + BitPacking + ArrowNativeTy
9695
Ok(values)
9796
}
9897

99-
fn filter_indices<T: NativePType + BitPacking + ArrowNativeType>(
98+
fn filter_indices<T: NativePType + BitPacking>(
10099
array: &BitPackedArray,
101100
indices_len: usize,
102101
indices: impl Iterator<Item = usize>,

fuzz/src/array/take.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use arrow_buffer::ArrowNativeType;
54
use vortex_array::accessor::ArrayAccessor;
65
use vortex_array::arrays::{BoolArray, DecimalArray, PrimitiveArray, StructArray, VarBinViewArray};
76
use vortex_array::builders::{ArrayBuilderExt, builder_with_capacity};
@@ -131,7 +130,7 @@ pub fn take_canonical_array(
131130
}
132131
}
133132

134-
fn take_primitive<T: NativePType + ArrowNativeType>(
133+
fn take_primitive<T: NativePType>(
135134
primitive_array: PrimitiveArray,
136135
validity: Validity,
137136
indices: &[usize],

vortex-array/src/arrays/decimal/patch.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use arrow_buffer::ArrowNativeType;
54
use itertools::Itertools as _;
5+
use num_traits::AsPrimitive;
66
use vortex_buffer::{Buffer, BufferMut};
77
use vortex_dtype::{DecimalDType, NativePType, match_each_integer_ptype};
88
use vortex_error::{VortexExpect as _, VortexResult, vortex_bail};
@@ -58,7 +58,7 @@ fn patch_typed<I, ValuesDVT, PatchDVT>(
5858
patched_validity: Validity,
5959
) -> VortexResult<DecimalArray>
6060
where
61-
I: NativePType + ArrowNativeType,
61+
I: NativePType + AsPrimitive<usize>,
6262
PatchDVT: NativeDecimalType,
6363
ValuesDVT: NativeDecimalType,
6464
{
@@ -71,7 +71,7 @@ where
7171
}
7272

7373
for (idx, value) in patch_indices.iter().zip_eq(patch_values.into_iter()) {
74-
buffer[idx.as_usize() - patch_indices_offset] = <ValuesDVT as BigCast>::from(value).vortex_expect(
74+
buffer[idx.as_() - patch_indices_offset] = <ValuesDVT as BigCast>::from(value).vortex_expect(
7575
"values of a given DecimalDType are representable in all compatible NativeDecimalType",
7676
);
7777
}

vortex-array/src/arrays/primitive/patch.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use arrow_buffer::ArrowNativeType;
4+
use num_traits::AsPrimitive;
55
use vortex_dtype::{NativePType, match_each_integer_ptype, match_each_native_ptype};
66
use vortex_error::VortexResult;
77

@@ -43,15 +43,15 @@ impl PrimitiveArray {
4343
patched_validity: Validity,
4444
) -> VortexResult<Self>
4545
where
46-
T: NativePType + ArrowNativeType,
47-
I: NativePType + ArrowNativeType,
46+
T: NativePType,
47+
I: NativePType + AsPrimitive<usize>,
4848
{
4949
let mut own_values = self.into_buffer_mut::<T>();
5050

5151
let patch_indices = patch_indices.as_slice::<I>();
5252
let patch_values = patch_values.as_slice::<T>();
5353
for (idx, value) in itertools::zip_eq(patch_indices, patch_values) {
54-
own_values[idx.as_usize() - patch_indices_offset] = *value;
54+
own_values[idx.as_() - patch_indices_offset] = *value;
5555
}
5656
Ok(Self::new(own_values, patched_validity))
5757
}

vortex-duckdb/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ vortex = { workspace = true, features = ["files"] }
4040
vortex-file = { workspace = true, features = ["tokio", "object_store"] }
4141

4242
[dev-dependencies]
43+
jiff = { workspace = true }
4344
rstest = { workspace = true }
4445

4546
[lints]

vortex-duckdb/src/convert/dtype.rs

Lines changed: 44 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ impl LogicalType {
9393
unsafe {
9494
let ptr = cpp::duckdb_create_list_type(element_type.as_ptr());
9595
if ptr.is_null() {
96-
return Err(vortex_err!("Failed to create list type"));
96+
vortex_bail!("Failed to create list type");
9797
}
9898
Ok(Self::own(ptr))
9999
}
@@ -119,24 +119,28 @@ impl LogicalType {
119119
let duckdb_type = match temporal_metadata {
120120
TemporalMetadata::Date(TimeUnit::D) => DUCKDB_TYPE::DUCKDB_TYPE_DATE,
121121
TemporalMetadata::Date(time_unit) => {
122-
return Err(vortex_err!("Invalid TimeUnit {} for date", time_unit));
122+
vortex_bail!("Invalid TimeUnit {} for date", time_unit);
123123
}
124124
TemporalMetadata::Time(TimeUnit::Us) => DUCKDB_TYPE::DUCKDB_TYPE_TIME,
125125
TemporalMetadata::Time(time_unit) => {
126-
return Err(vortex_err!("Invalid TimeUnit {} for time", time_unit));
126+
vortex_bail!("Invalid TimeUnit {} for time", time_unit);
127127
}
128-
TemporalMetadata::Timestamp(time_unit, tz) => {
129-
if tz.is_some() {
130-
return Err(vortex_err!("Timestamp with timezone is not yet supported"));
128+
TemporalMetadata::Timestamp(time_unit, tz) => match time_unit {
129+
TimeUnit::Ns => DUCKDB_TYPE::DUCKDB_TYPE_TIMESTAMP_NS,
130+
TimeUnit::Us => {
131+
if let Some(tz) = tz {
132+
if tz != "UTC" {
133+
vortex_bail!("Invalid timezone for timestamp: {tz}");
134+
}
135+
DUCKDB_TYPE::DUCKDB_TYPE_TIMESTAMP_TZ
136+
} else {
137+
DUCKDB_TYPE::DUCKDB_TYPE_TIMESTAMP
138+
}
131139
}
132-
match time_unit {
133-
TimeUnit::Ns => DUCKDB_TYPE::DUCKDB_TYPE_TIMESTAMP_NS,
134-
TimeUnit::Us => DUCKDB_TYPE::DUCKDB_TYPE_TIMESTAMP,
135-
TimeUnit::Ms => DUCKDB_TYPE::DUCKDB_TYPE_TIMESTAMP_MS,
136-
TimeUnit::S => DUCKDB_TYPE::DUCKDB_TYPE_TIMESTAMP_S,
137-
_ => return Err(vortex_err!("Invalid TimeUnit {} for timestamp", time_unit)),
138-
}
139-
}
140+
TimeUnit::Ms => DUCKDB_TYPE::DUCKDB_TYPE_TIMESTAMP_MS,
141+
TimeUnit::S => DUCKDB_TYPE::DUCKDB_TYPE_TIMESTAMP_S,
142+
_ => vortex_bail!("Invalid TimeUnit {} for timestamp", time_unit),
143+
},
140144
};
141145

142146
Ok(Self::new(duckdb_type))
@@ -171,7 +175,6 @@ impl FromLogicalType for DType {
171175
DUCKDB_TYPE::DUCKDB_TYPE_UHUGEINT => todo!(),
172176
DUCKDB_TYPE::DUCKDB_TYPE_FLOAT => DType::Primitive(F32, nullability),
173177
DUCKDB_TYPE::DUCKDB_TYPE_DOUBLE => DType::Primitive(F64, nullability),
174-
175178
DUCKDB_TYPE::DUCKDB_TYPE_VARCHAR => DType::Utf8(nullability),
176179
DUCKDB_TYPE::DUCKDB_TYPE_BLOB => DType::Binary(nullability),
177180
DUCKDB_TYPE::DUCKDB_TYPE_DECIMAL => {
@@ -206,11 +209,17 @@ impl FromLogicalType for DType {
206209
Arc::new(DType::Primitive(I64, nullability)),
207210
Some(TemporalMetadata::Timestamp(TimeUnit::Us, None).into()),
208211
))),
212+
DUCKDB_TYPE::DUCKDB_TYPE_TIMESTAMP_TZ => DType::Extension(Arc::new(ExtDType::new(
213+
TIMESTAMP_ID.clone(),
214+
Arc::new(DType::Primitive(I64, nullability)),
215+
Some(TemporalMetadata::Timestamp(TimeUnit::Us, Some("UTC".to_string())).into()),
216+
))),
209217
DUCKDB_TYPE::DUCKDB_TYPE_TIMESTAMP_NS => DType::Extension(Arc::new(ExtDType::new(
210218
TIMESTAMP_ID.clone(),
211219
Arc::new(DType::Primitive(I64, nullability)),
212220
Some(TemporalMetadata::Timestamp(TimeUnit::Ns, None).into()),
213221
))),
222+
DUCKDB_TYPE::DUCKDB_TYPE_TIME_TZ => todo!(),
214223
DUCKDB_TYPE::DUCKDB_TYPE_INTERVAL => todo!(),
215224
DUCKDB_TYPE::DUCKDB_TYPE_ENUM => todo!(),
216225
DUCKDB_TYPE::DUCKDB_TYPE_LIST => todo!(),
@@ -220,8 +229,6 @@ impl FromLogicalType for DType {
220229
DUCKDB_TYPE::DUCKDB_TYPE_UUID => todo!(),
221230
DUCKDB_TYPE::DUCKDB_TYPE_UNION => todo!(),
222231
DUCKDB_TYPE::DUCKDB_TYPE_BIT => todo!(),
223-
DUCKDB_TYPE::DUCKDB_TYPE_TIME_TZ => todo!(),
224-
DUCKDB_TYPE::DUCKDB_TYPE_TIMESTAMP_TZ => todo!(),
225232
DUCKDB_TYPE::DUCKDB_TYPE_ANY => todo!(),
226233
DUCKDB_TYPE::DUCKDB_TYPE_VARINT => todo!(),
227234
DUCKDB_TYPE::DUCKDB_TYPE_STRING_LITERAL => todo!(),
@@ -526,6 +533,26 @@ mod tests {
526533
}
527534
}
528535

536+
#[test]
537+
fn test_timestamp_with_timezone() {
538+
use std::sync::Arc;
539+
540+
use vortex::dtype::datetime::{TIMESTAMP_ID, TemporalMetadata, TimeUnit};
541+
use vortex::dtype::{ExtDType, PType};
542+
543+
let ext_dtype = ExtDType::new(
544+
TIMESTAMP_ID.clone(),
545+
Arc::new(DType::Primitive(PType::I64, Nullability::NonNullable)),
546+
Some(TemporalMetadata::Timestamp(TimeUnit::Us, Some("UTC".to_string())).into()),
547+
);
548+
let dtype = DType::Extension(Arc::new(ext_dtype));
549+
550+
assert_eq!(
551+
LogicalType::try_from(&dtype).unwrap().as_type_id(),
552+
cpp::DUCKDB_TYPE::DUCKDB_TYPE_TIMESTAMP_TZ
553+
);
554+
}
555+
529556
#[test]
530557
fn test_temporal_extension_invalid_time_units() {
531558
use std::sync::Arc;
@@ -552,23 +579,6 @@ mod tests {
552579
assert!(LogicalType::try_from(&dtype).is_err());
553580
}
554581

555-
#[test]
556-
fn test_timestamp_with_timezone_unsupported() {
557-
use std::sync::Arc;
558-
559-
use vortex::dtype::datetime::{TIMESTAMP_ID, TemporalMetadata, TimeUnit};
560-
use vortex::dtype::{ExtDType, PType};
561-
562-
let ext_dtype = ExtDType::new(
563-
TIMESTAMP_ID.clone(),
564-
Arc::new(DType::Primitive(PType::I64, Nullability::NonNullable)),
565-
Some(TemporalMetadata::Timestamp(TimeUnit::Us, Some("UTC".to_string())).into()),
566-
);
567-
let dtype = DType::Extension(Arc::new(ext_dtype));
568-
569-
assert!(LogicalType::try_from(&dtype).is_err());
570-
}
571-
572582
#[test]
573583
fn test_unsupported_extension_type() {
574584
use vortex::dtype::{ExtDType, ExtID, PType};

vortex-duckdb/src/convert/vector.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@ use std::sync::Arc;
66

77
use arrow_array::builder::GenericBinaryBuilder;
88
use arrow_array::types::{
9-
Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, Time64MicrosecondType,
10-
UInt8Type, UInt16Type, UInt32Type, UInt64Type,
9+
Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, UInt8Type, UInt16Type,
10+
UInt32Type, UInt64Type,
1111
};
1212
use arrow_array::{
1313
Array, BooleanArray, Date32Array, Decimal128Array, PrimitiveArray, StringArray,
14-
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
15-
TimestampSecondArray,
14+
Time64MicrosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
15+
TimestampNanosecondArray, TimestampSecondArray,
1616
};
1717
use arrow_buffer::buffer::BooleanBuffer;
18-
use bitvec::macros::internal::funty::Fundamental;
18+
use num_traits::AsPrimitive;
1919
use vortex::ArrayRef;
2020
use vortex::arrays::StructArray;
2121
use vortex::arrow::FromArrowArray;
@@ -116,13 +116,13 @@ pub fn flat_vector_to_arrow_array(
116116

117117
Ok(Arc::new(structs))
118118
}
119-
DUCKDB_TYPE::DUCKDB_TYPE_TIME_TZ => {
119+
DUCKDB_TYPE::DUCKDB_TYPE_TIMESTAMP_TZ => {
120120
let data = vector.as_slice_with_len::<duckdb_timestamp>(len);
121-
let micros = data.iter().map(|duckdb_timestamp { micros }| *micros);
122121
let structs = TimestampMicrosecondArray::from_iter_values_with_nulls(
123-
micros,
122+
data.iter().map(|duckdb_timestamp { micros }| *micros),
124123
vector.validity_ref(data.len()).to_null_buffer(),
125-
);
124+
)
125+
.with_timezone("UTC");
126126

127127
Ok(Arc::new(structs))
128128
}
@@ -181,7 +181,7 @@ pub fn flat_vector_to_arrow_array(
181181
let data = vector.as_slice_with_len::<duckdb_time>(len);
182182

183183
Ok(Arc::new(
184-
PrimitiveArray::<Time64MicrosecondType>::from_iter_values_with_nulls(
184+
Time64MicrosecondArray::from_iter_values_with_nulls(
185185
data.iter().map(|duckdb_time { micros }| *micros),
186186
vector.validity_ref(data.len()).to_null_buffer(),
187187
),
@@ -311,7 +311,7 @@ pub fn flat_vector_to_arrow_array(
311311

312312
Ok(Arc::new(decimal_array))
313313
}
314-
_ => todo!("missing impl for {:?}", type_id),
314+
_ => todo!("missing impl for {type_id:?}"),
315315
}
316316
}
317317

@@ -323,12 +323,13 @@ pub fn data_chunk_to_arrow(field_names: &FieldNames, chunk: &DataChunk) -> Vorte
323323
.map(|(i, name)| {
324324
let mut vector = chunk.get_vector(i);
325325
vector.flatten(len);
326-
flat_vector_to_arrow_array(&mut vector, len.as_usize())
326+
flat_vector_to_arrow_array(&mut vector, len.as_())
327327
.map(|array_data| {
328-
assert_eq!(array_data.len(), chunk.len().as_usize());
328+
let chunk_len: usize = chunk.len().as_();
329+
assert_eq!(array_data.len(), chunk_len);
329330
(name, ArrayRef::from_arrow(array_data.as_ref(), true))
330331
})
331-
.map_err(|e| vortex_err!("duckdb to arrow conversion failure {}", e.to_string()))
332+
.map_err(|e| vortex_err!("duckdb to arrow conversion failure {e}"))
332333
})
333334
.collect::<VortexResult<Vec<_>>>()?;
334335
StructArray::try_from_iter(columns).map(|a| a.to_array())

0 commit comments

Comments
 (0)