diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index 27604eec..e8c851f7 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -59,6 +59,7 @@ uuid = { version = "1.10", features = ["v4"] } tempfile = "3.23.0" snafu = "0.8.3" scopeguard = "1.2.0" +delegate = "0.13.5" [target.'cfg(target_arch = "wasm32")'.dependencies] jiff = { workspace = true, features = ["js"] } diff --git a/crates/fluss/src/metadata/datatype.rs b/crates/fluss/src/metadata/datatype.rs index 8ad4f7e5..e5ccb9a8 100644 --- a/crates/fluss/src/metadata/datatype.rs +++ b/crates/fluss/src/metadata/datatype.rs @@ -852,6 +852,36 @@ impl RowType { pub fn fields(&self) -> &Vec { &self.fields } + + pub fn get_field_index(&self, field_name: &str) -> Option { + self.fields.iter().position(|f| f.name == field_name) + } + + #[cfg(test)] + pub fn with_data_types(data_types: Vec) -> Self { + let mut fields: Vec = Vec::new(); + data_types.iter().enumerate().for_each(|(idx, data_type)| { + fields.push(DataField::new(format!("f{}", idx), data_type.clone(), None)); + }); + + Self::with_nullable(true, fields) + } + + #[cfg(test)] + pub fn with_data_types_and_field_names( + data_types: Vec, + field_names: Vec<&str>, + ) -> Self { + let fields = data_types + .into_iter() + .zip(field_names) + .map(|(data_type, field_name)| { + DataField::new(field_name.to_string(), data_type.clone(), None) + }) + .collect::>(); + + Self::with_nullable(true, fields) + } } impl Display for RowType { diff --git a/crates/fluss/src/row/binary/binary_writer.rs b/crates/fluss/src/row/binary/binary_writer.rs new file mode 100644 index 00000000..a296777a --- /dev/null +++ b/crates/fluss/src/row/binary/binary_writer.rs @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::BinaryRowFormat; + +/// Writer to write a composite data format, like row, array, +#[allow(dead_code)] +pub trait BinaryWriter { + /// Reset writer to prepare next write + fn reset(&mut self); + + /// Set null to this field + fn set_null_at(&mut self, pos: usize); + + fn write_boolean(&mut self, value: bool); + + fn write_byte(&mut self, value: u8); + + fn write_bytes(&mut self, value: &[u8]); + + fn write_char(&mut self, value: &str, length: usize); + + fn write_string(&mut self, value: &str); + + fn write_short(&mut self, value: i16); + + fn write_int(&mut self, value: i32); + + fn write_long(&mut self, value: i64); + + fn write_float(&mut self, value: f32); + + fn write_double(&mut self, value: f64); + + fn write_binary(&mut self, bytes: &[u8], length: usize); + + // TODO Decimal type + // fn write_decimal(&mut self, pos: i32, value: f64); + + // TODO Timestamp type + // fn write_timestamp_ntz(&mut self, pos: i32, value: i64); + + // TODO Timestamp type + // fn write_timestamp_ltz(&mut self, pos: i32, value: i64); + + // TODO InternalArray, ArraySerializer + // fn write_array(&mut self, pos: i32, value: i64); + + // TODO Row serializer + // fn write_row(&mut self, pos: i32, value: &InternalRow); + + /// Finally, complete write to set real size to binary. + fn complete(&mut self); +} + +pub enum ValueWriter { + Nullable(InnerValueWriter), + NonNullable(InnerValueWriter), +} + +impl ValueWriter { + pub fn create_value_writer( + element_type: &DataType, + binary_row_format: Option<&BinaryRowFormat>, + ) -> Result { + let value_writer = + InnerValueWriter::create_inner_value_writer(element_type, binary_row_format)?; + if element_type.is_nullable() { + Ok(Self::Nullable(value_writer)) + } else { + Ok(Self::NonNullable(value_writer)) + } + } + + pub fn write_value( + &self, + writer: &mut W, + pos: usize, + value: &Datum, + ) -> Result<()> { + match self { + Self::Nullable(inner_value_writer) => { + if let Datum::Null = value { + writer.set_null_at(pos); + Ok(()) + } else { + inner_value_writer.write_value(writer, pos, value) + } + } + Self::NonNullable(inner_value_writer) => { + inner_value_writer.write_value(writer, pos, value) + } + } + } +} + +#[derive(Debug)] +pub enum InnerValueWriter { + Char, + String, + Boolean, + Binary, + Bytes, + TinyInt, + SmallInt, + Int, + BigInt, + Float, + Double, + // TODO Decimal, Date, TimeWithoutTimeZone, TimestampWithoutTimeZone, TimestampWithLocalTimeZone, Array, Row +} + +/// Accessor for writing the fields/elements of a binary writer during runtime, the +/// fields/elements must be written in the order. +impl InnerValueWriter { + pub fn create_inner_value_writer( + data_type: &DataType, + _: Option<&BinaryRowFormat>, + ) -> Result { + match data_type { + DataType::Char(_) => Ok(InnerValueWriter::Char), + DataType::String(_) => Ok(InnerValueWriter::String), + DataType::Boolean(_) => Ok(InnerValueWriter::Boolean), + DataType::Binary(_) => Ok(InnerValueWriter::Binary), + DataType::Bytes(_) => Ok(InnerValueWriter::Bytes), + DataType::TinyInt(_) => Ok(InnerValueWriter::TinyInt), + DataType::SmallInt(_) => Ok(InnerValueWriter::SmallInt), + DataType::Int(_) => Ok(InnerValueWriter::Int), + DataType::BigInt(_) => Ok(InnerValueWriter::BigInt), + DataType::Float(_) => Ok(InnerValueWriter::Float), + DataType::Double(_) => Ok(InnerValueWriter::Double), + _ => unimplemented!( + "ValueWriter for DataType {:?} is currently not implemented", + data_type + ), + } + } + pub fn write_value( + &self, + writer: &mut W, + _pos: usize, + value: &Datum, + ) -> Result<()> { + match (self, value) { + (InnerValueWriter::Char, Datum::String(v)) => { + writer.write_char(v, v.len()); + } + (InnerValueWriter::String, Datum::String(v)) => { + writer.write_string(v); + } + (InnerValueWriter::Boolean, Datum::Bool(v)) => { + writer.write_boolean(*v); + } + (InnerValueWriter::Binary, Datum::Blob(v)) => { + writer.write_binary(v.as_ref(), v.len()); + } + (InnerValueWriter::Binary, Datum::BorrowedBlob(v)) => { + writer.write_binary(v.as_ref(), v.len()); + } + (InnerValueWriter::Bytes, Datum::Blob(v)) => { + writer.write_bytes(v.as_ref()); + } + (InnerValueWriter::Bytes, Datum::BorrowedBlob(v)) => { + writer.write_bytes(v.as_ref()); + } + (InnerValueWriter::TinyInt, Datum::Int8(v)) => { + writer.write_byte(*v as u8); + } + (InnerValueWriter::SmallInt, Datum::Int16(v)) => { + writer.write_short(*v); + } + (InnerValueWriter::Int, Datum::Int32(v)) => { + writer.write_int(*v); + } + (InnerValueWriter::BigInt, Datum::Int64(v)) => { + writer.write_long(*v); + } + (InnerValueWriter::Float, Datum::Float32(v)) => { + writer.write_float(v.into_inner()); + } + (InnerValueWriter::Double, Datum::Float64(v)) => { + writer.write_double(v.into_inner()); + } + _ => { + return Err(IllegalArgument { + message: format!("{:?} used to write value {:?}", self, value), + }); + } + } + Ok(()) + } +} diff --git a/crates/fluss/src/row/binary/mod.rs b/crates/fluss/src/row/binary/mod.rs new file mode 100644 index 00000000..c31cbd59 --- /dev/null +++ b/crates/fluss/src/row/binary/mod.rs @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod binary_writer; + +pub use binary_writer::*; + +/// The binary row format types, it indicates the generated [`BinaryRow`] type by the [`BinaryWriter`] +#[allow(dead_code)] +pub enum BinaryRowFormat { + Compacted, + Aligned, + Indexed, +} diff --git a/crates/fluss/src/row/compacted/compacted_key_writer.rs b/crates/fluss/src/row/compacted/compacted_key_writer.rs new file mode 100644 index 00000000..84a6b227 --- /dev/null +++ b/crates/fluss/src/row/compacted/compacted_key_writer.rs @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::row::compacted::compacted_row_writer::CompactedRowWriter; +use bytes::Bytes; + +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter}; +use delegate::delegate; + +/// A wrapping of [`CompactedRowWriter`] used to encode key columns. +/// The encoding is the same as [`CompactedRowWriter`], but is without header of null bits to +/// represent whether the field value is null or not since the key columns must be not null. +pub struct CompactedKeyWriter { + delegate: CompactedRowWriter, +} + +impl CompactedKeyWriter { + pub fn new() -> CompactedKeyWriter { + CompactedKeyWriter { + // in compacted key encoder, we don't need to set null bits as the key columns must be not + // null, to use field count 0 to init to make the null bits 0 + delegate: CompactedRowWriter::new(0), + } + } + + pub fn create_value_writer(field_type: &DataType) -> Result { + ValueWriter::create_value_writer(field_type, Some(&BinaryRowFormat::Compacted)) + } + + delegate! { + to self.delegate { + pub fn reset(&mut self); + + #[allow(dead_code)] + pub fn position(&self) -> usize; + + #[allow(dead_code)] + pub fn buffer(&self) -> &[u8]; + + pub fn to_bytes(&self) -> Bytes; + } + } +} + +impl BinaryWriter for CompactedKeyWriter { + delegate! { + to self.delegate { + fn reset(&mut self); + + fn set_null_at(&mut self, pos: usize); + + fn write_boolean(&mut self, value: bool); + + fn write_byte(&mut self, value: u8); + + fn write_binary(&mut self, bytes: &[u8], length: usize); + + fn write_bytes(&mut self, value: &[u8]); + + fn write_char(&mut self, value: &str, _length: usize); + + fn write_string(&mut self, value: &str); + + fn write_short(&mut self, value: i16); + + fn write_int(&mut self, value: i32); + + fn write_long(&mut self, value: i64); + + fn write_float(&mut self, value: f32); + + fn write_double(&mut self, value: f64); + + + } + } + + fn complete(&mut self) { + // do nothing + } +} diff --git a/crates/fluss/src/row/compacted/mod.rs b/crates/fluss/src/row/compacted/mod.rs index 695cdad9..c81eb5a5 100644 --- a/crates/fluss/src/row/compacted/mod.rs +++ b/crates/fluss/src/row/compacted/mod.rs @@ -15,4 +15,7 @@ // specific language governing permissions and limitations // under the License. +mod compacted_key_writer; mod compacted_row_writer; + +pub use compacted_key_writer::CompactedKeyWriter; diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs index 1ea39334..28a378fd 100644 --- a/crates/fluss/src/row/datum.rs +++ b/crates/fluss/src/row/datum.rs @@ -55,6 +55,8 @@ pub enum Datum<'a> { String(&'a str), #[display("{0}")] Blob(Blob), + #[display("{:?}")] + BorrowedBlob(&'a [u8]), #[display("{0}")] Decimal(Decimal), #[display("{0}")] @@ -80,6 +82,7 @@ impl Datum<'_> { pub fn as_blob(&self) -> &[u8] { match self { Self::Blob(blob) => blob.as_ref(), + Self::BorrowedBlob(blob) => blob, _ => panic!("not a blob: {self:?}"), } } @@ -289,6 +292,7 @@ impl Datum<'_> { Datum::Float64(v) => append_value_to_arrow!(Float64Builder, v.into_inner()), Datum::String(v) => append_value_to_arrow!(StringBuilder, *v), Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, v.as_ref()), + Datum::BorrowedBlob(v) => append_value_to_arrow!(BinaryBuilder, *v), Datum::Decimal(_) | Datum::Date(_) | Datum::Timestamp(_) | Datum::TimestampTz(_) => { return Err(RowConvertError { message: format!( @@ -406,6 +410,12 @@ impl From> for Blob { } } +impl<'a> From<&'a [u8]> for Datum<'a> { + fn from(bytes: &'a [u8]) -> Datum<'a> { + Datum::BorrowedBlob(bytes) + } +} + const UNIX_EPOCH_DAY: jiff::civil::Date = jiff::civil::date(1970, 1, 1); impl Date { diff --git a/crates/fluss/src/row/encode/compacted_key_encoder.rs b/crates/fluss/src/row/encode/compacted_key_encoder.rs new file mode 100644 index 00000000..b9335a3c --- /dev/null +++ b/crates/fluss/src/row/encode/compacted_key_encoder.rs @@ -0,0 +1,329 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::RowType; +use crate::row::binary::ValueWriter; +use crate::row::compacted::CompactedKeyWriter; +use crate::row::encode::KeyEncoder; +use crate::row::field_getter::FieldGetter; +use crate::row::{Datum, InternalRow}; +use bytes::Bytes; + +#[allow(dead_code)] +pub struct CompactedKeyEncoder { + field_getters: Vec, + field_encoders: Vec, + compacted_encoder: CompactedKeyWriter, +} + +impl CompactedKeyEncoder { + /// Create a key encoder to encode the key of the input row. + /// + /// # Arguments + /// * `row_type` - the row type of the input row + /// * `keys` - the key fields to encode + /// + /// # Returns + /// * key_encoder - the [`KeyEncoder`] + pub fn create_key_encoder(row_type: &RowType, keys: &[String]) -> Result { + let mut encode_col_indexes = Vec::with_capacity(keys.len()); + + for key in keys { + match row_type.get_field_index(key) { + Some(idx) => encode_col_indexes.push(idx), + None => { + return Err(IllegalArgument { + message: format!( + "Field {:?} not found in input row type {:?}", + key, row_type + ), + }); + } + } + } + + Self::new(row_type, encode_col_indexes) + } + + pub fn new(row_type: &RowType, encode_field_pos: Vec) -> Result { + let mut field_getters: Vec = Vec::with_capacity(encode_field_pos.len()); + let mut field_encoders: Vec = Vec::with_capacity(encode_field_pos.len()); + + for pos in &encode_field_pos { + let data_type = row_type.fields().get(*pos).unwrap().data_type(); + field_getters.push(FieldGetter::create(data_type, *pos)); + field_encoders.push(CompactedKeyWriter::create_value_writer(data_type)?); + } + + Ok(CompactedKeyEncoder { + field_encoders, + field_getters, + compacted_encoder: CompactedKeyWriter::new(), + }) + } +} + +#[allow(dead_code)] +impl KeyEncoder for CompactedKeyEncoder { + fn encode_key(&mut self, row: &dyn InternalRow) -> Result { + self.compacted_encoder.reset(); + + // iterate all the fields of the row, and encode each field + for (pos, field_getter) in self.field_getters.iter().enumerate() { + match &field_getter.get_field(row) { + Datum::Null => { + return Err(IllegalArgument { + message: format!( + "Cannot encode key with null value at position: {:?}", + pos + ), + }); + } + value => self.field_encoders.get(pos).unwrap().write_value( + &mut self.compacted_encoder, + pos, + value, + )?, + } + } + + Ok(self.compacted_encoder.to_bytes()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::DataTypes; + use crate::row::{Datum, GenericRow}; + + pub fn for_test_row_type(row_type: &RowType) -> CompactedKeyEncoder { + CompactedKeyEncoder::new(row_type, (0..row_type.fields().len()).collect()) + .expect("CompactedKeyEncoder initialization failed") + } + + #[test] + fn test_encode_key() { + let row_type = RowType::with_data_types(vec![ + DataTypes::int(), + DataTypes::bigint(), + DataTypes::int(), + ]); + let row = GenericRow::from_data(vec![ + Datum::from(1i32), + Datum::from(3i64), + Datum::from(2i32), + ]); + + let mut encoder = for_test_row_type(&row_type); + + assert_eq!( + encoder.encode_key(&row).unwrap().iter().as_slice(), + [1u8, 3u8, 2u8] + ); + + let row = GenericRow::from_data(vec![ + Datum::from(2i32), + Datum::from(5i64), + Datum::from(6i32), + ]); + + assert_eq!( + encoder.encode_key(&row).unwrap().iter().as_slice(), + [2u8, 5u8, 6u8] + ); + } + + #[test] + fn test_encode_key_with_key_names() { + let data_types = vec![ + DataTypes::string(), + DataTypes::bigint(), + DataTypes::string(), + ]; + let field_names = vec!["partition", "f1", "f2"]; + + let row_type = RowType::with_data_types_and_field_names(data_types, field_names); + + let primary_keys = &["f2".to_string()]; + + let mut encoder = CompactedKeyEncoder::create_key_encoder(&row_type, primary_keys).unwrap(); + + let row = GenericRow::from_data(vec![ + Datum::from("p1"), + Datum::from(1i64), + Datum::from("a2"), + ]); + + // should only get "a2" 's ASCII representation + assert_eq!( + encoder.encode_key(&row).unwrap().iter().as_slice(), + // 2 (start of text), 97 (the letter a), 50 (the number 2) + [2u8, 97u8, 50u8] + ); + } + + #[test] + #[should_panic(expected = "Cannot encode key with null value at position: 2")] + fn test_null_primary_key() { + let row_type = RowType::with_data_types(vec![ + DataTypes::int(), + DataTypes::bigint(), + DataTypes::int(), + DataTypes::string(), + ]); + + let primary_key_indices = vec![0, 1, 2]; + + let mut encoder = CompactedKeyEncoder::new(&row_type, primary_key_indices) + .expect("CompactedKeyEncoder initialization failed"); + + let row = GenericRow::from_data(vec![ + Datum::from(1i32), + Datum::from(3i64), + Datum::from(2i32), + Datum::from("a2"), + ]); + + assert_eq!( + encoder.encode_key(&row).unwrap().iter().as_slice(), + [1u8, 3u8, 2u8] + ); + + let row = GenericRow::from_data(vec![ + Datum::from(1i32), + Datum::from(3i64), + Datum::Null, + Datum::from("a2"), + ]); + + encoder.encode_key(&row).unwrap(); + } + + #[test] + fn test_int_string_as_primary_key() { + let row_type = RowType::with_data_types(vec![ + DataTypes::string(), + DataTypes::int(), + DataTypes::string(), + DataTypes::string(), + ]); + + let primary_key_indices = vec![1, 2]; + let mut encoder = CompactedKeyEncoder::new(&row_type, primary_key_indices) + .expect("CompactedKeyEncoder initialization failed"); + + let row = GenericRow::from_data(vec![ + Datum::from("a1"), + Datum::from(1i32), + Datum::from("a2"), + Datum::from("a3"), + ]); + + assert_eq!( + encoder.encode_key(&row).unwrap().iter().as_slice(), + // 1 (1i32), 2 (start of text), 97 (the letter a), 50 (the number 2) + [1u8, 2u8, 97u8, 50u8] + ); + } + + #[test] + fn test_all_data_types() { + let row_type = RowType::with_data_types(vec![ + DataTypes::boolean(), + DataTypes::tinyint(), + DataTypes::smallint(), + DataTypes::int(), + DataTypes::bigint(), + DataTypes::float(), + DataTypes::double(), + // TODO Date + // TODO Time + DataTypes::binary(20), + DataTypes::bytes(), + DataTypes::char(2), + DataTypes::string(), + // TODO Decimal + // TODO Timestamp + // TODO Timestamp LTZ + // TODO Array of Int + // TODO Array of Float + // TODO Array of String + // TODO: Add Map and Row fields in Issue #1973 + ]); + + let row = GenericRow::from_data(vec![ + Datum::from(true), + Datum::from(2i8), + Datum::from(10i16), + Datum::from(100i32), + Datum::from(-6101065172474983726i64), // from Java test case: new BigInteger("12345678901234567890").longValue() + Datum::from(13.2f32), + Datum::from(15.21f64), + // TODO Date + // TODO Time + Datum::from("1234567890".as_bytes()), + Datum::from("20".as_bytes()), + Datum::from("1"), + Datum::from("hello"), + // TODO Decimal + // TODO Timestamp + // TODO Timestamp LTZ + // TODO Array of Int + // TODO Array of Float + // TODO Array of String + // TODO: Add Map and Row fields in Issue #1973 + ]); + + let mut encoder = for_test_row_type(&row_type); + + let mut expected: Vec = Vec::new(); + // BOOLEAN: true + expected.extend(vec![0x01]); + // TINYINT: 2 + expected.extend(vec![0x02]); + // SMALLINT: 10 + expected.extend(vec![0x0A]); + // INT: 100 + expected.extend(vec![0x00, 0x64]); + // BIGINT: -6101065172474983726 + expected.extend(vec![ + 0xD2, 0x95, 0xFC, 0xD8, 0xCE, 0xB1, 0xAA, 0xAA, 0xAB, 0x01, + ]); + // FLOAT: 13.2 + expected.extend(vec![0x33, 0x33, 0x53, 0x41]); + // DOUBLE: 15.21 + expected.extend(vec![0xEC, 0x51, 0xB8, 0x1E, 0x85, 0x6B, 0x2E, 0x40]); + // BINARY(20): "1234567890".getBytes() + expected.extend(vec![ + 0x0A, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x30, + ]); + + // BYTES: "20".getBytes() + expected.extend(vec![0x02, 0x32, 0x30]); + // CHAR(2): "1" + expected.extend(vec![0x01, 0x31]); + // STRING: String: "hello" + expected.extend(vec![0x05, 0x68, 0x65, 0x6C, 0x6C, 0x6F]); + assert_eq!( + encoder.encode_key(&row).unwrap().iter().as_slice(), + expected.as_slice() + ); + } +} diff --git a/crates/fluss/src/row/encode/mod.rs b/crates/fluss/src/row/encode/mod.rs new file mode 100644 index 00000000..6c6eed99 --- /dev/null +++ b/crates/fluss/src/row/encode/mod.rs @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod compacted_key_encoder; + +use crate::error::Result; +use crate::metadata::{DataLakeFormat, RowType}; +use crate::row::InternalRow; +use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder; +use bytes::Bytes; + +/// An interface for encoding key of row into bytes. +#[allow(dead_code)] +pub trait KeyEncoder { + fn encode_key(&mut self, row: &dyn InternalRow) -> Result; +} + +#[allow(dead_code)] +impl dyn KeyEncoder { + /// Create a key encoder to encode the key bytes of the input row. + /// # Arguments + /// * `row_type` - the row type of the input row + /// * `key_fields` - the key fields to encode + /// * `lake_format` - the data lake format + /// + /// # Returns + /// key encoder + pub fn of( + row_type: &RowType, + key_fields: Vec, + data_lake_format: Option, + ) -> Result> { + match data_lake_format { + Some(DataLakeFormat::Paimon) => { + unimplemented!("KeyEncoder for Paimon format is currently unimplemented") + } + Some(DataLakeFormat::Lance) => Ok(Box::new(CompactedKeyEncoder::create_key_encoder( + row_type, + key_fields.as_slice(), + )?)), + Some(DataLakeFormat::Iceberg) => { + unimplemented!("KeyEncoder for Iceberg format is currently unimplemented") + } + None => Ok(Box::new(CompactedKeyEncoder::create_key_encoder( + row_type, + key_fields.as_slice(), + )?)), + } + } +} diff --git a/crates/fluss/src/row/field_getter.rs b/crates/fluss/src/row/field_getter.rs new file mode 100644 index 00000000..3a9cf0fa --- /dev/null +++ b/crates/fluss/src/row/field_getter.rs @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::DataType; +use crate::row::{Datum, InternalRow}; + +pub enum FieldGetter { + Nullable(InnerFieldGetter), + NonNullable(InnerFieldGetter), +} +impl FieldGetter { + pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Datum<'a> { + match self { + FieldGetter::Nullable(getter) => { + if row.is_null_at(getter.pos()) { + Datum::Null + } else { + getter.get_field(row) + } + } + FieldGetter::NonNullable(getter) => getter.get_field(row), + } + } + + pub fn create(data_type: &DataType, pos: usize) -> FieldGetter { + let inner_field_getter = match data_type { + DataType::Char(t) => InnerFieldGetter::Char { + pos, + len: t.length() as usize, + }, + DataType::String(_) => InnerFieldGetter::String { pos }, + DataType::Boolean(_) => InnerFieldGetter::Bool { pos }, + DataType::Binary(t) => InnerFieldGetter::Binary { + pos, + len: t.length(), + }, + DataType::Bytes(_) => InnerFieldGetter::Bytes { pos }, + DataType::TinyInt(_) => InnerFieldGetter::TinyInt { pos }, + DataType::SmallInt(_) => InnerFieldGetter::SmallInt { pos }, + DataType::Int(_) => InnerFieldGetter::Int { pos }, + DataType::BigInt(_) => InnerFieldGetter::BigInt { pos }, + DataType::Float(_) => InnerFieldGetter::Float { pos }, + DataType::Double(_) => InnerFieldGetter::Double { pos }, + _ => unimplemented!("DataType {:?} is currently unimplemented", data_type), + }; + + if data_type.is_nullable() { + Self::Nullable(inner_field_getter) + } else { + Self::NonNullable(inner_field_getter) + } + } +} + +pub enum InnerFieldGetter { + Char { pos: usize, len: usize }, + String { pos: usize }, + Bool { pos: usize }, + Binary { pos: usize, len: usize }, + Bytes { pos: usize }, + TinyInt { pos: usize }, + SmallInt { pos: usize }, + Int { pos: usize }, + BigInt { pos: usize }, + Float { pos: usize }, + Double { pos: usize }, +} + +impl InnerFieldGetter { + pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Datum<'a> { + match self { + InnerFieldGetter::Char { pos, len } => Datum::String(row.get_char(*pos, *len)), + InnerFieldGetter::String { pos } => Datum::from(row.get_string(*pos)), + InnerFieldGetter::Bool { pos } => Datum::from(row.get_boolean(*pos)), + InnerFieldGetter::Binary { pos, len } => Datum::from(row.get_binary(*pos, *len)), + InnerFieldGetter::Bytes { pos } => Datum::from(row.get_bytes(*pos)), + InnerFieldGetter::TinyInt { pos } => Datum::from(row.get_byte(*pos)), + InnerFieldGetter::SmallInt { pos } => Datum::from(row.get_short(*pos)), + InnerFieldGetter::Int { pos } => Datum::from(row.get_int(*pos)), + InnerFieldGetter::BigInt { pos } => Datum::from(row.get_long(*pos)), + InnerFieldGetter::Float { pos } => Datum::from(row.get_float(*pos)), + InnerFieldGetter::Double { pos } => Datum::from(row.get_double(*pos)), + //TODO Decimal, Date, Time, Timestamp, TimestampLTZ, Array, Map, Row + } + } + + pub fn pos(&self) -> usize { + match self { + Self::Char { pos, .. } + | Self::String { pos } + | Self::Bool { pos } + | Self::Binary { pos, .. } + | Self::Bytes { pos } + | Self::TinyInt { pos } + | Self::SmallInt { pos, .. } + | Self::Int { pos } + | Self::BigInt { pos } + | Self::Float { pos, .. } + | Self::Double { pos } => *pos, + } + } +} diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index 86fdf90c..c321ab9d 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -19,11 +19,15 @@ mod column; mod datum; +mod binary; mod compacted; +mod encode; +mod field_getter; pub use column::*; pub use datum::*; +// TODO make functions return Result for better error handling pub trait InternalRow { /// Returns the number of fields in this row fn get_field_count(&self) -> usize; @@ -143,6 +147,11 @@ impl<'a> Default for GenericRow<'a> { } impl<'a> GenericRow<'a> { + pub fn from_data(data: Vec>>) -> GenericRow<'a> { + GenericRow { + values: data.into_iter().map(Into::into).collect(), + } + } pub fn new() -> GenericRow<'a> { GenericRow { values: vec![] } }