Skip to content

Commit 319c542

Browse files
committed
Expire subscriptions after TTL
1 parent adc3614 commit 319c542

File tree

5 files changed

+154
-4
lines changed

5 files changed

+154
-4
lines changed

crates/core/src/sync/interface.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use sqlite_nostd::{self as sqlite, ColumnType};
2121
use sqlite_nostd::{Connection, Context};
2222

2323
use crate::sync::BucketPriority;
24+
use crate::util::JsonString;
2425

2526
/// Payload provided by SDKs when requesting a sync iteration.
2627
#[derive(Deserialize)]
@@ -143,7 +144,7 @@ pub struct RequestedStreamSubscription {
143144
/// The name of the sync stream to subscribe to.
144145
pub stream: String,
145146
/// Parameters to make available in the stream's definition.
146-
pub parameters: Box<serde_json::value::RawValue>,
147+
pub parameters: Option<Box<JsonString>>,
147148
pub override_priority: Option<BucketPriority>,
148149
#[serde_as(as = "DisplayFromStr")]
149150
pub client_id: i64,

crates/core/src/sync/storage_adapter.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode};
88
use crate::{
99
error::{PSResult, PowerSyncError},
1010
ext::SafeManagedStmt,
11+
kv::client_id,
1112
operations::delete_bucket,
1213
schema::Schema,
1314
state::DatabaseState,
@@ -37,6 +38,7 @@ pub struct StorageAdapter {
3738
pub progress_stmt: ManagedStmt,
3839
time_stmt: ManagedStmt,
3940
delete_subscription: ManagedStmt,
41+
update_subscription: ManagedStmt,
4042
}
4143

4244
impl StorageAdapter {
@@ -53,11 +55,16 @@ impl StorageAdapter {
5355
let delete_subscription =
5456
db.prepare_v2("DELETE FROM ps_stream_subscriptions WHERE id = ?")?;
5557

58+
// language=SQLite
59+
let update_subscription =
60+
db.prepare_v2("UPDATE ps_stream_subscriptions SET active = ?2, is_default = ?3, ttl = ?, expires_at = ?, last_synced_at = ? WHERE id = ?1")?;
61+
5662
Ok(Self {
5763
db,
5864
progress_stmt: progress,
5965
time_stmt: time,
6066
delete_subscription,
67+
update_subscription,
6168
})
6269
}
6370

@@ -283,7 +290,23 @@ impl StorageAdapter {
283290
&self,
284291
include_defaults: bool,
285292
) -> Result<StreamSubscriptionRequest, PowerSyncError> {
293+
self.delete_outdated_subscriptions()?;
294+
286295
let mut subscriptions: Vec<RequestedStreamSubscription> = Vec::new();
296+
let stmt = self
297+
.db
298+
.prepare_v2("SELECT * FROM ps_stream_subscriptions WHERE NOT is_default;")?;
299+
300+
while let ResultCode::ROW = stmt.step()? {
301+
let subscription = Self::read_stream_subscription(&stmt)?;
302+
303+
subscriptions.push(RequestedStreamSubscription {
304+
stream: subscription.stream_name,
305+
parameters: subscription.local_params,
306+
override_priority: subscription.local_priority,
307+
client_id: subscription.id,
308+
});
309+
}
287310

288311
Ok(StreamSubscriptionRequest {
289312
include_defaults,
@@ -323,6 +346,12 @@ impl StorageAdapter {
323346
})
324347
}
325348

349+
fn delete_outdated_subscriptions(&self) -> Result<(), PowerSyncError> {
350+
self.db
351+
.exec_safe("DELETE FROM ps_stream_subscriptions WHERE expires_at < unixepoch()")?;
352+
Ok(())
353+
}
354+
326355
pub fn iterate_local_subscriptions<F: FnMut(LocallyTrackedSubscription) -> ()>(
327356
&self,
328357
mut action: F,
@@ -351,6 +380,39 @@ impl StorageAdapter {
351380
}
352381
}
353382

383+
pub fn update_subscription(
384+
&self,
385+
subscription: &LocallyTrackedSubscription,
386+
) -> Result<(), PowerSyncError> {
387+
let _ = self.update_subscription.reset();
388+
389+
self.update_subscription.bind_int64(1, subscription.id)?;
390+
self.update_subscription
391+
.bind_int(2, if subscription.active { 1 } else { 0 })?;
392+
self.update_subscription
393+
.bind_int(3, if subscription.is_default { 1 } else { 0 })?;
394+
if let Some(ttl) = subscription.ttl {
395+
self.update_subscription.bind_int64(4, ttl)?;
396+
} else {
397+
self.update_subscription.bind_null(4)?;
398+
}
399+
400+
if let Some(expires_at) = subscription.expires_at {
401+
self.update_subscription.bind_int64(5, expires_at)?;
402+
} else {
403+
self.update_subscription.bind_null(5)?;
404+
}
405+
406+
if let Some(last_synced_at) = subscription.last_synced_at {
407+
self.update_subscription.bind_int64(6, last_synced_at)?;
408+
} else {
409+
self.update_subscription.bind_null(6)?;
410+
}
411+
412+
self.update_subscription.exec()?;
413+
Ok(())
414+
}
415+
354416
pub fn delete_subscription(&self, id: i64) -> Result<(), PowerSyncError> {
355417
let _ = self.delete_subscription.reset();
356418
self.delete_subscription.bind_int64(1, id)?;

crates/core/src/sync/streaming_sync.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,7 @@ impl StreamingSyncIteration {
569569
tracked: &TrackedCheckpoint,
570570
) -> Result<Vec<ActiveStreamSubscription>, PowerSyncError> {
571571
let mut tracked_subscriptions: Vec<LocallyTrackedSubscription> = Vec::new();
572+
let now = self.adapter.now()?;
572573

573574
// Load known subscriptions from database
574575
self.adapter.iterate_local_subscriptions(|mut sub| {
@@ -586,9 +587,14 @@ impl StreamingSyncIteration {
586587
.filter(|s| s.stream_name == subscription.name);
587588

588589
let mut has_local = false;
589-
for subscription in matching_local_subscriptions {
590-
subscription.active = true;
590+
for local in matching_local_subscriptions {
591+
local.active = true;
592+
local.is_default = subscription.is_default;
591593
has_local = true;
594+
595+
if let Some(ttl) = local.ttl {
596+
local.expires_at = Some(now.0 + ttl);
597+
}
592598
}
593599

594600
if !has_local && subscription.is_default {
@@ -599,8 +605,10 @@ impl StreamingSyncIteration {
599605

600606
// Clean up default subscriptions that are no longer active.
601607
for subscription in &tracked_subscriptions {
602-
if subscription.is_default && !subscription.active {
608+
if !subscription.has_subscribed_manually() && !subscription.active {
603609
self.adapter.delete_subscription(subscription.id)?;
610+
} else {
611+
self.adapter.update_subscription(subscription)?;
604612
}
605613
}
606614
tracked_subscriptions

crates/core/src/sync/subscriptions.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,19 @@ pub struct LocallyTrackedSubscription {
3333
}
3434

3535
impl LocallyTrackedSubscription {
36+
/// The default TTL of non-default subscriptions if none is set: One day.
37+
pub const DEFAULT_TTL: i64 = 60 * 60 * 24;
38+
3639
pub fn key(&self) -> SubscriptionKey {
3740
SubscriptionKey {
3841
stream_name: self.stream_name.clone(),
3942
params: self.local_params.clone(),
4043
}
4144
}
45+
46+
pub fn has_subscribed_manually(&self) -> bool {
47+
self.ttl.is_some()
48+
}
4249
}
4350

4451
/// A request sent from a PowerSync SDK to alter the subscriptions managed by this client.

dart/test/sync_stream_test.dart

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,5 +175,77 @@ void main() {
175175
expect(stored, containsPair('active', 1));
176176
expect(stored, containsPair('is_default', 0));
177177
});
178+
179+
syncTest('ttl', (controller) {
180+
db.execute(
181+
'INSERT INTO ps_stream_subscriptions (stream_name, ttl) VALUES (?, ?);',
182+
['my_stream', 3600]);
183+
184+
var startInstructions = control('start', null);
185+
expect(
186+
startInstructions,
187+
contains(
188+
containsPair(
189+
'EstablishSyncStream',
190+
containsPair(
191+
'request',
192+
containsPair(
193+
'streams',
194+
{
195+
'include_defaults': true,
196+
'subscriptions': [
197+
{
198+
'stream': 'my_stream',
199+
'parameters': null,
200+
'override_priority': null,
201+
'client_id': '1',
202+
}
203+
],
204+
},
205+
),
206+
),
207+
),
208+
),
209+
);
210+
211+
// Send a checkpoint containing the stream, increasing the TTL.
212+
control(
213+
'line_text',
214+
json.encode(
215+
checkpoint(
216+
lastOpId: 1,
217+
buckets: [],
218+
streams: [('my_stream', false)],
219+
),
220+
),
221+
);
222+
223+
final [row] = db.select('SELECT * FROM ps_stream_subscriptions');
224+
expect(row, containsPair('expires_at', 1740826800));
225+
control('stop', null);
226+
227+
// Elapse beyond end of TTL
228+
controller.elapse(const Duration(hours: 2));
229+
startInstructions = control('start', null);
230+
expect(
231+
startInstructions,
232+
contains(
233+
containsPair(
234+
'EstablishSyncStream',
235+
containsPair(
236+
'request',
237+
containsPair(
238+
'streams',
239+
{
240+
'include_defaults': true,
241+
// Outdated subscription should no longer be included.
242+
'subscriptions': isEmpty,
243+
},
244+
),
245+
),
246+
),
247+
),
248+
);
249+
});
178250
});
179251
}

0 commit comments

Comments
 (0)