@@ -15,8 +15,8 @@ use tokio::time::{timeout, Duration};
1515use tracing:: { debug, warn} ;
1616
1717use bacnet_encoding:: apdu:: {
18- self , encode_apdu, Apdu , ConfirmedRequest as ConfirmedRequestPdu , SegmentAck as SegmentAckPdu ,
19- SimpleAck ,
18+ self , encode_apdu, AbortPdu , Apdu , ConfirmedRequest as ConfirmedRequestPdu ,
19+ SegmentAck as SegmentAckPdu , SimpleAck ,
2020} ;
2121use bacnet_encoding:: npdu:: NpduAddress ;
2222use bacnet_network:: layer:: NetworkLayer ;
@@ -233,6 +233,10 @@ struct SegmentedReceiveState {
233233 expected_next_seq : u8 ,
234234 /// Timestamp of last received segment (for reaping stale sessions).
235235 last_activity : Instant ,
236+ /// Window position counter for per-window SegmentAck (Clause 5.2.2).
237+ window_position : u8 ,
238+ /// Proposed window size from the server.
239+ proposed_window_size : u8 ,
236240}
237241
238242/// Timeout for idle segmented reassembly sessions.
@@ -536,15 +540,47 @@ impl<T: TransportPort + 'static> BACnetClient<T> {
536540 let seg_ack_senders: Arc < Mutex < HashMap < SegKey , mpsc:: Sender < SegmentAckPdu > > > > =
537541 Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ;
538542 let seg_ack_senders_dispatch = Arc :: clone ( & seg_ack_senders) ;
543+ let segmented_response_accepted = config. segmented_response_accepted ;
539544
540545 let dispatch_task = tokio:: spawn ( async move {
541546 let mut seg_state: HashMap < SegKey , SegmentedReceiveState > = HashMap :: new ( ) ;
547+ let mut last_device_purge = Instant :: now ( ) ;
548+ const DEVICE_PURGE_INTERVAL : Duration = Duration :: from_secs ( 300 ) ;
549+ const DEVICE_MAX_AGE : Duration = Duration :: from_secs ( 600 ) ;
542550
543551 while let Some ( received) = apdu_rx. recv ( ) . await {
544552 let now = Instant :: now ( ) ;
545- seg_state. retain ( |_key, state| {
546- now. duration_since ( state. last_activity ) < SEG_RECEIVER_TIMEOUT
547- } ) ;
553+
554+ // Periodically purge stale device table entries
555+ if now. duration_since ( last_device_purge) >= DEVICE_PURGE_INTERVAL {
556+ device_table_dispatch
557+ . lock ( )
558+ . await
559+ . purge_stale ( DEVICE_MAX_AGE ) ;
560+ last_device_purge = now;
561+ }
562+ // Reap stale segmented sessions and send Abort to the server
563+ let stale_keys: Vec < SegKey > = seg_state
564+ . iter ( )
565+ . filter ( |( _, state) | {
566+ now. duration_since ( state. last_activity ) >= SEG_RECEIVER_TIMEOUT
567+ } )
568+ . map ( |( key, _) | key. clone ( ) )
569+ . collect ( ) ;
570+ for key in & stale_keys {
571+ if let Some ( _state) = seg_state. remove ( key) {
572+ let abort = Apdu :: Abort ( AbortPdu {
573+ sent_by_server : false ,
574+ invoke_id : key. 1 ,
575+ abort_reason : bacnet_types:: enums:: AbortReason :: TSM_TIMEOUT ,
576+ } ) ;
577+ let mut buf = BytesMut :: with_capacity ( 4 ) ;
578+ encode_apdu ( & mut buf, & abort) ;
579+ let _ = network_dispatch
580+ . send_apdu ( & buf, & key. 0 , false , NetworkPriority :: NORMAL )
581+ . await ;
582+ }
583+ }
548584
549585 match apdu:: decode_apdu ( received. apdu . clone ( ) ) {
550586 Ok ( decoded) => {
@@ -558,6 +594,7 @@ impl<T: TransportPort + 'static> BACnetClient<T> {
558594 & received. source_mac ,
559595 & received. source_network ,
560596 decoded,
597+ segmented_response_accepted,
561598 )
562599 . await ;
563600 }
@@ -592,6 +629,7 @@ impl<T: TransportPort + 'static> BACnetClient<T> {
592629 source_mac : & [ u8 ] ,
593630 source_network : & Option < NpduAddress > ,
594631 apdu : Apdu ,
632+ segmented_response_accepted : bool ,
595633 ) {
596634 match apdu {
597635 Apdu :: SimpleAck ( ack) => {
@@ -601,8 +639,15 @@ impl<T: TransportPort + 'static> BACnetClient<T> {
601639 }
602640 Apdu :: ComplexAck ( ack) => {
603641 if ack. segmented {
604- Self :: handle_segmented_complex_ack ( tsm, network, seg_state, source_mac, ack)
605- . await ;
642+ Self :: handle_segmented_complex_ack (
643+ tsm,
644+ network,
645+ seg_state,
646+ source_mac,
647+ ack,
648+ segmented_response_accepted,
649+ )
650+ . await ;
606651 } else {
607652 debug ! ( invoke_id = ack. invoke_id, "Received ComplexAck" ) ;
608653 let mut tsm = tsm. lock ( ) . await ;
@@ -760,6 +805,7 @@ impl<T: TransportPort + 'static> BACnetClient<T> {
760805 seg_state : & mut HashMap < SegKey , SegmentedReceiveState > ,
761806 source_mac : & [ u8 ] ,
762807 ack : bacnet_encoding:: apdu:: ComplexAck ,
808+ segmented_response_accepted : bool ,
763809 ) {
764810 let seq = ack. sequence_number . unwrap_or ( 0 ) ;
765811 let key = ( MacAddr :: from_slice ( source_mac) , ack. invoke_id ) ;
@@ -771,6 +817,21 @@ impl<T: TransportPort + 'static> BACnetClient<T> {
771817 "Received segmented ComplexAck"
772818 ) ;
773819
820+ // If client doesn't support segmented reception, send Abort per Clause 5.4.4.2
821+ if !segmented_response_accepted {
822+ let abort = Apdu :: Abort ( AbortPdu {
823+ sent_by_server : false ,
824+ invoke_id : ack. invoke_id ,
825+ abort_reason : bacnet_types:: enums:: AbortReason :: SEGMENTATION_NOT_SUPPORTED ,
826+ } ) ;
827+ let mut buf = BytesMut :: with_capacity ( 4 ) ;
828+ encode_apdu ( & mut buf, & abort) ;
829+ let _ = network
830+ . send_apdu ( & buf, source_mac, false , NetworkPriority :: NORMAL )
831+ . await ;
832+ return ;
833+ }
834+
774835 const MAX_CONCURRENT_SEG_SESSIONS : usize = 64 ;
775836 if !seg_state. contains_key ( & key) && seg_state. len ( ) >= MAX_CONCURRENT_SEG_SESSIONS {
776837 warn ! (
@@ -781,37 +842,55 @@ impl<T: TransportPort + 'static> BACnetClient<T> {
781842 return ;
782843 }
783844
845+ let proposed_ws = ack. proposed_window_size . unwrap_or ( 1 ) ;
784846 let state = seg_state
785847 . entry ( key. clone ( ) )
786848 . or_insert_with ( || SegmentedReceiveState {
787849 receiver : SegmentReceiver :: new ( ) ,
788850 expected_next_seq : 0 ,
789851 last_activity : Instant :: now ( ) ,
852+ window_position : 0 ,
853+ proposed_window_size : proposed_ws,
790854 } ) ;
791855
792856 state. last_activity = Instant :: now ( ) ;
793857
794858 if seq != state. expected_next_seq {
795- warn ! (
796- invoke_id = ack. invoke_id,
797- expected = state. expected_next_seq,
798- received = seq,
799- "Segment gap detected, sending negative SegmentAck"
800- ) ;
859+ // Check for duplicate (already received) vs true gap
860+ if seq < state. expected_next_seq {
861+ // Duplicate segment — discard silently and ack
862+ debug ! (
863+ invoke_id = ack. invoke_id,
864+ seq, "Discarding duplicate segment"
865+ ) ;
866+ } else {
867+ // True gap — send negative SegmentAck with last correctly received seq
868+ warn ! (
869+ invoke_id = ack. invoke_id,
870+ expected = state. expected_next_seq,
871+ received = seq,
872+ "Segment gap detected, sending negative SegmentAck"
873+ ) ;
874+ }
801875 let neg_ack = Apdu :: SegmentAck ( SegmentAckPdu {
802- negative_ack : true ,
876+ negative_ack : seq >= state . expected_next_seq ,
803877 sent_by_server : false ,
804878 invoke_id : ack. invoke_id ,
805- sequence_number : state. expected_next_seq ,
806- actual_window_size : ack. proposed_window_size . unwrap_or ( 1 ) ,
879+ // Spec: sequence_number = last correctly received sequence number
880+ sequence_number : if state. expected_next_seq > 0 {
881+ state. expected_next_seq . wrapping_sub ( 1 )
882+ } else {
883+ 0
884+ } ,
885+ actual_window_size : proposed_ws,
807886 } ) ;
808887 let mut buf = BytesMut :: with_capacity ( 4 ) ;
809888 encode_apdu ( & mut buf, & neg_ack) ;
810889 if let Err ( e) = network
811890 . send_apdu ( & buf, source_mac, false , NetworkPriority :: NORMAL )
812891 . await
813892 {
814- warn ! ( error = %e, "Failed to send negative SegmentAck" ) ;
893+ warn ! ( error = %e, "Failed to send SegmentAck" ) ;
815894 }
816895 return ;
817896 }
@@ -821,21 +900,28 @@ impl<T: TransportPort + 'static> BACnetClient<T> {
821900 return ;
822901 }
823902 state. expected_next_seq = seq. wrapping_add ( 1 ) ;
903+ state. window_position += 1 ;
824904
825- let seg_ack = Apdu :: SegmentAck ( SegmentAckPdu {
826- negative_ack : false ,
827- sent_by_server : false ,
828- invoke_id : ack. invoke_id ,
829- sequence_number : seq,
830- actual_window_size : ack. proposed_window_size . unwrap_or ( 1 ) ,
831- } ) ;
832- let mut buf = BytesMut :: with_capacity ( 4 ) ;
833- encode_apdu ( & mut buf, & seg_ack) ;
834- if let Err ( e) = network
835- . send_apdu ( & buf, source_mac, false , NetworkPriority :: NORMAL )
836- . await
837- {
838- warn ! ( error = %e, "Failed to send SegmentAck" ) ;
905+ // Per-window SegmentAck: only ack at window boundary or final segment (Clause 5.2.2)
906+ let should_ack = !ack. more_follows || state. window_position >= state. proposed_window_size ;
907+
908+ if should_ack {
909+ state. window_position = 0 ;
910+ let seg_ack = Apdu :: SegmentAck ( SegmentAckPdu {
911+ negative_ack : false ,
912+ sent_by_server : false ,
913+ invoke_id : ack. invoke_id ,
914+ sequence_number : seq,
915+ actual_window_size : proposed_ws,
916+ } ) ;
917+ let mut buf = BytesMut :: with_capacity ( 4 ) ;
918+ encode_apdu ( & mut buf, & seg_ack) ;
919+ if let Err ( e) = network
920+ . send_apdu ( & buf, source_mac, false , NetworkPriority :: NORMAL )
921+ . await
922+ {
923+ warn ! ( error = %e, "Failed to send SegmentAck" ) ;
924+ }
839925 }
840926
841927 if !ack. more_follows {
@@ -1100,6 +1186,7 @@ impl<T: TransportPort + 'static> BACnetClient<T> {
11001186 self . seg_ack_senders . lock ( ) . await . insert ( key, seg_ack_tx) ;
11011187 }
11021188
1189+ // Tseg: use APDU timeout for now (configurable via apdu_timeout_ms)
11031190 let timeout_duration = Duration :: from_millis ( self . config . apdu_timeout_ms ) ;
11041191 let max_ack_retries = self . config . apdu_retries ;
11051192 let mut window_size = self . config . proposed_window_size . max ( 1 ) as usize ;
0 commit comments