|
1 | 1 | use alloc::borrow::Cow; |
| 2 | +use alloc::string::{String, ToString}; |
2 | 3 | use alloc::vec::Vec; |
3 | | -use serde::de::{IgnoredAny, VariantAccess, Visitor}; |
| 4 | +use serde::de::{Error, IgnoredAny, VariantAccess, Visitor}; |
4 | 5 | use serde::Deserialize; |
5 | 6 | use serde_with::{serde_as, DisplayFromStr}; |
6 | 7 |
|
@@ -82,6 +83,14 @@ pub struct Checkpoint<'a> { |
82 | 83 | pub write_checkpoint: Option<i64>, |
83 | 84 | #[serde(borrow)] |
84 | 85 | pub buckets: Vec<BucketChecksum<'a>>, |
| 86 | + #[serde(default, borrow)] |
| 87 | + pub streams: Vec<StreamDefinition<'a>>, |
| 88 | +} |
| 89 | + |
| 90 | +#[derive(Deserialize, Debug)] |
| 91 | +pub struct StreamDefinition<'a> { |
| 92 | + pub name: SyncLineStr<'a>, |
| 93 | + pub is_default: bool, |
85 | 94 | } |
86 | 95 |
|
87 | 96 | #[serde_as] |
@@ -120,14 +129,67 @@ pub struct BucketChecksum<'a> { |
120 | 129 | pub priority: Option<BucketPriority>, |
121 | 130 | #[serde(default)] |
122 | 131 | pub count: Option<i64>, |
123 | | - #[serde_as(as = "Vec<Option<DisplayFromStr>>")] |
124 | 132 | #[serde(default)] |
125 | | - pub subscriptions: Vec<Option<i64>>, |
| 133 | + pub subscriptions: BucketSubscriptionReason, |
126 | 134 | // #[serde(default)] |
127 | 135 | // #[serde(deserialize_with = "deserialize_optional_string_to_i64")] |
128 | 136 | // pub last_op_id: Option<i64>, |
129 | 137 | } |
130 | 138 |
|
| 139 | +/// The reason for why a bucket was included in a checkpoint. |
| 140 | +#[derive(Debug, Default, Clone)] |
| 141 | +pub enum BucketSubscriptionReason { |
| 142 | + /// A bucket was created for all of the subscription ids we've explicitly requested in the sync |
| 143 | + /// request. |
| 144 | + ExplicitlySubscribed { subscriptions: Vec<i64> }, |
| 145 | + /// A bucket was created from a default stream. |
| 146 | + IsDefault { stream_name: String }, |
| 147 | + /// We're talking to an older sync service not sending the reason. |
| 148 | + #[default] |
| 149 | + Unknown, |
| 150 | +} |
| 151 | + |
| 152 | +impl<'de> Deserialize<'de> for BucketSubscriptionReason { |
| 153 | + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> |
| 154 | + where |
| 155 | + D: serde::Deserializer<'de>, |
| 156 | + { |
| 157 | + struct MyVisitor; |
| 158 | + |
| 159 | + impl<'de> Visitor<'de> for MyVisitor { |
| 160 | + type Value = BucketSubscriptionReason; |
| 161 | + |
| 162 | + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { |
| 163 | + write!(formatter, "a subscription reason") |
| 164 | + } |
| 165 | + |
| 166 | + fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> |
| 167 | + where |
| 168 | + A: serde::de::SeqAccess<'de>, |
| 169 | + { |
| 170 | + let mut subscriptions = Vec::<i64>::new(); |
| 171 | + |
| 172 | + while let Some(item) = seq.next_element::<&'de str>()? { |
| 173 | + subscriptions.push(item.parse().map_err(|_| A::Error::custom("not an int"))?); |
| 174 | + } |
| 175 | + |
| 176 | + Ok(BucketSubscriptionReason::ExplicitlySubscribed { subscriptions }) |
| 177 | + } |
| 178 | + |
| 179 | + fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> |
| 180 | + where |
| 181 | + E: serde::de::Error, |
| 182 | + { |
| 183 | + Ok(BucketSubscriptionReason::IsDefault { |
| 184 | + stream_name: v.to_string(), |
| 185 | + }) |
| 186 | + } |
| 187 | + } |
| 188 | + |
| 189 | + deserializer.deserialize_any(MyVisitor) |
| 190 | + } |
| 191 | +} |
| 192 | + |
131 | 193 | #[derive(Deserialize, Debug)] |
132 | 194 | pub struct DataLine<'a> { |
133 | 195 | #[serde(borrow)] |
@@ -229,6 +291,7 @@ mod tests { |
229 | 291 | last_op_id: 10, |
230 | 292 | write_checkpoint: None, |
231 | 293 | buckets: _, |
| 294 | + streams: _, |
232 | 295 | }) |
233 | 296 | ); |
234 | 297 |
|
@@ -264,6 +327,7 @@ mod tests { |
264 | 327 | last_op_id: 1, |
265 | 328 | write_checkpoint: None, |
266 | 329 | buckets: _, |
| 330 | + streams: _, |
267 | 331 | }) |
268 | 332 | ); |
269 | 333 | } |
|
0 commit comments