Skip to content

Commit 7559364

Browse files
committed
Make Storage methods async
This will better support concurrent requests.
1 parent 4de5c9a commit 7559364

File tree

13 files changed

+597
-412
lines changed

13 files changed

+597
-412
lines changed

Cargo.lock

Lines changed: 27 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ members = [
88
rust-version = "1.82.0" # MSRV
99

1010
[workspace.dependencies]
11+
async-trait = "0.1.88"
1112
uuid = { version = "^1.17.0", features = ["serde", "v4"] }
1213
actix-web = "^4.11.0"
1314
anyhow = "1.0"
@@ -24,3 +25,4 @@ actix-rt = "2"
2425
tempfile = "3"
2526
pretty_assertions = "1"
2627
temp-env = "0.3"
28+
tokio = { version = "*", features = ["rt", "macros"] }

core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ license = "MIT"
1010

1111
[dependencies]
1212
uuid.workspace = true
13+
async-trait.workspace = true
1314
anyhow.workspace = true
1415
thiserror.workspace = true
1516
log.workspace = true
@@ -18,3 +19,4 @@ chrono.workspace = true
1819

1920
[dev-dependencies]
2021
pretty_assertions.workspace = true
22+
tokio.workspace = true

core/src/inmemory.rs

Lines changed: 60 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ struct InnerTxn<'a> {
4444
committed: bool,
4545
}
4646

47+
#[async_trait::async_trait]
4748
impl Storage for InMemoryStorage {
48-
fn txn(&self, client_id: Uuid) -> anyhow::Result<Box<dyn StorageTxn + '_>> {
49+
async fn txn(&self, client_id: Uuid) -> anyhow::Result<Box<dyn StorageTxn + '_>> {
4950
Ok(Box::new(InnerTxn {
5051
client_id,
5152
guard: self.0.lock().expect("poisoned lock"),
@@ -55,12 +56,13 @@ impl Storage for InMemoryStorage {
5556
}
5657
}
5758

59+
#[async_trait::async_trait(?Send)]
5860
impl StorageTxn for InnerTxn<'_> {
59-
fn get_client(&mut self) -> anyhow::Result<Option<Client>> {
61+
async fn get_client(&mut self) -> anyhow::Result<Option<Client>> {
6062
Ok(self.guard.clients.get(&self.client_id).cloned())
6163
}
6264

63-
fn new_client(&mut self, latest_version_id: Uuid) -> anyhow::Result<()> {
65+
async fn new_client(&mut self, latest_version_id: Uuid) -> anyhow::Result<()> {
6466
if self.guard.clients.contains_key(&self.client_id) {
6567
return Err(anyhow::anyhow!("Client {} already exists", self.client_id));
6668
}
@@ -75,7 +77,7 @@ impl StorageTxn for InnerTxn<'_> {
7577
Ok(())
7678
}
7779

78-
fn set_snapshot(&mut self, snapshot: Snapshot, data: Vec<u8>) -> anyhow::Result<()> {
80+
async fn set_snapshot(&mut self, snapshot: Snapshot, data: Vec<u8>) -> anyhow::Result<()> {
7981
let client = self
8082
.guard
8183
.clients
@@ -87,7 +89,7 @@ impl StorageTxn for InnerTxn<'_> {
8789
Ok(())
8890
}
8991

90-
fn get_snapshot_data(&mut self, version_id: Uuid) -> anyhow::Result<Option<Vec<u8>>> {
92+
async fn get_snapshot_data(&mut self, version_id: Uuid) -> anyhow::Result<Option<Vec<u8>>> {
9193
// sanity check
9294
let client = self.guard.clients.get(&self.client_id);
9395
let client = client.ok_or_else(|| anyhow::anyhow!("no such client"))?;
@@ -97,7 +99,7 @@ impl StorageTxn for InnerTxn<'_> {
9799
Ok(self.guard.snapshots.get(&self.client_id).cloned())
98100
}
99101

100-
fn get_version_by_parent(
102+
async fn get_version_by_parent(
101103
&mut self,
102104
parent_version_id: Uuid,
103105
) -> anyhow::Result<Option<Version>> {
@@ -116,15 +118,15 @@ impl StorageTxn for InnerTxn<'_> {
116118
}
117119
}
118120

119-
fn get_version(&mut self, version_id: Uuid) -> anyhow::Result<Option<Version>> {
121+
async fn get_version(&mut self, version_id: Uuid) -> anyhow::Result<Option<Version>> {
120122
Ok(self
121123
.guard
122124
.versions
123125
.get(&(self.client_id, version_id))
124126
.cloned())
125127
}
126128

127-
fn add_version(
129+
async fn add_version(
128130
&mut self,
129131
version_id: Uuid,
130132
parent_version_id: Uuid,
@@ -174,7 +176,7 @@ impl StorageTxn for InnerTxn<'_> {
174176
Ok(())
175177
}
176178

177-
fn commit(&mut self) -> anyhow::Result<()> {
179+
async fn commit(&mut self) -> anyhow::Result<()> {
178180
self.committed = true;
179181
Ok(())
180182
}
@@ -193,32 +195,33 @@ mod test {
193195
use super::*;
194196
use chrono::Utc;
195197

196-
#[test]
197-
fn test_get_client_empty() -> anyhow::Result<()> {
198+
#[tokio::test]
199+
async fn test_get_client_empty() -> anyhow::Result<()> {
198200
let storage = InMemoryStorage::new();
199-
let mut txn = storage.txn(Uuid::new_v4())?;
200-
let maybe_client = txn.get_client()?;
201+
let mut txn = storage.txn(Uuid::new_v4()).await?;
202+
let maybe_client = txn.get_client().await?;
201203
assert!(maybe_client.is_none());
202204
Ok(())
203205
}
204206

205-
#[test]
206-
fn test_client_storage() -> anyhow::Result<()> {
207+
#[tokio::test]
208+
async fn test_client_storage() -> anyhow::Result<()> {
207209
let storage = InMemoryStorage::new();
208210
let client_id = Uuid::new_v4();
209-
let mut txn = storage.txn(client_id)?;
211+
let mut txn = storage.txn(client_id).await?;
210212

211213
let latest_version_id = Uuid::new_v4();
212-
txn.new_client(latest_version_id)?;
214+
txn.new_client(latest_version_id).await?;
213215

214-
let client = txn.get_client()?.unwrap();
216+
let client = txn.get_client().await?.unwrap();
215217
assert_eq!(client.latest_version_id, latest_version_id);
216218
assert!(client.snapshot.is_none());
217219

218220
let latest_version_id = Uuid::new_v4();
219-
txn.add_version(latest_version_id, Uuid::new_v4(), vec![1, 1])?;
221+
txn.add_version(latest_version_id, Uuid::new_v4(), vec![1, 1])
222+
.await?;
220223

221-
let client = txn.get_client()?.unwrap();
224+
let client = txn.get_client().await?.unwrap();
222225
assert_eq!(client.latest_version_id, latest_version_id);
223226
assert!(client.snapshot.is_none());
224227

@@ -227,113 +230,116 @@ mod test {
227230
timestamp: Utc::now(),
228231
versions_since: 4,
229232
};
230-
txn.set_snapshot(snap.clone(), vec![1, 2, 3])?;
233+
txn.set_snapshot(snap.clone(), vec![1, 2, 3]).await?;
231234

232-
let client = txn.get_client()?.unwrap();
235+
let client = txn.get_client().await?.unwrap();
233236
assert_eq!(client.latest_version_id, latest_version_id);
234237
assert_eq!(client.snapshot.unwrap(), snap);
235238

236-
txn.commit()?;
239+
txn.commit().await?;
237240
Ok(())
238241
}
239242

240-
#[test]
241-
fn test_gvbp_empty() -> anyhow::Result<()> {
243+
#[tokio::test]
244+
async fn test_gvbp_empty() -> anyhow::Result<()> {
242245
let storage = InMemoryStorage::new();
243246
let client_id = Uuid::new_v4();
244-
let mut txn = storage.txn(client_id)?;
245-
let maybe_version = txn.get_version_by_parent(Uuid::new_v4())?;
247+
let mut txn = storage.txn(client_id).await?;
248+
let maybe_version = txn.get_version_by_parent(Uuid::new_v4()).await?;
246249
assert!(maybe_version.is_none());
247250
Ok(())
248251
}
249252

250-
#[test]
251-
fn test_add_version_and_get_version() -> anyhow::Result<()> {
253+
#[tokio::test]
254+
async fn test_add_version_and_get_version() -> anyhow::Result<()> {
252255
let storage = InMemoryStorage::new();
253256
let client_id = Uuid::new_v4();
254-
let mut txn = storage.txn(client_id)?;
257+
let mut txn = storage.txn(client_id).await?;
255258

256259
let version_id = Uuid::new_v4();
257260
let parent_version_id = Uuid::new_v4();
258261
let history_segment = b"abc".to_vec();
259262

260-
txn.new_client(parent_version_id)?;
261-
txn.add_version(version_id, parent_version_id, history_segment.clone())?;
263+
txn.new_client(parent_version_id).await?;
264+
txn.add_version(version_id, parent_version_id, history_segment.clone())
265+
.await?;
262266

263267
let expected = Version {
264268
version_id,
265269
parent_version_id,
266270
history_segment,
267271
};
268272

269-
let version = txn.get_version_by_parent(parent_version_id)?.unwrap();
273+
let version = txn.get_version_by_parent(parent_version_id).await?.unwrap();
270274
assert_eq!(version, expected);
271275

272-
let version = txn.get_version(version_id)?.unwrap();
276+
let version = txn.get_version(version_id).await?.unwrap();
273277
assert_eq!(version, expected);
274278

275-
txn.commit()?;
279+
txn.commit().await?;
276280
Ok(())
277281
}
278282

279-
#[test]
280-
fn test_add_version_exists() -> anyhow::Result<()> {
283+
#[tokio::test]
284+
async fn test_add_version_exists() -> anyhow::Result<()> {
281285
let storage = InMemoryStorage::new();
282286
let client_id = Uuid::new_v4();
283-
let mut txn = storage.txn(client_id)?;
287+
let mut txn = storage.txn(client_id).await?;
284288

285289
let version_id = Uuid::new_v4();
286290
let parent_version_id = Uuid::new_v4();
287291
let history_segment = b"abc".to_vec();
288292

289-
txn.new_client(parent_version_id)?;
290-
txn.add_version(version_id, parent_version_id, history_segment.clone())?;
293+
txn.new_client(parent_version_id).await?;
294+
txn.add_version(version_id, parent_version_id, history_segment.clone())
295+
.await?;
291296
assert!(txn
292297
.add_version(version_id, parent_version_id, history_segment.clone())
298+
.await
293299
.is_err());
294-
txn.commit()?;
300+
txn.commit().await?;
295301
Ok(())
296302
}
297303

298-
#[test]
299-
fn test_snapshots() -> anyhow::Result<()> {
304+
#[tokio::test]
305+
async fn test_snapshots() -> anyhow::Result<()> {
300306
let storage = InMemoryStorage::new();
301307
let client_id = Uuid::new_v4();
302-
let mut txn = storage.txn(client_id)?;
308+
let mut txn = storage.txn(client_id).await?;
303309

304-
txn.new_client(Uuid::new_v4())?;
305-
assert!(txn.get_client()?.unwrap().snapshot.is_none());
310+
txn.new_client(Uuid::new_v4()).await?;
311+
assert!(txn.get_client().await?.unwrap().snapshot.is_none());
306312

307313
let snap = Snapshot {
308314
version_id: Uuid::new_v4(),
309315
timestamp: Utc::now(),
310316
versions_since: 3,
311317
};
312-
txn.set_snapshot(snap.clone(), vec![9, 8, 9])?;
318+
txn.set_snapshot(snap.clone(), vec![9, 8, 9]).await?;
313319

314320
assert_eq!(
315-
txn.get_snapshot_data(snap.version_id)?.unwrap(),
321+
txn.get_snapshot_data(snap.version_id).await?.unwrap(),
316322
vec![9, 8, 9]
317323
);
318-
assert_eq!(txn.get_client()?.unwrap().snapshot, Some(snap));
324+
assert_eq!(txn.get_client().await?.unwrap().snapshot, Some(snap));
319325

320326
let snap2 = Snapshot {
321327
version_id: Uuid::new_v4(),
322328
timestamp: Utc::now(),
323329
versions_since: 10,
324330
};
325-
txn.set_snapshot(snap2.clone(), vec![0, 2, 4, 6])?;
331+
txn.set_snapshot(snap2.clone(), vec![0, 2, 4, 6]).await?;
326332

327333
assert_eq!(
328-
txn.get_snapshot_data(snap2.version_id)?.unwrap(),
334+
txn.get_snapshot_data(snap2.version_id).await?.unwrap(),
329335
vec![0, 2, 4, 6]
330336
);
331-
assert_eq!(txn.get_client()?.unwrap().snapshot, Some(snap2));
337+
assert_eq!(txn.get_client().await?.unwrap().snapshot, Some(snap2));
332338

333339
// check that mismatched version is detected
334-
assert!(txn.get_snapshot_data(Uuid::new_v4()).is_err());
340+
assert!(txn.get_snapshot_data(Uuid::new_v4()).await.is_err());
335341

336-
txn.commit()?;
342+
txn.commit().await?;
337343
Ok(())
338344
}
339345
}

0 commit comments

Comments
 (0)