@@ -7,31 +7,62 @@ use anyhow::{anyhow, Context};
77use common:: { ln:: channel:: LxOutPoint , notify_once:: NotifyOnce , task:: LxTask } ;
88use lightning:: chain:: transaction:: OutPoint ;
99use tokio:: sync:: mpsc;
10- use tracing:: { debug, error, info, info_span} ;
10+ use tracing:: { debug, error, info, info_span, Instrument } ;
1111
1212use crate :: {
1313 alias:: LexeChainMonitorType , traits:: LexePersister , BoxedAnyhowFuture ,
1414} ;
1515
16+ // `api_call_fut` is a future which makes an api call (typically with
17+ // retries) to the backend to persist the channel monitor state, returning
18+ // an `anyhow::Result<()>` once either (1) persistence succeeds or (2)
19+ // there were too many failures to keep trying. We take this future as
20+ // input (instead of e.g. a `VfsFile`) because it is the cleanest and
21+ // easiest way to abstract over the user node and LSP's differing api
22+ // clients, vfs structures, and expected error types.
23+ //
24+ // TODO(max): Add a required `upsert_monitor` method to the `LexePersister`
25+ // trait to avoid this.
26+ pub type MonitorChannelItem = ( LxChannelMonitorUpdate , BoxedAnyhowFuture ) ;
27+
1628/// Represents a channel monitor update. See docs on each field for details.
1729pub struct LxChannelMonitorUpdate {
18- pub funding_txo : LxOutPoint ,
30+ kind : ChannelMonitorUpdateKind ,
31+ funding_txo : LxOutPoint ,
1932 /// The ID of the channel monitor update, given by
2033 /// [`ChannelMonitorUpdate::update_id`] or
2134 /// [`ChannelMonitor::get_latest_update_id`].
2235 ///
2336 /// [`ChannelMonitorUpdate::update_id`]: lightning::chain::channelmonitor::ChannelMonitorUpdate::update_id
2437 /// [`ChannelMonitor::get_latest_update_id`]: lightning::chain::channelmonitor::ChannelMonitor::get_latest_update_id
25- pub update_id : u64 ,
26- /// A future which makes an api call (typically with retries) to the
27- /// backend to persist the channel monitor state, returning an
28- /// `anyhow::Result<()>` once either (1) persistence succeeds or (2) there
29- /// were too many failures to keep trying. We take this future as input
30- /// (instead of e.g. a `VfsFile`) because it is the cleanest and easiest
31- /// way to abstract over the user node and LSP's differing api clients, vfs
32- /// structures, and expected error types.
33- pub api_call_fut : BoxedAnyhowFuture ,
34- pub kind : ChannelMonitorUpdateKind ,
38+ update_id : u64 ,
39+ span : tracing:: Span ,
40+ }
41+
42+ impl LxChannelMonitorUpdate {
43+ pub fn new (
44+ kind : ChannelMonitorUpdateKind ,
45+ funding_txo : LxOutPoint ,
46+ update_id : u64 ,
47+ ) -> Self {
48+ let span =
49+ info_span ! ( "(monitor-update)" , %kind, %funding_txo, %update_id) ;
50+
51+ Self {
52+ kind,
53+ funding_txo,
54+ update_id,
55+ span,
56+ }
57+ }
58+
59+ /// The span for this update which includes the full monitor update context.
60+ ///
61+ /// Logs related to this monitor update should be logged inside this span,
62+ /// to ensure the log information is associated with this update.
63+ pub fn span ( & self ) -> tracing:: Span {
64+ self . span . clone ( )
65+ }
3566}
3667
3768/// Whether the [`LxChannelMonitorUpdate`] represents a new or updated channel.
@@ -56,7 +87,7 @@ impl Display for ChannelMonitorUpdateKind {
5687/// channel monitor state.
5788pub fn spawn_channel_monitor_persister_task < PS > (
5889 chain_monitor : Arc < LexeChainMonitorType < PS > > ,
59- mut channel_monitor_persister_rx : mpsc:: Receiver < LxChannelMonitorUpdate > ,
90+ mut channel_monitor_persister_rx : mpsc:: Receiver < MonitorChannelItem > ,
6091 mut shutdown : NotifyOnce ,
6192) -> LxTask < ( ) >
6293where
@@ -67,15 +98,22 @@ where
6798 LxTask :: spawn_with_span ( SPAN_NAME , info_span ! ( SPAN_NAME ) , async move {
6899 loop {
69100 tokio:: select! {
70- Some ( update) = channel_monitor_persister_rx. recv( ) => {
101+ Some ( ( update, api_call_fut) )
102+ = channel_monitor_persister_rx. recv( ) => {
103+ let update_span = update. span( ) ;
71104
72105 let handle_result = handle_update(
73106 chain_monitor. as_ref( ) ,
74107 update,
75- ) . await ;
108+ api_call_fut,
109+ )
110+ . instrument( update_span. clone( ) )
111+ . await ;
76112
77113 if let Err ( e) = handle_result {
78- error!( "Monitor persist error: {e:#}" ) ;
114+ update_span. in_scope( || {
115+ error!( "Monitor persist error: {e:#}" ) ;
116+ } ) ;
79117
80118 // Channel monitor persistence errors are serious;
81119 // all errors are considered fatal.
@@ -101,20 +139,20 @@ where
101139async fn handle_update < PS : LexePersister > (
102140 chain_monitor : & LexeChainMonitorType < PS > ,
103141 update : LxChannelMonitorUpdate ,
142+ api_call_fut : BoxedAnyhowFuture ,
104143) -> anyhow:: Result < ( ) > {
105144 let LxChannelMonitorUpdate {
106145 funding_txo,
107146 update_id,
108- api_call_fut ,
109- kind ,
147+ kind : _ ,
148+ span : _ ,
110149 } = update;
111150
112- debug ! ( %kind , %funding_txo , %update_id , "Handling channel monitor update" ) ;
151+ debug ! ( "Handling channel monitor update" ) ;
113152
114153 // Run the persist future.
115154 api_call_fut
116155 . await
117- . with_context ( || format ! ( "{kind} {funding_txo} {update_id}" ) )
118156 . context ( "Channel monitor persist API call failed" ) ?;
119157
120158 // Notify the chain monitor that the monitor update has been persisted.
@@ -127,10 +165,9 @@ async fn handle_update<PS: LexePersister>(
127165 // monitor future.
128166 chain_monitor
129167 . channel_monitor_updated ( OutPoint :: from ( funding_txo) , update_id)
130- . map_err ( |e| anyhow ! ( "{kind} {funding_txo} {update_id}: {e:?}" ) )
131- . context ( "channel_monitor_updated returned Err" ) ?;
168+ . map_err ( |e| anyhow ! ( "channel_monitor_updated returned Err: {e:?}" ) ) ?;
132169
133- info ! ( %kind , %funding_txo , %update_id , "Success: persisted monitor" ) ;
170+ info ! ( "Success: persisted monitor" ) ;
134171
135172 Ok ( ( ) )
136173}
0 commit comments