diff --git a/Cargo.lock b/Cargo.lock index 0f3c7d5..6d69fcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -246,7 +246,6 @@ dependencies = [ "serde", "serde_json", "sqlite_nostd", - "streaming-iterator", "uuid", ] @@ -404,12 +403,6 @@ dependencies = [ "sqlite3_capi", ] -[[package]] -name = "streaming-iterator" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b2231b7c3057d5e4ad0156fb3dc807d900806020c5ffa3ee6ff2c8c76fb8520" - [[package]] name = "syn" version = "1.0.109" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 9aef67e..419e484 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -19,7 +19,6 @@ num-traits = { version = "0.2.15", default-features = false } num-derive = "0.3" serde_json = { version = "1.0", default-features = false, features = ["alloc"] } serde = { version = "1.0", default-features = false, features = ["alloc", "derive"] } -streaming-iterator = { version = "0.1.9", default-features = false, features = ["alloc"] } const_format = "0.2.34" [dependencies.uuid] diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 61fb257..e73fdd8 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -66,3 +66,9 @@ impl From for SQLiteError { SQLiteError(ResultCode::ABORT, Some(value.to_string())) } } + +impl From for SQLiteError { + fn from(value: core::fmt::Error) -> Self { + SQLiteError(ResultCode::INTERNAL, Some(format!("{}", value))) + } +} diff --git a/crates/core/src/fix_data.rs b/crates/core/src/fix_data.rs index ec9c0da..8dcab1b 100644 --- a/crates/core/src/fix_data.rs +++ b/crates/core/src/fix_data.rs @@ -117,7 +117,7 @@ fn remove_duplicate_key_encoding(key: &str) -> Option { } fn powersync_remove_duplicate_key_encoding_impl( - ctx: *mut sqlite::context, + _ctx: *mut sqlite::context, args: &[*mut sqlite::value], ) -> Result, SQLiteError> { let arg = args.get(0).ok_or(ResultCode::MISUSE)?; diff --git a/crates/core/src/schema/management.rs b/crates/core/src/schema/management.rs index 6f66d03..bccba3f 100644 --- a/crates/core/src/schema/management.rs +++ b/crates/core/src/schema/management.rs @@ -1,8 +1,8 @@ extern crate alloc; -use alloc::format; use alloc::string::String; use alloc::vec::Vec; +use alloc::{format, vec}; use core::ffi::c_int; use sqlite::{Connection, ResultCode, Value}; @@ -14,6 +14,8 @@ use crate::ext::ExtendedDatabase; use crate::util::{quote_identifier, quote_json_path}; use crate::{create_auto_tx_function, create_sqlite_text_fn}; +use super::Schema; + fn update_tables(db: *mut sqlite::sqlite3, schema: &str) -> Result<(), SQLiteError> { { // In a block so that the statement is finalized before dropping tables @@ -138,87 +140,83 @@ SELECT name, internal_name, local_only FROM powersync_tables WHERE name NOT IN ( fn update_indexes(db: *mut sqlite::sqlite3, schema: &str) -> Result<(), SQLiteError> { let mut statements: Vec = alloc::vec![]; + let schema = serde_json::from_str::(schema)?; + let mut expected_index_names: Vec = vec![]; { // In a block so that the statement is finalized before dropping indexes // language=SQLite - let statement = db.prepare_v2("\ -SELECT - powersync_internal_table_name(tables.value) as table_name, - (powersync_internal_table_name(tables.value) || '__' || json_extract(indexes.value, '$.name')) as index_name, - json_extract(indexes.value, '$.columns') as index_columns, - ifnull(sqlite_master.sql, '') as sql - FROM json_each(json_extract(?, '$.tables')) tables - CROSS JOIN json_each(json_extract(tables.value, '$.indexes')) indexes - LEFT JOIN sqlite_master ON sqlite_master.name = index_name AND sqlite_master.type = 'index' - ").into_db_result(db)?; - statement.bind_text(1, schema, sqlite::Destructor::STATIC)?; + let find_index = + db.prepare_v2("SELECT sql FROM sqlite_master WHERE name = ? AND type = 'index'")?; - while statement.step().into_db_result(db)? == ResultCode::ROW { - let table_name = statement.column_text(0)?; - let index_name = statement.column_text(1)?; - let columns = statement.column_text(2)?; - let existing_sql = statement.column_text(3)?; - - // language=SQLite - let stmt2 = db.prepare_v2("select json_extract(e.value, '$.name') as name, json_extract(e.value, '$.type') as type, json_extract(e.value, '$.ascending') as ascending from json_each(?) e")?; - stmt2.bind_text(1, columns, sqlite::Destructor::STATIC)?; - - let mut column_values: Vec = alloc::vec![]; - while stmt2.step()? == ResultCode::ROW { - let name = stmt2.column_text(0)?; - let type_name = stmt2.column_text(1)?; - let ascending = stmt2.column_int(2) != 0; - - if ascending { - let value = format!( + for table in &schema.tables { + let table_name = table.internal_name(); + + for index in &table.indexes { + let index_name = format!("{}__{}", table_name, &index.name); + + let existing_sql = { + find_index.reset()?; + find_index.bind_text(1, &index_name, sqlite::Destructor::STATIC)?; + + let result = if let ResultCode::ROW = find_index.step()? { + Some(find_index.column_text(0)?) + } else { + None + }; + + result + }; + + let mut column_values: Vec = alloc::vec![]; + for indexed_column in &index.columns { + let mut value = format!( "CAST(json_extract(data, {:}) as {:})", - quote_json_path(name), - type_name - ); - column_values.push(value); - } else { - let value = format!( - "CAST(json_extract(data, {:}) as {:}) DESC", - quote_json_path(name), - type_name + quote_json_path(&indexed_column.name), + &indexed_column.type_name ); + + if !indexed_column.ascending { + value += " DESC"; + } + column_values.push(value); } - } - let sql = format!( - "CREATE INDEX {} ON {}({})", - quote_identifier(index_name), - quote_identifier(table_name), - column_values.join(", ") - ); - if existing_sql == "" { - statements.push(sql); - } else if existing_sql != sql { - statements.push(format!("DROP INDEX {}", quote_identifier(index_name))); - statements.push(sql); + let sql = format!( + "CREATE INDEX {} ON {}({})", + quote_identifier(&index_name), + quote_identifier(&table_name), + column_values.join(", ") + ); + + if existing_sql.is_none() { + statements.push(sql); + } else if existing_sql != Some(&sql) { + statements.push(format!("DROP INDEX {}", quote_identifier(&index_name))); + statements.push(sql); + } + + expected_index_names.push(index_name); } } // In a block so that the statement is finalized before dropping indexes // language=SQLite - let statement = db.prepare_v2("\ -WITH schema_indexes AS ( -SELECT - powersync_internal_table_name(tables.value) as table_name, - (powersync_internal_table_name(tables.value) || '__' || json_extract(indexes.value, '$.name')) as index_name - FROM json_each(json_extract(?1, '$.tables')) tables - CROSS JOIN json_each(json_extract(tables.value, '$.indexes')) indexes -) + let statement = db + .prepare_v2( + "\ SELECT sqlite_master.name as index_name FROM sqlite_master WHERE sqlite_master.type = 'index' AND sqlite_master.name GLOB 'ps_data_*' - AND sqlite_master.name NOT IN (SELECT index_name FROM schema_indexes) -").into_db_result(db)?; - statement.bind_text(1, schema, sqlite::Destructor::STATIC)?; + AND sqlite_master.name NOT IN (SELECT value FROM json_each(?)) +", + ) + .into_db_result(db)?; + let json_names = serde_json::to_string(&expected_index_names)?; + statement.bind_text(1, &json_names, sqlite::Destructor::STATIC)?; while statement.step()? == ResultCode::ROW { let name = statement.column_text(0)?; diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs index a0a277e..76a8c4a 100644 --- a/crates/core/src/schema/mod.rs +++ b/crates/core/src/schema/mod.rs @@ -1,11 +1,16 @@ mod management; mod table_info; +use alloc::vec::Vec; +use serde::Deserialize; use sqlite::ResultCode; use sqlite_nostd as sqlite; -pub use table_info::{ - ColumnInfo, ColumnNameAndTypeStatement, DiffIncludeOld, TableInfo, TableInfoFlags, -}; +pub use table_info::{Column, DiffIncludeOld, Table, TableInfoFlags}; + +#[derive(Deserialize)] +pub struct Schema { + tables: Vec, +} pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { management::register(db) diff --git a/crates/core/src/schema/table_info.rs b/crates/core/src/schema/table_info.rs index 0bfbfa5..4224221 100644 --- a/crates/core/src/schema/table_info.rs +++ b/crates/core/src/schema/table_info.rs @@ -1,103 +1,127 @@ -use core::marker::PhantomData; +use alloc::{format, string::String, vec, vec::Vec}; +use serde::{de::Visitor, Deserialize}; -use alloc::{ - string::{String, ToString}, - vec::Vec, -}; -use streaming_iterator::StreamingIterator; - -use crate::error::SQLiteError; -use sqlite::{Connection, ResultCode}; -use sqlite_nostd::{self as sqlite, ManagedStmt}; - -pub struct TableInfo { +#[derive(Deserialize)] +pub struct Table { pub name: String, - pub view_name: String, + #[serde(rename = "view_name")] + pub view_name_override: Option, + pub columns: Vec, + #[serde(default)] + pub indexes: Vec, + #[serde( + default, + rename = "include_old", + deserialize_with = "deserialize_include_old" + )] pub diff_include_old: Option, + #[serde(flatten)] pub flags: TableInfoFlags, } -impl TableInfo { - pub fn parse_from(db: *mut sqlite::sqlite3, data: &str) -> Result { - // language=SQLite - let statement = db.prepare_v2( - "SELECT - json_extract(?1, '$.name'), - ifnull(json_extract(?1, '$.view_name'), json_extract(?1, '$.name')), - json_extract(?1, '$.local_only'), - json_extract(?1, '$.insert_only'), - json_extract(?1, '$.include_old'), - json_extract(?1, '$.include_metadata'), - json_extract(?1, '$.include_old_only_when_changed'), - json_extract(?1, '$.ignore_empty_update')", - )?; - statement.bind_text(1, data, sqlite::Destructor::STATIC)?; - - let step_result = statement.step()?; - if step_result != ResultCode::ROW { - return Err(SQLiteError::from(ResultCode::SCHEMA)); - } +impl Table { + pub fn from_json(text: &str) -> Result { + serde_json::from_str(text) + } - let name = statement.column_text(0)?.to_string(); - let view_name = statement.column_text(1)?.to_string(); - let flags = { - let local_only = statement.column_int(2) != 0; - let insert_only = statement.column_int(3) != 0; - let include_metadata = statement.column_int(5) != 0; - let include_old_only_when_changed = statement.column_int(6) != 0; - let ignore_empty_update = statement.column_int(7) != 0; - - let mut flags = TableInfoFlags::default(); - flags = flags.set_flag(TableInfoFlags::LOCAL_ONLY, local_only); - flags = flags.set_flag(TableInfoFlags::INSERT_ONLY, insert_only); - flags = flags.set_flag(TableInfoFlags::INCLUDE_METADATA, include_metadata); - flags = flags.set_flag( - TableInfoFlags::INCLUDE_OLD_ONLY_WHEN_CHANGED, - include_old_only_when_changed, - ); - flags = flags.set_flag(TableInfoFlags::IGNORE_EMPTY_UPDATE, ignore_empty_update); - flags - }; - - let include_old = match statement.column_type(4)? { - sqlite_nostd::ColumnType::Text => { - let columns: Vec = serde_json::from_str(statement.column_text(4)?)?; - Some(DiffIncludeOld::OnlyForColumns { columns }) - } + pub fn view_name(&self) -> &str { + self.view_name_override + .as_deref() + .unwrap_or(self.name.as_str()) + } - sqlite_nostd::ColumnType::Integer => { - if statement.column_int(4) != 0 { - Some(DiffIncludeOld::ForAllColumns) - } else { - None - } - } - _ => None, - }; - - // Don't allow include_metadata for local_only tables, it breaks our trigger setup and makes - // no sense because these changes are never inserted into ps_crud. - if flags.include_metadata() && flags.local_only() { - return Err(SQLiteError( - ResultCode::ERROR, - Some("include_metadata and local_only are incompatible".to_string()), - )); + pub fn internal_name(&self) -> String { + if self.flags.local_only() { + format!("ps_data_local__{:}", self.name) + } else { + format!("ps_data__{:}", self.name) } + } - return Ok(TableInfo { - name, - view_name, - diff_include_old: include_old, - flags, - }); + pub fn column_names(&self) -> impl Iterator { + self.columns.iter().map(|c| c.name.as_str()) } } +#[derive(Deserialize)] +pub struct Column { + pub name: String, + #[serde(rename = "type")] + pub type_name: String, +} + +#[derive(Deserialize)] +pub struct Index { + pub name: String, + pub columns: Vec, +} + +#[derive(Deserialize)] +pub struct IndexedColumn { + pub name: String, + pub ascending: bool, + #[serde(rename = "type")] + pub type_name: String, +} + pub enum DiffIncludeOld { OnlyForColumns { columns: Vec }, ForAllColumns, } +fn deserialize_include_old<'de, D: serde::Deserializer<'de>>( + deserializer: D, +) -> Result, D::Error> { + struct IncludeOldVisitor; + + impl<'de> Visitor<'de> for IncludeOldVisitor { + type Value = Option; + + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(formatter, "an array of columns, or true") + } + + fn visit_some(self, deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + deserializer.deserialize_any(self) + } + + fn visit_none(self) -> Result + where + E: serde::de::Error, + { + return Ok(None); + } + + fn visit_bool(self, v: bool) -> Result + where + E: serde::de::Error, + { + Ok(if v { + Some(DiffIncludeOld::ForAllColumns) + } else { + None + }) + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut elements: Vec = vec![]; + while let Some(next) = seq.next_element::()? { + elements.push(next); + } + + Ok(Some(DiffIncludeOld::OnlyForColumns { columns: elements })) + } + } + + deserializer.deserialize_option(IncludeOldVisitor) +} + #[derive(Clone, Copy)] #[repr(transparent)] pub struct TableInfoFlags(pub u32); @@ -148,53 +172,56 @@ impl Default for TableInfoFlags { } } -pub struct ColumnNameAndTypeStatement<'a> { - pub stmt: ManagedStmt, - table: PhantomData<&'a str>, -} +impl<'de> Deserialize<'de> for TableInfoFlags { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct FlagsVisitor; -impl ColumnNameAndTypeStatement<'_> { - pub fn new(db: *mut sqlite::sqlite3, table: &str) -> Result { - let stmt = db.prepare_v2("select json_extract(e.value, '$.name'), json_extract(e.value, '$.type') from json_each(json_extract(?, '$.columns')) e")?; - stmt.bind_text(1, table, sqlite::Destructor::STATIC)?; + impl<'de> Visitor<'de> for FlagsVisitor { + type Value = TableInfoFlags; - Ok(Self { - stmt, - table: PhantomData, - }) - } + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(formatter, "an object with table flags") + } - fn step(stmt: &ManagedStmt) -> Result, ResultCode> { - if stmt.step()? == ResultCode::ROW { - let name = stmt.column_text(0)?; - let type_name = stmt.column_text(1)?; + fn visit_map(self, mut map: A) -> Result + where + A: serde::de::MapAccess<'de>, + { + let mut flags = TableInfoFlags::default(); + + while let Some((key, value)) = map.next_entry::<&'de str, bool>()? { + flags = flags.set_flag( + match key { + "local_only" => TableInfoFlags::LOCAL_ONLY, + "insert_only" => TableInfoFlags::INSERT_ONLY, + "include_metadata" => TableInfoFlags::INCLUDE_METADATA, + "include_old_only_when_changed" => { + TableInfoFlags::INCLUDE_OLD_ONLY_WHEN_CHANGED + } + "ignore_empty_update" => TableInfoFlags::IGNORE_EMPTY_UPDATE, + _ => continue, + }, + value, + ); + } - return Ok(Some(ColumnInfo { name, type_name })); + Ok(flags) + } } - Ok(None) + deserializer.deserialize_struct( + "TableInfoFlags", + &[ + "local_only", + "insert_only", + "include_metadata", + "include_old_only_when_changed", + "ignore_empty_update", + ], + FlagsVisitor, + ) } - - pub fn streaming_iter( - &mut self, - ) -> impl StreamingIterator> { - streaming_iterator::from_fn(|| match Self::step(&self.stmt) { - Err(e) => Some(Err(e)), - Ok(Some(other)) => Some(Ok(other)), - Ok(None) => None, - }) - } - - pub fn names_iter(&mut self) -> impl StreamingIterator> { - self.streaming_iter().map(|item| match item { - Ok(row) => Ok(row.name), - Err(e) => Err(*e), - }) - } -} - -#[derive(Clone)] -pub struct ColumnInfo<'a> { - pub name: &'a str, - pub type_name: &'a str, } diff --git a/crates/core/src/views.rs b/crates/core/src/views.rs index dea0b2b..03cbdd8 100644 --- a/crates/core/src/views.rs +++ b/crates/core/src/views.rs @@ -6,49 +6,41 @@ use alloc::string::String; use alloc::vec::Vec; use core::ffi::c_int; use core::fmt::Write; -use streaming_iterator::StreamingIterator; use sqlite::{Connection, Context, ResultCode, Value}; use sqlite_nostd::{self as sqlite}; use crate::create_sqlite_text_fn; use crate::error::SQLiteError; -use crate::schema::{ColumnInfo, ColumnNameAndTypeStatement, DiffIncludeOld, TableInfo}; +use crate::schema::{DiffIncludeOld, Table}; use crate::util::*; fn powersync_view_sql_impl( - ctx: *mut sqlite::context, + _ctx: *mut sqlite::context, args: &[*mut sqlite::value], ) -> Result { - let db = ctx.db_handle(); - let table = args[0].text(); - let table_info = TableInfo::parse_from(db, table)?; + let table_info = Table::from_json(args[0].text())?; let name = &table_info.name; - let view_name = &table_info.view_name; + let view_name = &table_info.view_name(); let local_only = table_info.flags.local_only(); let include_metadata = table_info.flags.include_metadata(); let quoted_name = quote_identifier(view_name); let internal_name = quote_internal_name(name, local_only); - let mut columns = ColumnNameAndTypeStatement::new(db, table)?; - let mut iter = columns.streaming_iter(); - let mut column_names_quoted: Vec = alloc::vec![]; let mut column_values: Vec = alloc::vec![]; column_names_quoted.push(quote_identifier("id")); column_values.push(String::from("id")); - while let Some(row) = iter.next() { - let ColumnInfo { name, type_name } = row.clone()?; - column_names_quoted.push(quote_identifier(name)); + for column in &table_info.columns { + column_names_quoted.push(quote_identifier(&column.name)); - let foo = format!( + column_values.push(format!( "CAST(json_extract(data, {:}) as {:})", - quote_json_path(name), - type_name - ); - column_values.push(foo); + quote_json_path(&column.name), + &column.type_name + )); } if include_metadata { @@ -77,14 +69,13 @@ create_sqlite_text_fn!( ); fn powersync_trigger_delete_sql_impl( - ctx: *mut sqlite::context, + _ctx: *mut sqlite::context, args: &[*mut sqlite::value], ) -> Result { - let table = args[0].text(); - let table_info = TableInfo::parse_from(ctx.db_handle(), table)?; + let table_info = Table::from_json(args[0].text())?; let name = &table_info.name; - let view_name = &table_info.view_name; + let view_name = &table_info.view_name(); let local_only = table_info.flags.local_only(); let insert_only = table_info.flags.insert_only(); @@ -93,23 +84,14 @@ fn powersync_trigger_delete_sql_impl( let trigger_name = quote_identifier_prefixed("ps_view_delete_", view_name); let type_string = quote_string(name); - let db = ctx.db_handle(); - let old_fragment: Cow<'static, str> = match table_info.diff_include_old { + let old_fragment: Cow<'static, str> = match &table_info.diff_include_old { Some(include_old) => { - let mut columns = ColumnNameAndTypeStatement::new(db, table)?; - let json = match include_old { DiffIncludeOld::OnlyForColumns { columns } => { - let mut iterator = columns.iter(); - let mut columns = - streaming_iterator::from_fn(|| -> Option> { - Some(Ok(iterator.next()?.as_str())) - }); - - json_object_fragment("OLD", &mut columns) + json_object_fragment("OLD", &mut columns.iter().map(|c| c.as_str())) } DiffIncludeOld::ForAllColumns => { - json_object_fragment("OLD", &mut columns.names_iter()) + json_object_fragment("OLD", &mut table_info.column_names()) } }?; @@ -179,15 +161,13 @@ create_sqlite_text_fn!( ); fn powersync_trigger_insert_sql_impl( - ctx: *mut sqlite::context, + _ctx: *mut sqlite::context, args: &[*mut sqlite::value], ) -> Result { - let table = args[0].text(); - - let table_info = TableInfo::parse_from(ctx.db_handle(), table)?; + let table_info = Table::from_json(args[0].text())?; let name = &table_info.name; - let view_name = &table_info.view_name; + let view_name = &table_info.view_name(); let local_only = table_info.flags.local_only(); let insert_only = table_info.flags.insert_only(); @@ -196,10 +176,7 @@ fn powersync_trigger_insert_sql_impl( let trigger_name = quote_identifier_prefixed("ps_view_insert_", view_name); let type_string = quote_string(name); - let local_db = ctx.db_handle(); - - let mut columns = ColumnNameAndTypeStatement::new(local_db, table)?; - let json_fragment = json_object_fragment("NEW", &mut columns.names_iter())?; + let json_fragment = json_object_fragment("NEW", &mut table_info.column_names())?; let metadata_fragment = if table_info.flags.include_metadata() { ", 'metadata', NEW._metadata" @@ -258,15 +235,13 @@ create_sqlite_text_fn!( ); fn powersync_trigger_update_sql_impl( - ctx: *mut sqlite::context, + _ctx: *mut sqlite::context, args: &[*mut sqlite::value], ) -> Result { - let table = args[0].text(); - - let table_info = TableInfo::parse_from(ctx.db_handle(), table)?; + let table_info = Table::from_json(args[0].text())?; let name = &table_info.name; - let view_name = &table_info.view_name; + let view_name = &table_info.view_name(); let insert_only = table_info.flags.insert_only(); let local_only = table_info.flags.local_only(); @@ -275,23 +250,16 @@ fn powersync_trigger_update_sql_impl( let trigger_name = quote_identifier_prefixed("ps_view_update_", view_name); let type_string = quote_string(name); - let db = ctx.db_handle(); - let mut columns = ColumnNameAndTypeStatement::new(db, table)?; - let json_fragment_new = json_object_fragment("NEW", &mut columns.names_iter())?; - let json_fragment_old = json_object_fragment("OLD", &mut columns.names_iter())?; + let json_fragment_new = json_object_fragment("NEW", &mut table_info.column_names())?; + let json_fragment_old = json_object_fragment("OLD", &mut table_info.column_names())?; let mut old_values_fragment = match &table_info.diff_include_old { None => None, Some(DiffIncludeOld::ForAllColumns) => Some(json_fragment_old.clone()), - Some(DiffIncludeOld::OnlyForColumns { columns }) => { - let mut iterator = columns.iter(); - let mut columns = - streaming_iterator::from_fn(|| -> Option> { - Some(Ok(iterator.next()?.as_str())) - }); - - Some(json_object_fragment("OLD", &mut columns)?) - } + Some(DiffIncludeOld::OnlyForColumns { columns }) => Some(json_object_fragment( + "OLD", + &mut columns.iter().map(|c| c.as_str()), + )?), }; if table_info.flags.include_old_only_when_changed() { @@ -301,15 +269,9 @@ fn powersync_trigger_update_sql_impl( let filtered_new_fragment = match &table_info.diff_include_old { // When include_old_only_when_changed is combined with a column filter, make sure we // only include the powersync_diff of columns matched by the filter. - Some(DiffIncludeOld::OnlyForColumns { columns }) => { - let mut iterator = columns.iter(); - let mut columns = - streaming_iterator::from_fn(|| -> Option> { - Some(Ok(iterator.next()?.as_str())) - }); - - Cow::Owned(json_object_fragment("NEW", &mut columns)?) - } + Some(DiffIncludeOld::OnlyForColumns { columns }) => Cow::Owned( + json_object_fragment("NEW", &mut columns.iter().map(|c| c.as_str()))?, + ), _ => Cow::Borrowed(json_fragment_new.as_str()), }; @@ -444,7 +406,7 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { /// Example output with prefix "NEW": "json_object('id', NEW.id, 'name', NEW.name, 'age', NEW.age)". fn json_object_fragment<'a>( prefix: &str, - name_results: &mut dyn StreamingIterator>, + name_results: &mut dyn Iterator, ) -> Result { // floor(SQLITE_MAX_FUNCTION_ARG / 2). // To keep databases portable, we use the default limit of 100 args for this, @@ -452,9 +414,7 @@ fn json_object_fragment<'a>( const MAX_ARG_COUNT: usize = 50; let mut column_names_quoted: Vec = alloc::vec![]; - while let Some(row) = name_results.next() { - let name = (*row)?; - + while let Some(name) = name_results.next() { let quoted: String = format!( "{:}, {:}.{:}", quote_string(name),