@@ -265,6 +265,7 @@ def __init__(self, client: Client, channel: abc.Connectable):
265
265
self .sink = None
266
266
self .starting_time = None
267
267
self .stopping_time = None
268
+ self .temp_queued_data : dict [int , list ] = {}
268
269
269
270
warn_nacl = not has_nacl
270
271
supported_modes : tuple [SupportedModes , ...] = (
@@ -890,7 +891,7 @@ def recv_audio(self, sink, callback, *args):
890
891
# it by user, handles pcm files and
891
892
# silence that should be added.
892
893
893
- self .user_timestamps : dict [int , tuple [int , float ]] = {}
894
+ self .user_timestamps : dict [int , tuple [int , int , float ]] = {}
894
895
self .starting_time = time .perf_counter ()
895
896
self .first_packet_timestamp : float
896
897
while self .recording :
@@ -918,7 +919,30 @@ def recv_audio(self, sink, callback, *args):
918
919
919
920
def recv_decoded_audio (self , data : RawData ):
920
921
# Add silence when they were not being recorded.
921
- if data .ssrc not in self .user_timestamps : # First packet from user
922
+ data .user_id = self .ws .ssrc_map .get (data .ssrc , {}).get ("user_id" )
923
+
924
+ if data .user_id is None :
925
+ _log .debug (
926
+ f"DEBUG: received packet with SSRC { data .ssrc } not linked to a user_id."
927
+ f"Queueing for later processing."
928
+ )
929
+ self .temp_queued_data .setdefault (data .ssrc , []).append (data )
930
+ return
931
+ elif data .ssrc in self .temp_queued_data :
932
+ _log .debug (
933
+ "DEBUG: We got %d packet(s) in queue for SSRC %d" ,
934
+ len (self .temp_queued_data [data .ssrc ]),
935
+ data .ssrc ,
936
+ )
937
+ queued_packets = self .temp_queued_data .pop (data .ssrc )
938
+ for q_packet in queued_packets :
939
+ q_packet .user_id = data .user_id
940
+ self ._process_audio_packet (q_packet )
941
+
942
+ self ._process_audio_packet (data )
943
+
944
+ def _process_audio_packet (self , data : RawData ):
945
+ if data .user_id not in self .user_timestamps : # First packet from user
922
946
if (
923
947
not self .user_timestamps or not self .sync_start
924
948
): # First packet from anyone
@@ -931,19 +955,33 @@ def recv_decoded_audio(self, data: RawData):
931
955
) - 960
932
956
933
957
else : # Previously received a packet from user
934
- dRT = (
935
- data .receive_time - self .user_timestamps [data .ssrc ][1 ]
936
- ) * 48000 # delta receive time
937
- dT = data .timestamp - self .user_timestamps [data .ssrc ][0 ] # delta timestamp
938
- diff = abs (100 - dT * 100 / dRT )
939
- if (
940
- diff > 60 and dT != 960
941
- ): # If the difference in change is more than 60% threshold
942
- silence = dRT - 960
958
+ prev_ssrc = self .user_timestamps [data .user_id ][0 ]
959
+ prev_timestamp = self .user_timestamps [data .user_id ][1 ]
960
+ prev_receive_time = self .user_timestamps [data .user_id ][2 ]
961
+
962
+ if data .ssrc != prev_ssrc :
963
+ _log .info (
964
+ f"Received audio data from USER_ID { data .user_id } with a previous SSRC { prev_ssrc } and new "
965
+ f"SSRC { data .ssrc } ."
966
+ )
967
+ dRT = (data .receive_time - prev_receive_time ) * 1000
968
+ silence = max (0 , int (dRT / (1000 / 48000 ))) - 960
943
969
else :
944
- silence = dT - 960
970
+ dRT = (
971
+ data .receive_time - prev_receive_time
972
+ ) * 48000 # delta receive time
973
+ dT = data .timestamp - prev_timestamp # delta timestamp
974
+ diff = abs (100 - dT * 100 / dRT )
975
+ if (
976
+ diff > 60 and dT != 960
977
+ ): # If the difference in change is more than 60% threshold
978
+ silence = dRT - 960
979
+ else :
980
+ silence = dT - 960
945
981
946
- self .user_timestamps .update ({data .ssrc : (data .timestamp , data .receive_time )})
982
+ self .user_timestamps .update (
983
+ {data .user_id : (data .ssrc , data .timestamp , data .receive_time )}
984
+ )
947
985
948
986
data .decoded_data = (
949
987
struct .pack ("<h" , 0 ) * max (0 , int (silence )) * opus ._OpusStruct .CHANNELS
0 commit comments