@@ -11,7 +11,6 @@ import (
1111 "github.com/btcsuite/btclog/v2"
1212 "github.com/lightninglabs/lightning-node-connect/hashmailrpc"
1313 "github.com/lightningnetwork/lnd/tlv"
14- "github.com/prometheus/client_golang/prometheus"
1514 "golang.org/x/time/rate"
1615 "google.golang.org/grpc/codes"
1716 "google.golang.org/grpc/status"
@@ -35,6 +34,11 @@ const (
3534 // DefaultBufSize is the default number of bytes that are read in a
3635 // single operation.
3736 DefaultBufSize = 4096
37+
38+ // streamTTL is the amount of time that a stream needs to be exist without
39+ // reads for it to be considered for pruning. Otherwise, memory will grow
40+ // unbounded.
41+ streamTTL = 24 * time .Hour
3842)
3943
4044// streamID is the identifier of a stream.
@@ -747,9 +751,7 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc,
747751 streamID := newStreamID (desc .StreamId )
748752 if streamID .isOdd () {
749753 baseID := streamID .baseID ()
750- mailboxReadCount .With (prometheus.Labels {
751- streamIDLabel : fmt .Sprintf ("%x" , baseID ),
752- }).Inc ()
754+ streamActivityTracker .Record (fmt .Sprintf ("%x" , baseID ))
753755 }
754756
755757 err = reader .Send (& hashmailrpc.CipherBox {
@@ -766,6 +768,91 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc,
766768
767769var _ hashmailrpc.HashMailServer = (* hashMailServer )(nil )
768770
771+ // streamActivity tracks per-session read activity for classifying mailbox
772+ // sessions as active, standby, or in-use. It maintains an in-memory map
773+ // of stream IDs to counters and timestamps.
774+ type streamActivity struct {
775+ sync.Mutex
776+ streams map [string ]* activityEntry
777+ }
778+
779+ // activityEntry holds the read count and last update time for a single mailbox
780+ // session.
781+ type activityEntry struct {
782+ count uint64
783+ lastUpdate time.Time
784+ }
785+
786+ // newStreamActivity creates a new streamActivity tracker used to monitor
787+ // mailbox read activity per stream ID.
788+ func newStreamActivity () * streamActivity {
789+ return & streamActivity {
790+ streams : make (map [string ]* activityEntry ),
791+ }
792+ }
793+
794+ // Record logs a read event for the given base stream ID.
795+ // It increments the read count and updates the last activity timestamp.
796+ func (sa * streamActivity ) Record (baseID string ) {
797+ sa .Lock ()
798+ defer sa .Unlock ()
799+
800+ entry , ok := sa .streams [baseID ]
801+ if ! ok {
802+ entry = & activityEntry {}
803+ sa .streams [baseID ] = entry
804+ }
805+ entry .count ++
806+ entry .lastUpdate = time .Now ()
807+ }
808+
809+ // ClassifyAndReset categorizes each tracked stream based on its recent read
810+ // rate and returns aggregate counts of active, standby, and in-use sessions.
811+ // A stream is classified as:
812+ // - In-use: if read rate ≥ 0.5 reads/sec.
813+ // - Standby: if 0 < read rate < 0.5 reads/sec.
814+ // - Active: if read rate > 0 (includes standby and in-use).
815+ func (sa * streamActivity ) ClassifyAndReset () (active , standby , inuse int ) {
816+ sa .Lock ()
817+ defer sa .Unlock ()
818+
819+ now := time .Now ()
820+
821+ for baseID , e := range sa .streams {
822+ inactiveDuration := now .Sub (e .lastUpdate )
823+
824+ // Prune if idle for >24h and no new reads.
825+ if e .count == 0 && inactiveDuration > streamTTL {
826+ delete (sa .streams , baseID )
827+ continue
828+ }
829+
830+ elapsed := inactiveDuration .Seconds ()
831+ if elapsed <= 0 {
832+ // Prevent divide-by-zero, treat as 1s interval.
833+ elapsed = 1
834+ }
835+
836+ rate := float64 (e .count ) / elapsed
837+
838+ switch {
839+ case rate >= 0.5 :
840+ inuse ++
841+ case rate > 0 :
842+ standby ++
843+ }
844+ if rate > 0 {
845+ active ++
846+ }
847+
848+ // Reset for next window.
849+ e .count = 0
850+ e .lastUpdate = now
851+ }
852+
853+ return active , standby , inuse
854+ }
855+
769856// streamStatus keeps track of the occupancy status of a stream's read and
770857// write sub-streams. It is initialised with callback functions to call on the
771858// event of the streams being occupied (either or both of the streams are
0 commit comments