Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions core/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ pub struct Version {
///
/// Transactions must be sequentially consistent. That is, the results of transactions performed
/// in storage must be as if each were executed sequentially in some order. In particular,
/// un-committed changes must not be read by another transaction.
/// un-committed changes must not be read by another transaction, but committed changes must
/// be visible to subequent transations. Together, this guarantees that `add_version` reliably
/// constructs a linear sequence of versions.
///
/// Transactions with different client IDs cannot share any data, so it is safe to handle them
/// concurrently.
Expand Down Expand Up @@ -70,8 +72,10 @@ pub trait StorageTxn {
async fn get_version(&mut self, version_id: Uuid) -> anyhow::Result<Option<Version>>;

/// Add a version (that must not already exist), and
/// - update latest_version_id
/// - update latest_version_id from parent_version_id to version_id
/// - increment snapshot.versions_since
/// Fails if the existing `latest_version_id` is not equal to `parent_version_id`. Check
/// this by calling `get_client` earlier in the same transaction.
async fn add_version(
&mut self,
version_id: Uuid,
Expand Down
68 changes: 54 additions & 14 deletions sqlite/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
//! Tihs crate implements a SQLite storage backend for the TaskChampion sync server.
//! This crate implements a SQLite storage backend for the TaskChampion sync server.
//!
//! Use the [`SqliteStorage`] type as an implementation of the [`Storage`] trait.
//!
//! This crate is intended for small deployments of a sync server, supporting one or a small number
//! of users. The schema for the database is considered an implementation detail. For more robust
//! database support, consider `taskchampion-sync-server-storage-postgres`.

use anyhow::Context;
use chrono::{TimeZone, Utc};
use rusqlite::types::{FromSql, ToSql};
Expand Down Expand Up @@ -43,7 +50,7 @@ impl SqliteStorage {
/// Create a new instance using a database at the given directory.
///
/// The database will be stored in a file named `taskchampion-sync-server.sqlite3` in the given
/// directory.
/// directory. The database will be created if it does not exist.
pub fn new<P: AsRef<Path>>(directory: P) -> anyhow::Result<SqliteStorage> {
std::fs::create_dir_all(&directory)
.with_context(|| format!("Failed to create `{}`.", directory.as_ref().display()))?;
Expand Down Expand Up @@ -176,7 +183,7 @@ impl StorageTxn for Txn {
async fn new_client(&mut self, latest_version_id: Uuid) -> anyhow::Result<()> {
self.con
.execute(
"INSERT OR REPLACE INTO clients (client_id, latest_version_id) VALUES (?, ?)",
"INSERT INTO clients (client_id, latest_version_id) VALUES (?, ?)",
params![&StoredUuid(self.client_id), &StoredUuid(latest_version_id)],
)
.context("Error creating/updating client")?;
Expand Down Expand Up @@ -231,7 +238,6 @@ impl StorageTxn for Txn {

async fn get_version_by_parent(
&mut self,

parent_version_id: Uuid,
) -> anyhow::Result<Option<Version>> {
self.get_version_impl(
Expand All @@ -249,7 +255,6 @@ impl StorageTxn for Txn {

async fn add_version(
&mut self,

version_id: Uuid,
parent_version_id: Uuid,
history_segment: Vec<u8>,
Expand All @@ -264,17 +269,26 @@ impl StorageTxn for Txn {
]
)
.context("Error adding version")?;
self.con
let rows_changed = self
.con
.execute(
"UPDATE clients
SET
latest_version_id = ?,
versions_since_snapshot = versions_since_snapshot + 1
WHERE client_id = ?",
params![StoredUuid(version_id), StoredUuid(self.client_id),],
WHERE client_id = ? and latest_version_id = ?",
params![
StoredUuid(version_id),
StoredUuid(self.client_id),
StoredUuid(parent_version_id)
],
)
.context("Error updating client for new version")?;

if rows_changed == 0 {
anyhow::bail!("clients.latest_version_id does not match parent_version_id");
}

Ok(())
}

Expand Down Expand Up @@ -328,12 +342,12 @@ mod test {
assert_eq!(client.latest_version_id, latest_version_id);
assert!(client.snapshot.is_none());

let latest_version_id = Uuid::new_v4();
txn.add_version(latest_version_id, Uuid::new_v4(), vec![1, 1])
let new_version_id = Uuid::new_v4();
txn.add_version(new_version_id, latest_version_id, vec![1, 1])
.await?;

let client = txn.get_client().await?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id);
assert_eq!(client.latest_version_id, new_version_id);
assert!(client.snapshot.is_none());

let snap = Snapshot {
Expand All @@ -344,7 +358,7 @@ mod test {
txn.set_snapshot(snap.clone(), vec![1, 2, 3]).await?;

let client = txn.get_client().await?.unwrap();
assert_eq!(client.latest_version_id, latest_version_id);
assert_eq!(client.latest_version_id, new_version_id);
assert_eq!(client.snapshot.unwrap(), snap);

Ok(())
Expand All @@ -368,8 +382,10 @@ mod test {
let client_id = Uuid::new_v4();
let mut txn = storage.txn(client_id).await?;

let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
txn.new_client(parent_version_id).await?;

let version_id = Uuid::new_v4();
let history_segment = b"abc".to_vec();
txn.add_version(version_id, parent_version_id, history_segment.clone())
.await?;
Expand All @@ -396,11 +412,35 @@ mod test {
let client_id = Uuid::new_v4();
let mut txn = storage.txn(client_id).await?;

let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
txn.new_client(parent_version_id).await?;

let version_id = Uuid::new_v4();
let history_segment = b"abc".to_vec();
txn.add_version(version_id, parent_version_id, history_segment.clone())
.await?;
// Fails because the version already exists.
assert!(txn
.add_version(version_id, parent_version_id, history_segment.clone())
.await
.is_err());
Ok(())
}

#[tokio::test]
async fn test_add_version_mismatch() -> anyhow::Result<()> {
let tmp_dir = TempDir::new()?;
let storage = SqliteStorage::new(tmp_dir.path())?;
let client_id = Uuid::new_v4();
let mut txn = storage.txn(client_id).await?;

let latest_version_id = Uuid::new_v4();
txn.new_client(latest_version_id).await?;

let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4(); // != latest_version_id
let history_segment = b"abc".to_vec();
// Fails because the latest_version_id is not parent_version_id.
assert!(txn
.add_version(version_id, parent_version_id, history_segment.clone())
.await
Expand Down
Loading