Skip to content
89 changes: 89 additions & 0 deletions crates/core/src/bucket_priority.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use serde::{de::Visitor, Deserialize};
use sqlite_nostd::ResultCode;

use crate::error::SQLiteError;

#[repr(transparent)]
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct BucketPriority {
pub number: i32,
}

impl BucketPriority {
pub fn may_publish_with_outstanding_uploads(self) -> bool {
self == BucketPriority::HIGHEST
}

pub const HIGHEST: BucketPriority = BucketPriority { number: 0 };

/// A low priority used to represent fully-completed sync operations across all priorities.
pub const SENTINEL: BucketPriority = BucketPriority { number: i32::MAX };
}

impl TryFrom<i32> for BucketPriority {
type Error = SQLiteError;

fn try_from(value: i32) -> Result<Self, Self::Error> {
if value < BucketPriority::HIGHEST.number || value == Self::SENTINEL.number {
return Err(SQLiteError(
ResultCode::MISUSE,
Some("Invalid bucket priority".into()),
));
}

return Ok(BucketPriority { number: value });
}
}

impl Into<i32> for BucketPriority {
fn into(self) -> i32 {
self.number
}
}

impl PartialOrd<BucketPriority> for BucketPriority {
fn partial_cmp(&self, other: &BucketPriority) -> Option<core::cmp::Ordering> {
Some(self.number.partial_cmp(&other.number)?.reverse())
}
}

impl<'de> Deserialize<'de> for BucketPriority {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct PriorityVisitor;
impl<'de> Visitor<'de> for PriorityVisitor {
type Value = BucketPriority;

fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
formatter.write_str("a priority as an integer between 0 and 3 (inclusive)")
}

fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
BucketPriority::try_from(v).map_err(|e| E::custom(e.1.unwrap_or_default()))
}

fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
let i: i32 = v.try_into().map_err(|_| E::custom("int too large"))?;
Self::visit_i32(self, i)
}

fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
let i: i32 = v.try_into().map_err(|_| E::custom("int too large"))?;
Self::visit_i32(self, i)
}
}

deserializer.deserialize_i32(PriorityVisitor)
}
}
8 changes: 5 additions & 3 deletions crates/core/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use sqlite::ResultCode;
use sqlite_nostd as sqlite;
use sqlite_nostd::{Connection, Context};

use crate::bucket_priority::BucketPriority;
use crate::create_sqlite_optional_text_fn;
use crate::create_sqlite_text_fn;
use crate::error::SQLiteError;
Expand Down Expand Up @@ -46,13 +47,14 @@ fn powersync_last_synced_at_impl(
let db = ctx.db_handle();

// language=SQLite
let statement = db.prepare_v2("select value from ps_kv where key = 'last_synced_at'")?;
let statement = db.prepare_v2("select last_synced_at from ps_sync_state where priority = ?")?;
statement.bind_int(1, BucketPriority::SENTINEL.into())?;

if statement.step()? == ResultCode::ROW {
let client_id = statement.column_text(0)?;
return Ok(Some(client_id.to_string()));
Ok(Some(client_id.to_string()))
} else {
return Ok(None);
Ok(None)
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use core::ffi::{c_char, c_int};
use sqlite::ResultCode;
use sqlite_nostd as sqlite;

mod bucket_priority;
mod checkpoint;
mod crud_vtab;
mod diff;
Expand Down
23 changes: 23 additions & 0 deletions crates/core/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use sqlite::ResultCode;
use sqlite_nostd as sqlite;
use sqlite_nostd::{Connection, Context};

use crate::bucket_priority::BucketPriority;
use crate::error::{PSResult, SQLiteError};
use crate::fix035::apply_v035_fix;

Expand Down Expand Up @@ -310,5 +311,27 @@ json_array(
.into_db_result(local_db)?;
}

if current_version < 7 && target_version >= 7 {
const SENTINEL_PRIORITY: i32 = BucketPriority::SENTINEL.number;
let stmt = format!("\
CREATE TABLE ps_sync_state (
priority INTEGER NOT NULL,
last_synced_at TEXT NOT NULL
) STRICT;
INSERT OR IGNORE INTO ps_sync_state (priority, last_synced_at)
SELECT {}, value from ps_kv where key = 'last_synced_at';

INSERT INTO ps_migration(id, down_migrations)
VALUES(7,
json_array(
json_object('sql', 'INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = {}'),
json_object('sql', 'DROP TABLE ps_sync_state'),
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 7')
));
", SENTINEL_PRIORITY, SENTINEL_PRIORITY);

local_db.exec_safe(&stmt).into_db_result(local_db)?;
}

Ok(())
}
2 changes: 1 addition & 1 deletion crates/core/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ SELECT
json_extract(e.value, '$.has_more') as has_more,
json_extract(e.value, '$.after') as after,
json_extract(e.value, '$.next_after') as next_after
FROM json_each(json_extract(?, '$.buckets')) e",
FROM json_each(json_extract(?1, '$.buckets')) e",
)?;
statement.bind_text(1, data, sqlite::Destructor::STATIC)?;

Expand Down
11 changes: 5 additions & 6 deletions crates/core/src/operations_vtab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,30 +76,29 @@ extern "C" fn update(
} else if rowid.value_type() == sqlite::ColumnType::Null {
// INSERT
let op = args[2].text();
let data = args[3].text();

let tab = unsafe { &mut *vtab.cast::<VirtualTable>() };
let db = tab.db;

if op == "save" {
let result = insert_operation(db, data);
let result = insert_operation(db, args[3].text());
vtab_result(vtab, result)
} else if op == "sync_local" {
let result = sync_local(db, data);
let result = sync_local(db, &args[3]);
if let Ok(result_row) = result {
unsafe {
*p_row_id = result_row;
}
}
vtab_result(vtab, result)
} else if op == "clear_remove_ops" {
let result = clear_remove_ops(db, data);
let result = clear_remove_ops(db, args[3].text());
vtab_result(vtab, result)
} else if op == "delete_pending_buckets" {
let result = delete_pending_buckets(db, data);
let result = delete_pending_buckets(db, args[3].text());
vtab_result(vtab, result)
} else if op == "delete_bucket" {
let result = delete_bucket(db, data);
let result = delete_bucket(db, args[3].text());
vtab_result(vtab, result)
} else {
ResultCode::MISUSE as c_int
Expand Down
Loading