Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
0350ba1
Introduce CompactedKeyEncoder, initial commit
leekeiabstraction Jan 3, 2026
a02719a
Introduce CompactedKeyEncoder: working test cases
leekeiabstraction Jan 3, 2026
bf9215d
Use Datum, remove Value
leekeiabstraction Jan 4, 2026
111b08f
All data type unit test
leekeiabstraction Jan 4, 2026
082257d
Use Result in KeyEncoder
leekeiabstraction Jan 4, 2026
f506ab0
Update todo comment
leekeiabstraction Jan 4, 2026
b6e308e
Mark test methods as cfg(test)
leekeiabstraction Jan 4, 2026
a71db15
ValueWriter documentation
leekeiabstraction Jan 4, 2026
006d593
Minor refactoring
leekeiabstraction Jan 4, 2026
7dcea05
Add null check
leekeiabstraction Jan 4, 2026
a6c26a6
Improve todo message
leekeiabstraction Jan 4, 2026
fe7da7b
Move licence to top
leekeiabstraction Jan 4, 2026
93ca8ab
More readable test case
leekeiabstraction Jan 5, 2026
c668dc5
Improve error message, use write_bytes for BytesWriter
leekeiabstraction Jan 5, 2026
ad3bd4a
Fix documentation
leekeiabstraction Jan 5, 2026
ac1d4b3
Use Result<> to return CompactedKeyEncoder and ValueWriter for better…
leekeiabstraction Jan 5, 2026
f29e2cc
More idiomatic implementation of encode_key
leekeiabstraction Jan 5, 2026
cbd0e0f
Improve documentation
leekeiabstraction Jan 5, 2026
352f157
Improve error message
leekeiabstraction Jan 5, 2026
16f4514
Improve error message
leekeiabstraction Jan 5, 2026
07d6105
Minor refactor
leekeiabstraction Jan 5, 2026
145830a
Formatting and clippy
leekeiabstraction Jan 5, 2026
e1d85a6
Formatting and clippy
leekeiabstraction Jan 5, 2026
1ec7693
Addressed PR comments
leekeiabstraction Jan 7, 2026
700bb47
Addressed PR comments
leekeiabstraction Jan 7, 2026
ca4036c
Improve error message
leekeiabstraction Jan 7, 2026
4df3a81
Clippy
leekeiabstraction Jan 7, 2026
bcb7e08
Improve error message
leekeiabstraction Jan 7, 2026
b496612
Improve error message
leekeiabstraction Jan 7, 2026
043e1d4
Improve and remove duplicate todos
leekeiabstraction Jan 7, 2026
0ac34be
Fix test
leekeiabstraction Jan 7, 2026
88ec810
More succinct code
leekeiabstraction Jan 7, 2026
d5a916b
More succinct code
leekeiabstraction Jan 7, 2026
eb70020
Use static dispatch for more performant code
leekeiabstraction Jan 7, 2026
eff7a88
Use static dispatch for better performance
leekeiabstraction Jan 8, 2026
b02233a
Move for_test_row_type function into test module
leekeiabstraction Jan 9, 2026
3f5b938
More idiomatic row type building
leekeiabstraction Jan 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/fluss/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ uuid = { version = "1.10", features = ["v4"] }
tempfile = "3.23.0"
snafu = "0.8.3"
scopeguard = "1.2.0"
delegate = "0.13.5"

[target.'cfg(target_arch = "wasm32")'.dependencies]
jiff = { workspace = true, features = ["js"] }
Expand Down
30 changes: 30 additions & 0 deletions crates/fluss/src/metadata/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,36 @@ impl RowType {
pub fn fields(&self) -> &Vec<DataField> {
&self.fields
}

pub fn get_field_index(&self, field_name: &str) -> Option<usize> {
self.fields.iter().position(|f| f.name == field_name)
}

#[cfg(test)]
pub fn with_data_types(data_types: Vec<DataType>) -> Self {
let mut fields: Vec<DataField> = Vec::new();
data_types.iter().enumerate().for_each(|(idx, data_type)| {
fields.push(DataField::new(format!("f{}", idx), data_type.clone(), None));
});

Self::with_nullable(true, fields)
}

#[cfg(test)]
pub fn with_data_types_and_field_names(
data_types: Vec<DataType>,
field_names: Vec<&str>,
) -> Self {
let fields = data_types
.into_iter()
.zip(field_names)
.map(|(data_type, field_name)| {
DataField::new(field_name.to_string(), data_type.clone(), None)
})
.collect::<Vec<_>>();

Self::with_nullable(true, fields)
}
}

impl Display for RowType {
Expand Down
210 changes: 210 additions & 0 deletions crates/fluss/src/row/binary/binary_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::error::Error::IllegalArgument;
use crate::error::Result;
use crate::metadata::DataType;
use crate::row::Datum;
use crate::row::binary::BinaryRowFormat;

/// Writer to write a composite data format, like row, array,
#[allow(dead_code)]
pub trait BinaryWriter {
/// Reset writer to prepare next write
fn reset(&mut self);

/// Set null to this field
fn set_null_at(&mut self, pos: usize);

fn write_boolean(&mut self, value: bool);

fn write_byte(&mut self, value: u8);

fn write_bytes(&mut self, value: &[u8]);

fn write_char(&mut self, value: &str, length: usize);

fn write_string(&mut self, value: &str);

fn write_short(&mut self, value: i16);

fn write_int(&mut self, value: i32);

fn write_long(&mut self, value: i64);

fn write_float(&mut self, value: f32);

fn write_double(&mut self, value: f64);

fn write_binary(&mut self, bytes: &[u8], length: usize);

// TODO Decimal type
// fn write_decimal(&mut self, pos: i32, value: f64);

// TODO Timestamp type
// fn write_timestamp_ntz(&mut self, pos: i32, value: i64);

// TODO Timestamp type
// fn write_timestamp_ltz(&mut self, pos: i32, value: i64);

// TODO InternalArray, ArraySerializer
// fn write_array(&mut self, pos: i32, value: i64);

// TODO Row serializer
// fn write_row(&mut self, pos: i32, value: &InternalRow);

/// Finally, complete write to set real size to binary.
fn complete(&mut self);
}

pub enum ValueWriter {
Nullable(InnerValueWriter),
NonNullable(InnerValueWriter),
}

impl ValueWriter {
pub fn create_value_writer(
element_type: &DataType,
binary_row_format: Option<&BinaryRowFormat>,
) -> Result<ValueWriter> {
let value_writer =
InnerValueWriter::create_inner_value_writer(element_type, binary_row_format)?;
if element_type.is_nullable() {
Ok(Self::Nullable(value_writer))
} else {
Ok(Self::NonNullable(value_writer))
}
}

pub fn write_value<W: BinaryWriter>(
&self,
writer: &mut W,
pos: usize,
value: &Datum,
) -> Result<()> {
match self {
Self::Nullable(inner_value_writer) => {
if let Datum::Null = value {
writer.set_null_at(pos);
Ok(())
} else {
inner_value_writer.write_value(writer, pos, value)
}
}
Self::NonNullable(inner_value_writer) => {
inner_value_writer.write_value(writer, pos, value)
}
}
}
}

#[derive(Debug)]
pub enum InnerValueWriter {
Char,
String,
Boolean,
Binary,
Bytes,
TinyInt,
SmallInt,
Int,
BigInt,
Float,
Double,
// TODO Decimal, Date, TimeWithoutTimeZone, TimestampWithoutTimeZone, TimestampWithLocalTimeZone, Array, Row
}

/// Accessor for writing the fields/elements of a binary writer during runtime, the
/// fields/elements must be written in the order.
impl InnerValueWriter {
pub fn create_inner_value_writer(
data_type: &DataType,
_: Option<&BinaryRowFormat>,
) -> Result<InnerValueWriter> {
match data_type {
DataType::Char(_) => Ok(InnerValueWriter::Char),
DataType::String(_) => Ok(InnerValueWriter::String),
DataType::Boolean(_) => Ok(InnerValueWriter::Boolean),
DataType::Binary(_) => Ok(InnerValueWriter::Binary),
DataType::Bytes(_) => Ok(InnerValueWriter::Bytes),
DataType::TinyInt(_) => Ok(InnerValueWriter::TinyInt),
DataType::SmallInt(_) => Ok(InnerValueWriter::SmallInt),
DataType::Int(_) => Ok(InnerValueWriter::Int),
DataType::BigInt(_) => Ok(InnerValueWriter::BigInt),
DataType::Float(_) => Ok(InnerValueWriter::Float),
DataType::Double(_) => Ok(InnerValueWriter::Double),
_ => unimplemented!(
"ValueWriter for DataType {:?} is currently not implemented",
data_type
),
}
}
pub fn write_value<W: BinaryWriter>(
&self,
writer: &mut W,
_pos: usize,
value: &Datum,
) -> Result<()> {
match (self, value) {
(InnerValueWriter::Char, Datum::String(v)) => {
writer.write_char(v, v.len());
}
(InnerValueWriter::String, Datum::String(v)) => {
writer.write_string(v);
}
(InnerValueWriter::Boolean, Datum::Bool(v)) => {
writer.write_boolean(*v);
}
(InnerValueWriter::Binary, Datum::Blob(v)) => {
writer.write_binary(v.as_ref(), v.len());
}
(InnerValueWriter::Binary, Datum::BorrowedBlob(v)) => {
writer.write_binary(v.as_ref(), v.len());
}
(InnerValueWriter::Bytes, Datum::Blob(v)) => {
writer.write_bytes(v.as_ref());
}
(InnerValueWriter::Bytes, Datum::BorrowedBlob(v)) => {
writer.write_bytes(v.as_ref());
}
(InnerValueWriter::TinyInt, Datum::Int8(v)) => {
writer.write_byte(*v as u8);
}
(InnerValueWriter::SmallInt, Datum::Int16(v)) => {
writer.write_short(*v);
}
(InnerValueWriter::Int, Datum::Int32(v)) => {
writer.write_int(*v);
}
(InnerValueWriter::BigInt, Datum::Int64(v)) => {
writer.write_long(*v);
}
(InnerValueWriter::Float, Datum::Float32(v)) => {
writer.write_float(v.into_inner());
}
(InnerValueWriter::Double, Datum::Float64(v)) => {
writer.write_double(v.into_inner());
}
_ => {
return Err(IllegalArgument {
message: format!("{:?} used to write value {:?}", self, value),
});
}
}
Ok(())
}
}
28 changes: 28 additions & 0 deletions crates/fluss/src/row/binary/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod binary_writer;

pub use binary_writer::*;

/// The binary row format types, it indicates the generated [`BinaryRow`] type by the [`BinaryWriter`]
#[allow(dead_code)]
pub enum BinaryRowFormat {
Compacted,
Aligned,
Indexed,
}
97 changes: 97 additions & 0 deletions crates/fluss/src/row/compacted/compacted_key_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::row::compacted::compacted_row_writer::CompactedRowWriter;
use bytes::Bytes;

use crate::error::Result;
use crate::metadata::DataType;
use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter};
use delegate::delegate;

/// A wrapping of [`CompactedRowWriter`] used to encode key columns.
/// The encoding is the same as [`CompactedRowWriter`], but is without header of null bits to
/// represent whether the field value is null or not since the key columns must be not null.
pub struct CompactedKeyWriter {
delegate: CompactedRowWriter,
}

impl CompactedKeyWriter {
pub fn new() -> CompactedKeyWriter {
CompactedKeyWriter {
// in compacted key encoder, we don't need to set null bits as the key columns must be not
// null, to use field count 0 to init to make the null bits 0
delegate: CompactedRowWriter::new(0),
}
}

pub fn create_value_writer(field_type: &DataType) -> Result<ValueWriter> {
ValueWriter::create_value_writer(field_type, Some(&BinaryRowFormat::Compacted))
}

delegate! {
to self.delegate {
pub fn reset(&mut self);

#[allow(dead_code)]
pub fn position(&self) -> usize;

#[allow(dead_code)]
pub fn buffer(&self) -> &[u8];

pub fn to_bytes(&self) -> Bytes;
}
}
}

impl BinaryWriter for CompactedKeyWriter {
delegate! {
to self.delegate {
fn reset(&mut self);

fn set_null_at(&mut self, pos: usize);

fn write_boolean(&mut self, value: bool);

fn write_byte(&mut self, value: u8);

fn write_binary(&mut self, bytes: &[u8], length: usize);

fn write_bytes(&mut self, value: &[u8]);

fn write_char(&mut self, value: &str, _length: usize);

fn write_string(&mut self, value: &str);

fn write_short(&mut self, value: i16);

fn write_int(&mut self, value: i32);

fn write_long(&mut self, value: i64);

fn write_float(&mut self, value: f32);

fn write_double(&mut self, value: f64);


}
}

fn complete(&mut self) {
// do nothing
}
}
3 changes: 3 additions & 0 deletions crates/fluss/src/row/compacted/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@
// specific language governing permissions and limitations
// under the License.

mod compacted_key_writer;
mod compacted_row_writer;

pub use compacted_key_writer::CompactedKeyWriter;
Loading
Loading