Skip to content

Commit adc3614

Browse files
committed
Allow subscribing to streams
1 parent 7ec822f commit adc3614

File tree

6 files changed

+172
-24
lines changed

6 files changed

+172
-24
lines changed

crates/core/src/migrations.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -392,10 +392,11 @@ CREATE TABLE ps_stream_subscriptions (
392392
active INTEGER NOT NULL DEFAULT FALSE,
393393
is_default INTEGER NOT NULL DEFAULT FALSE,
394394
local_priority INTEGER,
395-
local_params TEXT,
395+
local_params TEXT NOT NULL DEFAULT 'null',
396396
ttl INTEGER,
397397
expires_at INTEGER,
398-
last_synced_at INTEGER
398+
last_synced_at INTEGER,
399+
UNIQUE (stream_name, local_params)
399400
) STRICT;
400401
401402
INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array(

crates/core/src/sync/interface.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::constants::SUBTYPE_JSON;
77
use crate::error::PowerSyncError;
88
use crate::schema::Schema;
99
use crate::state::DatabaseState;
10+
use crate::sync::subscriptions::apply_subscriptions;
1011
use alloc::borrow::Cow;
1112
use alloc::boxed::Box;
1213
use alloc::rc::Rc;
@@ -217,6 +218,11 @@ pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(
217218
}),
218219
"refreshed_token" => SyncControlRequest::SyncEvent(SyncEvent::DidRefreshToken),
219220
"completed_upload" => SyncControlRequest::SyncEvent(SyncEvent::UploadFinished),
221+
"subscriptions" => {
222+
let request = serde_json::from_str(payload.text())
223+
.map_err(PowerSyncError::as_argument_error)?;
224+
return apply_subscriptions(ctx.db_handle(), request);
225+
}
220226
_ => {
221227
return Err(PowerSyncError::argument_error("Unknown operation"));
222228
}

crates/core/src/sync/storage_adapter.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub struct StorageAdapter {
3636
pub db: *mut sqlite::sqlite3,
3737
pub progress_stmt: ManagedStmt,
3838
time_stmt: ManagedStmt,
39+
delete_subscription: ManagedStmt,
3940
}
4041

4142
impl StorageAdapter {
@@ -48,10 +49,15 @@ impl StorageAdapter {
4849
// language=SQLite
4950
let time = db.prepare_v2("SELECT unixepoch()")?;
5051

52+
// language=SQLite
53+
let delete_subscription =
54+
db.prepare_v2("DELETE FROM ps_stream_subscriptions WHERE id = ?")?;
55+
5156
Ok(Self {
5257
db,
5358
progress_stmt: progress,
5459
time_stmt: time,
60+
delete_subscription,
5561
})
5662
}
5763

@@ -296,6 +302,8 @@ impl StorageAdapter {
296302
fn read_stream_subscription(
297303
stmt: &ManagedStmt,
298304
) -> Result<LocallyTrackedSubscription, PowerSyncError> {
305+
let raw_params = stmt.column_text(5)?;
306+
299307
Ok(LocallyTrackedSubscription {
300308
id: stmt.column_int64(0),
301309
stream_name: stmt.column_text(1)?.to_string(),
@@ -304,9 +312,11 @@ impl StorageAdapter {
304312
local_priority: column_nullable(&stmt, 4, || {
305313
BucketPriority::try_from(stmt.column_int(4))
306314
})?,
307-
local_params: column_nullable(&stmt, 5, || {
308-
JsonString::from_string(stmt.column_text(5)?.to_string())
309-
})?,
315+
local_params: if raw_params == "null" {
316+
None
317+
} else {
318+
Some(JsonString::from_string(stmt.column_text(5)?.to_string())?)
319+
},
310320
ttl: column_nullable(&stmt, 6, || Ok(stmt.column_int64(6)))?,
311321
expires_at: column_nullable(&stmt, 7, || Ok(stmt.column_int64(7)))?,
312322
last_synced_at: column_nullable(&stmt, 8, || Ok(stmt.column_int64(8)))?,
@@ -340,6 +350,13 @@ impl StorageAdapter {
340350
Err(PowerSyncError::unknown_internal())
341351
}
342352
}
353+
354+
pub fn delete_subscription(&self, id: i64) -> Result<(), PowerSyncError> {
355+
let _ = self.delete_subscription.reset();
356+
self.delete_subscription.bind_int64(1, id)?;
357+
self.delete_subscription.exec()?;
358+
Ok(())
359+
}
343360
}
344361

345362
pub struct BucketInfo {

crates/core/src/sync/streaming_sync.rs

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -571,25 +571,41 @@ impl StreamingSyncIteration {
571571
let mut tracked_subscriptions: Vec<LocallyTrackedSubscription> = Vec::new();
572572

573573
// Load known subscriptions from database
574-
self.adapter.iterate_local_subscriptions(|sub| {
574+
self.adapter.iterate_local_subscriptions(|mut sub| {
575+
// We will mark it as active again if it's part of the streams included in the
576+
// checkpoint.
577+
sub.active = false;
578+
575579
tracked_subscriptions.push(sub);
576580
})?;
577581

578582
// If they don't exist already, create default subscriptions included in checkpoint
579583
for subscription in &tracked.streams {
580-
if subscription.is_default {
581-
let found = tracked_subscriptions
582-
.iter()
583-
.filter(|s| s.stream_name == subscription.name && s.local_params.is_none())
584-
.next();
584+
let matching_local_subscriptions = tracked_subscriptions
585+
.iter_mut()
586+
.filter(|s| s.stream_name == subscription.name);
587+
588+
let mut has_local = false;
589+
for subscription in matching_local_subscriptions {
590+
subscription.active = true;
591+
has_local = true;
592+
}
585593

586-
if found.is_none() {
587-
let subscription = self.adapter.create_default_subscription(subscription)?;
588-
tracked_subscriptions.push(subscription);
589-
}
594+
if !has_local && subscription.is_default {
595+
let subscription = self.adapter.create_default_subscription(subscription)?;
596+
tracked_subscriptions.push(subscription);
590597
}
591598
}
592599

600+
// Clean up default subscriptions that are no longer active.
601+
for subscription in &tracked_subscriptions {
602+
if subscription.is_default && !subscription.active {
603+
self.adapter.delete_subscription(subscription.id)?;
604+
}
605+
}
606+
tracked_subscriptions
607+
.retain(|subscription| !subscription.is_default || subscription.active);
608+
593609
debug_assert!(tracked_subscriptions.is_sorted_by_key(|s| s.id));
594610

595611
let mut resolved: Vec<ActiveStreamSubscription> =
@@ -607,8 +623,6 @@ impl StreamingSyncIteration {
607623
}
608624
}
609625

610-
// TODO: Cleanup old default subscriptions?
611-
612626
// Iterate over buckets to associate them with subscriptions
613627
for bucket in tracked.checkpoint.buckets.values() {
614628
match &bucket.subscriptions {

crates/core/src/sync/subscriptions.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,14 @@ use core::{cmp::Ordering, hash::Hash, time::Duration};
33
use alloc::{boxed::Box, string::String};
44
use serde::Deserialize;
55
use serde_with::{serde_as, DurationSeconds};
6+
use sqlite_nostd::{self as sqlite, Connection};
67

7-
use crate::{sync::BucketPriority, util::JsonString};
8+
use crate::{
9+
error::{PSResult, PowerSyncError},
10+
ext::SafeManagedStmt,
11+
sync::BucketPriority,
12+
util::JsonString,
13+
};
814

915
/// A key that uniquely identifies a stream subscription.
1016
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord)]
@@ -38,16 +44,20 @@ impl LocallyTrackedSubscription {
3844
/// A request sent from a PowerSync SDK to alter the subscriptions managed by this client.
3945
#[derive(Deserialize)]
4046
pub enum SubscriptionChangeRequest {
47+
#[serde(rename = "subscribe")]
4148
Subscribe(SubscribeToStream),
4249
}
4350

4451
#[serde_as]
4552
#[derive(Deserialize)]
4653
pub struct SubscribeToStream {
4754
pub stream: String,
55+
#[serde(default)]
4856
pub params: Option<Box<serde_json::value::RawValue>>,
4957
#[serde_as(as = "Option<DurationSeconds>")]
58+
#[serde(default)]
5059
pub ttl: Option<Duration>,
60+
#[serde(default)]
5161
pub priority: Option<BucketPriority>,
5262
}
5363

@@ -57,3 +67,37 @@ pub struct UnsubscribeFromStream {
5767
pub params: Option<Box<serde_json::value::RawValue>>,
5868
pub immediate: bool,
5969
}
70+
71+
pub fn apply_subscriptions(
72+
db: *mut sqlite::sqlite3,
73+
subscription: SubscriptionChangeRequest,
74+
) -> Result<(), PowerSyncError> {
75+
match subscription {
76+
SubscriptionChangeRequest::Subscribe(subscription) => {
77+
let stmt = db
78+
.prepare_v2("INSERT INTO ps_stream_subscriptions (stream_name, local_priority, local_params, ttl) VALUES (?, ?2, ?, ?4) ON CONFLICT DO UPDATE SET local_priority = min(coalesce(?2, local_priority), local_priority), ttl = ?4, is_default = FALSE")
79+
.into_db_result(db)?;
80+
81+
stmt.bind_text(1, &subscription.stream, sqlite::Destructor::STATIC)?;
82+
match &subscription.priority {
83+
Some(priority) => stmt.bind_int(2, priority.number),
84+
None => stmt.bind_null(2),
85+
}?;
86+
stmt.bind_text(
87+
3,
88+
match &subscription.params {
89+
Some(params) => params.get(),
90+
None => "null",
91+
},
92+
sqlite::Destructor::STATIC,
93+
)?;
94+
match &subscription.ttl {
95+
Some(ttl) => stmt.bind_int64(4, ttl.as_secs() as i64),
96+
None => stmt.bind_null(4),
97+
}?;
98+
stmt.exec()?;
99+
}
100+
}
101+
102+
Ok(())
103+
}

dart/test/sync_stream_test.dart

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,19 @@ void main() {
4747

4848
db.execute('commit');
4949
final [row] = result;
50-
final instructions = jsonDecode(row.columnAt(0)) as List;
51-
for (final instruction in instructions) {
52-
if (instruction case {'UpdateSyncStatus': final status}) {
53-
lastStatus = status['status']!;
50+
51+
final rawResult = row.columnAt(0);
52+
if (rawResult is String) {
53+
final instructions = jsonDecode(row.columnAt(0)) as List;
54+
for (final instruction in instructions) {
55+
if (instruction case {'UpdateSyncStatus': final status}) {
56+
lastStatus = status['status']!;
57+
}
5458
}
59+
return instructions;
60+
} else {
61+
return const [];
5562
}
56-
57-
return instructions;
5863
}
5964

6065
group('default streams', () {
@@ -109,5 +114,66 @@ void main() {
109114
final [stored] = db.select('SELECT * FROM ps_stream_subscriptions');
110115
expect(stored, containsPair('last_synced_at', 1740823200));
111116
});
117+
118+
syncTest('are deleted', (_) {
119+
control('start', null);
120+
121+
for (final stream in ['s1', 's2']) {
122+
control(
123+
'line_text',
124+
json.encode(
125+
checkpoint(
126+
lastOpId: 1,
127+
buckets: [
128+
bucketDescription('a', subscriptions: stream, priority: 1),
129+
],
130+
streams: [(stream, true)],
131+
),
132+
),
133+
);
134+
control(
135+
'line_text',
136+
json.encode(checkpointComplete(priority: 1)),
137+
);
138+
}
139+
140+
expect(
141+
lastStatus,
142+
containsPair(
143+
'streams',
144+
[containsPair('name', 's2')],
145+
),
146+
);
147+
});
148+
149+
syncTest('can be made explicit', (_) {
150+
control('start', null);
151+
control(
152+
'line_text',
153+
json.encode(
154+
checkpoint(
155+
lastOpId: 1,
156+
buckets: [
157+
bucketDescription('a', subscriptions: 'a', priority: 1),
158+
],
159+
streams: [('a', true)],
160+
),
161+
),
162+
);
163+
164+
var [stored] = db.select('SELECT * FROM ps_stream_subscriptions');
165+
expect(stored, containsPair('is_default', 1));
166+
167+
control(
168+
'subscriptions',
169+
json.encode({
170+
'subscribe': {'stream': 'a'},
171+
}),
172+
);
173+
174+
[stored] = db.select('SELECT * FROM ps_stream_subscriptions');
175+
expect(stored, containsPair('active', 1));
176+
expect(stored, containsPair('is_default', 0));
177+
});
112178
});
113179
}

0 commit comments

Comments
 (0)