Skip to content

Commit 15d0e3a

Browse files
committed
[FIX/ADD] fix and changes to accomodate new variant types
1 parent d1b3774 commit 15d0e3a

File tree

3 files changed

+97
-18
lines changed

3 files changed

+97
-18
lines changed

parquet-variant/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,6 @@ rust-version = { workspace = true }
3434
arrow-schema = "55.1.0"
3535
chrono = { workspace = true }
3636
serde_json = "1.0"
37+
base64 = "0.21"
3738

3839
[lib]

parquet-variant/src/encoder/variant_to_json.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! Module for converting Variant data to JSON format
1919
2020
use arrow_schema::ArrowError;
21+
use base64::{Engine as _, engine::general_purpose};
2122
use serde_json::Value;
2223
use std::io::Write;
2324

@@ -67,6 +68,55 @@ pub fn variant_to_json<W: Write>(
6768
Variant::Int8(i) => {
6869
write!(json_buffer, "{}", i)?;
6970
}
71+
Variant::Int16(i) => {
72+
write!(json_buffer, "{}", i)?;
73+
}
74+
Variant::Int32(i) => {
75+
write!(json_buffer, "{}", i)?;
76+
}
77+
Variant::Int64(i) => {
78+
write!(json_buffer, "{}", i)?;
79+
}
80+
Variant::Float(f) => {
81+
write!(json_buffer, "{}", f)?;
82+
}
83+
Variant::Double(f) => {
84+
write!(json_buffer, "{}", f)?;
85+
}
86+
Variant::Decimal4 { integer, scale } => {
87+
// Convert decimal to string representation
88+
let divisor = 10_i32.pow(*scale as u32);
89+
let decimal_value = *integer as f64 / divisor as f64;
90+
write!(json_buffer, "{}", decimal_value)?;
91+
}
92+
Variant::Decimal8 { integer, scale } => {
93+
// Convert decimal to string representation
94+
let divisor = 10_i64.pow(*scale as u32);
95+
let decimal_value = *integer as f64 / divisor as f64;
96+
write!(json_buffer, "{}", decimal_value)?;
97+
}
98+
Variant::Decimal16 { integer, scale } => {
99+
// Convert decimal to string representation
100+
let divisor = 10_i128.pow(*scale as u32);
101+
let decimal_value = *integer as f64 / divisor as f64;
102+
write!(json_buffer, "{}", decimal_value)?;
103+
}
104+
Variant::Date(date) => {
105+
write!(json_buffer, "\"{}\"", date.format("%Y-%m-%d"))?;
106+
}
107+
Variant::TimestampMicros(ts) => {
108+
write!(json_buffer, "\"{}\"", ts.to_rfc3339())?;
109+
}
110+
Variant::TimestampNtzMicros(ts) => {
111+
write!(json_buffer, "\"{}\"", ts.format("%Y-%m-%dT%H:%M:%S%.6f"))?;
112+
}
113+
Variant::Binary(bytes) => {
114+
// Encode binary as base64 string
115+
let base64_str = general_purpose::STANDARD.encode(bytes);
116+
let json_str = serde_json::to_string(&base64_str)
117+
.map_err(|e| ArrowError::InvalidArgumentError(format!("JSON encoding error: {}", e)))?;
118+
write!(json_buffer, "{}", json_str)?;
119+
}
70120
Variant::String(s) | Variant::ShortString(s) => {
71121
// Use serde_json to properly escape the string
72122
let json_str = serde_json::to_string(s)
@@ -198,6 +248,44 @@ pub fn variant_to_json_value(variant: &Variant) -> Result<Value, ArrowError> {
198248
Variant::BooleanTrue => Ok(Value::Bool(true)),
199249
Variant::BooleanFalse => Ok(Value::Bool(false)),
200250
Variant::Int8(i) => Ok(Value::Number((*i).into())),
251+
Variant::Int16(i) => Ok(Value::Number((*i).into())),
252+
Variant::Int32(i) => Ok(Value::Number((*i).into())),
253+
Variant::Int64(i) => Ok(Value::Number((*i).into())),
254+
Variant::Float(f) => {
255+
serde_json::Number::from_f64(*f as f64)
256+
.map(Value::Number)
257+
.ok_or_else(|| ArrowError::InvalidArgumentError("Invalid float value".to_string()))
258+
}
259+
Variant::Double(f) => {
260+
serde_json::Number::from_f64(*f)
261+
.map(Value::Number)
262+
.ok_or_else(|| ArrowError::InvalidArgumentError("Invalid double value".to_string()))
263+
}
264+
Variant::Decimal4 { integer, scale } => {
265+
let divisor = 10_i32.pow(*scale as u32);
266+
let decimal_value = *integer as f64 / divisor as f64;
267+
serde_json::Number::from_f64(decimal_value)
268+
.map(Value::Number)
269+
.ok_or_else(|| ArrowError::InvalidArgumentError("Invalid decimal value".to_string()))
270+
}
271+
Variant::Decimal8 { integer, scale } => {
272+
let divisor = 10_i64.pow(*scale as u32);
273+
let decimal_value = *integer as f64 / divisor as f64;
274+
serde_json::Number::from_f64(decimal_value)
275+
.map(Value::Number)
276+
.ok_or_else(|| ArrowError::InvalidArgumentError("Invalid decimal value".to_string()))
277+
}
278+
Variant::Decimal16 { integer, scale } => {
279+
let divisor = 10_i128.pow(*scale as u32);
280+
let decimal_value = *integer as f64 / divisor as f64;
281+
serde_json::Number::from_f64(decimal_value)
282+
.map(Value::Number)
283+
.ok_or_else(|| ArrowError::InvalidArgumentError("Invalid decimal value".to_string()))
284+
}
285+
Variant::Date(date) => Ok(Value::String(date.format("%Y-%m-%d").to_string())),
286+
Variant::TimestampMicros(ts) => Ok(Value::String(ts.to_rfc3339())),
287+
Variant::TimestampNtzMicros(ts) => Ok(Value::String(ts.format("%Y-%m-%dT%H:%M:%S%.6f").to_string())),
288+
Variant::Binary(bytes) => Ok(Value::String(general_purpose::STANDARD.encode(bytes))),
201289
Variant::String(s) | Variant::ShortString(s) => Ok(Value::String(s.to_string())),
202290
Variant::Object(obj) => {
203291
let mut map = serde_json::Map::new();

parquet-variant/src/variant.rs

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ pub struct VariantObject<'m, 'v> {
310310
impl<'m, 'v> VariantObject<'m, 'v> {
311311
pub fn fields(&self) -> Result<impl Iterator<Item = (&'m str, Variant<'m, 'v>)>, ArrowError> {
312312
// Parse the object header to get information about field layout
313-
let value_header = *first_byte_from_slice(self.value)? >> 2;
313+
let value_header = *first_byte_from_slice(self.value_data)? >> 2;
314314
let is_large = (value_header & 0x08) != 0; // 4th bit from the right
315315
let field_id_size_minus_one = (value_header >> 2) & 0x03; // bits 4-5
316316
let field_offset_size_minus_one = value_header & 0x03; // Last two bits
@@ -326,30 +326,30 @@ impl<'m, 'v> VariantObject<'m, 'v> {
326326
};
327327

328328
// Skip the header byte to read the num_fields
329-
let num_fields = num_fields_size.unpack_usize(self.value, 1, 0)?;
329+
let num_fields = num_fields_size.unpack_usize(self.value_data, 1, 0)?;
330330
let first_field_id_byte = 1 + num_fields_size as usize;
331331

332332
// Collect all field information
333333
let mut fields = Vec::new();
334334

335335
for i in 0..num_fields {
336336
// Get field ID
337-
let field_id = field_id_size.unpack_usize(self.value, first_field_id_byte, i)?;
337+
let field_id = field_id_size.unpack_usize(self.value_data, first_field_id_byte, i)?;
338338

339339
// Get field name from metadata
340340
let field_name = self.metadata.get_field_by(field_id)?;
341341

342342
// Calculate offset positions
343343
let first_offset_byte = first_field_id_byte + num_fields * (field_id_size as usize);
344-
let start_offset = field_offset_size.unpack_usize(self.value, first_offset_byte, i)?;
345-
let end_offset = field_offset_size.unpack_usize(self.value, first_offset_byte, i + 1)?;
344+
let start_offset = field_offset_size.unpack_usize(self.value_data, first_offset_byte, i)?;
345+
let end_offset = field_offset_size.unpack_usize(self.value_data, first_offset_byte, i + 1)?;
346346

347347
// Calculate the start of the values section
348348
let first_value_byte = first_offset_byte + (num_fields + 1) * (field_offset_size as usize);
349349

350350
// Extract field value bytes
351351
let field_value_bytes = slice_from_slice(
352-
self.value,
352+
self.value_data,
353353
first_value_byte + start_offset..first_value_byte + end_offset,
354354
)?;
355355

@@ -385,15 +385,15 @@ impl<'m, 'v> VariantArray<'m, 'v> {
385385
/// Return the length of this array
386386
pub fn len(&self) -> usize {
387387
// Parse the array header to get the number of elements
388-
if let Ok(value_header) = first_byte_from_slice(self.value).map(|b| *b >> 2) {
388+
if let Ok(value_header) = first_byte_from_slice(self.value_data).map(|b| *b >> 2) {
389389
let is_large = (value_header & 0x04) != 0; // 3rd bit from the right
390390
let num_elements_size = match is_large {
391391
true => OffsetSizeBytes::Four,
392392
false => OffsetSizeBytes::One,
393393
};
394394

395395
// Skip the header byte to read the num_elements
396-
num_elements_size.unpack_usize(self.value, 1, 0).unwrap_or(0)
396+
num_elements_size.unpack_usize(self.value_data, 1, 0).unwrap_or(0)
397397
} else {
398398
0 // Return 0 if we can't read the header
399399
}
@@ -1152,16 +1152,6 @@ impl From<f64> for Variant<'_, '_> {
11521152
}
11531153
}
11541154

1155-
impl From<bool> for Variant<'_, '_> {
1156-
fn from(value: bool) -> Self {
1157-
if value {
1158-
Variant::BooleanTrue
1159-
} else {
1160-
Variant::BooleanFalse
1161-
}
1162-
}
1163-
}
1164-
11651155
impl From<NaiveDate> for Variant<'_, '_> {
11661156
fn from(value: NaiveDate) -> Self {
11671157
Variant::Date(value)

0 commit comments

Comments
 (0)