12
12
#![ allow( clippy:: arc_with_non_send_sync) ]
13
13
14
14
use std:: collections:: HashMap ;
15
+ use std:: future:: Future ;
15
16
use std:: sync:: atomic:: { AtomicBool , Ordering } ;
16
17
use std:: sync:: Arc ;
17
18
18
19
pub extern crate nostr;
19
20
20
21
use async_utility:: time;
21
- use nostr:: nips:: nip47:: { Request , Response } ;
22
+ use nostr:: nips:: nip47:: { Notification , Request , Response } ;
22
23
use nostr_relay_pool:: prelude:: * ;
23
24
24
25
pub mod error;
@@ -31,6 +32,7 @@ pub use self::error::Error;
31
32
pub use self :: options:: NostrWalletConnectOptions ;
32
33
33
34
const ID : & str = "nwc" ;
35
+ const NOTIFICATIONS_ID : & str = "nwc-notifications" ;
34
36
35
37
/// Nostr Wallet Connect client
36
38
#[ derive( Debug , Clone ) ]
@@ -39,6 +41,7 @@ pub struct NWC {
39
41
pool : RelayPool ,
40
42
opts : NostrWalletConnectOptions ,
41
43
bootstrapped : Arc < AtomicBool > ,
44
+ notifications_subscribed : Arc < AtomicBool > ,
42
45
}
43
46
44
47
impl NWC {
@@ -55,6 +58,7 @@ impl NWC {
55
58
pool : RelayPool :: default ( ) ,
56
59
opts,
57
60
bootstrapped : Arc :: new ( AtomicBool :: new ( false ) ) ,
61
+ notifications_subscribed : Arc :: new ( AtomicBool :: new ( false ) ) ,
58
62
}
59
63
}
60
64
@@ -191,6 +195,118 @@ impl NWC {
191
195
Ok ( res. to_get_info ( ) ?)
192
196
}
193
197
198
+ /// Subscribe to wallet notifications
199
+ pub async fn subscribe_to_notifications ( & self ) -> Result < ( ) , Error > {
200
+ if self . notifications_subscribed . load ( Ordering :: SeqCst ) {
201
+ tracing:: debug!( "Already subscribed to notifications" ) ;
202
+ return Ok ( ( ) ) ;
203
+ }
204
+
205
+ tracing:: info!( "Subscribing to wallet notifications..." ) ;
206
+
207
+ self . bootstrap ( ) . await ?;
208
+
209
+ let client_keys = Keys :: new ( self . uri . secret . clone ( ) ) ;
210
+ let client_pubkey = client_keys. public_key ( ) ;
211
+
212
+ tracing:: debug!( "Client pubkey: {}" , client_pubkey) ;
213
+ tracing:: debug!( "Wallet service pubkey: {}" , self . uri. public_key) ;
214
+
215
+ let notification_filter = Filter :: new ( )
216
+ . author ( self . uri . public_key )
217
+ . pubkey ( client_pubkey)
218
+ . kind ( Kind :: WalletConnectNotification )
219
+ . since ( Timestamp :: now ( ) ) ;
220
+
221
+ tracing:: debug!( "Notification filter: {:?}" , notification_filter) ;
222
+
223
+ self . pool
224
+ . subscribe_with_id (
225
+ SubscriptionId :: new ( NOTIFICATIONS_ID ) ,
226
+ notification_filter,
227
+ SubscribeOptions :: default ( ) ,
228
+ )
229
+ . await ?;
230
+
231
+ self . notifications_subscribed . store ( true , Ordering :: SeqCst ) ;
232
+
233
+ tracing:: info!( "Successfully subscribed to notifications" ) ;
234
+ Ok ( ( ) )
235
+ }
236
+
237
+ /// Handle incoming notifications with a callback function
238
+ pub async fn handle_notifications < F , Fut > ( & self , func : F ) -> Result < ( ) , Error >
239
+ where
240
+ F : Fn ( Notification ) -> Fut ,
241
+ Fut : Future < Output = Result < bool > > ,
242
+ {
243
+ let mut notifications = self . pool . notifications ( ) ;
244
+
245
+ while let Ok ( notification) = notifications. recv ( ) . await {
246
+ tracing:: trace!( "Received relay pool notification: {:?}" , notification) ;
247
+
248
+ match notification {
249
+ RelayPoolNotification :: Event {
250
+ subscription_id,
251
+ event,
252
+ ..
253
+ } => {
254
+ tracing:: debug!(
255
+ "Received event: kind={}, author={}, id={}" ,
256
+ event. kind,
257
+ event. pubkey,
258
+ event. id
259
+ ) ;
260
+
261
+ if subscription_id. as_str ( ) != NOTIFICATIONS_ID {
262
+ tracing:: trace!( "Ignoring event with subscription id: {}" , subscription_id) ;
263
+ continue ;
264
+ }
265
+
266
+ if event. kind != Kind :: WalletConnectNotification {
267
+ tracing:: trace!( "Ignoring event with kind: {}" , event. kind) ;
268
+ continue ;
269
+ }
270
+
271
+ tracing:: info!( "Processing wallet notification event" ) ;
272
+
273
+ match Notification :: from_event ( & self . uri , & event) {
274
+ Ok ( nip47_notification) => {
275
+ tracing:: info!(
276
+ "Successfully parsed notification: {:?}" ,
277
+ nip47_notification. notification_type
278
+ ) ;
279
+ let exit: bool = func ( nip47_notification)
280
+ . await
281
+ . map_err ( |e| Error :: Handler ( e. to_string ( ) ) ) ?;
282
+ if exit {
283
+ break ;
284
+ }
285
+ }
286
+ Err ( e) => {
287
+ tracing:: error!( "Failed to parse notification: {}" , e) ;
288
+ tracing:: debug!( "Event content: {}" , event. content) ;
289
+ return Err ( Error :: from ( e) ) ;
290
+ }
291
+ }
292
+ }
293
+ RelayPoolNotification :: Shutdown => break ,
294
+ _ => { }
295
+ }
296
+ }
297
+
298
+ Ok ( ( ) )
299
+ }
300
+
301
+ /// Unsubscribe from notifications
302
+ pub async fn unsubscribe_from_notifications ( & self ) -> Result < ( ) , Error > {
303
+ self . pool
304
+ . unsubscribe ( & SubscriptionId :: new ( NOTIFICATIONS_ID ) )
305
+ . await ;
306
+ self . notifications_subscribed . store ( false , Ordering :: SeqCst ) ;
307
+ Ok ( ( ) )
308
+ }
309
+
194
310
/// Completely shutdown [NWC] client
195
311
#[ inline]
196
312
pub async fn shutdown ( self ) {
0 commit comments