Skip to content

Commit f214257

Browse files
committed
refactor(SyncEngine): introduce a TransportTrait: Sink<ClientMsg> + Stream<ServerMsg> and use that
This makes it easier to do testing (hopefully).
1 parent 1d6eb6d commit f214257

18 files changed

+270
-261
lines changed

shared/src/sync_engine/changes.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::sync_engine::websocket_updates::typed_transport::TypedTransportTrait;
21
use std::collections::hash_map::Entry;
32
use std::collections::HashMap;
43
use typesafe_idb::{ReadOnly, Store};
@@ -10,6 +9,7 @@ use crate::types::issue_comment::{IssueComment, IssueCommentId};
109
use crate::types::label::{Label, LabelId};
1110

1211
use super::optimistic::db::{DbWithOptimisticChanges, TxnBuilderWithOptimisticChanges, TxnWithOptimisticChanges};
12+
use super::websocket_updates::transport::TransportTrait;
1313
use super::{
1414
super::types::{
1515
github_app::{GithubApp, GithubAppId},
@@ -425,7 +425,7 @@ where
425425
}
426426
}
427427

428-
impl<W: TypedTransportTrait, GithubApi> SyncEngine<W, GithubApi> {
428+
impl<W: TransportTrait, GithubApi> SyncEngine<W, GithubApi> {
429429
pub async fn persist_changes<
430430
Marker: StoreMarkersForChanges,
431431
Mode: TxnMode<SupportsReadOnly = Present, SupportsReadWrite = Present>,
@@ -457,7 +457,7 @@ impl<W: TypedTransportTrait, GithubApi> SyncEngine<W, GithubApi> {
457457
}
458458
}
459459

460-
async fn persist_changes_to_issues<W: TypedTransportTrait, Marker, Mode>(
460+
async fn persist_changes_to_issues<W: TransportTrait, Marker, Mode>(
461461
txn: &TxnWithOptimisticChanges<Marker, Mode>,
462462
issues: HashMap<IssueId, ExistingOrDeleted<Issue>>,
463463
) -> SyncResult<(), W>
@@ -485,7 +485,7 @@ where
485485
Ok(())
486486
}
487487

488-
async fn persist_changes_to_issue_comments<W: TypedTransportTrait, Marker, Mode>(
488+
async fn persist_changes_to_issue_comments<W: TransportTrait, Marker, Mode>(
489489
txn: &TxnWithOptimisticChanges<Marker, Mode>,
490490
issue_comments: HashMap<IssueCommentId, ExistingOrDeleted<IssueComment>>,
491491
) -> SyncResult<(), W>
@@ -514,7 +514,7 @@ where
514514
Ok(())
515515
}
516516

517-
async fn persist_changes_to_github_apps<W: TypedTransportTrait, Marker, Mode>(
517+
async fn persist_changes_to_github_apps<W: TransportTrait, Marker, Mode>(
518518
txn: &TxnWithOptimisticChanges<Marker, Mode>,
519519
github_apps: HashMap<GithubAppId, ExistingOrDeleted<GithubApp>>,
520520
) -> SyncResult<(), W>
@@ -541,7 +541,7 @@ where
541541
Ok(())
542542
}
543543

544-
async fn persist_changes_to_users<W: TypedTransportTrait, Marker, Mode>(
544+
async fn persist_changes_to_users<W: TransportTrait, Marker, Mode>(
545545
txn: &TxnWithOptimisticChanges<Marker, Mode>,
546546
users: HashMap<UserId, ExistingOrDeleted<User>>,
547547
) -> SyncResult<(), W>
@@ -569,7 +569,7 @@ where
569569
Ok(())
570570
}
571571

572-
async fn persist_changes_to_licenses<W: TypedTransportTrait, Marker, Mode>(
572+
async fn persist_changes_to_licenses<W: TransportTrait, Marker, Mode>(
573573
txn: &TxnWithOptimisticChanges<Marker, Mode>,
574574
licenses: HashMap<LicenseId, ExistingOrDeleted<License>>,
575575
) -> SyncResult<(), W>
@@ -597,7 +597,7 @@ where
597597
Ok(())
598598
}
599599

600-
async fn persist_changes_to_milestones<W: TypedTransportTrait, Marker, Mode>(
600+
async fn persist_changes_to_milestones<W: TransportTrait, Marker, Mode>(
601601
txn: &TxnWithOptimisticChanges<Marker, Mode>,
602602
milestones: HashMap<MilestoneId, ExistingOrDeleted<Milestone>>,
603603
) -> SyncResult<(), W>
@@ -625,7 +625,7 @@ where
625625
Ok(())
626626
}
627627

628-
async fn persist_changes_to_repositorys<W: TypedTransportTrait, Marker, Mode>(
628+
async fn persist_changes_to_repositorys<W: TransportTrait, Marker, Mode>(
629629
txn: &TxnWithOptimisticChanges<Marker, Mode>,
630630
repositorys: HashMap<RepositoryId, ExistingOrDeleted<Repository>>,
631631
) -> SyncResult<(), W>
@@ -653,7 +653,7 @@ where
653653
Ok(())
654654
}
655655

656-
async fn upsert_labels<W: TypedTransportTrait, Marker, Mode>(
656+
async fn upsert_labels<W: TransportTrait, Marker, Mode>(
657657
txn: &TxnWithOptimisticChanges<Marker, Mode>,
658658
labels: HashMap<LabelId, ExistingOrDeleted<Label>>,
659659
) -> SyncResult<(), W>

shared/src/sync_engine/error.rs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ use std::{fmt::Debug, panic::Location};
44
use super::{
55
conversions::conversion_error::ConversionError,
66
optimistic::db::Error,
7-
websocket_updates::typed_transport::{TypedTransportError, TypedTransportTrait},
7+
websocket_updates::transport::TransportTrait,
88
};
99

10-
pub enum SyncErrorSrc<T: TypedTransportTrait> {
10+
pub enum SyncErrorSrc<T: TransportTrait> {
1111
OwnApi(OwnApiError),
1212
Github(github_api::simple_error::SimpleError),
1313
Db(Error),
@@ -18,11 +18,11 @@ pub enum SyncErrorSrc<T: TypedTransportTrait> {
1818
Ewebsock(ewebsock::Error),
1919
/// These are things like: the user that owns a repository in our db not existing in our db.
2020
DataModel(String),
21-
WebSocket(TypedTransportError<T::ConnError>),
21+
Transport(T::TransportError),
2222
NotAvailable(NotAvailableError)
2323
}
2424

25-
impl<T: TypedTransportTrait> Debug for SyncErrorSrc<T> {
25+
impl<T: TransportTrait> Debug for SyncErrorSrc<T> {
2626
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2727
match self {
2828
SyncErrorSrc::OwnApi(err) => write!(f, "SyncErrorSrc::OwnApi({:?})", err),
@@ -34,13 +34,13 @@ impl<T: TypedTransportTrait> Debug for SyncErrorSrc<T> {
3434
SyncErrorSrc::Merge(err) => write!(f, "SyncErrorSrc::Merge({:?})", err),
3535
SyncErrorSrc::Ewebsock(err) => write!(f, "SyncErrorSrc::Ewebsock({:?})", err),
3636
SyncErrorSrc::DataModel(msg) => write!(f, "SyncErrorSrc::DataModel({})", msg),
37-
SyncErrorSrc::WebSocket(err) => write!(f, "SyncErrorSrc::WebSocket({:?})", err),
37+
SyncErrorSrc::Transport(err) => write!(f, "SyncErrorSrc::WebSocket({:?})", err),
3838
SyncErrorSrc::NotAvailable(err) => write!(f, "SyncErrorSrc::NotAvailable({:?})", err),
3939
}
4040
}
4141
}
4242

43-
impl<T: TypedTransportTrait> From<SyncErrorSrc<T>> for SyncError<T> {
43+
impl<T: TransportTrait> From<SyncErrorSrc<T>> for SyncError<T> {
4444
#[track_caller]
4545
fn from(value: SyncErrorSrc<T>) -> Self {
4646
Self {
@@ -51,12 +51,12 @@ impl<T: TypedTransportTrait> From<SyncErrorSrc<T>> for SyncError<T> {
5151
}
5252

5353
#[allow(dead_code)]
54-
pub struct SyncError<T: TypedTransportTrait> {
54+
pub struct SyncError<T: TransportTrait> {
5555
source: SyncErrorSrc<T>,
5656
location: &'static Location<'static>
5757
}
5858

59-
impl<T: TypedTransportTrait> Debug for SyncError<T> {
59+
impl<T: TransportTrait> Debug for SyncError<T> {
6060
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
6161
f.debug_struct("SyncError")
6262
.field("source", &self.source)
@@ -65,7 +65,7 @@ impl<T: TypedTransportTrait> Debug for SyncError<T> {
6565
}
6666
}
6767

68-
impl<T: TypedTransportTrait> From<ConversionError> for SyncError<T> {
68+
impl<T: TransportTrait> From<ConversionError> for SyncError<T> {
6969
fn from(value: ConversionError) -> Self {
7070
match value {
7171
ConversionError::Merge(merge_error) => SyncErrorSrc::Merge(merge_error),
@@ -76,7 +76,7 @@ impl<T: TypedTransportTrait> From<ConversionError> for SyncError<T> {
7676
}
7777
}
7878

79-
impl<T: TypedTransportTrait> SyncError<T> {
79+
impl<T: TransportTrait> SyncError<T> {
8080
/// We don't derive `From<>` because a `String` might accidentally get converted (which is what
8181
/// `ewebsock::Error` really is).
8282
pub fn from_ewebsock(error: ewebsock::Error) -> Self {
@@ -86,48 +86,42 @@ impl<T: TypedTransportTrait> SyncError<T> {
8686

8787
pub type SyncResult<T, W> = Result<T, SyncError<W>>;
8888

89-
impl<W: TypedTransportTrait, T> From<github_api::apis::Error<T>> for SyncError<W> {
89+
impl<W: TransportTrait, T> From<github_api::apis::Error<T>> for SyncError<W> {
9090
#[track_caller]
9191
fn from(value: github_api::apis::Error<T>) -> Self {
9292
SyncErrorSrc::Github(value.into()).into()
9393
}
9494
}
9595

96-
impl<W: TypedTransportTrait> From<TypedTransportError<W::ConnError>> for SyncError<W> {
97-
#[track_caller]
98-
fn from(value: TypedTransportError<W::ConnError>) -> Self {
99-
SyncErrorSrc::WebSocket(value).into()
100-
}
101-
}
10296

103-
impl<W: TypedTransportTrait> From<OwnApiError> for SyncError<W> {
97+
impl<W: TransportTrait> From<OwnApiError> for SyncError<W> {
10498
#[track_caller]
10599
fn from(value: OwnApiError) -> Self {
106100
SyncErrorSrc::OwnApi(value).into()
107101
}
108102
}
109103

110-
impl<W: TypedTransportTrait> From<Error> for SyncError<W> {
104+
impl<W: TransportTrait> From<Error> for SyncError<W> {
111105
#[track_caller]
112106
fn from(value: Error) -> Self {
113107
SyncErrorSrc::Db(value).into()
114108
}
115109
}
116110

117-
impl<W: TypedTransportTrait> From<serde_json::Error> for SyncError<W> {
111+
impl<W: TransportTrait> From<serde_json::Error> for SyncError<W> {
118112
#[track_caller]
119113
fn from(value: serde_json::Error) -> Self {
120114
SyncErrorSrc::SerdeToString(value).into()
121115
}
122116
}
123-
impl<W: TypedTransportTrait> From<MergeError> for SyncError<W> {
117+
impl<W: TransportTrait> From<MergeError> for SyncError<W> {
124118
#[track_caller]
125119
fn from(value: MergeError) -> Self {
126120
SyncErrorSrc::Merge(value).into()
127121
}
128122
}
129123

130-
impl<W: TypedTransportTrait> From<NotAvailableError> for SyncError<W> {
124+
impl<W: TransportTrait> From<NotAvailableError> for SyncError<W> {
131125
#[track_caller]
132126
fn from(value: NotAvailableError) -> Self {
133127
SyncErrorSrc::NotAvailable(value).into()

shared/src/sync_engine/initial_sync/ensure_initial_sync_issue_comments.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use super::super::{
1010
SyncEngine, MAX_PER_PAGE,
1111
};
1212
use crate::{
13-
sync_engine::websocket_updates::typed_transport::TypedTransportTrait,
13+
sync_engine::websocket_updates::transport::TransportTrait,
1414
types::{
1515
installation::InstallationId,
1616
issue::{Issue, IssueId, NumberIndex},
@@ -22,7 +22,7 @@ use crate::{
2222
},
2323
};
2424

25-
impl<W: TypedTransportTrait, GithubApi: GithubApiTrait> SyncEngine<W, GithubApi> {
25+
impl<W: TransportTrait, GithubApi: GithubApiTrait> SyncEngine<W, GithubApi> {
2626
/// This function will try to find issue ids in the db by using the issue number in `issue_url`
2727
/// of issue_comment`.
2828
pub async fn ensure_initial_sync_issue_comments(

shared/src/sync_engine/initial_sync/ensure_initial_sync_issues.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use super::super::{
44
changes::{AddChanges, Changes},
55
conversions::ToDb,
66
error::{SyncErrorSrc, SyncResult},
7-
SyncEngine, TypedTransportTrait, MAX_PER_PAGE,
7+
SyncEngine, MAX_PER_PAGE,
88
};
99
use crate::{
1010
avail::MergeError,
@@ -17,8 +17,9 @@ use crate::{
1717
},
1818
};
1919
use github_api::github_api_trait::GithubApiTrait;
20+
use crate::sync_engine::websocket_updates::transport::TransportTrait;
2021

21-
impl<W: TypedTransportTrait, GithubApi: GithubApiTrait> SyncEngine<W, GithubApi> {
22+
impl<W: TransportTrait, GithubApi: GithubApiTrait> SyncEngine<W, GithubApi> {
2223
pub async fn ensure_initial_sync_issues(
2324
&self,
2425
id: &RepositoryId,

shared/src/sync_engine/initial_sync/ensure_initial_sync_repository.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use github_api::github_api_trait::GithubApiTrait;
22

3-
use crate::{sync_engine::websocket_updates::typed_transport, types::{
3+
use crate::{sync_engine::websocket_updates::transport::TransportTrait, types::{
44
repository::{Repository, RepositoryId},
55
repository_initial_sync_status::{RepoSyncStatus, RepositoryInitialSyncStatus},
66
}};
77

88
use super::super::{error::SyncResult, SyncEngine};
99

10-
impl<T: typed_transport::TypedTransportTrait, GithubApi: GithubApiTrait> SyncEngine<T, GithubApi> {
10+
impl<T: TransportTrait, GithubApi: GithubApiTrait> SyncEngine<T, GithubApi> {
1111
/// `force_initial_sync` means we ignore the RepositoryInitialSyncStatus. This will come into
1212
/// play when we implement the "if the last time we were in touch is less than 7 days, do a
1313
/// full resync."

shared/src/sync_engine/initial_sync/fetch_repositorys_for_installation_id.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use futures::future::try_join_all;
22
use github_api::github_api_trait::GithubApiTrait;
33

44
use crate::{
5-
avail::MergeError, sync_engine::websocket_updates::typed_transport::TypedTransportTrait,
5+
avail::MergeError, sync_engine::websocket_updates::transport::TransportTrait,
66
types::installation::InstallationId,
77
};
88

@@ -12,7 +12,7 @@ use super::super::{
1212
SyncEngine, SyncResult, MAX_PER_PAGE,
1313
};
1414

15-
impl<W: TypedTransportTrait, GithubApi: GithubApiTrait> SyncEngine<W, GithubApi> {
15+
impl<W: TransportTrait, GithubApi: GithubApiTrait> SyncEngine<W, GithubApi> {
1616
pub async fn fetch_repositorys_for_installation_id(
1717
&self,
1818
id: &InstallationId,

shared/src/sync_engine/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use optimistic::db::{DbWithOptimisticChanges, ReactivityTrackers};
22
use registry::Registry;
33
use send_wrapper::SendWrapper;
44
use std::{marker::PhantomData, sync::Arc};
5-
pub use websocket_updates::typed_transport::*;
5+
pub use websocket_updates::transport::*;
66
mod conversions;
77
mod initial_sync;
88

@@ -12,7 +12,7 @@ pub mod mutations;
1212
pub mod optimistic;
1313
mod registry;
1414
pub mod storage_traits;
15-
mod websocket_updates;
15+
pub mod websocket_updates;
1616

1717
use std::{cmp::Ordering, rc::Rc};
1818

@@ -49,7 +49,7 @@ pub struct SyncEngine<Transport, GithubApi> {
4949

5050
const MAX_PER_PAGE: i32 = 100;
5151

52-
impl<W: TypedTransportTrait, GithubApi> SyncEngine<W, GithubApi> {
52+
impl<W: TransportTrait, GithubApi> SyncEngine<W, GithubApi> {
5353
async fn get_api_conf(
5454
&self,
5555
id: &InstallationId,

shared/src/sync_engine/mutations/issues.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@ use jiff::Timestamp;
44
use crate::{
55
random::random, sync_engine::{
66
error::{SyncError, SyncErrorSrc},
7-
SyncEngine, TypedTransportTrait,
7+
SyncEngine,
88
}, types::{
99
installation::InstallationId,
1010
issue::{Issue, IssueId},
1111
repository::{Repository, RepositoryId},
1212
user::{User, UserId},
13-
}
13+
}, sync_engine::websocket_updates::transport::TransportTrait,
1414
};
1515

16-
impl<T: TypedTransportTrait, GithubApi: GithubApiTrait> SyncEngine<T, GithubApi> {
16+
impl<T: TransportTrait, GithubApi: GithubApiTrait> SyncEngine<T, GithubApi> {
1717
/// Returns the optimistic id of the issue.
1818
///
1919
/// Invariant upheld: The issue number and id will be a negative number for the optimistic issue.

shared/src/sync_engine/new_defn.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use typesafe_idb::{StoreMarker, TypesafeDb};
2222

2323
use super::optimistic::db::DbWithOptimisticChanges;
2424
use super::registry::Registry;
25-
use super::websocket_updates::typed_transport::TypedTransportTrait;
25+
use super::websocket_updates::transport::TransportTrait;
2626
use super::DbSubscription;
2727
use super::{error::SyncResult, SyncEngine};
2828

@@ -41,7 +41,7 @@ pub type DbStoreMarkers = impl StoreMarker<IssueCommentsInitialSyncStatus>
4141
+ StoreMarker<LastWebhookUpdateAt>
4242
+ Default;
4343

44-
impl<W: TypedTransportTrait, GithubApi> SyncEngine<W, GithubApi> {
44+
impl<W: TransportTrait, GithubApi> SyncEngine<W, GithubApi> {
4545
pub async fn new(endpoint_client: EndpointClient) -> SyncResult<Self, W> {
4646
let db = TypesafeDb::builder("heimisch".into())
4747
.with_store::<Issue>()

shared/src/sync_engine/optimistic/db/tests.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use wasm_bindgen_test::wasm_bindgen_test;
99
use crate::{
1010
endpoints::endpoint_client::EndpointClient,
1111
sync_engine::{
12-
websocket_updates::tests::MockTypedTransport, DbStoreMarkers, DbSubscription, SyncEngine,
12+
websocket_updates::tests::MockTransport, DbStoreMarkers, DbSubscription, SyncEngine,
1313
},
1414
types::{
1515
issue::{Issue, RepositoryIdIndex},
@@ -19,8 +19,8 @@ use crate::{
1919

2020
use super::{TxnBuilderWithOptimisticChanges, TxnWithOptimisticChanges};
2121

22-
async fn get_sync_engine() -> SyncEngine<MockTypedTransport, ()> {
23-
SyncEngine::<MockTypedTransport, ()>::new(EndpointClient::new(
22+
async fn get_sync_engine() -> SyncEngine<MockTransport, ()> {
23+
SyncEngine::<MockTransport, ()>::new(EndpointClient::new(
2424
|_| (),
2525
Url::parse("https://www.example.com/").unwrap(),
2626
))

0 commit comments

Comments
 (0)