@@ -1182,6 +1182,10 @@ gst_kvs_sink_handle_sink_event (GstCollectPads *pads,
11821182 event = NULL ;
11831183 break ;
11841184 }
1185+ case GST_EVENT_EOS: {
1186+ LOG_INFO (" EOS Event received in sink for " << kvssink->stream_name );
1187+ break ;
1188+ }
11851189 default :
11861190 break ;
11871191 }
@@ -1245,12 +1249,19 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads,
12451249 GstMapInfo info;
12461250
12471251 info.data = NULL ;
1248-
12491252 // eos reached
12501253 if (buf == NULL && track_data == NULL ) {
12511254 LOG_INFO (" Received event for " << kvssink->stream_name );
1252- data->kinesis_video_stream ->stopSync ();
1253- LOG_INFO (" Sending eos for " << kvssink->stream_name );
1255+ // Need this check in case pipeline is already being set to NULL and
1256+ // stream is being or/already stopped. Although stopSync() is an idempotent call,
1257+ // we want to avoid an extra call. It is not possible for this callback to be invoked
1258+ // after stopSync() since we stop collecting on pads before invoking. But having this
1259+ // check anyways in case it happens
1260+ if (!data->streamingStopped .load ()) {
1261+ data->kinesis_video_stream ->stopSync ();
1262+ data->streamingStopped .store (true );
1263+ LOG_INFO (" Sending eos for " << kvssink->stream_name );
1264+ }
12541265
12551266 // send out eos message to gstreamer bus
12561267 message = gst_message_new_eos (GST_OBJECT_CAST (kvssink));
@@ -1599,7 +1610,22 @@ gst_kvs_sink_change_state(GstElement *element, GstStateChange transition) {
15991610 gst_collect_pads_start (kvssink->collect );
16001611 break ;
16011612 case GST_STATE_CHANGE_PAUSED_TO_READY:
1613+ LOG_INFO (" Stopping kvssink for " << kvssink->stream_name );
16021614 gst_collect_pads_stop (kvssink->collect );
1615+
1616+ // Need this check in case an EOS was received in the buffer handler and
1617+ // stream was already stopped. Although stopSync() is an idempotent call,
1618+ // we want to avoid an extra call
1619+ if (!data->streamingStopped .load ()) {
1620+ data->kinesis_video_stream ->stopSync ();
1621+ data->streamingStopped .store (true );
1622+ } else {
1623+ LOG_INFO (" Streaming already stopped for " << kvssink->stream_name );
1624+ }
1625+ LOG_INFO (" Stopped kvssink for " << kvssink->stream_name );
1626+ break ;
1627+ case GST_STATE_CHANGE_READY_TO_NULL:
1628+ LOG_INFO (" Pipeline state changed to NULL in kvssink" );
16031629 break ;
16041630 default :
16051631 break ;
0 commit comments