Skip to content

Commit 4b95fc2

Browse files
feat: introduce CompactedKeyEncoder (#124)
1 parent ff7e869 commit 4b95fc2

File tree

11 files changed

+897
-0
lines changed

11 files changed

+897
-0
lines changed

crates/fluss/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ uuid = { version = "1.10", features = ["v4"] }
5959
tempfile = "3.23.0"
6060
snafu = "0.8.3"
6161
scopeguard = "1.2.0"
62+
delegate = "0.13.5"
6263

6364
[target.'cfg(target_arch = "wasm32")'.dependencies]
6465
jiff = { workspace = true, features = ["js"] }

crates/fluss/src/metadata/datatype.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,36 @@ impl RowType {
852852
pub fn fields(&self) -> &Vec<DataField> {
853853
&self.fields
854854
}
855+
856+
pub fn get_field_index(&self, field_name: &str) -> Option<usize> {
857+
self.fields.iter().position(|f| f.name == field_name)
858+
}
859+
860+
#[cfg(test)]
861+
pub fn with_data_types(data_types: Vec<DataType>) -> Self {
862+
let mut fields: Vec<DataField> = Vec::new();
863+
data_types.iter().enumerate().for_each(|(idx, data_type)| {
864+
fields.push(DataField::new(format!("f{}", idx), data_type.clone(), None));
865+
});
866+
867+
Self::with_nullable(true, fields)
868+
}
869+
870+
#[cfg(test)]
871+
pub fn with_data_types_and_field_names(
872+
data_types: Vec<DataType>,
873+
field_names: Vec<&str>,
874+
) -> Self {
875+
let fields = data_types
876+
.into_iter()
877+
.zip(field_names)
878+
.map(|(data_type, field_name)| {
879+
DataField::new(field_name.to_string(), data_type.clone(), None)
880+
})
881+
.collect::<Vec<_>>();
882+
883+
Self::with_nullable(true, fields)
884+
}
855885
}
856886

857887
impl Display for RowType {
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::error::Error::IllegalArgument;
19+
use crate::error::Result;
20+
use crate::metadata::DataType;
21+
use crate::row::Datum;
22+
use crate::row::binary::BinaryRowFormat;
23+
24+
/// Writer to write a composite data format, like row, array,
25+
#[allow(dead_code)]
26+
pub trait BinaryWriter {
27+
/// Reset writer to prepare next write
28+
fn reset(&mut self);
29+
30+
/// Set null to this field
31+
fn set_null_at(&mut self, pos: usize);
32+
33+
fn write_boolean(&mut self, value: bool);
34+
35+
fn write_byte(&mut self, value: u8);
36+
37+
fn write_bytes(&mut self, value: &[u8]);
38+
39+
fn write_char(&mut self, value: &str, length: usize);
40+
41+
fn write_string(&mut self, value: &str);
42+
43+
fn write_short(&mut self, value: i16);
44+
45+
fn write_int(&mut self, value: i32);
46+
47+
fn write_long(&mut self, value: i64);
48+
49+
fn write_float(&mut self, value: f32);
50+
51+
fn write_double(&mut self, value: f64);
52+
53+
fn write_binary(&mut self, bytes: &[u8], length: usize);
54+
55+
// TODO Decimal type
56+
// fn write_decimal(&mut self, pos: i32, value: f64);
57+
58+
// TODO Timestamp type
59+
// fn write_timestamp_ntz(&mut self, pos: i32, value: i64);
60+
61+
// TODO Timestamp type
62+
// fn write_timestamp_ltz(&mut self, pos: i32, value: i64);
63+
64+
// TODO InternalArray, ArraySerializer
65+
// fn write_array(&mut self, pos: i32, value: i64);
66+
67+
// TODO Row serializer
68+
// fn write_row(&mut self, pos: i32, value: &InternalRow);
69+
70+
/// Finally, complete write to set real size to binary.
71+
fn complete(&mut self);
72+
}
73+
74+
pub enum ValueWriter {
75+
Nullable(InnerValueWriter),
76+
NonNullable(InnerValueWriter),
77+
}
78+
79+
impl ValueWriter {
80+
pub fn create_value_writer(
81+
element_type: &DataType,
82+
binary_row_format: Option<&BinaryRowFormat>,
83+
) -> Result<ValueWriter> {
84+
let value_writer =
85+
InnerValueWriter::create_inner_value_writer(element_type, binary_row_format)?;
86+
if element_type.is_nullable() {
87+
Ok(Self::Nullable(value_writer))
88+
} else {
89+
Ok(Self::NonNullable(value_writer))
90+
}
91+
}
92+
93+
pub fn write_value<W: BinaryWriter>(
94+
&self,
95+
writer: &mut W,
96+
pos: usize,
97+
value: &Datum,
98+
) -> Result<()> {
99+
match self {
100+
Self::Nullable(inner_value_writer) => {
101+
if let Datum::Null = value {
102+
writer.set_null_at(pos);
103+
Ok(())
104+
} else {
105+
inner_value_writer.write_value(writer, pos, value)
106+
}
107+
}
108+
Self::NonNullable(inner_value_writer) => {
109+
inner_value_writer.write_value(writer, pos, value)
110+
}
111+
}
112+
}
113+
}
114+
115+
#[derive(Debug)]
116+
pub enum InnerValueWriter {
117+
Char,
118+
String,
119+
Boolean,
120+
Binary,
121+
Bytes,
122+
TinyInt,
123+
SmallInt,
124+
Int,
125+
BigInt,
126+
Float,
127+
Double,
128+
// TODO Decimal, Date, TimeWithoutTimeZone, TimestampWithoutTimeZone, TimestampWithLocalTimeZone, Array, Row
129+
}
130+
131+
/// Accessor for writing the fields/elements of a binary writer during runtime, the
132+
/// fields/elements must be written in the order.
133+
impl InnerValueWriter {
134+
pub fn create_inner_value_writer(
135+
data_type: &DataType,
136+
_: Option<&BinaryRowFormat>,
137+
) -> Result<InnerValueWriter> {
138+
match data_type {
139+
DataType::Char(_) => Ok(InnerValueWriter::Char),
140+
DataType::String(_) => Ok(InnerValueWriter::String),
141+
DataType::Boolean(_) => Ok(InnerValueWriter::Boolean),
142+
DataType::Binary(_) => Ok(InnerValueWriter::Binary),
143+
DataType::Bytes(_) => Ok(InnerValueWriter::Bytes),
144+
DataType::TinyInt(_) => Ok(InnerValueWriter::TinyInt),
145+
DataType::SmallInt(_) => Ok(InnerValueWriter::SmallInt),
146+
DataType::Int(_) => Ok(InnerValueWriter::Int),
147+
DataType::BigInt(_) => Ok(InnerValueWriter::BigInt),
148+
DataType::Float(_) => Ok(InnerValueWriter::Float),
149+
DataType::Double(_) => Ok(InnerValueWriter::Double),
150+
_ => unimplemented!(
151+
"ValueWriter for DataType {:?} is currently not implemented",
152+
data_type
153+
),
154+
}
155+
}
156+
pub fn write_value<W: BinaryWriter>(
157+
&self,
158+
writer: &mut W,
159+
_pos: usize,
160+
value: &Datum,
161+
) -> Result<()> {
162+
match (self, value) {
163+
(InnerValueWriter::Char, Datum::String(v)) => {
164+
writer.write_char(v, v.len());
165+
}
166+
(InnerValueWriter::String, Datum::String(v)) => {
167+
writer.write_string(v);
168+
}
169+
(InnerValueWriter::Boolean, Datum::Bool(v)) => {
170+
writer.write_boolean(*v);
171+
}
172+
(InnerValueWriter::Binary, Datum::Blob(v)) => {
173+
writer.write_binary(v.as_ref(), v.len());
174+
}
175+
(InnerValueWriter::Binary, Datum::BorrowedBlob(v)) => {
176+
writer.write_binary(v.as_ref(), v.len());
177+
}
178+
(InnerValueWriter::Bytes, Datum::Blob(v)) => {
179+
writer.write_bytes(v.as_ref());
180+
}
181+
(InnerValueWriter::Bytes, Datum::BorrowedBlob(v)) => {
182+
writer.write_bytes(v.as_ref());
183+
}
184+
(InnerValueWriter::TinyInt, Datum::Int8(v)) => {
185+
writer.write_byte(*v as u8);
186+
}
187+
(InnerValueWriter::SmallInt, Datum::Int16(v)) => {
188+
writer.write_short(*v);
189+
}
190+
(InnerValueWriter::Int, Datum::Int32(v)) => {
191+
writer.write_int(*v);
192+
}
193+
(InnerValueWriter::BigInt, Datum::Int64(v)) => {
194+
writer.write_long(*v);
195+
}
196+
(InnerValueWriter::Float, Datum::Float32(v)) => {
197+
writer.write_float(v.into_inner());
198+
}
199+
(InnerValueWriter::Double, Datum::Float64(v)) => {
200+
writer.write_double(v.into_inner());
201+
}
202+
_ => {
203+
return Err(IllegalArgument {
204+
message: format!("{:?} used to write value {:?}", self, value),
205+
});
206+
}
207+
}
208+
Ok(())
209+
}
210+
}

crates/fluss/src/row/binary/mod.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
mod binary_writer;
19+
20+
pub use binary_writer::*;
21+
22+
/// The binary row format types, it indicates the generated [`BinaryRow`] type by the [`BinaryWriter`]
23+
#[allow(dead_code)]
24+
pub enum BinaryRowFormat {
25+
Compacted,
26+
Aligned,
27+
Indexed,
28+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::row::compacted::compacted_row_writer::CompactedRowWriter;
19+
use bytes::Bytes;
20+
21+
use crate::error::Result;
22+
use crate::metadata::DataType;
23+
use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter};
24+
use delegate::delegate;
25+
26+
/// A wrapping of [`CompactedRowWriter`] used to encode key columns.
27+
/// The encoding is the same as [`CompactedRowWriter`], but is without header of null bits to
28+
/// represent whether the field value is null or not since the key columns must be not null.
29+
pub struct CompactedKeyWriter {
30+
delegate: CompactedRowWriter,
31+
}
32+
33+
impl CompactedKeyWriter {
34+
pub fn new() -> CompactedKeyWriter {
35+
CompactedKeyWriter {
36+
// in compacted key encoder, we don't need to set null bits as the key columns must be not
37+
// null, to use field count 0 to init to make the null bits 0
38+
delegate: CompactedRowWriter::new(0),
39+
}
40+
}
41+
42+
pub fn create_value_writer(field_type: &DataType) -> Result<ValueWriter> {
43+
ValueWriter::create_value_writer(field_type, Some(&BinaryRowFormat::Compacted))
44+
}
45+
46+
delegate! {
47+
to self.delegate {
48+
pub fn reset(&mut self);
49+
50+
#[allow(dead_code)]
51+
pub fn position(&self) -> usize;
52+
53+
#[allow(dead_code)]
54+
pub fn buffer(&self) -> &[u8];
55+
56+
pub fn to_bytes(&self) -> Bytes;
57+
}
58+
}
59+
}
60+
61+
impl BinaryWriter for CompactedKeyWriter {
62+
delegate! {
63+
to self.delegate {
64+
fn reset(&mut self);
65+
66+
fn set_null_at(&mut self, pos: usize);
67+
68+
fn write_boolean(&mut self, value: bool);
69+
70+
fn write_byte(&mut self, value: u8);
71+
72+
fn write_binary(&mut self, bytes: &[u8], length: usize);
73+
74+
fn write_bytes(&mut self, value: &[u8]);
75+
76+
fn write_char(&mut self, value: &str, _length: usize);
77+
78+
fn write_string(&mut self, value: &str);
79+
80+
fn write_short(&mut self, value: i16);
81+
82+
fn write_int(&mut self, value: i32);
83+
84+
fn write_long(&mut self, value: i64);
85+
86+
fn write_float(&mut self, value: f32);
87+
88+
fn write_double(&mut self, value: f64);
89+
90+
91+
}
92+
}
93+
94+
fn complete(&mut self) {
95+
// do nothing
96+
}
97+
}

crates/fluss/src/row/compacted/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
mod compacted_key_writer;
1819
mod compacted_row_writer;
20+
21+
pub use compacted_key_writer::CompactedKeyWriter;

0 commit comments

Comments
 (0)