Skip to content

Commit 7b45d56

Browse files
committed
More documentation comments
1 parent 7296c85 commit 7b45d56

File tree

6 files changed

+118
-11
lines changed

6 files changed

+118
-11
lines changed

crates/core/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ pub extern "C" fn sqlite3_powersync_init(
4444
api: *mut sqlite::api_routines,
4545
) -> c_int {
4646
sqlite::EXTENSION_INIT2(api);
47-
unsafe { SQLITE3_API = api };
47+
unsafe {
48+
// SAFETY: This field is only assigned once, when the library is loaded.
49+
SQLITE3_API = api
50+
};
4851

4952
let result = init_extension(db);
5053

crates/core/src/sync/interface.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,39 @@ use crate::error::SQLiteError;
1616
use super::streaming_sync::SyncClient;
1717
use super::sync_status::DownloadSyncStatus;
1818

19+
/// A request sent from a client SDK to the [SyncClient] with a `powersync_control` invocation.
1920
pub enum SyncControlRequest<'a> {
21+
/// The client requests to start a sync iteration.
22+
///
23+
/// Earlier iterations are implicitly dropped when receiving this request.
2024
StartSyncStream {
25+
/// Bucket parameters to include in the request when opening a sync stream.
2126
parameters: Option<serde_json::Map<String, serde_json::Value>>,
2227
},
28+
/// The client requests to stop the current sync iteration.
2329
StopSyncStream,
30+
/// The client is forwading a sync event to the core extension.
2431
SyncEvent(SyncEvent<'a>),
2532
}
2633

2734
pub enum SyncEvent<'a> {
35+
/// A synthetic event forwarded to the [SyncClient] after being started.
2836
Initialize,
37+
/// An event requesting the sync client to shut down.
2938
TearDown,
39+
/// Notifies the sync client that a token has been refreshed.
40+
///
41+
/// In response, we'll stop the current iteration to begin another one with the new token.
3042
DidRefreshToken,
43+
/// Notifies the sync client that the current CRUD upload (for which the client SDK is
44+
/// responsible) has finished.
45+
///
46+
/// If pending CRUD entries have previously prevented a sync from completing, this even can be
47+
/// used to try again.
3148
UploadFinished,
49+
/// Forward a text line (JSON) received from the sync service.
3250
TextLine { data: &'a str },
51+
/// Forward a binary line (BSON) received from the sync service.
3352
BinaryLine { data: &'a [u8] },
3453
}
3554

@@ -40,19 +59,26 @@ pub enum Instruction {
4059
severity: LogSeverity,
4160
line: Cow<'static, str>,
4261
},
62+
/// Update the download status for the ongoing sync iteration.
4363
UpdateSyncStatus {
4464
status: Rc<RefCell<DownloadSyncStatus>>,
4565
},
46-
EstablishSyncStream {
47-
request: StreamingSyncRequest,
48-
},
66+
/// Connect to the sync service using the [StreamingSyncRequest] created by the core extension,
67+
/// and then forward received lines via [SyncEvent::TextLine] and [SyncEvent::BinaryLine].
68+
EstablishSyncStream { request: StreamingSyncRequest },
4969
FetchCredentials {
70+
/// Whether the credentials currently used have expired.
71+
///
72+
/// If false, this is a pre-fetch.
5073
did_expire: bool,
5174
},
5275
// These are defined like this because deserializers in Kotlin can't support either an
5376
// object or a literal value
77+
/// Close the websocket / HTTP stream to the sync service.
5478
CloseSyncStream {},
79+
/// Flush the file-system if it's non-durable (only applicable to the Dart SDK).
5580
FlushFileSystem {},
81+
/// Notify that a sync has been completed, prompting client SDKs to clear earlier errors.
5682
DidCompleteSync {},
5783
}
5884

@@ -79,6 +105,10 @@ pub struct BucketRequest {
79105
pub after: String,
80106
}
81107

108+
/// Wrapper around a [SyncClient].
109+
///
110+
/// We allocate one instance of this per database (in [register]) - the [SyncClient] has an initial
111+
/// empty state that doesn't consume any resources.
82112
struct SqlController {
83113
client: SyncClient,
84114
}

crates/core/src/sync/storage_adapter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ use super::{
2424
/// An adapter for storing sync state.
2525
///
2626
/// This is used to encapsulate some SQL queries used for the sync implementation, making the code
27-
/// in `streaming_sync.rs` easier to read.
27+
/// in `streaming_sync.rs` easier to read. It also allows caching some prepared statements that are
28+
/// used frequently as an optimization, but we're not taking advantage of that yet.
2829
pub struct StorageAdapter {
2930
pub db: *mut sqlite::sqlite3,
3031
progress_stmt: ManagedStmt,

crates/core/src/sync/streaming_sync.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,19 @@ use super::{
3232
Checksum,
3333
};
3434

35+
/// The sync client implementation, responsible for parsing lines received by the sync service and
36+
/// persisting them to the database.
37+
///
38+
/// The client consumes no resources and prepares no statements until a sync iteration is
39+
/// initialized.
3540
pub struct SyncClient {
3641
db: *mut sqlite::sqlite3,
42+
/// The current [ClientState] (essentially an optional [StreamingSyncIteration]).
43+
///
44+
/// This is guarded behind a mutex so that we can mutate the state without forcing callers to
45+
/// obtain a mutable reference to the [SyncClient] itself. It doesn't mean much in practice
46+
/// because it's impossible to run two `powersync_control` calls on the same database connection
47+
/// concurrently.
3748
state: Mutex<ClientState>,
3849
}
3950

@@ -91,7 +102,9 @@ impl SyncClient {
91102
}
92103

93104
enum ClientState {
105+
/// No sync iteration is currently active.
94106
Idle,
107+
/// A sync iteration has begun on the database.
95108
IterationActive(SyncIterationHandle),
96109
}
97110

@@ -108,11 +121,19 @@ impl ClientState {
108121
}
109122
}
110123

124+
/// A handle that allows progressing a [StreamingSyncIteration].
125+
///
126+
/// The sync itertion itself is implemented as an `async` function, as this allows us to treat it
127+
/// as a coroutine that preserves internal state between multiple `powersync_control` invocations.
128+
/// At each invocation, the future is polled once (and gets access to context that allows it to
129+
/// render [Instruction]s to return from the function).
111130
struct SyncIterationHandle {
112131
future: Pin<Box<dyn Future<Output = Result<(), SQLiteError>>>>,
113132
}
114133

115134
impl SyncIterationHandle {
135+
/// Creates a new sync iteration in a pending state by preparing statements for
136+
/// [StorageAdapter] and setting up the initial downloading state for [StorageAdapter] .
116137
fn new(
117138
db: *mut sqlite::sqlite3,
118139
parameters: Option<serde_json::Map<String, serde_json::Value>>,
@@ -128,6 +149,8 @@ impl SyncIterationHandle {
128149
Ok(Self { future })
129150
}
130151

152+
/// Forwards a [SyncEvent::Initialize] to the current sync iteration, returning the initial
153+
/// instructions generated.
131154
fn initialize(&mut self) -> Result<Vec<Instruction>, SQLiteError> {
132155
let mut event = ActiveEvent::new(SyncEvent::Initialize);
133156
let result = self.run(&mut event)?;
@@ -160,9 +183,12 @@ impl SyncIterationHandle {
160183
}
161184
}
162185

186+
/// A [SyncEvent] currently being handled by a [StreamingSyncIteration].
163187
struct ActiveEvent<'a> {
164188
handled: bool,
189+
/// The event to handle
165190
event: SyncEvent<'a>,
191+
/// Instructions to forward to the client when the `powersync_control` invocation completes.
166192
instructions: Vec<Instruction>,
167193
}
168194

@@ -420,6 +446,10 @@ impl StreamingSyncIteration {
420446
)?)
421447
}
422448

449+
/// Prepares a sync iteration by handling the initial [SyncEvent::Initialize].
450+
///
451+
/// This prepares a [StreamingSyncRequest] by fetching local sync state and the requested bucket
452+
/// parameters.
423453
async fn prepare_request(&mut self) -> Result<Vec<String>, SQLiteError> {
424454
let event = Self::receive_event().await;
425455
let SyncEvent::Initialize = event.event else {
@@ -484,6 +514,10 @@ impl SyncTarget {
484514
}
485515
}
486516

517+
/// Starts tracking the received `Checkpoint`.
518+
///
519+
/// This updates the internal state and returns a set of buckets to delete because they've been
520+
/// tracked locally but not in the new checkpoint.
487521
fn track_checkpoint<'a>(&mut self, checkpoint: &Checkpoint<'a>) -> BTreeSet<String> {
488522
let mut to_delete: BTreeSet<String> = match &self {
489523
SyncTarget::Tracking(checkpoint) => checkpoint.buckets.keys().cloned().collect(),

crates/core/src/sync/sync_status.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,25 @@ use super::{
1010
storage_adapter::PersistedBucketProgress, streaming_sync::OwnedCheckpoint,
1111
};
1212

13+
/// Information about a progressing download.
1314
#[derive(Serialize, Hash)]
1415
pub struct DownloadSyncStatus {
16+
/// Whether the socket to the sync service is currently open and connected.
17+
///
18+
/// This starts being true once we receive the first line, and is set to false as the iteration
19+
/// ends.
1520
pub connected: bool,
21+
/// Whether we've requested the client SDK to connect to the socket while not receiving sync
22+
/// lines yet.
1623
pub connecting: bool,
24+
/// Provides stats over which bucket priorities have already been synced (or when they've last
25+
/// been changed).
26+
///
1727
/// Always sorted by descending [BucketPriority] in [SyncPriorityStatus] (or, in other words,
1828
/// increasing priority numbers).
1929
pub priority_status: Vec<SyncPriorityStatus>,
30+
/// When a download is active (that is, a `checkpoint` or `checkpoint_diff` line has been
31+
/// received), information about how far the download has progressed.
2032
pub downloading: Option<SyncDownloadProgress>,
2133
}
2234

crates/core/src/util.rs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,19 @@ pub(crate) static mut SQLITE3_API: *mut api_routines = ptr::null_mut();
106106

107107
impl SqliteMutex {
108108
pub fn new() -> Self {
109-
let native_alloc = unsafe { (*SQLITE3_API).mutex_alloc };
109+
let native_alloc = unsafe {
110+
// SAFETY: SQLITE3_API is only set once when the library is loaded by SQLite.
111+
(*SQLITE3_API).mutex_alloc
112+
};
110113

111114
Self {
112115
ptr: match native_alloc {
113116
None => null_mut(),
114-
Some(mutex_alloc) => unsafe { mutex_alloc(SQLITE_MUTEX_FAST as i32) },
117+
Some(mutex_alloc) => unsafe {
118+
// SAFETY: We're allowed to call sqlite3_mutex_alloc with this bitmask:
119+
// https://sqlite.org/c3ref/mutex_alloc.html
120+
mutex_alloc(SQLITE_MUTEX_FAST as i32)
121+
},
115122
},
116123
}
117124
}
@@ -126,7 +133,11 @@ unsafe impl RawMutex for SqliteMutex {
126133
if self.ptr.is_null() {
127134
// Disable mutex code
128135
} else {
129-
unsafe { (*SQLITE3_API).mutex_enter.unwrap_unchecked()(self.ptr) }
136+
unsafe {
137+
// SAFETY: When we get here, we were able to allocate a mutex (so mutex methods
138+
// must be present).
139+
(*SQLITE3_API).mutex_enter.unwrap_unchecked()(self.ptr)
140+
}
130141
}
131142
}
132143

@@ -135,7 +146,11 @@ unsafe impl RawMutex for SqliteMutex {
135146
// Disable mutex code
136147
true
137148
} else {
138-
let res = unsafe { (*SQLITE3_API).mutex_try.unwrap_unchecked()(self.ptr) };
149+
let res = unsafe {
150+
// SAFETY: When we get here, we were able to allocate a mutex (so mutex methods
151+
// must be present).
152+
(*SQLITE3_API).mutex_try.unwrap_unchecked()(self.ptr)
153+
};
139154
res == 0
140155
}
141156
}
@@ -144,21 +159,33 @@ unsafe impl RawMutex for SqliteMutex {
144159
if self.ptr.is_null() {
145160
// Disable mutex code
146161
} else {
147-
unsafe { (*SQLITE3_API).mutex_leave.unwrap_unchecked()(self.ptr) }
162+
unsafe {
163+
// SAFETY: When we get here, we were able to allocate a mutex (so mutex methods
164+
// must be present). Also, this method is only allowed to be called after a caller
165+
// has locked the mutex before.
166+
(*SQLITE3_API).mutex_leave.unwrap_unchecked()(self.ptr)
167+
}
148168
}
149169
}
150170
}
151171

152172
impl Drop for SqliteMutex {
153173
fn drop(&mut self) {
154174
if !self.ptr.is_null() {
155-
unsafe { (*SQLITE3_API).mutex_free.unwrap_unchecked()(self.ptr) };
175+
unsafe {
176+
// SAFETY: The pointer points to a valid mutex we own. This means that we have been
177+
// able to allocate a mutex, so mutex methods must be present.
178+
(*SQLITE3_API).mutex_free.unwrap_unchecked()(self.ptr)
179+
};
156180
}
157181
}
158182
}
159183

160184
pub type Mutex<T> = MutexApi<SqliteMutex, T>;
161185

186+
/// Creates a [Mutex] implementation using `sqlite3_mutex_enter` and `sqlite3_mutex_free`.
187+
///
188+
/// When SQLite has been compiled without mutexes, the returned mutex doesn't do anything.
162189
pub fn sqlite3_mutex<T>(value: T) -> Mutex<T> {
163190
let raw = SqliteMutex::new();
164191
MutexApi::from_raw(raw, value)

0 commit comments

Comments
 (0)