88 "sync"
99 "time"
1010
11+ "github.com/btcsuite/btclog/v2"
1112 "github.com/lightninglabs/lightning-node-connect/hashmailrpc"
1213 "github.com/lightningnetwork/lnd/tlv"
1314 "github.com/prometheus/client_golang/prometheus"
@@ -104,8 +105,8 @@ func (r *readStream) ReadNextMsg(ctx context.Context) ([]byte, error) {
104105
105106// ReturnStream gives up the read stream by passing it back up through the
106107// payment stream.
107- func (r * readStream ) ReturnStream () {
108- log .Debugf ( "Returning read stream %x" , r . parentStream . id [:] )
108+ func (r * readStream ) ReturnStream (ctx context. Context ) {
109+ log .DebugS ( ctx , "Returning read stream" )
109110 r .parentStream .ReturnReadStream (r )
110111}
111112
@@ -193,7 +194,7 @@ type stream struct {
193194}
194195
195196// newStream creates a new stream independent of any given stream ID.
196- func newStream (id streamID , limiter * rate.Limiter ,
197+ func newStream (ctx context. Context , id streamID , limiter * rate.Limiter ,
197198 equivAuth func (auth * hashmailrpc.CipherBoxAuth ) error ,
198199 onStale func () error , staleTimeout time.Duration ) * stream {
199200
@@ -210,7 +211,7 @@ func newStream(id streamID, limiter *rate.Limiter,
210211 id : id ,
211212 equivAuth : equivAuth ,
212213 limiter : limiter ,
213- status : newStreamStatus (onStale , staleTimeout ),
214+ status : newStreamStatus (ctx , onStale , staleTimeout ),
214215 readBytesChan : make (chan []byte ),
215216 readErrChan : make (chan error , 1 ),
216217 quit : make (chan struct {}),
@@ -305,8 +306,8 @@ func (s *stream) ReturnWriteStream(w *writeStream) {
305306// RequestReadStream attempts to request the read stream from the main backing
306307// stream. If we're unable to obtain it before the timeout, then an error is
307308// returned.
308- func (s * stream ) RequestReadStream () (* readStream , error ) {
309- log .Tracef ( "HashMailStream(%x): requesting read stream", s . id [:] )
309+ func (s * stream ) RequestReadStream (ctx context. Context ) (* readStream , error ) {
310+ log .TraceS ( ctx , "Requested read stream" )
310311
311312 select {
312313 case r := <- s .readStreamChan :
@@ -320,8 +321,8 @@ func (s *stream) RequestReadStream() (*readStream, error) {
320321// RequestWriteStream attempts to request the read stream from the main backing
321322// stream. If we're unable to obtain it before the timeout, then an error is
322323// returned.
323- func (s * stream ) RequestWriteStream () (* writeStream , error ) {
324- log .Tracef ( "HashMailStream(%x): requesting write stream", s . id [:] )
324+ func (s * stream ) RequestWriteStream (ctx context. Context ) (* writeStream , error ) {
325+ log .TraceS ( ctx , "Requesting write stream" )
325326
326327 select {
327328 case w := <- s .writeStreamChan :
@@ -389,8 +390,10 @@ func (h *hashMailServer) Stop() {
389390}
390391
391392// tearDownStaleStream can be used to tear down a stale mailbox stream.
392- func (h * hashMailServer ) tearDownStaleStream (id streamID ) error {
393- log .Debugf ("Tearing down stale HashMail stream: id=%x" , id )
393+ func (h * hashMailServer ) tearDownStaleStream (ctx context.Context ,
394+ id streamID ) error {
395+
396+ log .DebugS (ctx , "Tearing down stale HashMail stream" )
394397
395398 h .Lock ()
396399 defer h .Unlock ()
@@ -428,15 +431,15 @@ func (h *hashMailServer) ValidateStreamAuth(ctx context.Context,
428431}
429432
430433// InitStream attempts to initialize a new stream given a valid descriptor.
431- func (h * hashMailServer ) InitStream (
434+ func (h * hashMailServer ) InitStream (ctx context. Context ,
432435 init * hashmailrpc.CipherBoxAuth ) (* hashmailrpc.CipherInitResp , error ) {
433436
434437 h .Lock ()
435438 defer h .Unlock ()
436439
437440 streamID := newStreamID (init .Desc .StreamId )
438441
439- log .Debugf ( "Creating new HashMail Stream: %x" , streamID )
442+ log .DebugS ( ctx , "Creating new HashMail Stream" )
440443
441444 // The stream is already active, and we only allow a single session for
442445 // a given stream to exist.
@@ -452,10 +455,11 @@ func (h *hashMailServer) InitStream(
452455 rate .Every (h .cfg .msgRate ), h .cfg .msgBurstAllowance ,
453456 )
454457 freshStream := newStream (
455- streamID , limiter , func (auth * hashmailrpc.CipherBoxAuth ) error {
458+ ctx , streamID , limiter ,
459+ func (auth * hashmailrpc.CipherBoxAuth ) error {
456460 return nil
457461 }, func () error {
458- return h .tearDownStaleStream (streamID )
462+ return h .tearDownStaleStream (ctx , streamID )
459463 }, h .cfg .staleTimeout ,
460464 )
461465
@@ -470,7 +474,9 @@ func (h *hashMailServer) InitStream(
470474
471475// LookUpReadStream attempts to loop up a new stream. If the stream is found, then
472476// the stream is marked as being active. Otherwise, an error is returned.
473- func (h * hashMailServer ) LookUpReadStream (streamID []byte ) (* readStream , error ) {
477+ func (h * hashMailServer ) LookUpReadStream (ctx context.Context ,
478+ streamID []byte ) (* readStream , error ) {
479+
474480 h .RLock ()
475481 defer h .RUnlock ()
476482
@@ -479,12 +485,13 @@ func (h *hashMailServer) LookUpReadStream(streamID []byte) (*readStream, error)
479485 return nil , fmt .Errorf ("stream not found" )
480486 }
481487
482- return stream .RequestReadStream ()
488+ return stream .RequestReadStream (ctx )
483489}
484490
485491// LookUpWriteStream attempts to loop up a new stream. If the stream is found,
486492// then the stream is marked as being active. Otherwise, an error is returned.
487- func (h * hashMailServer ) LookUpWriteStream (streamID []byte ) (* writeStream , error ) {
493+ func (h * hashMailServer ) LookUpWriteStream (ctx context.Context ,
494+ streamID []byte ) (* writeStream , error ) {
488495
489496 h .RLock ()
490497 defer h .RUnlock ()
@@ -494,7 +501,7 @@ func (h *hashMailServer) LookUpWriteStream(streamID []byte) (*writeStream, error
494501 return nil , fmt .Errorf ("stream not found" )
495502 }
496503
497- return stream .RequestWriteStream ()
504+ return stream .RequestWriteStream (ctx )
498505}
499506
500507// TearDownStream attempts to tear down a stream which renders both sides of
@@ -523,8 +530,7 @@ func (h *hashMailServer) TearDownStream(ctx context.Context, streamID []byte,
523530 return err
524531 }
525532
526- log .Debugf ("Tearing down HashMail stream: id=%x, auth=%v" ,
527- auth .Desc .StreamId , auth .Auth )
533+ log .DebugS (ctx , "Tearing down HashMail stream" , "auth" , auth .Auth )
528534
529535 // At this point we know the auth was valid, so we'll tear down the
530536 // stream.
@@ -568,16 +574,17 @@ func (h *hashMailServer) NewCipherBox(ctx context.Context,
568574 return nil , err
569575 }
570576
571- log .Debugf ("New HashMail stream init: id=%x, auth=%v" ,
572- init .Desc .StreamId , init .Auth )
577+ ctxl := btclog .WithCtx (ctx , btclog .Hex ("stream_id" , init .Desc .StreamId ))
578+
579+ log .DebugS (ctxl , "New HashMail stream init" , "auth" , init .Auth )
573580
574- if err := h .ValidateStreamAuth (ctx , init ); err != nil {
575- log .Debugf ( "Stream creation validation failed (id=%x): %v " ,
576- init . Desc . StreamId , err )
581+ if err := h .ValidateStreamAuth (ctxl , init ); err != nil {
582+ log .DebugS ( ctxl , "Stream creation validation failed" ,
583+ "err" , err )
577584 return nil , err
578585 }
579586
580- resp , err := h .InitStream (init )
587+ resp , err := h .InitStream (ctxl , init )
581588 if err != nil {
582589 return nil , err
583590 }
@@ -597,8 +604,9 @@ func (h *hashMailServer) DelCipherBox(ctx context.Context,
597604 return nil , err
598605 }
599606
600- log .Debugf ("New HashMail stream deletion: id=%x, auth=%v" ,
601- auth .Desc .StreamId , auth .Auth )
607+ ctxl := btclog .WithCtx (ctx , btclog .Hex ("stream_id" , auth .Desc .StreamId ))
608+
609+ log .DebugS (ctxl , "New HashMail stream deletion" , "auth" , auth .Auth )
602610
603611 if err := h .TearDownStream (ctx , auth .Desc .StreamId , auth ); err != nil {
604612 return nil , err
@@ -610,7 +618,7 @@ func (h *hashMailServer) DelCipherBox(ctx context.Context,
610618// SendStream implements the client streaming call to utilize the write end of
611619// a stream to send a message to the read end.
612620func (h * hashMailServer ) SendStream (readStream hashmailrpc.HashMail_SendStreamServer ) error {
613- log .Debugf ("New HashMail write stream pending..." )
621+ log .Debug ("New HashMail write stream pending..." )
614622
615623 // We'll need to receive the first message in order to determine if
616624 // this stream exists or not
@@ -621,6 +629,11 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
621629 return err
622630 }
623631
632+ ctx := btclog .WithCtx (
633+ readStream .Context (),
634+ btclog .Hex ("stream_id" , cipherBox .Desc .StreamId ),
635+ )
636+
624637 switch {
625638 case cipherBox .Desc == nil :
626639 return fmt .Errorf ("cipher box descriptor required" )
@@ -629,12 +642,11 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
629642 return fmt .Errorf ("stream_id required" )
630643 }
631644
632- log .Debugf ("New HashMail write stream: id=%x" ,
633- cipherBox .Desc .StreamId )
645+ log .DebugS (ctx , "New HashMail write stream" )
634646
635647 // Now that we have the first message, we can attempt to look up the
636648 // given stream.
637- writeStream , err := h .LookUpWriteStream (cipherBox .Desc .StreamId )
649+ writeStream , err := h .LookUpWriteStream (ctx , cipherBox .Desc .StreamId )
638650 if err != nil {
639651 return err
640652 }
@@ -643,13 +655,12 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
643655 // write inactive if the client hangs up on their end.
644656 defer writeStream .ReturnStream ()
645657
646- log .Tracef ( "Sending msg_len=%v to stream_id=%x" , len ( cipherBox . Msg ) ,
647- cipherBox .Desc . StreamId )
658+ log .TraceS ( ctx , "Sending message to stream" ,
659+ "msg_len" , len ( cipherBox .Msg ) )
648660
649661 // We'll send the first message into the stream, then enter our loop
650662 // below to continue to read from the stream and send it to the read
651663 // end.
652- ctx := readStream .Context ()
653664 if err := writeStream .WriteMsg (ctx , cipherBox .Msg ); err != nil {
654665 return err
655666 }
@@ -659,7 +670,7 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
659670 // exit before shutting down.
660671 select {
661672 case <- ctx .Done ():
662- log .Debugf ( "SendStream: Context done, exiting" )
673+ log .DebugS ( ctx , "SendStream: Context done, exiting" )
663674 return nil
664675 case <- h .quit :
665676 return fmt .Errorf ("server shutting down" )
@@ -669,13 +680,13 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
669680
670681 cipherBox , err := readStream .Recv ()
671682 if err != nil {
672- log .Debugf ( "SendStream: Exiting write stream RPC " +
673- "stream read: %v " , err )
683+ log .DebugS ( ctx , "SendStream: Exiting write stream RPC " +
684+ "stream read" , err )
674685 return err
675686 }
676687
677- log .Tracef ( "Sending msg_len=%v to stream_id=%x " ,
678- len (cipherBox .Msg ), cipherBox . Desc . StreamId )
688+ log .TraceS ( ctx , "Sending message to stream " ,
689+ "msg_len" , len (cipherBox .Msg ))
679690
680691 if err := writeStream .WriteMsg (ctx , cipherBox .Msg ); err != nil {
681692 return err
@@ -689,25 +700,30 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
689700func (h * hashMailServer ) RecvStream (desc * hashmailrpc.CipherBoxDesc ,
690701 reader hashmailrpc.HashMail_RecvStreamServer ) error {
691702
703+ ctx := btclog .WithCtx (
704+ reader .Context (),
705+ btclog .Hex ("stream_id" , desc .StreamId ),
706+ )
707+
692708 // First, we'll attempt to locate the stream. We allow any single
693709 // entity that knows of the full stream ID to access the read end.
694- readStream , err := h .LookUpReadStream (desc .StreamId )
710+ readStream , err := h .LookUpReadStream (ctx , desc .StreamId )
695711 if err != nil {
696712 return err
697713 }
698714
699- log .Debugf ( "New HashMail read stream: id=%x" , desc . StreamId )
715+ log .DebugS ( ctx , "New HashMail read stream" )
700716
701717 // If the reader hangs up, then we'll mark the stream as inactive so
702718 // another can take its place.
703- defer readStream .ReturnStream ()
719+ defer readStream .ReturnStream (ctx )
704720
705721 for {
706722 // Check to see if the stream has been closed or if we need to
707- // exit before shutting down .
723+ // exit before shutting d[own .
708724 select {
709725 case <- reader .Context ().Done ():
710- log .Debugf ( "Read stream context done." )
726+ log .DebugS ( ctx , "Read stream context done." )
711727 return nil
712728 case <- h .quit :
713729 return fmt .Errorf ("server shutting down" )
@@ -717,12 +733,11 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc,
717733
718734 nextMsg , err := readStream .ReadNextMsg (reader .Context ())
719735 if err != nil {
720- log .Debugf ( "Got error an read stream read: %v " , err )
736+ log .ErrorS ( ctx , "Got error on read stream read" , err )
721737 return err
722738 }
723739
724- log .Tracef ("Read %v bytes for HashMail stream_id=%x" ,
725- len (nextMsg ), desc .StreamId )
740+ log .TraceS (ctx , "Read bytes" , "msg_len" , len (nextMsg ))
726741
727742 // In order not to duplicate metric data, we only record this
728743 // read if its streamID is odd. We use the base stream ID as the
@@ -742,8 +757,8 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc,
742757 Msg : nextMsg ,
743758 })
744759 if err != nil {
745- log .Debugf ( "Got error when sending on read stream: %v " ,
746- err )
760+ log .DebugS ( ctx , "Got error when sending on read stream" ,
761+ "err" , err )
747762 return err
748763 }
749764 }
@@ -767,7 +782,7 @@ type streamStatus struct {
767782}
768783
769784// newStreamStatus constructs a new streamStatus instance.
770- func newStreamStatus (onStale func () error ,
785+ func newStreamStatus (ctx context. Context , onStale func () error ,
771786 staleTimeout time.Duration ) * streamStatus {
772787
773788 if staleTimeout < 0 {
@@ -778,7 +793,7 @@ func newStreamStatus(onStale func() error,
778793
779794 staleTimer := time .AfterFunc (staleTimeout , func () {
780795 if err := onStale (); err != nil {
781- log .Errorf ( "error in onStale callback: %v " , err )
796+ log .ErrorS ( ctx , "Error from onStale callback" , err )
782797 }
783798 })
784799
0 commit comments