Skip to content

Commit 1949e80

Browse files
committed
feat: allow running cch as a separate service
1 parent 3e157e9 commit 1949e80

37 files changed

+1143
-129
lines changed

crates/fiber-bin/src/main.rs

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,9 @@ pub async fn main() -> Result<(), ExitMessage> {
7878
let _span = info_span!("node", node = fnn::get_node_prefix()).entered();
7979

8080
let config = Config::parse();
81+
let fiber_fallback_config = config.fiber_fallback_config.clone();
8182

82-
let store_path = config
83-
.fiber
84-
.as_ref()
85-
.ok_or_else(|| ExitMessage("fiber config is required but absent".to_string()))?
86-
.store_path();
83+
let store_path = fiber_fallback_config.store_path();
8784

8885
let store = Store::new(store_path).map_err(|err| ExitMessage(err.to_string()))?;
8986
let store = StoreWithPubSub::new(store);
@@ -102,10 +99,7 @@ pub async fn main() -> Result<(), ExitMessage> {
10299
});
103100

104101
#[allow(unused_variables)]
105-
let (network_actor, ckb_chain_actor, network_graph, node_public_key) = match config
106-
.fiber
107-
.clone()
108-
{
102+
let (network_actor, ckb_chain_actor, network_graph) = match config.fiber.clone() {
109103
Some(fiber_config) => {
110104
// TODO: this is not a super user friendly error message which has actionable information
111105
// for the user to fix the error and start the node.
@@ -296,26 +290,31 @@ pub async fn main() -> Result<(), ExitMessage> {
296290
Some(network_actor),
297291
Some(ckb_chain_actor),
298292
Some(network_graph),
299-
Some(node_public_key),
300293
)
301294
}
302-
None => (None, None, None, None),
295+
None => (None, None, None),
303296
};
304297

305298
let cch_actor = match config.cch {
306299
Some(cch_config) => {
307300
info!("Starting cch");
308301
let ignore_startup_failure = cch_config.ignore_startup_failure;
302+
let node_keypair =
303+
if let Some(fiber) = config.fiber.as_ref() {
304+
Some(fiber.read_or_generate_secret_key().map_err(|err| {
305+
ExitMessage(format!("failed to read secret key: {}", err))
306+
})?)
307+
} else {
308+
None
309+
};
309310
match start_cch(
310311
CchArgs {
311312
config: cch_config,
312313
tracker: new_tokio_task_tracker(),
313314
token: new_tokio_cancellation_token(),
314-
network_actor: network_actor
315-
.clone()
316-
.expect("Cch service requires network actor"),
317-
pubkey: node_public_key.expect("Cch service requires node public key"),
315+
network_actor: network_actor.clone(),
318316
store: store.clone(),
317+
node_keypair,
319318
},
320319
root_actor.get_cell(),
321320
)
@@ -333,8 +332,6 @@ pub async fn main() -> Result<(), ExitMessage> {
333332
}
334333
}
335334
Ok(actor) => {
336-
// Subscribe the actor to the store so it can receive updates
337-
store.subscribe(Box::new(actor.clone()));
338335
info!("cch started successfully ...");
339336
Some(actor)
340337
}
@@ -344,8 +341,8 @@ pub async fn main() -> Result<(), ExitMessage> {
344341
};
345342

346343
// Start rpc service
347-
let rpc_server_handle = match (config.rpc, network_graph) {
348-
(Some(rpc_config), Some(network_graph)) => {
344+
let rpc_server_handle = match config.rpc {
345+
Some(rpc_config) => {
349346
match start_rpc(
350347
rpc_config,
351348
config.ckb,
@@ -355,20 +352,19 @@ pub async fn main() -> Result<(), ExitMessage> {
355352
store,
356353
network_graph,
357354
root_actor.get_cell(),
358-
#[cfg(debug_assertions)] ckb_chain_actor,
359-
#[cfg(debug_assertions)] rpc_dev_module_commitment_txs,
355+
#[cfg(debug_assertions)]
356+
ckb_chain_actor,
357+
#[cfg(debug_assertions)]
358+
rpc_dev_module_commitment_txs,
360359
)
361-
.await {
360+
.await
361+
{
362362
Ok(handle) => Some(handle),
363363
Err(err) => {
364364
return ExitMessage::err(format!("rpc server failed to start: {}", err));
365365
}
366366
}
367-
},
368-
(Some(_), None) => return ExitMessage::err(
369-
"RPC requires network graph in the fiber service which is not enabled in the config file"
370-
.to_string()
371-
),
367+
}
372368
_ => None,
373369
};
374370

crates/fiber-lib/src/cch/actor.rs

Lines changed: 98 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@ use lnd_grpc_tonic_client::{
77
RouterClient, Uri,
88
};
99

10-
use ractor::{call, RpcReplyPort};
10+
use ractor::RpcReplyPort;
1111
use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef};
12+
use secp256k1::{PublicKey, Secp256k1, SecretKey};
1213
use serde::Deserialize;
14+
use tentacle::secio::SecioKeyPair;
1315

16+
use crate::cch::cch_fiber_agent::CchFiberAgent;
17+
use crate::cch::order_guard::{CchOrderGuardActor, CchOrderGuardArgs};
1418
use crate::fiber::hash_algorithm::HashAlgorithm;
1519
use crate::fiber::payment::PaymentStatus;
1620
use crate::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -19,37 +23,45 @@ use std::str::FromStr;
1923
use tokio::{select, time::sleep};
2024
use tokio_util::{sync::CancellationToken, task::TaskTracker};
2125

22-
use crate::fiber::network::SendPaymentCommand;
23-
use crate::fiber::types::{Hash256, Pubkey};
24-
use crate::fiber::{NetworkActorCommand, NetworkActorMessage};
26+
use crate::fiber::types::{Hash256, Privkey};
27+
use crate::fiber::NetworkActorMessage;
2528
use crate::invoice::{CkbInvoice, Currency, InvoiceBuilder};
2629
use crate::store::pub_sub::{
2730
InvoiceUpdatedEvent, InvoiceUpdatedPayload, PaymentUpdatedEvent, PaymentUpdatedPayload,
28-
StoreUpdatedEvent,
31+
PubSubClient, StoreUpdatedEvent, Subscribe,
2932
};
3033

3134
use super::{
32-
order_guard::{
33-
CchOrderGuardActor, CchOrderGuardArgs, CchOrderGuardEvent, CchOrderGuardMessage,
34-
},
35+
order_guard::{CchOrderGuardEvent, CchOrderGuardMessage},
3536
CchConfig, CchError, CchInvoice, CchOrder, CchOrderStatus, CchOrderStore, CchStoreError,
3637
};
3738

3839
pub const BTC_PAYMENT_TIMEOUT_SECONDS: i32 = 60;
3940
pub const DEFAULT_ORDER_EXPIRY_SECONDS: u64 = 86400; // 24 hours
4041
pub const ORDER_PURGE_TTL: u64 = 86400 * 14; // 14 days
4142

42-
pub async fn start_cch<S: CchOrderStore + Clone + Send + Sync + 'static>(
43+
pub async fn start_cch<S: CchOrderStore + Subscribe + Clone + Send + Sync + 'static>(
4344
args: CchArgs<S>,
4445
root_actor: ActorCell,
4546
) -> Result<ActorRef<CchMessage>> {
47+
if args.network_actor.is_none() {
48+
if args.config.fiber_rpc_url.is_none() {
49+
return Err(anyhow!(
50+
"Cch requires either in process network actor or configured fiber RPC URL"
51+
));
52+
} else {
53+
ensure_fiber_http_url(args.config.fiber_rpc_url.clone())?;
54+
}
55+
}
56+
4657
let (actor, _handle) = Actor::spawn_linked(
4758
Some("cch actor".to_string()),
4859
CchActor::default(),
4960
args,
5061
root_actor,
5162
)
5263
.await?;
64+
5365
Ok(actor)
5466
}
5567

@@ -159,24 +171,24 @@ pub struct CchArgs<S> {
159171
pub config: CchConfig,
160172
pub tracker: TaskTracker,
161173
pub token: CancellationToken,
162-
pub network_actor: ActorRef<NetworkActorMessage>,
163-
pub pubkey: Pubkey,
174+
pub network_actor: Option<ActorRef<NetworkActorMessage>>,
175+
pub node_keypair: Option<crate::fiber::KeyPair>,
164176
pub store: S,
165177
}
166178

167179
pub struct CchState<S> {
168180
config: CchConfig,
169181
tracker: TaskTracker,
170182
token: CancellationToken,
171-
network_actor: ActorRef<NetworkActorMessage>,
172-
pubkey: Pubkey,
183+
fiber_agent: CchFiberAgent,
184+
node_keypair: Option<(PublicKey, SecretKey)>,
173185
store: S,
174186
lnd_connection: LndConnectionInfo,
175187
order_guard: ActorRef<CchOrderGuardMessage>,
176188
}
177189

178190
#[async_trait::async_trait]
179-
impl<S: CchOrderStore + Clone + Send + Sync + 'static> Actor for CchActor<S> {
191+
impl<S: CchOrderStore + Subscribe + Clone + Send + Sync + 'static> Actor for CchActor<S> {
180192
type Msg = CchMessage;
181193
type State = CchState<S>;
182194
type Arguments = CchArgs<S>;
@@ -198,13 +210,40 @@ impl<S: CchOrderStore + Clone + Send + Sync + 'static> Actor for CchActor<S> {
198210
myself.get_cell(),
199211
)
200212
.await?;
213+
214+
if args.network_actor.is_some() {
215+
// in process
216+
args.store.subscribe(Box::new(myself.clone()));
217+
} else {
218+
let pub_sub_client =
219+
PubSubClient::new(ensure_fiber_ws_url(args.config.fiber_rpc_url.clone())?);
220+
pub_sub_client.subscribe(Box::new(myself.clone()));
221+
let token = args.token.clone();
222+
args.tracker
223+
.spawn(async move { pub_sub_client.run(token).await });
224+
};
225+
let fiber_agent =
226+
CchFiberAgent::try_new(args.network_actor, args.config.fiber_rpc_url.as_deref())?;
227+
228+
let node_keypair = args.node_keypair.map(|kp| {
229+
let private_key: Privkey = <[u8; 32]>::try_from(kp.as_ref())
230+
.expect("valid length for key")
231+
.into();
232+
let secio_kp = SecioKeyPair::from(kp);
233+
234+
(
235+
PublicKey::from_slice(secio_kp.public_key().inner_ref()).expect("valid public key"),
236+
private_key.into(),
237+
)
238+
});
239+
201240
let state = CchState {
202241
config: args.config,
203242
tracker: args.tracker,
204243
token: args.token,
205-
network_actor: args.network_actor,
206-
pubkey: args.pubkey,
207244
store: args.store,
245+
node_keypair,
246+
fiber_agent,
208247
lnd_connection,
209248
order_guard,
210249
};
@@ -318,15 +357,26 @@ impl<S: CchOrderStore + Send + Sync + 'static> CchState<S> {
318357
let wrapped_btc_type_script: ckb_jsonrpc_types::Script =
319358
self.config.get_wrapped_btc_script().into();
320359
let invoice_amount_sats = amount_msat.div_ceil(1_000u128) + fee_sats;
321-
let invoice = InvoiceBuilder::new(send_btc.currency)
322-
.payee_pub_key(self.pubkey.into())
360+
361+
let invoice_builder = InvoiceBuilder::new(send_btc.currency)
323362
.amount(Some(invoice_amount_sats))
324363
.payment_hash(payment_hash)
325364
.hash_algorithm(HashAlgorithm::Sha256)
326365
.expiry_time(Duration::from_secs(expiry))
327366
.final_expiry_delta(self.config.ckb_final_tlc_expiry_delta)
328-
.udt_type_script(wrapped_btc_type_script.clone().into())
329-
.build()?;
367+
.udt_type_script(wrapped_btc_type_script.clone().into());
368+
let invoice = if let Some((public_key, secret_key)) = &self.node_keypair {
369+
invoice_builder
370+
.payee_pub_key(*public_key)
371+
.build_with_sign(|hash| Secp256k1::new().sign_ecdsa_recoverable(hash, secret_key))
372+
} else {
373+
invoice_builder.build()
374+
}?;
375+
let invoice = self
376+
.fiber_agent
377+
.add_invoice(invoice)
378+
.await
379+
.map_err(CchError::FiberNodeError)?;
330380
let order = CchOrder {
331381
wrapped_btc_type_script,
332382
fee_sats,
@@ -335,18 +385,12 @@ impl<S: CchOrderStore + Send + Sync + 'static> CchState<S> {
335385
created_at: duration_since_epoch.as_secs(),
336386
ckb_final_tlc_expiry_delta: self.config.ckb_final_tlc_expiry_delta,
337387
outgoing_pay_req: send_btc.btc_pay_req,
338-
incoming_invoice: CchInvoice::Fiber(invoice.clone()),
388+
incoming_invoice: CchInvoice::Fiber(invoice),
339389
payment_preimage: None,
340390
amount_sats: invoice_amount_sats,
341391
status: CchOrderStatus::Pending,
342392
};
343393

344-
let message = move |rpc_reply| -> NetworkActorMessage {
345-
NetworkActorMessage::Command(NetworkActorCommand::AddInvoice(invoice, None, rpc_reply))
346-
};
347-
348-
call!(&self.network_actor, message).expect("call actor")?;
349-
350394
self.store.insert_cch_order(order.clone())?;
351395
self.step_order(myself, &order).await?;
352396
Ok(order)
@@ -601,17 +645,11 @@ impl<S: CchOrderStore + Send + Sync + 'static> CchState<S> {
601645
order.payment_hash,
602646
order.status
603647
);
604-
let command = move |rpc_reply| -> NetworkActorMessage {
605-
NetworkActorMessage::Command(NetworkActorCommand::SettleInvoice(
606-
order.payment_hash,
607-
preimage,
608-
rpc_reply,
609-
))
610-
};
611-
call!(&self.network_actor, command)
612-
.expect("call actor")
613-
.map_err(CchError::CKBSettleInvoiceError)?;
614648

649+
self.fiber_agent
650+
.settle_invoice(order.payment_hash, preimage)
651+
.await
652+
.map_err(CchError::FiberNodeError)?;
615653
let mut order = order.clone();
616654
order.status = CchOrderStatus::Succeeded;
617655
self.update_cch_order(order);
@@ -638,23 +676,15 @@ impl<S: CchOrderStore + Send + Sync + 'static> CchState<S> {
638676
payment_hash = ?order.payment_hash,
639677
"Sending payment to fiber node because we received payment from LND",
640678
);
641-
let message = |rpc_reply| -> NetworkActorMessage {
642-
NetworkActorMessage::Command(NetworkActorCommand::SendPayment(
643-
SendPaymentCommand {
644-
invoice: Some(order.outgoing_pay_req.clone()),
645-
..Default::default()
646-
},
647-
rpc_reply,
648-
))
649-
};
679+
680+
let payment_status = self
681+
.fiber_agent
682+
.send_payment(order.outgoing_pay_req.clone())
683+
.await
684+
.map_err(CchError::FiberNodeError)?;
650685

651686
let mut order = order.clone();
652-
// TODO: handle payment failure here.
653-
let tlc_response = call!(self.network_actor, message)
654-
.expect("call actor")
655-
.map_err(CchError::CKBSendPaymentError)?;
656-
// TODO: handle payment failure here.
657-
if tlc_response.status == PaymentStatus::Failed {
687+
if payment_status == PaymentStatus::Failed {
658688
order.status = CchOrderStatus::Failed;
659689
} else {
660690
order.status = CchOrderStatus::OutgoingInFlight;
@@ -930,3 +960,19 @@ impl LndInvoiceTracker {
930960
Ok(status == CchOrderStatus::Succeeded || status == CchOrderStatus::Failed)
931961
}
932962
}
963+
964+
fn ensure_fiber_http_url(url_opt: Option<String>) -> Result<String> {
965+
if let Some(url) = url_opt {
966+
if url.starts_with("http://") || url.starts_with("https://") {
967+
return Ok(url);
968+
}
969+
}
970+
Err(anyhow!("fiber_rpc_url must start with http:// or https://"))
971+
}
972+
973+
fn ensure_fiber_ws_url(url_opt: Option<String>) -> Result<String> {
974+
let mut url = ensure_fiber_http_url(url_opt)?;
975+
// replace http with ws
976+
url.replace_range(..4, "ws");
977+
Ok(url)
978+
}

0 commit comments

Comments
 (0)