Skip to content

Commit 7ec822f

Browse files
committed
Update last_synced_at for subscriptions
1 parent b3fd778 commit 7ec822f

File tree

5 files changed

+103
-35
lines changed

5 files changed

+103
-35
lines changed

crates/core/src/sync/streaming_sync.rs

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::{
3131
sync_status::Timestamp,
3232
},
3333
};
34-
use sqlite_nostd::{self as sqlite};
34+
use sqlite_nostd::{self as sqlite, Connection, ResultCode};
3535

3636
use super::{
3737
interface::{Instruction, LogSeverity, StreamingSyncRequest, SyncControlRequest, SyncEvent},
@@ -307,9 +307,7 @@ impl StreamingSyncIteration {
307307
));
308308
};
309309
let target = &checkpoint.checkpoint;
310-
let result =
311-
self.adapter
312-
.sync_local(&self.state, target, None, &self.options.schema)?;
310+
let result = self.sync_local(target, None)?;
313311

314312
match result {
315313
SyncLocalResult::ChecksumFailure(checkpoint_result) => {
@@ -352,12 +350,7 @@ impl StreamingSyncIteration {
352350
"Received checkpoint complete without previous checkpoint",
353351
));
354352
};
355-
let result = self.adapter.sync_local(
356-
&self.state,
357-
&target.checkpoint,
358-
Some(priority),
359-
&self.options.schema,
360-
)?;
353+
let result = self.sync_local(&target.checkpoint, Some(priority))?;
361354

362355
match result {
363356
SyncLocalResult::ChecksumFailure(checkpoint_result) => {
@@ -503,12 +496,7 @@ impl StreamingSyncIteration {
503496
.map_err(|e| PowerSyncError::sync_protocol_error("invalid binary line", e))?,
504497
SyncEvent::UploadFinished => {
505498
if let Some(checkpoint) = self.validated_but_not_applied.take() {
506-
let result = self.adapter.sync_local(
507-
&self.state,
508-
&checkpoint,
509-
None,
510-
&self.options.schema,
511-
)?;
499+
let result = self.sync_local(&checkpoint, None)?;
512500

513501
match result {
514502
SyncLocalResult::ChangesApplied => {
@@ -629,17 +617,13 @@ impl StreamingSyncIteration {
629617
if let Ok(index) =
630618
tracked_subscriptions.binary_search_by_key(subscription_id, |s| s.id)
631619
{
632-
resolved[index]
633-
.associated_buckets
634-
.push(bucket.bucket.clone());
620+
resolved[index].mark_associated_with_bucket(&bucket);
635621
}
636622
}
637623
}
638624
BucketSubscriptionReason::IsDefault { stream_name } => {
639625
if let Some(index) = default_stream_subscriptions.get(stream_name.as_str()) {
640-
resolved[*index]
641-
.associated_buckets
642-
.push(bucket.bucket.clone());
626+
resolved[*index].mark_associated_with_bucket(&bucket);
643627
}
644628
}
645629
BucketSubscriptionReason::Unknown => {}
@@ -649,6 +633,41 @@ impl StreamingSyncIteration {
649633
Ok(resolved)
650634
}
651635

636+
/// Performs a partial or a complete local sync.
637+
fn sync_local(
638+
&self,
639+
target: &OwnedCheckpoint,
640+
priority: Option<BucketPriority>,
641+
) -> Result<SyncLocalResult, PowerSyncError> {
642+
let result =
643+
self.adapter
644+
.sync_local(&self.state, target, priority, &self.options.schema)?;
645+
646+
if matches!(&result, SyncLocalResult::ChangesApplied) {
647+
// Update affected stream subscriptions to mark them as synced.
648+
let mut status = self.status.inner().borrow_mut();
649+
if let Some(ref mut streams) = status.streams {
650+
let stmt = self.adapter.db.prepare_v2(
651+
"UPDATE ps_stream_subscriptions SET last_synced_at = unixepoch() WHERE id = ? RETURNING last_synced_at",
652+
)?;
653+
654+
for stream in streams {
655+
if stream.is_in_priority(priority) {
656+
stmt.bind_int64(1, stream.id)?;
657+
if stmt.step()? == ResultCode::ROW {
658+
let timestamp = Timestamp(stmt.column_int64(0));
659+
stream.last_synced_at = Some(timestamp);
660+
}
661+
662+
stmt.reset()?;
663+
}
664+
}
665+
}
666+
}
667+
668+
Ok(result)
669+
}
670+
652671
/// Prepares a sync iteration by handling the initial [SyncEvent::Initialize].
653672
///
654673
/// This prepares a [StreamingSyncRequest] by fetching local sync state and the requested bucket

crates/core/src/sync/sync_status.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
use alloc::{boxed::Box, collections::btree_map::BTreeMap, rc::Rc, string::String, vec::Vec};
22
use core::{
33
cell::RefCell,
4+
cmp::min,
45
hash::{BuildHasher, Hash},
56
};
67
use rustc_hash::FxBuildHasher;
78
use serde::Serialize;
89
use sqlite_nostd::ResultCode;
910

1011
use crate::{
11-
sync::{storage_adapter::StorageAdapter, subscriptions::LocallyTrackedSubscription},
12+
sync::{
13+
checkpoint::OwnedBucketChecksum, storage_adapter::StorageAdapter,
14+
subscriptions::LocallyTrackedSubscription,
15+
},
1216
util::JsonString,
1317
};
1418

@@ -37,7 +41,7 @@ pub struct DownloadSyncStatus {
3741
/// When a download is active (that is, a `checkpoint` or `checkpoint_diff` line has been
3842
/// received), information about how far the download has progressed.
3943
pub downloading: Option<SyncDownloadProgress>,
40-
pub streams: Vec<ActiveStreamSubscription>,
44+
pub streams: Option<Vec<ActiveStreamSubscription>>,
4145
}
4246

4347
impl DownloadSyncStatus {
@@ -78,7 +82,7 @@ impl DownloadSyncStatus {
7882
self.mark_connected();
7983

8084
self.downloading = Some(progress);
81-
self.streams = subscriptions;
85+
self.streams = Some(subscriptions);
8286
}
8387

8488
/// Increments [SyncDownloadProgress] progress for the given [DataLine].
@@ -122,7 +126,7 @@ impl Default for DownloadSyncStatus {
122126
connecting: false,
123127
downloading: None,
124128
priority_status: Vec::new(),
125-
streams: Vec::new(),
129+
streams: None,
126130
}
127131
}
128132
}
@@ -140,6 +144,10 @@ impl SyncStatusContainer {
140144
}
141145
}
142146

147+
pub fn inner(&self) -> &Rc<RefCell<DownloadSyncStatus>> {
148+
&self.status
149+
}
150+
143151
/// Invokes a function to update the sync status, then emits an [Instruction::UpdateSyncStatus]
144152
/// if the function did indeed change the status.
145153
pub fn update<F: FnOnce(&mut DownloadSyncStatus) -> ()>(
@@ -265,9 +273,12 @@ impl SyncDownloadProgress {
265273

266274
#[derive(Serialize, Hash)]
267275
pub struct ActiveStreamSubscription {
276+
#[serde(skip)]
277+
pub id: i64,
268278
pub name: String,
269279
pub parameters: Option<Box<JsonString>>,
270280
pub associated_buckets: Vec<String>,
281+
pub priority: Option<BucketPriority>,
271282
pub active: bool,
272283
pub is_default: bool,
273284
pub expires_at: Option<Timestamp>,
@@ -277,13 +288,30 @@ pub struct ActiveStreamSubscription {
277288
impl ActiveStreamSubscription {
278289
pub fn from_local(local: &LocallyTrackedSubscription) -> Self {
279290
Self {
291+
id: local.id,
280292
name: local.stream_name.clone(),
281293
parameters: local.local_params.clone(),
282294
is_default: local.is_default,
295+
priority: None,
283296
associated_buckets: Vec::new(),
284297
active: local.active,
285298
expires_at: local.expires_at.clone().map(|e| Timestamp(e)),
286299
last_synced_at: local.last_synced_at.map(|e| Timestamp(e)),
287300
}
288301
}
302+
303+
pub fn mark_associated_with_bucket(&mut self, bucket: &OwnedBucketChecksum) {
304+
self.associated_buckets.push(bucket.bucket.clone());
305+
self.priority = Some(match self.priority {
306+
None => bucket.priority,
307+
Some(prio) => min(prio, bucket.priority),
308+
});
309+
}
310+
311+
pub fn is_in_priority(&self, prio: Option<BucketPriority>) -> bool {
312+
match prio {
313+
None => true,
314+
Some(prio) => self.priority >= Some(prio),
315+
}
316+
}
289317
}

dart/test/sync_stream_test.dart

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ void main() {
5858
}
5959

6060
group('default streams', () {
61-
test('are created on-demand', () {
61+
syncTest('are created on-demand', (_) {
6262
control('start', null);
6363
control(
6464
'line_text',
@@ -86,11 +86,28 @@ void main() {
8686
'active': true,
8787
'is_default': true,
8888
'expires_at': null,
89-
'last_synced_at': null
89+
'last_synced_at': null,
90+
'priority': 1,
9091
}
9192
],
9293
),
9394
);
95+
96+
control(
97+
'line_text',
98+
json.encode(checkpointComplete(priority: 1)),
99+
);
100+
101+
expect(
102+
lastStatus,
103+
containsPair(
104+
'streams',
105+
[containsPair('last_synced_at', 1740823200)],
106+
),
107+
);
108+
109+
final [stored] = db.select('SELECT * FROM ps_stream_subscriptions');
110+
expect(stored, containsPair('last_synced_at', 1740823200));
94111
});
95112
});
96113
}

dart/test/sync_test.dart

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,7 @@ void _syncTests<T>({
126126
}
127127

128128
List<Object?> pushCheckpointComplete({int? priority, String lastOpId = '1'}) {
129-
return syncLine({
130-
priority == null ? 'checkpoint_complete' : 'partial_checkpoint_complete':
131-
{
132-
'last_op_id': lastOpId,
133-
if (priority != null) 'priority': priority,
134-
},
135-
});
129+
return syncLine(checkpointComplete(priority: priority, lastOpId: lastOpId));
136130
}
137131

138132
ResultSet fetchRows() {

dart/test/utils/test_utils.dart

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@ Object checkpoint({
2121
};
2222
}
2323

24+
/// Creates a `checkpoint_complete` or `partial_checkpoint_complete` line.
25+
Object checkpointComplete({int? priority, String lastOpId = '1'}) {
26+
return {
27+
priority == null ? 'checkpoint_complete' : 'partial_checkpoint_complete': {
28+
'last_op_id': lastOpId,
29+
if (priority != null) 'priority': priority,
30+
},
31+
};
32+
}
33+
2434
Object bucketDescription(
2535
String name, {
2636
int checksum = 0,

0 commit comments

Comments
 (0)