@@ -762,6 +762,274 @@ export class ClassifyLogEntry extends TransformStream<
762762 }
763763}
764764
765+ /** Merges GetBackgroundRSSI requests and responses into single entries */
766+ export class DetectBackgroundRSSICalls extends TransformStream <
767+ SemanticLogInfo ,
768+ SemanticLogInfo
769+ > {
770+ constructor ( ) {
771+ let pendingRequest : SemanticLogInfo | undefined ;
772+ const bufferedEntries : SemanticLogInfo [ ] = [ ] ;
773+
774+ function parseTimestamp ( timestamp : string ) : number {
775+ return new Date ( timestamp ) . getTime ( ) ;
776+ }
777+
778+ function flushBufferedEntries (
779+ controller : TransformStreamDefaultController < SemanticLogInfo > ,
780+ ) {
781+ for ( const entry of bufferedEntries ) {
782+ controller . enqueue ( entry ) ;
783+ }
784+ bufferedEntries . length = 0 ;
785+ }
786+
787+ const transformer : Transformer < SemanticLogInfo , SemanticLogInfo > = {
788+ transform ( chunk , controller ) {
789+ // Check if this is a GetBackgroundRSSI request
790+ if (
791+ chunk . kind === "REQUEST" &&
792+ chunk . direction === "outbound" &&
793+ chunk . message === "[GetBackgroundRSSI]"
794+ ) {
795+ // If we already have a pending request, flush it and start fresh
796+ this . flush ! ( controller ) ;
797+
798+ // Store the new request as pending
799+ pendingRequest = chunk ;
800+ return ;
801+ }
802+
803+ if ( ! pendingRequest ) {
804+ // No pending request, pass through immediately
805+ controller . enqueue ( chunk ) ;
806+ return ;
807+ }
808+
809+ const requestTime = parseTimestamp ( pendingRequest . timestamp ) ;
810+ const currentTime = parseTimestamp ( chunk . timestamp ) ;
811+ const timeDiff = currentTime - requestTime ;
812+
813+ // If more than 200ms have passed, flush everything
814+ if ( timeDiff > 200 ) {
815+ this . flush ! ( controller ) ;
816+ controller . enqueue ( chunk ) ;
817+ return ;
818+ }
819+
820+ // Check if this is the matching GetBackgroundRSSI response
821+ if (
822+ chunk . kind === "RESPONSE" &&
823+ chunk . direction === "inbound" &&
824+ typeof chunk . message === "object" &&
825+ chunk . message . message === "GetBackgroundRSSI" &&
826+ chunk . message . attributes
827+ ) {
828+ // Flush all buffered entries first
829+ flushBufferedEntries ( controller ) ;
830+
831+ // Create and emit the merged entry (do not emit the original request)
832+ const attributes = chunk . message . attributes ;
833+ const mergedEntry : SemanticLogInfo = {
834+ kind : "BACKGROUND_RSSI" ,
835+ timestamp : pendingRequest . timestamp ,
836+ "channel 0" : attributes [ "channel 0" ] as string ,
837+ "channel 1" : attributes [ "channel 1" ] as string ,
838+ ...( attributes [ "channel 2" ]
839+ ? { "channel 2" : attributes [ "channel 2" ] as string }
840+ : { } ) ,
841+ ...( attributes [ "channel 3" ]
842+ ? { "channel 3" : attributes [ "channel 3" ] as string }
843+ : { } ) ,
844+ } ;
845+
846+ controller . enqueue ( mergedEntry ) ;
847+ pendingRequest = undefined ;
848+ return ;
849+ }
850+
851+ // No the response we were looking for - buffer this entry while we wait for a response
852+ bufferedEntries . push ( chunk ) ;
853+ return ;
854+ } ,
855+
856+ flush ( controller ) {
857+ // Emit any remaining pending request and buffered entries
858+ if ( pendingRequest ) {
859+ controller . enqueue ( pendingRequest ) ;
860+ }
861+ pendingRequest = undefined ;
862+ flushBufferedEntries ( controller ) ;
863+ } ,
864+ } ;
865+
866+ super ( transformer ) ;
867+ }
868+ }
869+
870+ /** Aggregates consecutive BACKGROUND_RSSI entries into statistical summaries */
871+ export class AggregateBackgroundRSSI extends TransformStream <
872+ SemanticLogInfo ,
873+ SemanticLogInfo
874+ > {
875+ constructor ( ) {
876+ const bufferedRSSIEntries : SemanticLogInfo [ ] = [ ] ;
877+
878+ function parseRSSIValue ( rssiString : string ) : number {
879+ // Parse "-107 dBm" -> -107
880+ return parseInt ( rssiString , 10 ) ;
881+ }
882+
883+ function calculateMedian ( values : number [ ] ) : number {
884+ const sorted = [ ...values ] . sort ( ( a , b ) => a - b ) ;
885+ const mid = Math . floor ( sorted . length / 2 ) ;
886+ if ( sorted . length % 2 === 0 ) {
887+ return ( sorted [ mid - 1 ] + sorted [ mid ] ) / 2 ;
888+ }
889+ return sorted [ mid ] ;
890+ }
891+
892+ function calculateStdDev ( values : number [ ] ) : number {
893+ const mean =
894+ values . reduce ( ( sum , val ) => sum + val , 0 ) / values . length ;
895+ const variance =
896+ values . reduce ( ( sum , val ) => sum + Math . pow ( val - mean , 2 ) , 0 ) /
897+ values . length ;
898+ return Math . sqrt ( variance ) ;
899+ }
900+
901+ function findMinMaxWithTimestamp (
902+ values : number [ ] ,
903+ timestamps : string [ ] ,
904+ ) : {
905+ min : { value : number ; timestamp : string } ;
906+ max : { value : number ; timestamp : string } ;
907+ } {
908+ let minValue = values [ 0 ] ;
909+ let maxValue = values [ 0 ] ;
910+ let minTimestamp = timestamps [ 0 ] ;
911+ let maxTimestamp = timestamps [ 0 ] ;
912+
913+ for ( let i = 1 ; i < values . length ; i ++ ) {
914+ if ( values [ i ] < minValue ) {
915+ minValue = values [ i ] ;
916+ minTimestamp = timestamps [ i ] ;
917+ }
918+ if ( values [ i ] > maxValue ) {
919+ maxValue = values [ i ] ;
920+ maxTimestamp = timestamps [ i ] ;
921+ }
922+ }
923+
924+ return {
925+ min : { value : minValue , timestamp : minTimestamp } ,
926+ max : { value : maxValue , timestamp : maxTimestamp } ,
927+ } ;
928+ }
929+
930+ function aggregateRSSIEntries (
931+ entries : SemanticLogInfo [ ] ,
932+ ) : SemanticLogInfo {
933+ const channels : Record <
934+ string ,
935+ { values : number [ ] ; timestamps : string [ ] }
936+ > = { } ;
937+
938+ // Collect all channel data
939+ for ( const entry of entries ) {
940+ if ( entry . kind !== "BACKGROUND_RSSI" ) continue ;
941+
942+ for ( const [ channelKey , rssiString ] of Object . entries ( entry ) ) {
943+ if ( channelKey === "kind" || channelKey === "timestamp" )
944+ continue ;
945+ if ( typeof rssiString !== "string" ) continue ;
946+
947+ channels [ channelKey ] ??= { values : [ ] , timestamps : [ ] } ;
948+
949+ channels [ channelKey ] . values . push (
950+ parseRSSIValue ( rssiString ) ,
951+ ) ;
952+ channels [ channelKey ] . timestamps . push ( entry . timestamp ) ;
953+ }
954+ }
955+
956+ // Calculate statistics for each channel
957+ const channelStats : Record < string , any > = { } ;
958+ for ( const [ channelKey , data ] of Object . entries ( channels ) ) {
959+ const { min, max } = findMinMaxWithTimestamp (
960+ data . values ,
961+ data . timestamps ,
962+ ) ;
963+ const median = calculateMedian ( data . values ) ;
964+ const stddev =
965+ Math . round ( calculateStdDev ( data . values ) * 100 ) / 100 ; // Round to 2 decimal places
966+
967+ channelStats [ channelKey ] = {
968+ min,
969+ max,
970+ median,
971+ stddev,
972+ } ;
973+ }
974+
975+ const summary : SemanticLogInfo = {
976+ kind : "BACKGROUND_RSSI_SUMMARY" ,
977+ timestamp : entries [ 0 ] . timestamp ,
978+ samples : entries . length ,
979+ time_range : {
980+ start : entries [ 0 ] . timestamp ,
981+ end : entries . at ( - 1 ) ! . timestamp ,
982+ } ,
983+ ...channelStats ,
984+ } as any ;
985+
986+ return summary ;
987+ }
988+
989+ function flushBufferedEntries (
990+ controller : TransformStreamDefaultController < SemanticLogInfo > ,
991+ ) {
992+ if ( bufferedRSSIEntries . length === 0 ) return ;
993+
994+ if ( bufferedRSSIEntries . length <= 2 ) {
995+ // Not enough entries to aggregate, emit raw entries
996+ for ( const entry of bufferedRSSIEntries ) {
997+ controller . enqueue ( entry ) ;
998+ }
999+ } else {
1000+ // Aggregate the entries
1001+ const summary = aggregateRSSIEntries ( bufferedRSSIEntries ) ;
1002+ controller . enqueue ( summary ) ;
1003+ }
1004+
1005+ bufferedRSSIEntries . length = 0 ;
1006+ }
1007+
1008+ const transformer : Transformer < SemanticLogInfo , SemanticLogInfo > = {
1009+ transform ( chunk , controller ) {
1010+ if ( chunk . kind === "BACKGROUND_RSSI" ) {
1011+ // Buffer this RSSI entry
1012+ bufferedRSSIEntries . push ( chunk ) ;
1013+ return ;
1014+ }
1015+
1016+ // Different entry type found, flush any buffered RSSI entries
1017+ flushBufferedEntries ( controller ) ;
1018+
1019+ // Pass through the current entry
1020+ controller . enqueue ( chunk ) ;
1021+ } ,
1022+
1023+ flush ( controller ) {
1024+ // Flush any remaining buffered RSSI entries
1025+ flushBufferedEntries ( controller ) ;
1026+ } ,
1027+ } ;
1028+
1029+ super ( transformer ) ;
1030+ }
1031+ }
1032+
7651033/** Main pipeline class that processes log content through all transform stages */
7661034export class LogTransformPipeline {
7671035 async processLogContent ( logContent : string ) : Promise < SemanticLogInfo [ ] > {
@@ -773,6 +1041,8 @@ export class LogTransformPipeline {
7731041 const parseNestedStructures = new ParseNestedStructures ( ) ;
7741042 const filterLogEntries = new FilterLogEntries ( ) ;
7751043 const classifyLogEntry = new ClassifyLogEntry ( ) ;
1044+ const detectBackgroundRSSICalls = new DetectBackgroundRSSICalls ( ) ;
1045+ const aggregateBackgroundRSSI = new AggregateBackgroundRSSI ( ) ;
7761046
7771047 // Create a writable stream to collect results
7781048 const writableStream = new WritableStream < SemanticLogInfo > ( {
@@ -795,6 +1065,8 @@ export class LogTransformPipeline {
7951065 . pipeThrough ( parseNestedStructures )
7961066 . pipeThrough ( filterLogEntries )
7971067 . pipeThrough ( classifyLogEntry )
1068+ . pipeThrough ( detectBackgroundRSSICalls )
1069+ . pipeThrough ( aggregateBackgroundRSSI )
7981070 . pipeTo ( writableStream ) ;
7991071
8001072 return entries ;
0 commit comments