Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use sqlite::ResultCode;
use sqlite_nostd as sqlite;
use sqlite_nostd::{Connection, Context};

use crate::bucket_priority::BucketPriority;
use crate::create_sqlite_optional_text_fn;
use crate::create_sqlite_text_fn;
use crate::error::SQLiteError;
use crate::sync::BucketPriority;

fn powersync_client_id_impl(
ctx: *mut sqlite::context,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use core::ffi::{c_char, c_int};
use sqlite::ResultCode;
use sqlite_nostd as sqlite;

mod bucket_priority;
mod checkpoint;
mod crud_vtab;
mod diff;
Expand All @@ -26,6 +25,7 @@ mod migrations;
mod operations;
mod operations_vtab;
mod schema;
mod sync;
mod sync_local;
mod sync_types;
mod util;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use sqlite::ResultCode;
use sqlite_nostd as sqlite;
use sqlite_nostd::{Connection, Context};

use crate::bucket_priority::BucketPriority;
use crate::error::{PSResult, SQLiteError};
use crate::fix_data::apply_v035_fix;
use crate::sync::BucketPriority;

pub const LATEST_VERSION: i32 = 9;

Expand Down
32 changes: 17 additions & 15 deletions crates/core/src/operations.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use alloc::format;
use alloc::string::String;
use num_traits::Zero;

use crate::error::{PSResult, SQLiteError};
use crate::sync::Checksum;
use sqlite_nostd as sqlite;
use sqlite_nostd::{Connection, ResultCode};

Expand Down Expand Up @@ -101,16 +103,16 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
bucket_statement.reset()?;

let mut last_op: Option<i64> = None;
let mut add_checksum: i32 = 0;
let mut op_checksum: i32 = 0;
let mut add_checksum = Checksum::zero();
let mut op_checksum = Checksum::zero();
let mut added_ops: i32 = 0;

while iterate_statement.step()? == ResultCode::ROW {
let op_id = iterate_statement.column_int64(0);
let op = iterate_statement.column_text(1)?;
let object_type = iterate_statement.column_text(2);
let object_id = iterate_statement.column_text(3);
let checksum = iterate_statement.column_int(4);
let checksum = Checksum::from_i32(iterate_statement.column_int(4));
let op_data = iterate_statement.column_text(5);

last_op = Some(op_id);
Expand All @@ -131,9 +133,9 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",

while supersede_statement.step()? == ResultCode::ROW {
// Superseded (deleted) a previous operation, add the checksum
let supersede_checksum = supersede_statement.column_int(1);
add_checksum = add_checksum.wrapping_add(supersede_checksum);
op_checksum = op_checksum.wrapping_sub(supersede_checksum);
let supersede_checksum = Checksum::from_i32(supersede_statement.column_int(1));
add_checksum += supersede_checksum;
op_checksum -= supersede_checksum;

// Superseded an operation, only skip if the bucket was empty
// Previously this checked "superseded_op <= last_applied_op".
Expand All @@ -149,7 +151,7 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
if op == "REMOVE" {
let should_skip_remove = !superseded;

add_checksum = add_checksum.wrapping_add(checksum);
add_checksum += checksum;

if !should_skip_remove {
if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) {
Expand Down Expand Up @@ -190,12 +192,12 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
insert_statement.bind_null(6)?;
}

insert_statement.bind_int(7, checksum)?;
insert_statement.bind_int(7, checksum.bitcast_i32())?;
insert_statement.exec()?;

op_checksum = op_checksum.wrapping_add(checksum);
op_checksum += checksum;
} else if op == "MOVE" {
add_checksum = add_checksum.wrapping_add(checksum);
add_checksum += checksum;
} else if op == "CLEAR" {
// Any remaining PUT operations should get an implicit REMOVE
// language=SQLite
Expand Down Expand Up @@ -223,12 +225,12 @@ WHERE bucket = ?1",
"UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE id = ?2",
)?;
clear_statement2.bind_int64(2, bucket_id)?;
clear_statement2.bind_int(1, checksum)?;
clear_statement2.bind_int(1, checksum.bitcast_i32())?;
clear_statement2.exec()?;

add_checksum = 0;
add_checksum = Checksum::zero();
is_empty = true;
op_checksum = 0;
op_checksum = Checksum::zero();
}
}

Expand All @@ -244,8 +246,8 @@ WHERE bucket = ?1",
)?;
statement.bind_int64(1, bucket_id)?;
statement.bind_int64(2, *last_op)?;
statement.bind_int(3, add_checksum)?;
statement.bind_int(4, op_checksum)?;
statement.bind_int(3, add_checksum.bitcast_i32())?;
statement.bind_int(4, op_checksum.bitcast_i32())?;
statement.bind_int(5, added_ops)?;

statement.exec()?;
Expand Down
190 changes: 190 additions & 0 deletions crates/core/src/sync/checksum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use core::{
fmt::Display,
num::Wrapping,
ops::{Add, AddAssign, Sub, SubAssign},
};

use num_traits::float::FloatCore;
use num_traits::Zero;
use serde::{de::Visitor, Deserialize, Serialize};

/// A checksum as received from the sync service.
///
/// Conceptually, we use unsigned 32 bit integers to represent checksums, and adding checksums
/// should be a wrapping add.
#[repr(transparent)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
pub struct Checksum(Wrapping<u32>);

impl Checksum {
pub const fn value(self) -> u32 {
self.0 .0
}

pub const fn from_value(value: u32) -> Self {
Self(Wrapping(value))
}

pub const fn from_i32(value: i32) -> Self {
Self::from_value(value as u32)
}

pub const fn bitcast_i32(self) -> i32 {
self.value() as i32
}
}

impl Zero for Checksum {
fn zero() -> Self {
const { Self::from_value(0) }
}

fn is_zero(&self) -> bool {
self.value() == 0
}
}

impl Add for Checksum {
type Output = Self;

#[inline]
fn add(self, rhs: Self) -> Self::Output {
Self(self.0 + rhs.0)
}
}

impl AddAssign for Checksum {
#[inline]
fn add_assign(&mut self, rhs: Self) {
self.0 += rhs.0
}
}

impl Sub for Checksum {
type Output = Self;

fn sub(self, rhs: Self) -> Self::Output {
Self(self.0 - rhs.0)
}
}

impl SubAssign for Checksum {
fn sub_assign(&mut self, rhs: Self) {
self.0 -= rhs.0;
}
}

impl From<u32> for Checksum {
fn from(value: u32) -> Self {
Self::from_value(value)
}
}

impl Display for Checksum {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{:#010x}", self.value())
}
}

impl<'de> Deserialize<'de> for Checksum {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct MyVisitor;

impl<'de> Visitor<'de> for MyVisitor {
type Value = Checksum;

fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
write!(formatter, "a number to interpret as a checksum")
}

fn visit_u32<E>(self, v: u32) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(v.into())
}

fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
let as_u32: u32 = v.try_into().map_err(|_| {
E::invalid_value(serde::de::Unexpected::Unsigned(v), &"a 32-bit int")
})?;
Ok(as_u32.into())
}

fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Checksum::from_i32(v))
}

fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
// This is supposed to be an u32, but it could also be a i32 that we need to
// normalize.
let min: i64 = u32::MIN.into();
let max: i64 = u32::MAX.into();

if v >= min && v <= max {
return Ok(Checksum::from(v as u32));
}

let as_i32: i32 = v.try_into().map_err(|_| {
E::invalid_value(serde::de::Unexpected::Signed(v), &"a 32-bit int")
})?;
Ok(Checksum::from_i32(as_i32))
}

fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
if !v.is_finite() || f64::trunc(v) != v {
return Err(E::invalid_value(
serde::de::Unexpected::Float(v),
&"a whole number",
));
}

self.visit_i64(v as i64)
}
}

deserializer.deserialize_u32(MyVisitor)
}
}

#[cfg(test)]
mod test {
use super::Checksum;

#[test]
pub fn test_binary_representation() {
assert_eq!(Checksum::from_i32(-1).value(), u32::MAX);
assert_eq!(Checksum::from(u32::MAX).value(), u32::MAX);
assert_eq!(Checksum::from(u32::MAX).bitcast_i32(), -1);
}

fn deserialize(from: &str) -> Checksum {
serde_json::from_str(from).expect("should deserialize")
}

#[test]
pub fn test_deserialize() {
assert_eq!(deserialize("0").value(), 0);
assert_eq!(deserialize("-1").value(), u32::MAX);
assert_eq!(deserialize("-1.0").value(), u32::MAX);

assert_eq!(deserialize("3573495687").value(), 3573495687);
assert_eq!(deserialize("3573495687.0").value(), 3573495687);
assert_eq!(deserialize("-721471609.0").value(), 3573495687);
}
}
5 changes: 5 additions & 0 deletions crates/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod bucket_priority;
mod checksum;

pub use bucket_priority::BucketPriority;
pub use checksum::Checksum;
2 changes: 1 addition & 1 deletion crates/core/src/sync_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use alloc::string::String;
use alloc::vec::Vec;
use serde::Deserialize;

use crate::bucket_priority::BucketPriority;
use crate::error::{PSResult, SQLiteError};
use crate::sync::BucketPriority;
use sqlite_nostd::{self as sqlite, Destructor, ManagedStmt, Value};
use sqlite_nostd::{ColumnType, Connection, ResultCode};

Expand Down