Skip to content
This repository was archived by the owner on Mar 23, 2021. It is now read-only.

Commit 9dbee00

Browse files
Tobin C. Hardingthomaseizinger
authored andcommitted
Load swaps from the database
Load each accepted swap from the database and: - Populate state store - Spawn Alice/Bob
1 parent 479ab9a commit 9dbee00

File tree

5 files changed

+96
-6
lines changed

5 files changed

+96
-6
lines changed

cnd/src/db/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub mod with_swap_types;
1414
embed_migrations!("./migrations");
1515

1616
pub use self::{
17+
load_swaps::*,
1718
save_message::{SaveMessage, SaveRfc003Messages},
1819
swap::*,
1920
swap_types::*,

cnd/src/db/swap_types.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ struct QueryableSwap {
110110
swap_id: Text<SwapId>,
111111
}
112112

113-
#[derive(Clone, Debug, PartialEq)]
113+
#[derive(Clone, Copy, Debug, PartialEq)]
114114
pub struct SwapTypes {
115115
pub alpha_ledger: LedgerKind,
116116
pub beta_ledger: LedgerKind,
@@ -119,7 +119,7 @@ pub struct SwapTypes {
119119
pub role: Role,
120120
}
121121

122-
#[derive(Debug, Clone, Display, EnumString, PartialEq)]
122+
#[derive(Debug, Clone, Copy, Display, EnumString, PartialEq)]
123123
pub enum LedgerKind {
124124
Bitcoin,
125125
Ethereum,
@@ -137,7 +137,7 @@ impl From<ledger::LedgerKind> for LedgerKind {
137137
}
138138
}
139139

140-
#[derive(Clone, Debug, Display, EnumString, PartialEq)]
140+
#[derive(Clone, Copy, Debug, Display, EnumString, PartialEq)]
141141
pub enum AssetKind {
142142
Bitcoin,
143143
Ether,

cnd/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub mod bitcoin;
2222
pub mod comit_api;
2323
pub mod config;
2424
pub mod http_api;
25+
pub mod load_swaps;
2526
pub mod logging;
2627
pub mod network;
2728
#[cfg(test)]

cnd/src/load_swaps.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#![allow(clippy::type_repetition_in_bounds)]
2+
use crate::{
3+
db::{AcceptedSwap, DetermineTypes, LoadAcceptedSwap, Retrieve, Sqlite},
4+
seed::Seed,
5+
swap_protocols::{
6+
ledger::LedgerConnectors,
7+
rfc003::{
8+
state_store::{InMemoryStateStore, StateStore},
9+
Spawn,
10+
},
11+
},
12+
};
13+
use futures::Stream;
14+
use std::sync::Arc;
15+
16+
#[allow(clippy::cognitive_complexity)]
17+
pub async fn load_swaps_from_database(
18+
ledger_events: LedgerConnectors,
19+
state_store: Arc<InMemoryStateStore>,
20+
seed: Seed,
21+
db: Sqlite,
22+
) -> anyhow::Result<()> {
23+
log::debug!("loading swaps from database ...");
24+
25+
for swap in db.all().await?.iter() {
26+
let swap_id = swap.swap_id;
27+
log::debug!("got swap from database: {}", swap_id);
28+
29+
let types = db.determine_types(&swap_id).await?;
30+
31+
with_swap_types!(types, {
32+
let accepted: Result<AcceptedSwap<AL, BL, AA, BA>, anyhow::Error> =
33+
db.load_accepted_swap(swap_id.clone()).await;
34+
35+
match accepted {
36+
Ok((request, accept)) => {
37+
match types.role {
38+
Role::Alice => {
39+
let state = alice::State::accepted(request.clone(), accept, seed);
40+
state_store.insert(swap_id, state);
41+
42+
let receiver = ledger_events.spawn(request, accept);
43+
44+
tokio::spawn(receiver.for_each({
45+
let state_store = state_store.clone();
46+
move |update| {
47+
state_store
48+
.update::<alice::State<AL, BL, AA, BA>>(&swap_id, update);
49+
Ok(())
50+
}
51+
}));
52+
}
53+
Role::Bob => {
54+
let state = bob::State::accepted(request.clone(), accept, seed);
55+
state_store.insert(swap_id, state);
56+
57+
let receiver = ledger_events.spawn(request, accept);
58+
59+
tokio::spawn(receiver.for_each({
60+
let state_store = state_store.clone();
61+
move |update| {
62+
state_store
63+
.update::<bob::State<AL, BL, AA, BA>>(&swap_id, update);
64+
Ok(())
65+
}
66+
}));
67+
}
68+
};
69+
}
70+
Err(e) => log::error!("failed to load swap: {}, continuing ...", e),
71+
};
72+
});
73+
}
74+
Ok(())
75+
}

cnd/src/main.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use cnd::{
77
config::{self, Settings},
88
db::{DetermineTypes, Retrieve, Save, SaveRfc003Messages, Sqlite},
99
http_api::{self, route_factory},
10+
load_swaps,
1011
network::{self, Network, SendRequest},
1112
seed::{Seed, SwapSeed},
1213
swap_protocols::{
@@ -18,6 +19,7 @@ use cnd::{
1819
},
1920
};
2021
use futures::{stream, Future, Stream};
22+
use futures_core::{FutureExt, TryFutureExt};
2123
use libp2p::{
2224
identity::{self, ed25519},
2325
PeerId, Swarm,
@@ -53,8 +55,6 @@ fn main() -> anyhow::Result<()> {
5355

5456
let mut runtime = tokio::runtime::Runtime::new()?;
5557

56-
let state_store = Arc::new(InMemoryStateStore::default());
57-
5858
let bitcoin_connector = {
5959
let config::Bitcoin { node_url, network } = settings.clone().bitcoin;
6060
BitcoindConnector::new(node_url, network)?
@@ -68,11 +68,24 @@ fn main() -> anyhow::Result<()> {
6868
ethereum_connector,
6969
};
7070

71+
let state_store = Arc::new(InMemoryStateStore::default());
72+
73+
let database = Sqlite::new(&settings.database.sqlite)?;
74+
7175
let local_key_pair = derive_key_pair(&seed);
7276
let local_peer_id = PeerId::from(local_key_pair.clone().public());
7377
log::info!("Starting with peer_id: {}", local_peer_id);
7478

75-
let database = Sqlite::new(&settings.database.sqlite)?;
79+
runtime.block_on(
80+
load_swaps::load_swaps_from_database(
81+
ledger_events.clone(),
82+
state_store.clone(),
83+
seed,
84+
database.clone(),
85+
)
86+
.boxed()
87+
.compat(),
88+
)?;
7689

7790
let transport = libp2p::build_development_transport(local_key_pair);
7891
let behaviour = network::ComitNode::new(

0 commit comments

Comments
 (0)