Skip to content

Commit 9bff40e

Browse files
authored
feat(cch): implement order persistence and housekeeping (#1045)
* feat(cch): store orders in rocksdb * feat(cch): resume order processing from store Added `post_start` method to `CchActor` to load orders from store on startup and starting the state machines for active orderse. * cch order housekeeping Introduce a dedicated scheduler actor to handle time-based expiry of active CCH orders and delayed pruning of final orders (after 21 days). This ensures expired orders are marked as Failed and old final orders are cleaned up. - The CchActor now spawns a CchOrderSchedulerActor at startup and delegates scheduling responsibilities to it. - Orders are scheduled for expiry when created or resumed, and for pruning once they reach a final state. - The scheduler uses a binary heap to manage jobs and dynamically schedules the next processing tick based on the earliest due job. - Added necessary store trait method `delete_cch_order` - Updated state machine to prevent invalid transitions to Failed from Succeeded.
1 parent 7734bdd commit 9bff40e

File tree

20 files changed

+1281
-114
lines changed

20 files changed

+1281
-114
lines changed

crates/fiber-bin/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,13 +332,14 @@ pub async fn main() -> Result<(), ExitMessage> {
332332
.map_err(|err| ExitMessage(format!("failed to read secret key: {}", err)))?;
333333
match Actor::spawn_linked(
334334
Some("cch actor".to_string()),
335-
CchActor,
335+
CchActor::default(),
336336
CchArgs {
337337
config: cch_config,
338338
tracker: new_tokio_task_tracker(),
339339
token: new_tokio_cancellation_token(),
340340
network_actor: network_actor.clone(),
341341
node_keypair,
342+
store: store.clone(),
342343
},
343344
root_actor.get_cell(),
344345
)

crates/fiber-lib/src/cch/actions/mod.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use track_outgoing_payment::TrackOutgoingPaymentDispatcher;
1111
use anyhow::Result;
1212
use ractor::ActorRef;
1313

14-
use crate::cch::{actor::CchState, CchMessage, CchOrder, CchOrderStatus};
14+
use crate::cch::{actor::CchState, CchMessage, CchOrder, CchOrderStatus, CchOrderStore};
1515

1616
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1717
pub enum CchOrderAction {
@@ -29,8 +29,8 @@ pub trait ActionExecutor: Send + Sync {
2929
pub struct ActionDispatcher;
3030

3131
impl ActionDispatcher {
32-
fn dispatch(
33-
state: &mut CchState,
32+
fn dispatch<S: CchOrderStore>(
33+
state: &CchState<S>,
3434
cch_actor_ref: &ActorRef<CchMessage>,
3535
order: &CchOrder,
3636
action: CchOrderAction,
@@ -51,6 +51,21 @@ impl ActionDispatcher {
5151
}
5252
}
5353

54+
/// The actions to be taken when the state machine is started for the order.
55+
pub fn on_starting(order: &CchOrder) -> Vec<CchOrderAction> {
56+
let mut actions = Self::on_entering(order);
57+
match order.status {
58+
CchOrderStatus::IncomingAccepted
59+
| CchOrderStatus::OutgoingInFlight
60+
| CchOrderStatus::OutgoingSucceeded => {
61+
// Ensure start incoming invoice tracking.
62+
actions.push(CchOrderAction::TrackIncomingInvoice);
63+
}
64+
_ => {}
65+
}
66+
actions
67+
}
68+
5469
/// The actions to be taken when the order enters a new status.
5570
pub fn on_entering(order: &CchOrder) -> Vec<CchOrderAction> {
5671
match order.status {
@@ -69,8 +84,8 @@ impl ActionDispatcher {
6984
/// Execute an action.
7085
///
7186
/// Executor cannot modify the order directly, but can send events to the actor.
72-
pub async fn execute(
73-
state: &mut CchState,
87+
pub async fn execute<S: CchOrderStore>(
88+
state: &CchState<S>,
7489
cch_actor_ref: &ActorRef<CchMessage>,
7590
order: &CchOrder,
7691
action: CchOrderAction,

crates/fiber-lib/src/cch/actions/send_outgoing_payment.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
},
1212
actor::CchState,
1313
trackers::{map_lnd_payment_changed_event, CchTrackingEvent, LndConnectionInfo},
14-
CchMessage, CchOrder, CchOrderStatus,
14+
CchMessage, CchOrder, CchOrderStatus, CchOrderStore,
1515
},
1616
fiber::{
1717
payment::{PaymentStatus, SendPaymentCommand},
@@ -162,8 +162,8 @@ impl SendOutgoingPaymentDispatcher {
162162
order.status == CchOrderStatus::IncomingAccepted
163163
}
164164

165-
pub fn dispatch(
166-
state: &mut CchState,
165+
pub fn dispatch<S: CchOrderStore>(
166+
state: &CchState<S>,
167167
cch_actor_ref: &ActorRef<CchMessage>,
168168
order: &CchOrder,
169169
) -> Option<Box<dyn ActionExecutor>> {

crates/fiber-lib/src/cch/actions/settle_incoming_invoice.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
},
1111
actor::CchState,
1212
trackers::{CchTrackingEvent, LndConnectionInfo},
13-
CchMessage, CchOrder, CchOrderStatus,
13+
CchMessage, CchOrder, CchOrderStatus, CchOrderStore,
1414
},
1515
fiber::{types::Hash256, NetworkActorCommand, NetworkActorMessage, ASSUME_NETWORK_ACTOR_ALIVE},
1616
invoice::{CkbInvoiceStatus, SettleInvoiceError},
@@ -125,8 +125,8 @@ impl SettleIncomingInvoiceDispatcher {
125125
order.status == CchOrderStatus::OutgoingSucceeded
126126
}
127127

128-
pub fn dispatch(
129-
state: &mut CchState,
128+
pub fn dispatch<S: CchOrderStore>(
129+
state: &CchState<S>,
130130
cch_actor_ref: &ActorRef<CchMessage>,
131131
order: &CchOrder,
132132
) -> Option<Box<dyn ActionExecutor>> {

crates/fiber-lib/src/cch/actions/track_incoming_invoice.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::{
88
},
99
actor::CchState,
1010
trackers::LndTrackerMessage,
11-
CchMessage, CchOrder,
11+
CchMessage, CchOrder, CchOrderStore,
1212
},
1313
fiber::types::Hash256,
1414
};
@@ -36,8 +36,8 @@ impl TrackIncomingInvoiceDispatcher {
3636
!order.is_final()
3737
}
3838

39-
pub fn dispatch(
40-
state: &mut CchState,
39+
pub fn dispatch<S: CchOrderStore>(
40+
state: &CchState<S>,
4141
_cch_actor_ref: &ActorRef<CchMessage>,
4242
order: &CchOrder,
4343
) -> Option<Box<dyn ActionExecutor>> {

crates/fiber-lib/src/cch/actions/track_outgoing_payment.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use ractor::ActorRef;
22

3-
use crate::cch::{actions::ActionExecutor, actor::CchState, CchMessage, CchOrder};
3+
use crate::cch::{actions::ActionExecutor, actor::CchState, CchMessage, CchOrder, CchOrderStore};
44

55
pub struct TrackOutgoingPaymentDispatcher;
66

77
impl TrackOutgoingPaymentDispatcher {
8-
pub fn dispatch(
9-
_state: &mut CchState,
8+
pub fn dispatch<S: CchOrderStore>(
9+
_state: &CchState<S>,
1010
_cch_actor_ref: &ActorRef<CchMessage>,
1111
_order: &CchOrder,
1212
) -> Option<Box<dyn ActionExecutor>> {

0 commit comments

Comments
 (0)