diff --git a/Cargo.toml b/Cargo.toml index 59ad2b767..0e463e16d 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,8 @@ log = { version = "0.4.22", default-features = false, features = ["std"]} vss-client = { package = "vss-client-ng", version = "0.4" } prost = { version = "0.11.6", default-features = false} +payjoin = { version = "0.24.0", default-features = false, features = ["v2", "io"] } + [target.'cfg(windows)'.dependencies] winapi = { version = "0.3", features = ["winbase"] } diff --git a/src/io/mod.rs b/src/io/mod.rs index 7afd5bd40..99aad8205 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -78,3 +78,7 @@ pub(crate) const BDK_WALLET_INDEXER_KEY: &str = "indexer"; /// /// [`StaticInvoice`]: lightning::offers::static_invoice::StaticInvoice pub(crate) const STATIC_INVOICE_STORE_PRIMARY_NAMESPACE: &str = "static_invoices"; + +/// The payjoin sessions will be persisted under this key. +pub(crate) const PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE: &str = "payjoin_sessions"; +pub(crate) const PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/payment/mod.rs b/src/payment/mod.rs index f629960e1..0b7e4d665 100644 --- a/src/payment/mod.rs +++ b/src/payment/mod.rs @@ -14,6 +14,7 @@ mod onchain; mod spontaneous; pub(crate) mod store; mod unified_qr; +pub(crate) mod payjoin_payment; pub use bolt11::Bolt11Payment; pub use bolt12::Bolt12Payment; diff --git a/src/payment/payjoin_payment/mod.rs b/src/payment/payjoin_payment/mod.rs new file mode 100644 index 000000000..9d6339991 --- /dev/null +++ b/src/payment/payjoin_payment/mod.rs @@ -0,0 +1,10 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + + +pub(crate) mod payjoin_session; +pub(crate) mod persist; diff --git a/src/payment/payjoin_payment/payjoin_session.rs b/src/payment/payjoin_payment/payjoin_session.rs new file mode 100644 index 000000000..34d703fe6 --- /dev/null +++ b/src/payment/payjoin_payment/payjoin_session.rs @@ -0,0 +1,249 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use lightning::ln::channelmanager::PaymentId; +use lightning::ln::msgs::DecodeError; +use lightning::util::ser::{Readable, Writeable}; +use lightning::{ + _init_and_read_len_prefixed_tlv_fields, impl_writeable_tlv_based, + impl_writeable_tlv_based_enum, write_tlv_fields, +}; + +use crate::data_store::{StorableObject, StorableObjectUpdate}; + +/// Represents a payjoin session with persisted events +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct PayjoinSession { + /// Session identifier (uses PaymentId from PaymentDetails) + pub session_id: PaymentId, + + /// Direction of the payjoin (Send or Receive) + pub direction: PayjoinDirection, + + /// HPKE public key of receiver (only for sender sessions) + pub receiver_pubkey: Option>, + + /// Serialized session events as JSON strings + pub events: Vec, + + /// Current status of the session + pub status: PayjoinStatus, + + /// Unix timestamp of session completion (if completed) + pub completed_at: Option, + + /// The timestamp, in seconds since start of the UNIX epoch, when this entry was last updated. + pub latest_update_timestamp: u64, +} + +impl PayjoinSession { + pub fn new( + session_id: PaymentId, direction: PayjoinDirection, receiver_pubkey: Option>, + ) -> Self { + let latest_update_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(); + Self { + session_id, + direction, + receiver_pubkey, + events: Vec::new(), + status: PayjoinStatus::Active, + completed_at: None, + latest_update_timestamp, + } + } +} + +impl Writeable for PayjoinSession { + fn write( + &self, writer: &mut W, + ) -> Result<(), lightning::io::Error> { + write_tlv_fields!(writer, { + (0, self.session_id, required), + (2, self.direction, required), + (4, self.receiver_pubkey, option), + (6, self.events, required_vec), + (8, self.status, required), + (10, self.completed_at, required), + (12, self.latest_update_timestamp, required), + }); + Ok(()) + } +} + +impl Readable for PayjoinSession { + fn read(reader: &mut R) -> Result { + let unix_time_secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(); + _init_and_read_len_prefixed_tlv_fields!(reader, { + (0, session_id, required), + (2, direction, required), + (4, receiver_pubkey, option), + (6, events, required_vec), + (8, status, required), + (10, completed_at, option), + (12, latest_update_timestamp, (default_value, unix_time_secs)) + }); + + let session_id: PaymentId = session_id.0.ok_or(DecodeError::InvalidValue)?; + let direction: PayjoinDirection = direction.0.ok_or(DecodeError::InvalidValue)?; + let status: PayjoinStatus = status.0.ok_or(DecodeError::InvalidValue)?; + let latest_update_timestamp: u64 = + latest_update_timestamp.0.ok_or(DecodeError::InvalidValue)?; + + Ok(PayjoinSession { + session_id, + direction, + receiver_pubkey, + events, + status, + completed_at, + latest_update_timestamp, + }) + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum PayjoinDirection { + /// The session is for sending a payment + Send, + /// The session is for receiving a payment + Receive, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum PayjoinStatus { + /// The session is active + Active, + /// The session has completed successfully + Completed, + /// The session has failed + Failed, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct SerializedSessionEvent { + /// JSON representation of the event + pub event_json: String, + /// Unix timestamp of when the event occurred + pub created_at: u64, +} + +impl_writeable_tlv_based!(SerializedSessionEvent, { + (0, event_json, required), + (2, created_at, required), +}); + +impl_writeable_tlv_based_enum!(PayjoinDirection, + (0, Send) => {}, + (2, Receive) => {} +); + +impl_writeable_tlv_based_enum!(PayjoinStatus, + (0, Active) => {}, + (2, Completed) => {}, + (4, Failed) => {} +); + +/// Represents a payjoin session with persisted events +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct PayjoinSessionUpdate { + pub session_id: PaymentId, + pub receiver_pubkey: Option>>, + pub events: Option>, + pub status: Option, + pub completed_at: Option>, +} + +impl PayjoinSessionUpdate { + pub fn new(id: PaymentId) -> Self { + Self { + session_id: id, + receiver_pubkey: None, + events: None, + status: None, + completed_at: None, + } + } +} + +impl From<&PayjoinSession> for PayjoinSessionUpdate { + fn from(value: &PayjoinSession) -> Self { + Self { + session_id: value.session_id, + receiver_pubkey: Some(value.receiver_pubkey.clone()), + events: Some(value.events.clone()), + status: Some(value.status), + completed_at: Some(value.completed_at), + } + } +} + +impl StorableObject for PayjoinSession { + type Id = PaymentId; + type Update = PayjoinSessionUpdate; + + fn id(&self) -> Self::Id { + self.session_id + } + + fn update(&mut self, update: &Self::Update) -> bool { + debug_assert_eq!( + self.session_id, update.session_id, + "We should only ever override data for the same id" + ); + + let mut updated = false; + + macro_rules! update_if_necessary { + ($val:expr, $update:expr) => { + if $val != $update { + $val = $update; + updated = true; + } + }; + } + + if let Some(receiver_pubkey_opt) = &update.receiver_pubkey { + update_if_necessary!(self.receiver_pubkey, receiver_pubkey_opt.clone()); + } + if let Some(events_opt) = &update.events { + update_if_necessary!(self.events, events_opt.clone()); + } + if let Some(status_opt) = update.status { + update_if_necessary!(self.status, status_opt); + } + if let Some(completed_at_opt) = update.completed_at { + update_if_necessary!(self.completed_at, completed_at_opt); + } + + if updated { + self.latest_update_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(); + } + + updated + } + + fn to_update(&self) -> Self::Update { + self.into() + } +} + +impl StorableObjectUpdate for PayjoinSessionUpdate { + fn id(&self) -> ::Id { + self.session_id + } +} diff --git a/src/payment/payjoin_payment/persist.rs b/src/payment/payjoin_payment/persist.rs new file mode 100644 index 000000000..9b28b17ce --- /dev/null +++ b/src/payment/payjoin_payment/persist.rs @@ -0,0 +1,216 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use crate::payment::payjoin_payment::payjoin_session::{PayjoinStatus, SerializedSessionEvent}; +use crate::Error; +use std::{ + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use lightning::util::ser::Readable; +use lightning::{io::Cursor, ln::channelmanager::PaymentId, util::persist::KVStoreSync}; + +use crate::io::{ + PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE, PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE, +}; +use crate::logger::{log_error, LdkLogger, Logger}; +use crate::payment::payjoin_payment::payjoin_session::{PayjoinDirection, PayjoinSession}; +use crate::types::{DynStore, PayjoinSessionStore}; + +use payjoin::persist::SessionPersister; +use payjoin::receive::v2::SessionEvent as ReceiverSessionEvent; +use payjoin::send::v2::SessionEvent as SenderSessionEvent; + +pub(crate) struct KVStorePayjoinSenderPersister { + session_id: PaymentId, + kv_store: Arc, +} + +pub(crate) struct KVStorePayjoinReceiverPersister { + session_id: PaymentId, + kv_store: Arc, +} + +impl KVStorePayjoinReceiverPersister { + pub fn new( + session_id: PaymentId, kv_store: Arc, logger: Arc, + ) -> Result { + let sessions = Self::load_all_sessions(&kv_store, &logger)?; + let data_store = Arc::new(PayjoinSessionStore::new( + sessions, + PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE.to_string(), + PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE.to_string(), + kv_store, + logger, + )); + + let session = PayjoinSession::new(session_id, PayjoinDirection::Receive, None); + + data_store.insert(session)?; + + Ok(Self { session_id, kv_store: data_store }) + } + + /// Reconstruct persister from existing session + pub fn from_session( + &self, session_id: PaymentId, kv_store: Arc, logger: Arc, + ) -> Result { + let sessions = Self::load_all_sessions(&kv_store, &logger)?; + let data_store = Arc::new(PayjoinSessionStore::new( + sessions, + PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE.to_string(), + PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE.to_string(), + kv_store, + logger, + )); + + if data_store.get(&session_id).is_none() { + return Err(Error::InvalidPaymentId); + } + + Ok(Self { session_id, kv_store: data_store }) + } + + /// Load all sessions from KV store + fn load_all_sessions( + kv_store: &Arc, logger: &Arc, + ) -> Result, Error> { + let keys = KVStoreSync::list( + &**kv_store, + PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE, + PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE, + ) + .map_err(|e| { + log_error!(logger, "Failed to list payjoin sessions: {:?}", e); + Error::PersistenceFailed + })?; + + let mut sessions = Vec::new(); + for key in keys { + match KVStoreSync::read( + &**kv_store, + PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE, + PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE, + &key, + ) { + Ok(data) => { + let mut reader = Cursor::new(&data[..]); + match PayjoinSession::read(&mut reader) { + Ok(session) => sessions.push(session), + Err(e) => { + log_error!( + logger, + "Failed to deserialize PayjoinSession for key {}: {:?}. Skipping corrupted session.", + key, e + ); + continue; + }, + } + }, + Err(e) => { + log_error!( + logger, + "Failed to read PayjoinSession data for key {}: {:?}", + key, + e + ); + continue; + }, + } + } + + Ok(sessions) + } + + /// Get all active Receiver session IDs + pub fn get_active_session_ids( + kv_store: Arc, logger: Arc, + ) -> Result, Error> { + let sessions = Self::load_all_sessions(&kv_store, &logger)?; + Ok(sessions + .into_iter() + .filter(|s| { + s.direction == PayjoinDirection::Receive && s.status == PayjoinStatus::Active + }) + .map(|s| s.session_id) + .collect()) + } + + /// Get all inactive Receiver sessions (for cleanup) + pub fn get_inactive_sessions( + kv_store: Arc, logger: Arc, + ) -> Result, Error> { + let sessions = Self::load_all_sessions(&kv_store, &logger)?; + Ok(sessions + .into_iter() + .filter(|s| { + s.direction == PayjoinDirection::Receive + && s.status != PayjoinStatus::Active + && s.completed_at.is_some() + }) + .map(|s| (s.session_id, s.completed_at.unwrap())) + .collect()) + } +} + +impl SessionPersister for KVStorePayjoinReceiverPersister { + type SessionEvent = ReceiverSessionEvent; + type InternalStorageError = Error; + + fn save_event(&self, event: &Self::SessionEvent) -> Result<(), Self::InternalStorageError> { + let mut session = self.kv_store.get(&self.session_id).ok_or(Error::InvalidPaymentId)?; + + let event_json = serde_json::to_string(&event).map_err(|_| Error::PersistenceFailed)?; + + session.events.push(SerializedSessionEvent { + event_json, + created_at: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(), + }); + + self.kv_store.insert_or_update(session)?; + + Ok(()) + } + + fn load( + &self, + ) -> std::result::Result< + Box>, + Self::InternalStorageError, + > { + let session = self.kv_store.get(&self.session_id).ok_or(Error::InvalidPaymentId)?; + + let events: Vec = session + .events + .iter() + .map(|e| serde_json::from_str(&e.event_json)) + .collect::, _>>() + .map_err(|_| Error::PersistenceFailed)?; + + Ok(Box::new(events.into_iter())) + } + + fn close(&self) -> Result<(), Self::InternalStorageError> { + let mut session = self.kv_store.get(&self.session_id).ok_or(Error::InvalidPaymentId)?; + + session.completed_at = Some(now()); + session.status = PayjoinStatus::Completed; + + self.kv_store.insert_or_update(session)?; + + Ok(()) + } +} + +// Helper function for timestamp +fn now() -> u64 { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::from_secs(0)).as_secs() +} diff --git a/src/types.rs b/src/types.rs index 7c0e1227a..2f1418293 100644 --- a/src/types.rs +++ b/src/types.rs @@ -37,6 +37,7 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::RuntimeSpawner; use crate::logger::Logger; use crate::message_handler::NodeCustomMessageHandler; +use crate::payment::payjoin_payment::payjoin_session::PayjoinSession; use crate::payment::PaymentDetails; /// A supertrait that requires that a type implements both [`KVStore`] and [`KVStoreSync`] at the @@ -306,6 +307,8 @@ pub(crate) type BumpTransactionEventHandler = pub(crate) type PaymentStore = DataStore>; +pub(crate) type PayjoinSessionStore = DataStore>; + /// A local, potentially user-provided, identifier of a channel. /// /// By default, this will be randomly generated for the user to ensure local uniqueness.