@@ -2,7 +2,6 @@ use crossbeam_channel::Sender;
22use dash_sdk:: dpp:: dashcore:: consensus:: Decodable ;
33use dash_sdk:: dpp:: dashcore:: { Block , InstantLock , Network , Transaction } ;
44use dash_sdk:: dpp:: prelude:: CoreBlockHeight ;
5- use image:: EncodableLayout ;
65use std:: error:: Error ;
76use std:: io:: Cursor ;
87use std:: sync:: {
@@ -11,7 +10,18 @@ use std::sync::{
1110} ;
1211use std:: thread;
1312use std:: time:: Duration ;
13+
14+ #[ cfg( not( target_os = "windows" ) ) ]
1415use zmq:: Context ;
16+ #[ cfg( not( target_os = "windows" ) ) ]
17+ use image:: EncodableLayout ;
18+
19+ #[ cfg( target_os = "windows" ) ]
20+ use futures:: StreamExt ;
21+ #[ cfg( target_os = "windows" ) ]
22+ use tokio:: runtime:: Runtime ;
23+ #[ cfg( target_os = "windows" ) ]
24+ use zeromq:: { Socket , SocketRecv , SubSocket } ;
1525
1626pub struct CoreZMQListener {
1727 should_stop : Arc < AtomicBool > ,
@@ -30,10 +40,18 @@ pub enum ZMQConnectionEvent {
3040 Disconnected ,
3141}
3242
43+ #[ cfg( not( target_os = "windows" ) ) ]
3344pub const IS_LOCK_SIG_MSG : & [ u8 ; 12 ] = b"rawtxlocksig" ;
45+ #[ cfg( not( target_os = "windows" ) ) ]
3446pub const CHAIN_LOCKED_BLOCK_MSG : & [ u8 ; 12 ] = b"rawchainlock" ;
3547
48+ #[ cfg( target_os = "windows" ) ]
49+ pub const IS_LOCK_SIG_MSG : & str = "rawtxlocksig" ;
50+ #[ cfg( target_os = "windows" ) ]
51+ pub const CHAIN_LOCKED_BLOCK_MSG : & str = "rawchainlock" ;
52+
3653impl CoreZMQListener {
54+ #[ cfg( not( target_os = "windows" ) ) ]
3755 pub fn spawn_listener (
3856 network : Network ,
3957 endpoint : & str ,
@@ -256,6 +274,142 @@ impl CoreZMQListener {
256274 } )
257275 }
258276
277+ #[ cfg( target_os = "windows" ) ]
278+ pub fn spawn_listener (
279+ network : Network ,
280+ endpoint : & str ,
281+ sender : mpsc:: Sender < ( ZMQMessage , Network ) > ,
282+ tx_zmq_status : Option < Sender < ZMQConnectionEvent > > ,
283+ ) -> Result < Self , Box < dyn Error > > {
284+ let should_stop = Arc :: new ( AtomicBool :: new ( false ) ) ;
285+ let endpoint = endpoint. to_string ( ) ;
286+ let should_stop_clone = Arc :: clone ( & should_stop) ;
287+ let sender_clone = sender. clone ( ) ;
288+
289+ let handle = thread:: spawn ( move || {
290+ // Create the runtime inside the thread.
291+ let rt = Runtime :: new ( ) . unwrap ( ) ;
292+ rt. block_on ( async move {
293+ // Create the socket inside the async context.
294+ let mut socket = SubSocket :: new ( ) ;
295+
296+ // Connect to the endpoint
297+ socket
298+ . connect ( & endpoint)
299+ . await
300+ . expect ( "Failed to connect" ) ;
301+
302+ // Subscribe to the "rawtxlocksig" events.
303+ socket
304+ . subscribe ( IS_LOCK_SIG_MSG )
305+ . await
306+ . expect ( "Failed to subscribe to rawtxlocksig" ) ;
307+
308+ // Subscribe to the "rawchainlock" events.
309+ socket
310+ . subscribe ( CHAIN_LOCKED_BLOCK_MSG )
311+ . await
312+ . expect ( "Failed to subscribe to rawchainlock" ) ;
313+
314+ println ! ( "Subscribed to ZMQ at {}" , endpoint) ;
315+
316+ while !should_stop_clone. load ( Ordering :: SeqCst ) {
317+ // Receive messages
318+ match socket. recv ( ) . await {
319+ Ok ( msg) => {
320+ // Access frames using msg.get(n)
321+ if let Some ( topic_frame) = msg. get ( 0 ) {
322+ let topic = String :: from_utf8_lossy ( topic_frame) . to_string ( ) ;
323+
324+ if let Some ( data_frame) = msg. get ( 1 ) {
325+ let data_bytes = data_frame;
326+
327+ match topic. as_str ( ) {
328+ "rawchainlock" => {
329+ // Deserialize the Block
330+ let mut cursor = Cursor :: new ( data_bytes) ;
331+ match Block :: consensus_decode ( & mut cursor) {
332+ Ok ( block) => {
333+ if let Err ( e) = sender_clone. send ( (
334+ ZMQMessage :: ChainLockedBlock ( block) ,
335+ network,
336+ ) ) {
337+ eprintln ! (
338+ "Error sending data to main thread: {}" ,
339+ e
340+ ) ;
341+ }
342+ }
343+ Err ( e) => {
344+ eprintln ! (
345+ "Error deserializing chain locked block: {}" ,
346+ e
347+ ) ;
348+ }
349+ }
350+ }
351+ "rawtxlocksig" => {
352+ // Deserialize the Transaction and InstantLock
353+ let mut cursor = Cursor :: new ( data_bytes) ;
354+ match Transaction :: consensus_decode ( & mut cursor) {
355+ Ok ( tx) => {
356+ match InstantLock :: consensus_decode ( & mut cursor)
357+ {
358+ Ok ( islock) => {
359+ if let Err ( e) = sender_clone. send ( (
360+ ZMQMessage :: ISLockedTransaction (
361+ tx, islock,
362+ ) ,
363+ network,
364+ ) ) {
365+ eprintln ! (
366+ "Error sending data to main thread: {}" ,
367+ e
368+ ) ;
369+ }
370+ }
371+ Err ( e) => {
372+ eprintln ! (
373+ "Error deserializing InstantLock: {}" ,
374+ e
375+ ) ;
376+ }
377+ }
378+ }
379+ Err ( e) => {
380+ eprintln ! (
381+ "Error deserializing transaction: {}" ,
382+ e
383+ ) ;
384+ }
385+ }
386+ }
387+ _ => {
388+ println ! ( "Received unknown topic: {}" , topic) ;
389+ }
390+ }
391+ }
392+ }
393+ }
394+ Err ( e) => {
395+ eprintln ! ( "Error receiving message: {}" , e) ;
396+ // Sleep briefly before retrying
397+ tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
398+ }
399+ }
400+ }
401+
402+ println ! ( "Listener is stopping." ) ;
403+ // The socket will be dropped here
404+ } ) ;
405+ } ) ;
406+
407+ Ok ( CoreZMQListener {
408+ should_stop,
409+ handle : Some ( handle) ,
410+ } )
411+ }
412+
259413 /// Stops the listener by signaling the thread and waiting for it to finish.
260414 pub fn stop ( & mut self ) {
261415 self . should_stop . store ( true , Ordering :: SeqCst ) ;
0 commit comments