Skip to content

Commit 3e157e9

Browse files
committed
feat: add rpc to subscribe store changes
1 parent ac35142 commit 3e157e9

File tree

10 files changed

+134
-11
lines changed

10 files changed

+134
-11
lines changed

crates/fiber-bin/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@ pub async fn main() -> Result<(), ExitMessage> {
354354
cch_actor,
355355
store,
356356
network_graph,
357+
root_actor.get_cell(),
357358
#[cfg(debug_assertions)] ckb_chain_actor,
358359
#[cfg(debug_assertions)] rpc_dev_module_commitment_txs,
359360
)

crates/fiber-lib/src/rpc/biscuit.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ fn build_rules() -> HashMap<&'static str, AuthRule> {
7878
b.rule("send_btc", r#"allow if write("cch");"#);
7979
b.rule("receive_btc", r#"allow if read("cch");"#);
8080
b.rule("get_cch_order", r#"allow if read("cch");"#);
81+
b.rule("subscribe_store_changes", r#"allow if read("cch");"#);
8182
// channels
8283
b.rule("open_channel", r#"allow if write("channels");"#);
8384
b.rule("accept_channel", r#"allow if write("channels");"#);

crates/fiber-lib/src/rpc/config.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use clap_serde_derive::ClapSerde;
22

33
#[cfg(not(feature = "watchtower"))]
4-
const DEFAULT_ENABLED_MODULES: &str = "cch,channel,graph,payment,info,invoice,peer";
4+
const DEFAULT_ENABLED_MODULES: &str = "cch,channel,graph,payment,info,invoice,peer,pubsub";
55
#[cfg(feature = "watchtower")]
6-
const DEFAULT_ENABLED_MODULES: &str = "cch,channel,graph,payment,info,invoice,peer,watchtower";
6+
const DEFAULT_ENABLED_MODULES: &str =
7+
"cch,channel,graph,payment,info,invoice,peer,pubsub,watchtower";
78

89
#[derive(ClapSerde, Debug, Clone)]
910
pub struct RpcConfig {

crates/fiber-lib/src/rpc/mod.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub mod server {
3535
use crate::rpc::payment::PaymentRpcServer;
3636
use crate::rpc::payment::PaymentRpcServerImpl;
3737
use crate::rpc::peer::{PeerRpcServer, PeerRpcServerImpl};
38+
use crate::store::pub_sub::{register_pub_sub_rpc, Subscribe};
3839
use crate::{
3940
cch::CchMessage,
4041
fiber::{
@@ -60,7 +61,7 @@ pub mod server {
6061
};
6162
use jsonrpsee::ws_client::RpcServiceBuilder;
6263
use jsonrpsee::{Methods, RpcModule};
63-
use ractor::ActorRef;
64+
use ractor::{ActorCell, ActorRef};
6465
#[cfg(debug_assertions)]
6566
use std::collections::HashMap;
6667
use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
@@ -80,6 +81,7 @@ pub mod server {
8081
+ GossipMessageStore
8182
+ WatchtowerStore
8283
+ PreimageStore
84+
+ Subscribe
8385
{
8486
}
8587
#[cfg(feature = "watchtower")]
@@ -90,16 +92,25 @@ pub mod server {
9092
+ GossipMessageStore
9193
+ WatchtowerStore
9294
+ PreimageStore
95+
+ Subscribe
9396
{
9497
}
9598
#[cfg(not(feature = "watchtower"))]
9699
pub trait RpcServerStore:
97-
ChannelActorStateStore + InvoiceStore + NetworkGraphStateStore + GossipMessageStore
100+
ChannelActorStateStore
101+
+ InvoiceStore
102+
+ NetworkGraphStateStore
103+
+ GossipMessageStore
104+
+ Subscribe
98105
{
99106
}
100107
#[cfg(not(feature = "watchtower"))]
101108
impl<T> RpcServerStore for T where
102-
T: ChannelActorStateStore + InvoiceStore + NetworkGraphStateStore + GossipMessageStore
109+
T: ChannelActorStateStore
110+
+ InvoiceStore
111+
+ NetworkGraphStateStore
112+
+ GossipMessageStore
113+
+ Subscribe
103114
{
104115
}
105116

@@ -222,6 +233,7 @@ pub mod server {
222233
cch_actor: Option<ActorRef<CchMessage>>,
223234
store: S,
224235
network_graph: Arc<RwLock<NetworkGraph<S>>>,
236+
supervisor: ActorCell,
225237
#[cfg(debug_assertions)] ckb_chain_actor: Option<ActorRef<CkbChainMessage>>,
226238
#[cfg(debug_assertions)] rpc_dev_module_commitment_txs: Option<
227239
Arc<RwLock<HashMap<(Hash256, u64), TransactionView>>>,
@@ -258,6 +270,9 @@ pub mod server {
258270
.merge(GraphRpcServerImpl::new(network_graph, store.clone()).into_rpc())
259271
.unwrap();
260272
}
273+
if config.is_module_enabled("pubsub") {
274+
register_pub_sub_rpc(&mut modules, &store, supervisor).await?;
275+
}
261276
if let Some(network_actor) = network_actor {
262277
if config.is_module_enabled("info") {
263278
modules

crates/fiber-lib/src/store/pub_sub/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
mod pub_sub_rpc;
12
mod store_with_pub_sub;
23
mod subscription;
34

4-
pub use store_with_pub_sub::StoreWithPubSub;
5+
pub use pub_sub_rpc::register_pub_sub_rpc;
6+
pub use store_with_pub_sub::{StoreWithPubSub, Subscribe};
57
pub use subscription::{
68
InvoiceUpdatedEvent, InvoiceUpdatedPayload, PaymentUpdatedEvent, PaymentUpdatedPayload,
79
StorePublisher, StoreUpdatedEvent,
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
use crate::store::pub_sub::Subscribe;
2+
3+
/// Store Pub/Sub via RPC
4+
use super::StoreUpdatedEvent;
5+
6+
use jsonrpsee::{RpcModule, SubscriptionSink};
7+
use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef};
8+
9+
pub struct PubSubServerActor;
10+
11+
#[derive(Default)]
12+
pub struct PubSubServerState {
13+
sinks: Vec<SubscriptionSink>,
14+
}
15+
16+
pub enum PubSubServerMessage {
17+
Publish(StoreUpdatedEvent),
18+
AddSink(SubscriptionSink),
19+
}
20+
21+
impl From<StoreUpdatedEvent> for PubSubServerMessage {
22+
fn from(event: StoreUpdatedEvent) -> Self {
23+
PubSubServerMessage::Publish(event)
24+
}
25+
}
26+
27+
#[async_trait::async_trait]
28+
impl Actor for PubSubServerActor {
29+
type State = PubSubServerState;
30+
type Msg = PubSubServerMessage;
31+
type Arguments = ();
32+
33+
async fn pre_start(
34+
&self,
35+
_myself: ActorRef<Self::Msg>,
36+
_args: Self::Arguments,
37+
) -> Result<Self::State, ActorProcessingErr> {
38+
Ok(PubSubServerState::default())
39+
}
40+
41+
async fn handle(
42+
&self,
43+
_myself: ActorRef<Self::Msg>,
44+
message: Self::Msg,
45+
state: &mut Self::State,
46+
) -> Result<(), ActorProcessingErr> {
47+
match message {
48+
PubSubServerMessage::AddSink(sink) => state.sinks.push(sink),
49+
PubSubServerMessage::Publish(event) => {
50+
let subscription_message =
51+
serde_json::value::to_raw_value(&event).expect("serialize to JSON");
52+
let sinks = std::mem::take(&mut state.sinks);
53+
for sink in sinks {
54+
if sink.send(subscription_message.clone()).await.is_ok() {
55+
state.sinks.push(sink);
56+
}
57+
}
58+
}
59+
}
60+
Ok(())
61+
}
62+
}
63+
64+
const SUBSCRIBE_STORE_CHANGES_NAME: &str = "subscribe_store_changes";
65+
const SUBSCRIBE_STORE_CHANGES_NOTIF_NAME: &str = "store_changes";
66+
const UNSUBSCRIBE_STORE_CHANGES_NAME: &str = "unsubscribe_store_changes";
67+
68+
pub async fn register_pub_sub_rpc<S: Subscribe>(
69+
modules: &mut RpcModule<()>,
70+
publisher: &S,
71+
supervisor: ActorCell,
72+
) -> anyhow::Result<()> {
73+
let (pub_sub_actor, _) =
74+
ractor::Actor::spawn_linked(None, PubSubServerActor, (), supervisor).await?;
75+
publisher.subscribe(Box::new(pub_sub_actor.clone()));
76+
modules.register_subscription(
77+
SUBSCRIBE_STORE_CHANGES_NAME,
78+
SUBSCRIBE_STORE_CHANGES_NOTIF_NAME,
79+
UNSUBSCRIBE_STORE_CHANGES_NAME,
80+
move |_, pending, _, _| {
81+
let pub_sub_actor = pub_sub_actor.clone();
82+
async move {
83+
let sink = pending.accept().await?;
84+
let _ = pub_sub_actor.send_message(PubSubServerMessage::AddSink(sink));
85+
Ok(())
86+
}
87+
},
88+
)?;
89+
Ok(())
90+
}

crates/fiber-lib/src/store/pub_sub/store_with_pub_sub.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,16 @@ impl<S> StoreWithPubSub<S> {
4747
}
4848
}
4949

50+
pub trait Subscribe {
51+
fn subscribe(&self, subscriber: OutputPortSubscriber<StoreUpdatedEvent>);
52+
}
53+
54+
impl<S> Subscribe for StoreWithPubSub<S> {
55+
fn subscribe(&self, subscriber: OutputPortSubscriber<StoreUpdatedEvent>) {
56+
self.subscribe(subscriber);
57+
}
58+
}
59+
5060
impl<T: NetworkActorStateStore> NetworkActorStateStoreDeref for StoreWithPubSub<T> {
5161
type Target = T;
5262

crates/fiber-lib/src/store/pub_sub/subscription.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@
33
use std::sync::Arc;
44

55
use ractor::{port::OutputPortSubscriber, OutputPort};
6+
use serde::{Deserialize, Serialize};
67

78
use crate::{
89
fiber::{payment::PaymentStatus, types::Hash256},
910
invoice::CkbInvoiceStatus,
1011
};
1112

12-
#[derive(Clone, Debug, Eq, PartialEq)]
13+
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1314
pub enum InvoiceUpdatedPayload {
1415
/// The invoice is open and can be paid.
1516
Open,
@@ -28,7 +29,7 @@ pub enum InvoiceUpdatedPayload {
2829
/// The invoice is paid.
2930
Paid,
3031
}
31-
#[derive(Clone, Debug, Eq, PartialEq)]
32+
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
3233
pub struct InvoiceUpdatedEvent {
3334
pub invoice_hash: Hash256,
3435
pub payload: InvoiceUpdatedPayload,
@@ -61,7 +62,7 @@ impl InvoiceUpdatedEvent {
6162
}
6263

6364
// but with additional information for downstream services.
64-
#[derive(Clone, Debug, Eq, PartialEq)]
65+
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
6566
pub enum PaymentUpdatedPayload {
6667
/// initial status, payment session is created, no HTLC is sent
6768
Created,
@@ -72,7 +73,7 @@ pub enum PaymentUpdatedPayload {
7273
/// related HTLC is failed
7374
Failed,
7475
}
75-
#[derive(Clone, Debug, Eq, PartialEq)]
76+
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
7677
pub struct PaymentUpdatedEvent {
7778
pub payment_hash: Hash256,
7879
pub payload: PaymentUpdatedPayload,
@@ -104,7 +105,7 @@ impl PaymentUpdatedEvent {
104105
}
105106

106107
/// Message sent from Store to publisher.
107-
#[derive(Clone, Debug, Eq, PartialEq)]
108+
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
108109
pub enum StoreUpdatedEvent {
109110
InvoiceUpdated(InvoiceUpdatedEvent),
110111
PaymentUpdated(PaymentUpdatedEvent),

crates/fiber-lib/src/tests/test_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1563,6 +1563,7 @@ impl NetworkNode {
15631563
None,
15641564
store.clone(),
15651565
network_graph.clone(),
1566+
root_actor.get_cell(),
15661567
#[cfg(debug_assertions)]
15671568
None,
15681569
#[cfg(debug_assertions)]

tests/nodes/deployer/config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ rpc:
2020
- invoice
2121
- peer
2222
- watchtower
23+
- pubsub
2324
- dev
2425

2526
cch:

0 commit comments

Comments
 (0)