@@ -24,7 +24,7 @@ def __init__(self, pts=None, buffer_duration_ms=5000):
2424 # A note on locking. The lock is principally to protect outputframe, which is called by
2525 # the background encoder thread. Applications are going to call things like open_output,
2626 # close_output, start and stop. These only grab that lock for a short period of time to
27- # manipulate _output_available , which controls whether outputframe will do anything.
27+ # manipulate _output , which controls whether outputframe will do anything.
2828 # THe application API does not have it's own lock, because there doesn't seem to be a
2929 # need to drive it from different threads (though we could add one if necessary).
3030 self ._lock = Lock ()
@@ -33,7 +33,6 @@ def __init__(self, pts=None, buffer_duration_ms=5000):
3333 self ._buffer_duration_ms = buffer_duration_ms
3434 self ._circular = collections .deque ()
3535 self ._output = None
36- self ._output_available = False
3736 self ._streams = []
3837
3938 @property
@@ -52,15 +51,14 @@ def open_output(self, output):
5251 if self ._output :
5352 raise RuntimeError ("Underlying output must be closed first" )
5453
55- self ._output = output
56- self ._output .start ()
54+ output .start ()
5755 # Some outputs (PyavOutput) may need to know about the encoder's streams.
5856 for encoder_stream , codec , kwargs in self ._streams :
5957 output ._add_stream (encoder_stream , codec , ** kwargs )
6058
6159 # Now it's OK for the background thread to output frames.
6260 with self ._lock :
63- self ._output_available = True
61+ self ._output = output
6462 self ._first_frame = True
6563
6664 def close_output (self ):
@@ -69,43 +67,39 @@ def close_output(self):
6967 raise RuntimeError ("No underlying output has been opened" )
7068
7169 # After this, we guarantee that the background thread will never use the output.
70+ output = self ._output
7271 with self ._lock :
73- self ._output_available = False
72+ self ._output = None
7473
75- self ._output .stop ()
76- self ._output = None
74+ output .stop ()
75+
76+ def _flush (self , timestamp_now , output ):
77+ # Flush out anything that is time-expired compared to timestamp_now.
78+ # If timestamp_now is None, flush everything.
79+ while self ._circular and (front := self ._circular [0 ]):
80+ _ , keyframe , timestamp , _ , audio = front
7781
78- def _get_frame (self ):
79- # Fetch the next frame to be saved to the underlying output.
80- if not self ._circular :
81- return
82- if not self ._first_frame :
83- return self ._circular .popleft ()
84- # Must skip ahead to the first I frame if we haven't seen one yet.
85- while self ._circular :
86- entry = self ._circular .popleft ()
87- _ , key_frame , _ , _ , audio = entry
88- # If there is audio, all audio frames are likely to be keyframes, so we must ignore them when
89- # deciding when the streams can resume - only the video counts.
90- if key_frame and not audio :
82+ if timestamp_now and timestamp_now - timestamp < self .buffer_duration_ms * 1000 :
83+ break
84+
85+ # We need to drop this entry, writing it out if we can.
86+ self ._circular .popleft ()
87+
88+ if keyframe and not audio :
9189 self ._first_frame = False
92- return entry
90+
91+ if not self ._first_frame and output :
92+ output .outputframe (* front )
9393
9494 def outputframe (self , frame , keyframe = True , timestamp = None , packet = None , audio = False ):
9595 """Write frame to circular buffer"""
9696 with self ._lock :
9797 if self ._buffer_duration_ms == 0 or not self .recording :
9898 return
99- self ._circular .append ((frame , keyframe , timestamp , packet , audio ))
100- # Discard any expired buffer entries.
101- while timestamp - self ._circular [0 ][2 ] > self ._buffer_duration_ms * 1000 :
102- self ._circular .popleft ()
10399
104- if self ._output_available :
105- # Actually write this to the underlying output.
106- entry = self ._get_frame ()
107- if entry :
108- self ._output .outputframe (* entry )
100+ # Add this new frame to the buffer and flush anything that is now expired.
101+ self ._circular .append ((frame , keyframe , timestamp , packet , audio ))
102+ self ._flush (timestamp , self ._output )
109103
110104 def start (self ):
111105 """Start recording in the circular buffer."""
@@ -116,20 +110,19 @@ def start(self):
116110
117111 def stop (self ):
118112 """Close file handle and stop recording"""
113+ output = self ._output
119114 with self ._lock :
120115 if not self .recording :
121116 raise RuntimeError ("Circular output was not started" )
122117 self ._recording = False
123- self ._output_available = False
124-
125- # Flush out anything remaining in the buffer if the underlying output is still going
126- # when we stop.
127- if self ._output :
128- while (entry := self ._get_frame ()):
129- self ._output .outputframe (* entry )
130- self ._output .stop ()
131118 self ._output = None
132119
120+ # At this point the background thread can't be using the circular buffer or the output,
121+ # so we can flush everything out.
122+ if output :
123+ self ._flush (None , output )
124+ output .stop ()
125+
133126 def _add_stream (self , encoder_stream , codec_name , ** kwargs ):
134127 # Notice the PyavOutput of a stream that will be sending it packets to write out. It will need
135128 # to forward these whenever a new underlying output is opened.
0 commit comments