Skip to content

Commit 0e6842e

Browse files
committed
Start tracking subscriptions
1 parent 4389aaf commit 0e6842e

File tree

6 files changed

+47
-10
lines changed

6 files changed

+47
-10
lines changed

crates/core/src/migrations.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -394,11 +394,11 @@ CREATE TABLE ps_stream_subscriptions (
394394
local_params TEXT,
395395
ttl INTEGER
396396
) STRICT;
397-
ALTER TABLE ps_buckets ADD COLUMN derived_from INTEGER REFERENCES ps_streams (id);
397+
ALTER TABLE ps_buckets ADD COLUMN from_subscriptions TEXT NOT NULL DEFAULT '[null]';
398398
399-
INSERT INTO ps_migration(id, down_migrations) VALUES(9, json_array(
399+
INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array(
400400
json_object('sql', 'todo down migration'),
401-
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 10')
401+
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11')
402402
));
403403
";
404404
local_db.exec_safe(stmt)?;

crates/core/src/sync/interface.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,31 @@ use super::streaming_sync::SyncClient;
2020
use super::sync_status::DownloadSyncStatus;
2121

2222
/// Payload provided by SDKs when requesting a sync iteration.
23-
#[derive(Default, Deserialize)]
23+
#[derive(Deserialize)]
2424
pub struct StartSyncStream {
2525
/// Bucket parameters to include in the request when opening a sync stream.
2626
#[serde(default)]
2727
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
2828
#[serde(default)]
2929
pub schema: Schema,
30+
#[serde(default = "StartSyncStream::include_defaults_by_default")]
31+
pub include_defaults: bool,
32+
}
33+
34+
impl StartSyncStream {
35+
pub const fn include_defaults_by_default() -> bool {
36+
true
37+
}
38+
}
39+
40+
impl Default for StartSyncStream {
41+
fn default() -> Self {
42+
Self {
43+
parameters: Default::default(),
44+
schema: Default::default(),
45+
include_defaults: Self::include_defaults_by_default(),
46+
}
47+
}
3048
}
3149

3250
/// A request sent from a client SDK to the [SyncClient] with a `powersync_control` invocation.
@@ -107,6 +125,7 @@ pub struct StreamingSyncRequest {
107125
pub binary_data: bool,
108126
pub client_id: String,
109127
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
128+
pub streams: StreamSubscriptionRequest,
110129
}
111130

112131
#[derive(Serialize)]

crates/core/src/sync/line.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,9 @@ pub struct BucketChecksum<'a> {
120120
pub priority: Option<BucketPriority>,
121121
#[serde(default)]
122122
pub count: Option<i64>,
123-
#[serde_as(as = "Option<Vec<DisplayFromStr>>")]
123+
#[serde_as(as = "Vec<Option<DisplayFromStr>>")]
124124
#[serde(default)]
125-
pub subscriptions: Option<Vec<i64>>,
125+
pub subscriptions: Vec<Option<i64>>,
126126
// #[serde(default)]
127127
// #[serde(deserialize_with = "deserialize_optional_string_to_i64")]
128128
// pub last_op_id: Option<i64>,

crates/core/src/sync/storage_adapter.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ use crate::{
1010
operations::delete_bucket,
1111
schema::Schema,
1212
state::DatabaseState,
13-
sync::checkpoint::{validate_checkpoint, ChecksumMismatch},
13+
sync::{
14+
checkpoint::{validate_checkpoint, ChecksumMismatch},
15+
interface::{RequestedStreamSubscription, StreamSubscriptionRequest},
16+
},
1417
sync_local::{PartialSyncOperation, SyncOperation},
1518
};
1619

@@ -239,6 +242,18 @@ impl StorageAdapter {
239242
}
240243
}
241244

245+
pub fn collect_subscription_requests(
246+
&self,
247+
include_defaults: bool,
248+
) -> Result<StreamSubscriptionRequest, PowerSyncError> {
249+
let mut subscriptions: Vec<RequestedStreamSubscription> = Vec::new();
250+
251+
Ok(StreamSubscriptionRequest {
252+
include_defaults,
253+
subscriptions,
254+
})
255+
}
256+
242257
pub fn now(&self) -> Result<Timestamp, ResultCode> {
243258
self.time_stmt.step()?;
244259
let res = Timestamp(self.time_stmt.column_int64(0));

crates/core/src/sync/streaming_sync.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,9 @@ impl StreamingSyncIteration {
589589
binary_data: true,
590590
client_id: client_id(self.db)?,
591591
parameters: self.options.parameters.take(),
592+
streams: self
593+
.adapter
594+
.collect_subscription_requests(self.options.include_defaults)?,
592595
};
593596

594597
event

crates/core/src/sync/subscriptions.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use core::time::Duration;
22

3-
use alloc::string::String;
3+
use alloc::{boxed::Box, string::String};
44
use serde::Deserialize;
55
use serde_with::{serde_as, DurationSeconds};
66

@@ -16,7 +16,7 @@ pub enum SubscriptionChangeRequest {
1616
#[derive(Deserialize)]
1717
pub struct SubscribeToStream {
1818
pub stream: String,
19-
pub params: Option<serde_json::value::RawValue>,
19+
pub params: Option<Box<serde_json::value::RawValue>>,
2020
#[serde_as(as = "Option<DurationSeconds>")]
2121
pub ttl: Option<Duration>,
2222
pub priority: Option<BucketPriority>,
@@ -25,6 +25,6 @@ pub struct SubscribeToStream {
2525
#[derive(Deserialize)]
2626
pub struct UnsubscribeFromStream {
2727
pub stream: String,
28-
pub params: Option<serde_json::value::RawValue>,
28+
pub params: Option<Box<serde_json::value::RawValue>>,
2929
pub immediate: bool,
3030
}

0 commit comments

Comments
 (0)