Skip to content

Commit e1e7ba3

Browse files
chore: cleanup and fix partial update in python (#292)
1 parent f8ebc5e commit e1e7ba3

File tree

7 files changed

+128
-165
lines changed

7 files changed

+128
-165
lines changed

bindings/python/example/example.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,49 @@ async def main():
569569
print(f"Error during delete: {e}")
570570
traceback.print_exc()
571571

572+
# --- Test Partial Update by column names ---
573+
print("\n--- Testing Partial Update (by column names) ---")
574+
try:
575+
partial_writer = pk_table.new_upsert(columns=["user_id", "balance"])
576+
handle = partial_writer.upsert({"user_id": 1, "balance": Decimal("9999.99")})
577+
await handle.wait()
578+
print("Partial update: set balance=9999.99 for user_id=1")
579+
580+
lookuper = pk_table.new_lookup()
581+
result = await lookuper.lookup({"user_id": 1})
582+
if result:
583+
print(f"Partial update verified:"
584+
f"\n name={result['name']} (unchanged)"
585+
f"\n balance={result['balance']} (updated)")
586+
else:
587+
print("ERROR: Expected to find user_id=1")
588+
589+
except Exception as e:
590+
print(f"Error during partial update by names: {e}")
591+
traceback.print_exc()
592+
593+
# --- Test Partial Update by column indices ---
594+
print("\n--- Testing Partial Update (by column indices) ---")
595+
try:
596+
# Columns: 0=user_id (PK), 1=name — update name only
597+
partial_writer_idx = pk_table.new_upsert(column_indices=[0, 1])
598+
handle = partial_writer_idx.upsert([1, "Alice Renamed"])
599+
await handle.wait()
600+
print("Partial update by indices: set name='Alice Renamed' for user_id=1")
601+
602+
lookuper = pk_table.new_lookup()
603+
result = await lookuper.lookup({"user_id": 1})
604+
if result:
605+
print(f"Partial update by indices verified:"
606+
f"\n name={result['name']} (updated)"
607+
f"\n balance={result['balance']} (unchanged)")
608+
else:
609+
print("ERROR: Expected to find user_id=1")
610+
611+
except Exception as e:
612+
print(f"Error during partial update by indices: {e}")
613+
traceback.print_exc()
614+
572615
# Demo: Column projection using builder pattern
573616
print("\n--- Testing Column Projection ---")
574617
try:

bindings/python/fluss/__init__.pyi

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -378,15 +378,14 @@ class AppendWriter:
378378
WriteResultHandle: Ignore for fire-and-forget, or await handle.wait() for acknowledgement.
379379
380380
Supported Types:
381-
Currently supports primitive types only:
382381
- Boolean, TinyInt, SmallInt, Int, BigInt (integers)
383382
- Float, Double (floating point)
384383
- String, Char (text)
385384
- Bytes, Binary (binary data)
385+
- Date, Time, Timestamp, TimestampLTZ (temporal)
386+
- Decimal (arbitrary precision)
386387
- Null values
387388
388-
Temporal types (Date, Timestamp, Decimal) are not yet supported.
389-
390389
Example:
391390
writer.append({'id': 1, 'name': 'Alice', 'score': 95.5})
392391
writer.append([1, 'Alice', 95.5])
@@ -712,5 +711,7 @@ class OffsetType:
712711

713712
# Constant for earliest offset (-2)
714713
EARLIEST_OFFSET: int
714+
# Constant for latest offset (-1)
715+
LATEST_OFFSET: int
715716

716717
__version__: str

bindings/python/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
9696

9797
// Register constants
9898
m.add("EARLIEST_OFFSET", fcore::client::EARLIEST_OFFSET)?;
99+
m.add("LATEST_OFFSET", fcore::client::LATEST_OFFSET)?;
99100

100101
// Register exception types
101102
m.add_class::<FlussError>()?;

bindings/python/src/lookup.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::table::{internal_row_to_dict, python_pk_to_generic_row};
18+
use crate::table::{internal_row_to_dict, python_to_sparse_generic_row};
1919
use crate::*;
2020
use pyo3_async_runtimes::tokio::future_into_py;
2121
use std::sync::Arc;
@@ -52,7 +52,8 @@ impl Lookuper {
5252
py: Python<'py>,
5353
pk: &Bound<'_, PyAny>,
5454
) -> PyResult<Bound<'py, PyAny>> {
55-
let generic_row = python_pk_to_generic_row(pk, &self.table_info)?;
55+
let pk_indices = self.table_info.get_schema().primary_key_indexes();
56+
let generic_row = python_to_sparse_generic_row(pk, &self.table_info, &pk_indices)?;
5657
let inner = self.inner.clone();
5758
let table_info = self.table_info.clone();
5859

bindings/python/src/table.rs

Lines changed: 46 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -639,203 +639,100 @@ enum RowInput<'py> {
639639
List(Bound<'py, pyo3::types::PyList>),
640640
}
641641

642-
/// Helper function to process sequence types (list/tuple) into datums
643-
fn process_sequence_to_datums<'a, I>(
644-
values: I,
645-
len: usize,
646-
fields: &[fcore::metadata::DataField],
647-
) -> PyResult<Vec<fcore::row::Datum<'static>>>
648-
where
649-
I: Iterator<Item = Bound<'a, PyAny>>,
650-
{
651-
if len != fields.len() {
652-
return Err(FlussError::new_err(format!(
653-
"Expected {} values, got {}",
654-
fields.len(),
655-
len
656-
)));
657-
}
658-
659-
let mut datums = Vec::with_capacity(fields.len());
660-
for (i, (field, value)) in fields.iter().zip(values).enumerate() {
661-
datums.push(
662-
python_value_to_datum(&value, field.data_type()).map_err(|e| {
663-
FlussError::new_err(format!("Field '{}' (index {}): {}", field.name(), i, e))
664-
})?,
665-
);
666-
}
667-
Ok(datums)
668-
}
669-
670-
/// Convert Python row (dict/list/tuple) to GenericRow based on schema
642+
/// Convert Python row (dict/list/tuple) to GenericRow requiring all schema columns.
671643
pub fn python_to_generic_row(
672644
row: &Bound<PyAny>,
673645
table_info: &fcore::metadata::TableInfo,
674646
) -> PyResult<fcore::row::GenericRow<'static>> {
675-
// Extract with user-friendly error message
676-
let row_input: RowInput = row.extract().map_err(|_| {
677-
let type_name = row
678-
.get_type()
679-
.name()
680-
.map(|n| n.to_string())
681-
.unwrap_or_else(|_| "unknown".to_string());
682-
FlussError::new_err(format!(
683-
"Row must be a dict, list, or tuple; got {type_name}"
684-
))
685-
})?;
686-
let schema = table_info.row_type();
687-
let fields = schema.fields();
688-
689-
let datums = match row_input {
690-
RowInput::Dict(dict) => {
691-
// Strict: reject unknown keys (and also reject non-str keys nicely)
692-
for (k, _) in dict.iter() {
693-
let key_str = k.extract::<&str>().map_err(|_| {
694-
let key_type = k
695-
.get_type()
696-
.name()
697-
.map(|n| n.to_string())
698-
.unwrap_or_else(|_| "unknown".to_string());
699-
FlussError::new_err(format!("Row dict keys must be strings; got {key_type}"))
700-
})?;
701-
702-
if fields.iter().all(|f| f.name() != key_str) {
703-
let expected = fields
704-
.iter()
705-
.map(|f| f.name())
706-
.collect::<Vec<_>>()
707-
.join(", ");
708-
return Err(FlussError::new_err(format!(
709-
"Unknown field '{key_str}'. Expected fields: {expected}"
710-
)));
711-
}
712-
}
713-
714-
let mut datums = Vec::with_capacity(fields.len());
715-
for field in fields {
716-
let value = dict.get_item(field.name())?.ok_or_else(|| {
717-
FlussError::new_err(format!("Missing field: {}", field.name()))
718-
})?;
719-
datums.push(
720-
python_value_to_datum(&value, field.data_type()).map_err(|e| {
721-
FlussError::new_err(format!("Field '{}': {}", field.name(), e))
722-
})?,
723-
);
724-
}
725-
datums
726-
}
727-
728-
RowInput::List(list) => process_sequence_to_datums(list.iter(), list.len(), fields)?,
729-
730-
RowInput::Tuple(tuple) => process_sequence_to_datums(tuple.iter(), tuple.len(), fields)?,
731-
};
647+
let all_indices: Vec<usize> = (0..table_info.row_type().fields().len()).collect();
648+
python_to_sparse_generic_row(row, table_info, &all_indices)
649+
}
732650

733-
Ok(fcore::row::GenericRow { values: datums })
651+
/// Process a Python sequence (list or tuple) into datums at the target column positions.
652+
fn process_sequence(
653+
seq: &Bound<pyo3::types::PySequence>,
654+
target_indices: &[usize],
655+
fields: &[fcore::metadata::DataField],
656+
datums: &mut [fcore::row::Datum<'static>],
657+
) -> PyResult<()> {
658+
if seq.len()? != target_indices.len() {
659+
return Err(FlussError::new_err(format!(
660+
"Expected {} elements, got {}",
661+
target_indices.len(),
662+
seq.len()?
663+
)));
664+
}
665+
for (i, &col_idx) in target_indices.iter().enumerate() {
666+
let field = &fields[col_idx];
667+
let value = seq.get_item(i)?;
668+
datums[col_idx] = python_value_to_datum(&value, field.data_type())
669+
.map_err(|e| FlussError::new_err(format!("Field '{}': {}", field.name(), e)))?;
670+
}
671+
Ok(())
734672
}
735673

736-
/// Convert Python primary key values (dict/list/tuple) to GenericRow.
737-
/// Only requires PK columns; non-PK columns are filled with Null.
738-
/// For dict: keys should be PK column names.
739-
/// For list/tuple: values should be PK values in PK column order.
740-
pub fn python_pk_to_generic_row(
674+
/// Build a full-width GenericRow filling only the specified column
675+
/// indices from user input; all other columns are set to Null.
676+
pub fn python_to_sparse_generic_row(
741677
row: &Bound<PyAny>,
742678
table_info: &fcore::metadata::TableInfo,
679+
target_indices: &[usize],
743680
) -> PyResult<fcore::row::GenericRow<'static>> {
744-
let schema = table_info.get_schema();
745681
let row_type = table_info.row_type();
746682
let fields = row_type.fields();
747-
let pk_indexes = schema.primary_key_indexes();
748-
let pk_names: Vec<&str> = schema.primary_key_column_names();
749-
750-
if pk_indexes.is_empty() {
751-
return Err(FlussError::new_err(
752-
"Table has no primary key; cannot use PK-only row",
753-
));
754-
}
683+
let target_names: Vec<&str> = target_indices.iter().map(|&i| fields[i].name()).collect();
755684

756-
// Initialize all datums as Null
757685
let mut datums: Vec<fcore::row::Datum<'static>> = vec![fcore::row::Datum::Null; fields.len()];
758686

759-
// Extract with user-friendly error message
760687
let row_input: RowInput = row.extract().map_err(|_| {
761688
let type_name = row
762689
.get_type()
763690
.name()
764691
.map(|n| n.to_string())
765692
.unwrap_or_else(|_| "unknown".to_string());
766693
FlussError::new_err(format!(
767-
"PK row must be a dict, list, or tuple; got {type_name}"
694+
"Row must be a dict, list, or tuple; got {type_name}"
768695
))
769696
})?;
770697

771698
match row_input {
772699
RowInput::Dict(dict) => {
773-
// Validate keys are PK columns
774700
for (k, _) in dict.iter() {
775701
let key_str = k.extract::<&str>().map_err(|_| {
776702
let key_type = k
777703
.get_type()
778704
.name()
779705
.map(|n| n.to_string())
780706
.unwrap_or_else(|_| "unknown".to_string());
781-
FlussError::new_err(format!("PK dict keys must be strings; got {key_type}"))
707+
FlussError::new_err(format!("Dict keys must be strings; got {key_type}"))
782708
})?;
783-
784-
if !pk_names.contains(&key_str) {
709+
if !target_names.contains(&key_str) {
785710
return Err(FlussError::new_err(format!(
786-
"Unknown PK field '{}'. Expected PK fields: {}",
711+
"Unknown field '{}'. Expected: {}",
787712
key_str,
788-
pk_names.join(", ")
713+
target_names.join(", ")
789714
)));
790715
}
791716
}
792-
793-
// Extract PK values
794-
for (i, pk_idx) in pk_indexes.iter().enumerate() {
795-
let pk_name = pk_names[i];
796-
let field: &fcore::metadata::DataField = &fields[*pk_idx];
717+
for (i, &col_idx) in target_indices.iter().enumerate() {
718+
let name = target_names[i];
719+
let field = &fields[col_idx];
797720
let value = dict
798-
.get_item(pk_name)?
799-
.ok_or_else(|| FlussError::new_err(format!("Missing PK field: {pk_name}")))?;
800-
datums[*pk_idx] = python_value_to_datum(&value, field.data_type())
801-
.map_err(|e| FlussError::new_err(format!("PK field '{pk_name}': {e}")))?;
721+
.get_item(name)?
722+
.ok_or_else(|| FlussError::new_err(format!("Missing field: {name}")))?;
723+
datums[col_idx] = python_value_to_datum(&value, field.data_type())
724+
.map_err(|e| FlussError::new_err(format!("Field '{name}': {e}")))?;
802725
}
803726
}
804727

805728
RowInput::List(list) => {
806-
if list.len() != pk_indexes.len() {
807-
return Err(FlussError::new_err(format!(
808-
"PK list must have {} elements (PK columns), got {}",
809-
pk_indexes.len(),
810-
list.len()
811-
)));
812-
}
813-
for (i, pk_idx) in pk_indexes.iter().enumerate() {
814-
let field: &fcore::metadata::DataField = &fields[*pk_idx];
815-
let value = list.get_item(i)?;
816-
datums[*pk_idx] =
817-
python_value_to_datum(&value, field.data_type()).map_err(|e| {
818-
FlussError::new_err(format!("PK field '{}': {}", field.name(), e))
819-
})?;
820-
}
729+
let seq = list.as_sequence();
730+
process_sequence(seq, target_indices, fields, &mut datums)?;
821731
}
822732

823733
RowInput::Tuple(tuple) => {
824-
if tuple.len() != pk_indexes.len() {
825-
return Err(FlussError::new_err(format!(
826-
"PK tuple must have {} elements (PK columns), got {}",
827-
pk_indexes.len(),
828-
tuple.len()
829-
)));
830-
}
831-
for (i, pk_idx) in pk_indexes.iter().enumerate() {
832-
let field: &fcore::metadata::DataField = &fields[*pk_idx];
833-
let value = tuple.get_item(i)?;
834-
datums[*pk_idx] =
835-
python_value_to_datum(&value, field.data_type()).map_err(|e| {
836-
FlussError::new_err(format!("PK field '{}': {}", field.name(), e))
837-
})?;
838-
}
734+
let seq = tuple.as_sequence();
735+
process_sequence(seq, target_indices, fields, &mut datums)?;
839736
}
840737
}
841738

0 commit comments

Comments
 (0)