Skip to content

Commit 24bf773

Browse files
committed
refactor(SyncEngine): make it easier to test
1 parent c9384c7 commit 24bf773

File tree

13 files changed

+183
-132
lines changed

13 files changed

+183
-132
lines changed

rustc-ice-2025-02-13T11_42_50-54470.txt

Lines changed: 73 additions & 0 deletions
Large diffs are not rendered by default.

shared/src/retry.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
use std::fmt::Debug;
1+
use std::{fmt::Debug, future::Future};
22

33
/// # Panics
44
///
55
/// Panics if `n` = 0.
6-
pub async fn try_n_times<T, E: Debug>(
7-
func: impl AsyncFn() -> Result<T, E>,
8-
mut n: usize,
9-
) -> Result<T, E> {
6+
pub async fn try_n_times<Fut, T, E>(func: impl Fn() -> Fut, mut n: usize) -> Result<T, E>
7+
where
8+
Fut: Future<Output = Result<T, E>>,
9+
E: Debug,
10+
{
1011
if n == 0 {
1112
panic!("try_n_times() received a non-positive integer.");
1213
}

shared/src/sync_engine/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use optimistic::db::{DbWithOptimisticChanges, ReactivityTrackers};
22
use registry::Registry;
33
use send_wrapper::SendWrapper;
4-
use std::{marker::PhantomData, sync::Arc};
4+
use url::Url;
5+
use std::{future::Future, pin::Pin, sync::Arc};
56
pub use websocket_updates::transport::*;
67
mod conversions;
78
mod initial_sync;
@@ -40,11 +41,12 @@ mod new_defn;
4041

4142
pub use new_defn::DbStoreMarkers;
4243

43-
pub struct SyncEngine<Transport, GithubApi> {
44+
pub struct SyncEngine<Transport: TransportTrait, GithubApi> {
4445
pub db: Rc<DbWithOptimisticChanges<DbStoreMarkers>>,
4546
pub db_subscriptions: SendWrapper<Rc<Registry<DbSubscription>>>,
4647
endpoint_client: EndpointClient,
47-
_transport: PhantomData<(Transport, GithubApi)>,
48+
_github_api: GithubApi,
49+
make_transport: Rc<dyn Fn(Url) -> Pin<Box<dyn Future<Output = Result<Transport, Transport::TransportError>>>>>,
4850
}
4951

5052
const MAX_PER_PAGE: i32 = 100;

shared/src/sync_engine/new_defn.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
use std::future::Future;
2+
use std::pin::Pin;
13
/// Without this isolation, our `impl` definition for the `DbStoreMarkers` type will not have one
24
/// "defining use."
3-
4-
use std::marker::PhantomData;
55
use std::rc::Rc;
66

77
use crate::types::label::Label;
@@ -19,6 +19,7 @@ use crate::{
1919
};
2020
use send_wrapper::SendWrapper;
2121
use typesafe_idb::{StoreMarker, TypesafeDb};
22+
use url::Url;
2223

2324
use super::optimistic::db::DbWithOptimisticChanges;
2425
use super::registry::Registry;
@@ -41,8 +42,19 @@ pub type DbStoreMarkers = impl StoreMarker<IssueCommentsInitialSyncStatus>
4142
+ StoreMarker<LastWebhookUpdateAt>
4243
+ Default;
4344

44-
impl<W: TransportTrait, GithubApi> SyncEngine<W, GithubApi> {
45-
pub async fn new(endpoint_client: EndpointClient) -> SyncResult<Self, W> {
45+
impl<Transport: TransportTrait, GithubApi> SyncEngine<Transport, GithubApi> {
46+
pub async fn new<F, Fut>(
47+
endpoint_client: EndpointClient,
48+
make_transport: F,
49+
github_api: GithubApi,
50+
) -> SyncResult<Self, Transport>
51+
where
52+
F: Fn(Url) -> Fut + 'static,
53+
Fut: Future<Output = Result<Transport, Transport::TransportError>> + 'static,
54+
{
55+
// Convert the nice generic function into the boxed version we need internally
56+
let make_transport = Rc::new(move |url| Box::pin(make_transport(url)) as Pin<Box<dyn Future<Output = _>>>);
57+
4658
let db = TypesafeDb::builder("heimisch".into())
4759
.with_store::<Issue>()
4860
.with_store::<User>()
@@ -72,14 +84,15 @@ impl<W: TransportTrait, GithubApi> SyncEngine<W, GithubApi> {
7284
})
7385
.for_each(|sub| (sub.func)());
7486
}),
75-
).await?;
87+
)
88+
.await?;
7689

7790
Ok(Self {
7891
db: Rc::new(db),
7992
db_subscriptions: SendWrapper::new(db_subscriptions),
8093
endpoint_client,
81-
_transport: PhantomData,
94+
_github_api: github_api,
95+
make_transport,
8296
})
8397
}
8498
}
85-

shared/src/sync_engine/optimistic/db/tests.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ use wasm_bindgen_test::wasm_bindgen_test;
88

99
use crate::{
1010
endpoints::endpoint_client::EndpointClient,
11-
sync_engine::{
12-
websocket_updates::tests::MockTransport, DbStoreMarkers, DbSubscription, SyncEngine,
13-
},
11+
sync_engine::{tests::MockTransport, DbStoreMarkers, DbSubscription, SyncEngine},
1412
types::{
1513
issue::{Issue, RepositoryIdIndex},
1614
repository::Repository,
@@ -20,10 +18,11 @@ use crate::{
2018
use super::{TxnBuilderWithOptimisticChanges, TxnWithOptimisticChanges};
2119

2220
async fn get_sync_engine() -> SyncEngine<MockTransport, ()> {
23-
SyncEngine::<MockTransport, ()>::new(EndpointClient::new(
24-
|_| (),
25-
Url::parse("https://www.example.com/").unwrap(),
26-
))
21+
SyncEngine::<MockTransport, ()>::new(
22+
EndpointClient::new(|_| (), Url::parse("https://www.example.com/").unwrap()),
23+
|_| async { Ok(MockTransport::new().0) },
24+
(),
25+
)
2726
.await
2827
.unwrap()
2928
}
@@ -263,7 +262,7 @@ pub async fn get_all_no_optimisim_create_overlapping() {
263262
.with_txn_2(async |txn| {
264263
txn.object_store::<Issue>()
265264
.unwrap()
266-
.create(Default::default(), async {Ok( Default::default()) });
265+
.create(Default::default(), async { Ok(Default::default()) });
267266
})
268267
.should_overlap(true)
269268
.call()

shared/src/sync_engine/websocket_updates/binary_transport.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,5 +67,5 @@ pub trait BinaryTransportTrait:
6767
type ConnError: Debug;
6868

6969
#[allow(async_fn_in_trait)]
70-
async fn establish_conn(url: &Url) -> Result<Self, Self::ConnError>;
70+
async fn establish_conn(url: Url) -> Result<Self, Self::ConnError>;
7171
}

shared/src/sync_engine/websocket_updates/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
pub mod applying_error;
22
pub mod binary_transport;
33

4-
#[cfg(test)]
5-
pub mod tests;
64
pub mod transport;
75

86
use applying_error::{ApplyingError, ApplyingResult};
@@ -48,7 +46,7 @@ where
4846
})
4947
.expect(""),
5048
));
51-
let websocket_conn = try_n_times(async || Transport::establish(&url).await, 3)
49+
let websocket_conn = try_n_times(|| (self.make_transport)(url.clone()), 3)
5250
.await
5351
.map_err(|e| SyncErrorSrc::Transport(e))?;
5452
pin_mut!(websocket_conn);

shared/src/sync_engine/websocket_updates/tests.rs

Lines changed: 0 additions & 68 deletions
This file was deleted.

shared/src/sync_engine/websocket_updates/transport.rs

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
use codee::{Decoder, Encoder};
2-
use futures::{Sink, Stream};
3-
use pin_project::pin_project;
4-
use std::fmt::Debug;
5-
use std::task::{ready, Poll};
6-
use url::Url;
7-
81
use crate::{
92
endpoints::defns::api::websocket_updates::{ClientMsg, ServerMsg},
103
sync_engine::websocket_updates::binary_transport::{
114
BinaryTransportError, BinaryTransportTrait, ConnOrClosedError, JsonSerdeToBinaryCodec,
125
},
136
};
7+
use codee::{Decoder, Encoder};
8+
use futures::{Sink, Stream};
9+
use pin_project::pin_project;
10+
use std::fmt::Debug;
11+
use std::task::{ready, Poll};
12+
use url::Url;
1413

14+
#[derive(Clone)]
1515
#[pin_project]
1616
pub struct Transport<I> {
1717
#[pin]
@@ -100,18 +100,12 @@ pub trait TransportTrait:
100100
Sized + Sink<ClientMsg> + Stream<Item = Result<ServerMsg, Self::TransportError>>
101101
{
102102
type TransportError: Debug;
103-
104-
#[allow(async_fn_in_trait)]
105-
async fn establish(url: &Url) -> Result<Self, Self::TransportError>;
106103
}
107104

108-
impl<T> TransportTrait for Transport<T>
109-
where
110-
T: BinaryTransportTrait,
111-
{
112-
type TransportError = BinaryTransportError<<T as Sink<Vec<u8>>>::Error>;
113-
114-
async fn establish(url: &Url) -> Result<Self, Self::TransportError> {
105+
impl<T: BinaryTransportTrait> Transport<T> {
106+
pub async fn new(
107+
url: Url,
108+
) -> Result<Self, <Transport<T> as TransportTrait>::TransportError> {
115109
let inner = T::establish_conn(url)
116110
.await
117111
.map_err(BinaryTransportError::Conn)?;
@@ -120,45 +114,69 @@ where
120114
}
121115
}
122116

117+
impl<T> TransportTrait for Transport<T>
118+
where
119+
T: BinaryTransportTrait,
120+
{
121+
type TransportError = BinaryTransportError<<T as Sink<Vec<u8>>>::Error>;
122+
}
123+
123124
#[cfg(test)]
124125
pub mod tests {
125-
use std::{pin::Pin, task::{Context, Poll}};
126+
use std::{
127+
pin::Pin,
128+
task::{Context, Poll},
129+
};
126130

127131
use futures::{channel::mpsc, Sink, Stream};
128132

129133
use crate::endpoints::defns::api::websocket_updates::{ClientMsg, ServerMsg};
130134

135+
use super::TransportTrait;
136+
131137
pub struct MockTransportHandler {
132138
send: mpsc::Sender<ServerMsg>,
133139
recv: mpsc::Receiver<ClientMsg>,
134140
}
135141

136-
pub struct TestTransport {
142+
pub struct MockTransport {
137143
recv: mpsc::Receiver<ServerMsg>,
138144
send: mpsc::Sender<ClientMsg>,
139145
}
140146

141-
impl TestTransport {
147+
impl TransportTrait for MockTransport {
148+
type TransportError = mpsc::SendError;
149+
}
150+
151+
impl MockTransport {
142152
pub fn new() -> (Self, MockTransportHandler) {
143153
let (server_msg_sender, server_msg_receiver) = mpsc::channel(100);
144154
let (client_msg_sender, client_msg_receiver) = mpsc::channel(100);
145155

146156
(
147-
Self { recv: server_msg_receiver, send: client_msg_sender },
148-
MockTransportHandler { send: server_msg_sender, recv: client_msg_receiver },
157+
Self {
158+
recv: server_msg_receiver,
159+
send: client_msg_sender,
160+
},
161+
MockTransportHandler {
162+
send: server_msg_sender,
163+
recv: client_msg_receiver,
164+
},
149165
)
150166
}
151167
}
152168

153-
impl Stream for TestTransport {
169+
impl Stream for MockTransport {
154170
type Item = Result<ServerMsg, mpsc::SendError>;
155171

156172
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
157-
Pin::new(&mut self.recv).poll_next(cx).map(|opt| opt.map(Ok))
173+
Pin::new(&mut self.recv)
174+
.poll_next(cx)
175+
.map(|opt| opt.map(Ok))
158176
}
159177
}
160178

161-
impl Sink<ClientMsg> for TestTransport {
179+
impl Sink<ClientMsg> for MockTransport {
162180
type Error = mpsc::SendError;
163181

164182
fn poll_ready(
@@ -186,4 +204,4 @@ pub mod tests {
186204
Pin::new(&mut self.send).poll_close(cx)
187205
}
188206
}
189-
}
207+
}

web/src/app/mod.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::rc::Rc;
1717
use leptos::prelude::*;
1818

1919
use routing::Routed;
20+
use shared::sync_engine::Transport;
2021
use sync_engine_provider::SyncEngine;
2122

2223
use crate::consts::ENDPOINT_CLIENT;
@@ -25,9 +26,13 @@ use crate::consts::ENDPOINT_CLIENT;
2526
pub fn App() -> impl IntoView {
2627
let sync_engine = LocalResource::new(move || async move {
2728
Rc::new(
28-
SyncEngine::new(ENDPOINT_CLIENT.with(|e| e.clone()))
29-
.await
30-
.unwrap(),
29+
SyncEngine::new(
30+
ENDPOINT_CLIENT.with(|e| e.clone()),
31+
|url| async { Transport::new(url).await },
32+
github_api::github_api_trait::GithubApi,
33+
)
34+
.await
35+
.unwrap(),
3136
)
3237
});
3338

0 commit comments

Comments
 (0)