Skip to content

Commit 7c8f20b

Browse files
committed
refactor: use fsm and actions pattern in CCH
- Implement a state machine for CCH orders to manage transitions based on incoming invoice and outgoing payment events. - Introduce action dispatchers for handling various order actions, such as sending payments and settling invoices. - Refactor existing code to improve modularity and maintainability, including the separation of event handling and order state management.
1 parent fb51baa commit 7c8f20b

25 files changed

+919
-604
lines changed

.cursor/rules/base-branch.mdc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
alwaysApply: true
3+
---
4+
The base branch is develop

.cursor/rules/nextest.mdc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
alwaysApply: true
3+
---
4+
# Run tests using `cargo nextest`
5+
6+
Always execute the Rust test suite with `cargo nextest` instead of the default `cargo test`.
7+
8+
## How to run
9+
10+
- `cargo nextest run --all-targets --all-features` for a full local verification run.
11+
- Use the `Makefile` target `make test` when you want the exact flags used in automation (`RUST_LOG=off cargo nextest run --no-fail-fast -p fnn -p fiber-bin`).
12+
13+
Running tests this way avoids discrepancies with CI and ensures flaky tests are surfaced quickly.

.cursor/worktrees.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"setup-worktree": [
3+
"cp $ROOT_WORKTREE_PATH/mise.toml mise.toml",
4+
"mkdir -p .vscode && cp -f $ROOT_WORKTREE_PATH/.vscode/settings.json .vscode/settings.json"
5+
]
6+
}

crates/fiber-bin/src/main.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use fnn::tasks::{
2020
use fnn::watchtower::{
2121
WatchtowerActor, WatchtowerMessage, DEFAULT_WATCHTOWER_CHECK_INTERVAL_SECONDS,
2222
};
23-
use fnn::{start_cch, start_network, Config, NetworkServiceEvent};
23+
use fnn::{start_network, CchActor, Config, NetworkServiceEvent};
2424
use jsonrpsee::http_client::HttpClientBuilder;
2525
use jsonrpsee::ws_client::{HeaderMap, HeaderValue};
2626
use ractor::{port::OutputPortSubscriberTrait as _, Actor, ActorRef, OutputPort};
@@ -327,7 +327,9 @@ pub async fn main() -> Result<(), ExitMessage> {
327327
})?
328328
.read_or_generate_secret_key()
329329
.map_err(|err| ExitMessage(format!("failed to read secret key: {}", err)))?;
330-
match start_cch(
330+
match Actor::spawn_linked(
331+
Some("cch actor".to_string()),
332+
CchActor,
331333
CchArgs {
332334
config: cch_config,
333335
tracker: new_tokio_task_tracker(),
@@ -350,7 +352,7 @@ pub async fn main() -> Result<(), ExitMessage> {
350352
));
351353
}
352354
}
353-
Ok(actor) => {
355+
Ok((actor, _handle)) => {
354356
if let Some(port) = cch_fiber_store_event_port {
355357
actor.subscribe_to_port(&port);
356358
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
use crate::cch::{CchInvoice, CchOrder};
2+
3+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
4+
pub enum PaymentHandlerType {
5+
Fiber,
6+
Lightning,
7+
}
8+
9+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10+
pub enum InvoiceHandlerType {
11+
Fiber,
12+
Lightning,
13+
}
14+
15+
pub fn dispatch_invoice_handler(order: &CchOrder) -> InvoiceHandlerType {
16+
match order.incoming_invoice {
17+
CchInvoice::Fiber(_) => InvoiceHandlerType::Fiber,
18+
CchInvoice::Lightning(_) => InvoiceHandlerType::Lightning,
19+
}
20+
}
21+
pub fn dispatch_payment_handler(order: &CchOrder) -> PaymentHandlerType {
22+
// Payment use the inverse handler of the invoice.
23+
match order.incoming_invoice {
24+
CchInvoice::Fiber(_) => PaymentHandlerType::Lightning,
25+
CchInvoice::Lightning(_) => PaymentHandlerType::Fiber,
26+
}
27+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
mod backend_dispatchers;
2+
mod send_outgoing_payment;
3+
mod settle_incoming_invoice;
4+
mod track_incoming_invoice;
5+
mod track_outgoing_payment;
6+
use send_outgoing_payment::SendOutgoingPaymentDispatcher;
7+
use settle_incoming_invoice::SettleIncomingInvoiceDispatcher;
8+
use track_incoming_invoice::TrackIncomingInvoiceDispatcher;
9+
use track_outgoing_payment::TrackOutgoingPaymentDispatcher;
10+
11+
use anyhow::Result;
12+
use ractor::ActorRef;
13+
14+
use crate::cch::{actor::CchState, order::CchOrderAction, CchMessage, CchOrder};
15+
16+
#[async_trait::async_trait]
17+
pub trait ActionExecutor: Send + Sync {
18+
async fn execute(self: Box<Self>) -> Result<()>;
19+
}
20+
21+
pub struct ActionDispatcher;
22+
23+
impl ActionDispatcher {
24+
fn dispatch(
25+
state: &mut CchState,
26+
cch_actor_ref: &ActorRef<CchMessage>,
27+
order: &CchOrder,
28+
action: CchOrderAction,
29+
) -> Option<Box<dyn ActionExecutor>> {
30+
match action {
31+
CchOrderAction::TrackIncomingInvoice => {
32+
TrackIncomingInvoiceDispatcher::dispatch(state, cch_actor_ref, order)
33+
}
34+
CchOrderAction::SendOutgoingPayment => {
35+
SendOutgoingPaymentDispatcher::dispatch(state, cch_actor_ref, order)
36+
}
37+
CchOrderAction::TrackOutgoingPayment => {
38+
TrackOutgoingPaymentDispatcher::dispatch(state, cch_actor_ref, order)
39+
}
40+
CchOrderAction::SettleIncomingInvoice => {
41+
SettleIncomingInvoiceDispatcher::dispatch(state, cch_actor_ref, order)
42+
}
43+
}
44+
}
45+
46+
/// Execute an action.
47+
///
48+
/// Executor cannot modify the order directly, but can send events to the actor.
49+
pub async fn execute(
50+
state: &mut CchState,
51+
cch_actor_ref: &ActorRef<CchMessage>,
52+
order: &CchOrder,
53+
action: CchOrderAction,
54+
) -> Result<()> {
55+
if let Some(executor) = Self::dispatch(state, cch_actor_ref, order, action) {
56+
return executor.execute().await;
57+
}
58+
Ok(())
59+
}
60+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
use anyhow::{anyhow, Result};
2+
use futures::StreamExt as _;
3+
use lnd_grpc_tonic_client::routerrpc;
4+
use ractor::{call, ActorRef};
5+
6+
use crate::{
7+
cch::{
8+
actions::{
9+
backend_dispatchers::{dispatch_payment_handler, PaymentHandlerType},
10+
ActionExecutor,
11+
},
12+
actor::CchState,
13+
trackers::{map_lnd_payment_changed_event, CchTrackingEvent, LndConnectionInfo},
14+
CchMessage, CchOrder, CchOrderStatus,
15+
},
16+
fiber::{
17+
network::SendPaymentCommand, NetworkActorCommand, NetworkActorMessage,
18+
ASSUME_NETWORK_ACTOR_ALIVE,
19+
},
20+
};
21+
22+
const BTC_PAYMENT_TIMEOUT_SECONDS: i32 = 60;
23+
24+
pub struct SendOutgoingPaymentDispatcher;
25+
26+
pub struct SendFiberOutgoingPaymentExecutor {
27+
cch_actor_ref: ActorRef<CchMessage>,
28+
network_actor_ref: ActorRef<NetworkActorMessage>,
29+
outgoing_pay_req: String,
30+
}
31+
32+
#[async_trait::async_trait]
33+
impl ActionExecutor for SendFiberOutgoingPaymentExecutor {
34+
async fn execute(self: Box<Self>) -> Result<()> {
35+
let outgoing_pay_req = self.outgoing_pay_req;
36+
let message = |rpc_reply| -> NetworkActorMessage {
37+
NetworkActorMessage::Command(NetworkActorCommand::SendPayment(
38+
SendPaymentCommand {
39+
invoice: Some(outgoing_pay_req),
40+
..Default::default()
41+
},
42+
rpc_reply,
43+
))
44+
};
45+
46+
let payment = call!(self.network_actor_ref, message)
47+
.expect(ASSUME_NETWORK_ACTOR_ALIVE)
48+
.map_err(|err| anyhow!("{}", err))?;
49+
50+
self.cch_actor_ref.send_message(CchMessage::TrackingEvent(
51+
CchTrackingEvent::PaymentChanged {
52+
payment_hash: payment.payment_hash,
53+
payment_preimage: None,
54+
status: payment.status,
55+
},
56+
))?;
57+
58+
Ok(())
59+
}
60+
}
61+
62+
pub struct SendLightningOutgoingPaymentExecutor {
63+
cch_actor_ref: ActorRef<CchMessage>,
64+
outgoing_pay_req: String,
65+
lnd_connection: LndConnectionInfo,
66+
}
67+
68+
#[async_trait::async_trait]
69+
impl ActionExecutor for SendLightningOutgoingPaymentExecutor {
70+
async fn execute(self: Box<Self>) -> Result<()> {
71+
let req = routerrpc::SendPaymentRequest {
72+
payment_request: self.outgoing_pay_req,
73+
timeout_seconds: BTC_PAYMENT_TIMEOUT_SECONDS,
74+
..Default::default()
75+
};
76+
tracing::debug!("[inbounding tlc] SendPaymentRequest: {:?}", req);
77+
78+
let mut client = self.lnd_connection.create_router_client().await?;
79+
// TODO: set a fee
80+
let mut stream = client.send_payment_v2(req).await?.into_inner();
81+
// Wait for the first message then quit
82+
let payment_result_opt = stream.next().await;
83+
tracing::debug!("[inbounding tlc] payment result: {:?}", payment_result_opt);
84+
if let Some(Ok(payment)) = payment_result_opt {
85+
self.cch_actor_ref.send_message(CchMessage::TrackingEvent(
86+
map_lnd_payment_changed_event(payment)?,
87+
))?;
88+
}
89+
Ok(())
90+
}
91+
}
92+
93+
impl SendOutgoingPaymentDispatcher {
94+
pub fn should_dispatch(order: &CchOrder) -> bool {
95+
order.status == CchOrderStatus::IncomingAccepted
96+
}
97+
98+
pub fn dispatch(
99+
state: &mut CchState,
100+
cch_actor_ref: &ActorRef<CchMessage>,
101+
order: &CchOrder,
102+
) -> Option<Box<dyn ActionExecutor>> {
103+
if !Self::should_dispatch(order) {
104+
return None;
105+
}
106+
107+
match dispatch_payment_handler(order) {
108+
PaymentHandlerType::Fiber => Some(Box::new(SendFiberOutgoingPaymentExecutor {
109+
cch_actor_ref: cch_actor_ref.clone(),
110+
network_actor_ref: state.network_actor.clone(),
111+
outgoing_pay_req: order.outgoing_pay_req.clone(),
112+
})),
113+
PaymentHandlerType::Lightning => Some(Box::new(SendLightningOutgoingPaymentExecutor {
114+
cch_actor_ref: cch_actor_ref.clone(),
115+
outgoing_pay_req: order.outgoing_pay_req.clone(),
116+
lnd_connection: state.lnd_connection.clone(),
117+
})),
118+
}
119+
}
120+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use anyhow::Result;
2+
use lnd_grpc_tonic_client::invoicesrpc;
3+
use ractor::{call, ActorRef};
4+
5+
use crate::{
6+
cch::{
7+
actions::{
8+
backend_dispatchers::{dispatch_invoice_handler, InvoiceHandlerType},
9+
ActionExecutor,
10+
},
11+
actor::CchState,
12+
trackers::LndConnectionInfo,
13+
CchMessage, CchOrder, CchOrderStatus,
14+
},
15+
fiber::{types::Hash256, NetworkActorCommand, NetworkActorMessage, ASSUME_NETWORK_ACTOR_ALIVE},
16+
};
17+
18+
pub struct SettleIncomingInvoiceDispatcher;
19+
20+
pub struct SettleFiberIncomingInvoiceExecutor {
21+
payment_hash: Hash256,
22+
payment_preimage: Hash256,
23+
network_actor_ref: ActorRef<NetworkActorMessage>,
24+
}
25+
26+
#[async_trait::async_trait]
27+
impl ActionExecutor for SettleFiberIncomingInvoiceExecutor {
28+
async fn execute(self: Box<Self>) -> Result<()> {
29+
let payment_hash = self.payment_hash;
30+
let payment_preimage = self.payment_preimage;
31+
let command = move |rpc_reply| -> NetworkActorMessage {
32+
NetworkActorMessage::Command(NetworkActorCommand::SettleInvoice(
33+
payment_hash,
34+
payment_preimage,
35+
rpc_reply,
36+
))
37+
};
38+
call!(self.network_actor_ref, command).expect(ASSUME_NETWORK_ACTOR_ALIVE)?;
39+
Ok(())
40+
}
41+
}
42+
43+
pub struct SettleLightningIncomingInvoiceExecutor {
44+
payment_preimage: Hash256,
45+
lnd_connection: LndConnectionInfo,
46+
}
47+
48+
#[async_trait::async_trait]
49+
impl ActionExecutor for SettleLightningIncomingInvoiceExecutor {
50+
async fn execute(self: Box<Self>) -> Result<()> {
51+
// settle the lnd invoice
52+
let req = invoicesrpc::SettleInvoiceMsg {
53+
preimage: self.payment_preimage.into(),
54+
};
55+
tracing::debug!("[settled tlc] SettleInvoiceMsg: {:?}", req);
56+
57+
let mut client = self.lnd_connection.create_invoices_client().await?;
58+
// TODO: set a fee
59+
let resp = client.settle_invoice(req).await?.into_inner();
60+
tracing::debug!("[settled tlc] SettleInvoiceResp: {:?}", resp);
61+
Ok(())
62+
}
63+
}
64+
65+
impl SettleIncomingInvoiceDispatcher {
66+
pub fn should_dispatch(order: &CchOrder) -> bool {
67+
order.status == CchOrderStatus::OutgoingSucceeded
68+
}
69+
70+
pub fn dispatch(
71+
state: &mut CchState,
72+
_cch_actor_ref: &ActorRef<CchMessage>,
73+
order: &CchOrder,
74+
) -> Option<Box<dyn ActionExecutor>> {
75+
if !Self::should_dispatch(order) {
76+
return None;
77+
}
78+
let payment_preimage = order.payment_preimage?;
79+
80+
match dispatch_invoice_handler(order) {
81+
// `CchActor` will track all fiber invoices, so there's nothing to do here to track a single invoice.
82+
InvoiceHandlerType::Fiber => Some(Box::new(SettleFiberIncomingInvoiceExecutor {
83+
payment_hash: order.payment_hash,
84+
payment_preimage,
85+
network_actor_ref: state.network_actor.clone(),
86+
})),
87+
InvoiceHandlerType::Lightning => {
88+
Some(Box::new(SettleLightningIncomingInvoiceExecutor {
89+
payment_preimage,
90+
lnd_connection: state.lnd_connection.clone(),
91+
}))
92+
}
93+
}
94+
}
95+
}

0 commit comments

Comments
 (0)