Skip to content

Commit eb227ca

Browse files
Asure correct data & flush order during multiple flushes (#194)
Summary: Asure correct data & flush order during multiple flushes Type: Fix Test Plan: UT/CT, Fullstack Jira: NO-JIRA
1 parent 708c1df commit eb227ca

11 files changed

+436
-14
lines changed

source/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ add_library(gstrialtosinks
4848
Timer.cpp
4949
BufferParser.cpp
5050
LogToGstHandler.cpp
51+
FlushAndDataSynchronizer.cpp
5152
)
5253

5354
target_include_directories(gstrialtosinks
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright (C) 2026 Sky UK
3+
*
4+
* This library is free software; you can redistribute it and/or
5+
* modify it under the terms of the GNU Lesser General Public
6+
* License as published by the Free Software Foundation;
7+
* version 2.1 of the License.
8+
*
9+
* This library is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12+
* Lesser General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Lesser General Public
15+
* License along with this library; if not, write to the Free Software
16+
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17+
*/
18+
19+
#include "FlushAndDataSynchronizer.h"
20+
#include "GStreamerMSEMediaPlayerClient.h"
21+
#include "GstreamerCatLog.h"
22+
#include <algorithm>
23+
#include <gst/gst.h>
24+
25+
#define GST_CAT_DEFAULT rialtoGStreamerCat
26+
27+
void FlushAndDataSynchronizer::addSource(int32_t sourceId)
28+
{
29+
std::unique_lock lock(m_mutex);
30+
m_sourceStates[sourceId] = {FlushState::IDLE, DataState::NO_DATA};
31+
GST_INFO("Added source %d to FlushAndDataSynchronizer", sourceId);
32+
}
33+
34+
void FlushAndDataSynchronizer::removeSource(int32_t sourceId)
35+
{
36+
std::unique_lock lock(m_mutex);
37+
m_sourceStates.erase(sourceId);
38+
m_cv.notify_all();
39+
GST_INFO("Removed source %d from FlushAndDataSynchronizer", sourceId);
40+
}
41+
42+
void FlushAndDataSynchronizer::notifyFlushStarted(int32_t sourceId)
43+
{
44+
std::unique_lock lock(m_mutex);
45+
m_sourceStates[sourceId].flushState = FlushState::FLUSHING;
46+
m_sourceStates[sourceId].dataState = DataState::NO_DATA;
47+
GST_INFO("FlushAndDataSynchronizer: Flush started for source %d", sourceId);
48+
}
49+
50+
void FlushAndDataSynchronizer::notifyFlushCompleted(int32_t sourceId)
51+
{
52+
std::unique_lock lock(m_mutex);
53+
m_sourceStates[sourceId].flushState = FlushState::FLUSHED;
54+
m_cv.notify_all();
55+
GST_INFO("FlushAndDataSynchronizer: Flush completed for source %d", sourceId);
56+
}
57+
58+
void FlushAndDataSynchronizer::notifyDataReceived(int32_t sourceId)
59+
{
60+
std::unique_lock lock(m_mutex);
61+
if (m_sourceStates[sourceId].dataState == DataState::NO_DATA)
62+
{
63+
m_sourceStates[sourceId].dataState = DataState::DATA_RECEIVED;
64+
GST_INFO("FlushAndDataSynchronizer: Data received for source %d", sourceId);
65+
}
66+
}
67+
68+
void FlushAndDataSynchronizer::notifyDataPushed(int32_t sourceId)
69+
{
70+
std::unique_lock lock(m_mutex);
71+
m_sourceStates[sourceId].dataState = DataState::DATA_PUSHED;
72+
m_sourceStates[sourceId].flushState = FlushState::IDLE;
73+
m_cv.notify_all();
74+
GST_INFO("FlushAndDataSynchronizer: Data pushed for source %d", sourceId);
75+
}
76+
77+
void FlushAndDataSynchronizer::waitIfRequired(int32_t sourceId)
78+
{
79+
std::unique_lock lock(m_mutex);
80+
GST_INFO("FlushAndDataSynchronizer: waitIfRequired enter for source %d", sourceId);
81+
m_cv.wait(lock,
82+
[&]()
83+
{
84+
return std::none_of(m_sourceStates.begin(), m_sourceStates.end(),
85+
[&](const auto &state)
86+
{
87+
return (state.first == sourceId &&
88+
state.second.flushState == FlushState::FLUSHING) ||
89+
(state.second.flushState != FlushState::IDLE &&
90+
state.second.dataState == DataState::DATA_RECEIVED);
91+
});
92+
});
93+
GST_INFO("FlushAndDataSynchronizer: waitIfRequired exit for source %d", sourceId);
94+
}
95+
96+
bool FlushAndDataSynchronizer::isAnySourceFlushing() const
97+
{
98+
std::unique_lock lock(m_mutex);
99+
return std::any_of(m_sourceStates.begin(), m_sourceStates.end(),
100+
[](const auto &state) { return state.second.flushState == FlushState::FLUSHING; });
101+
}

source/FlushAndDataSynchronizer.h

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright (C) 2026 Sky UK
3+
*
4+
* This library is free software; you can redistribute it and/or
5+
* modify it under the terms of the GNU Lesser General Public
6+
* License as published by the Free Software Foundation;
7+
* version 2.1 of the License.
8+
*
9+
* This library is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12+
* Lesser General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Lesser General Public
15+
* License along with this library; if not, write to the Free Software
16+
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17+
*/
18+
19+
#ifndef FLUSH_AND_DATA_SYNCHRONIZER_H_
20+
#define FLUSH_AND_DATA_SYNCHRONIZER_H_
21+
22+
#include "IFlushAndDataSynchronizer.h"
23+
#include <condition_variable>
24+
#include <map>
25+
#include <mutex>
26+
#include <vector>
27+
28+
class FlushAndDataSynchronizer : public IFlushAndDataSynchronizer
29+
{
30+
enum class FlushState
31+
{
32+
IDLE,
33+
FLUSHING,
34+
FLUSHED
35+
};
36+
37+
enum class DataState
38+
{
39+
NO_DATA,
40+
DATA_RECEIVED,
41+
DATA_PUSHED
42+
};
43+
44+
struct SourceState
45+
{
46+
FlushState flushState;
47+
DataState dataState;
48+
};
49+
50+
struct FlushData
51+
{
52+
int32_t sourceId;
53+
bool resetTime;
54+
};
55+
56+
public:
57+
FlushAndDataSynchronizer() = default;
58+
~FlushAndDataSynchronizer() override = default;
59+
60+
void addSource(int32_t sourceId) override;
61+
void removeSource(int32_t sourceId) override;
62+
void notifyFlushStarted(int32_t sourceId) override;
63+
void notifyFlushCompleted(int32_t sourceId) override;
64+
void notifyDataReceived(int32_t sourceId) override;
65+
void notifyDataPushed(int32_t sourceId) override;
66+
void waitIfRequired(int32_t sourceId) override;
67+
bool isAnySourceFlushing() const override;
68+
69+
private:
70+
mutable std::mutex m_mutex;
71+
std::condition_variable m_cv;
72+
std::map<int32_t, SourceState> m_sourceStates;
73+
};
74+
75+
#endif // FLUSH_AND_DATA_SYNCHRONIZER_H_

source/GStreamerMSEMediaPlayerClient.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,8 @@ StateChangeResult GStreamerMSEMediaPlayerClient::play(int32_t sourceId)
254254
return;
255255
}
256256

257-
if (m_serverPlaybackState == firebolt::rialto::PlaybackState::PLAYING)
257+
if (m_serverPlaybackState == firebolt::rialto::PlaybackState::PLAYING ||
258+
(m_serverPlaybackState == firebolt::rialto::PlaybackState::END_OF_STREAM && wasPlayingBeforeEos))
258259
{
259260
GST_INFO("Server is already playing");
260261
sourceIt->second.m_state = ClientState::PLAYING;
@@ -296,7 +297,8 @@ StateChangeResult GStreamerMSEMediaPlayerClient::play(int32_t sourceId)
296297
}
297298
else
298299
{
299-
GST_WARNING("Not in PAUSED state in %u state", static_cast<uint32_t>(m_clientState));
300+
GST_WARNING("Not in PAUSED state in client state %u state; server playback state: %u",
301+
static_cast<uint32_t>(m_clientState), static_cast<uint32_t>(m_serverPlaybackState));
300302
}
301303

302304
result = StateChangeResult::SUCCESS_ASYNC;
@@ -387,9 +389,11 @@ void GStreamerMSEMediaPlayerClient::setPlaybackRate(double rate)
387389

388390
void GStreamerMSEMediaPlayerClient::flush(int32_t sourceId, bool resetTime)
389391
{
392+
m_flushAndDataSynchronizer.notifyFlushStarted(sourceId);
390393
m_backendQueue->callInEventLoop(
391394
[&]()
392395
{
396+
wasPlayingBeforeEos = false;
393397
bool async{true};
394398
auto sourceIt = m_attachedSources.find(sourceId);
395399
if (sourceIt == m_attachedSources.end())
@@ -519,6 +523,7 @@ bool GStreamerMSEMediaPlayerClient::attachSource(std::unique_ptr<firebolt::rialt
519523
m_attachedSources.emplace(source->getId(),
520524
AttachedSource(rialtoSink, bufferPuller, delegate, source->getType()));
521525
delegate->setSourceId(source->getId());
526+
m_flushAndDataSynchronizer.addSource(source->getId());
522527
bufferPuller->start();
523528
}
524529
}
@@ -566,6 +571,7 @@ void GStreamerMSEMediaPlayerClient::removeSource(int32_t sourceId)
566571
GST_WARNING("Remove source %d failed", sourceId);
567572
}
568573
m_attachedSources.erase(sourceId);
574+
m_flushAndDataSynchronizer.removeSource(sourceId);
569575
});
570576
}
571577

@@ -575,12 +581,14 @@ void GStreamerMSEMediaPlayerClient::handlePlaybackStateChange(firebolt::rialto::
575581
m_backendQueue->callInEventLoop(
576582
[&]()
577583
{
584+
const auto kPreviousState{m_serverPlaybackState};
578585
m_serverPlaybackState = state;
579586
switch (state)
580587
{
581588
case firebolt::rialto::PlaybackState::PAUSED:
582589
case firebolt::rialto::PlaybackState::PLAYING:
583590
{
591+
wasPlayingBeforeEos = false;
584592
if (state == firebolt::rialto::PlaybackState::PAUSED && m_clientState == ClientState::AWAITING_PAUSED)
585593
{
586594
m_clientState = ClientState::PAUSED;
@@ -616,6 +624,10 @@ void GStreamerMSEMediaPlayerClient::handlePlaybackStateChange(firebolt::rialto::
616624
}
617625
case firebolt::rialto::PlaybackState::END_OF_STREAM:
618626
{
627+
if (!wasPlayingBeforeEos && firebolt::rialto::PlaybackState::PLAYING == kPreviousState)
628+
{
629+
wasPlayingBeforeEos = true;
630+
}
619631
for (const auto &source : m_attachedSources)
620632
{
621633
source.second.m_delegate->handleEos();
@@ -629,6 +641,7 @@ void GStreamerMSEMediaPlayerClient::handlePlaybackStateChange(firebolt::rialto::
629641
}
630642
case firebolt::rialto::PlaybackState::FAILURE:
631643
{
644+
wasPlayingBeforeEos = false;
632645
for (const auto &source : m_attachedSources)
633646
{
634647
source.second.m_delegate->handleError("Rialto server playback failed");
@@ -669,6 +682,7 @@ void GStreamerMSEMediaPlayerClient::handleSourceFlushed(int32_t sourceId)
669682
}
670683
sourceIt->second.m_isFlushing = false;
671684
sourceIt->second.m_delegate->handleFlushCompleted();
685+
m_flushAndDataSynchronizer.notifyFlushCompleted(sourceId);
672686
});
673687
}
674688

@@ -925,6 +939,11 @@ bool GStreamerMSEMediaPlayerClient::switchSource(const std::unique_ptr<firebolt:
925939
return result;
926940
}
927941

942+
IFlushAndDataSynchronizer &GStreamerMSEMediaPlayerClient::getFlushAndDataSynchronizer()
943+
{
944+
return m_flushAndDataSynchronizer;
945+
}
946+
928947
bool GStreamerMSEMediaPlayerClient::checkIfAllAttachedSourcesInStates(const std::vector<ClientState> &states)
929948
{
930949
return std::all_of(m_attachedSources.begin(), m_attachedSources.end(), [states](const auto &source)
@@ -1177,6 +1196,11 @@ void PullBufferMessage::handle()
11771196
status = firebolt::rialto::MediaSourceStatus::NO_AVAILABLE_SAMPLES;
11781197
}
11791198

1199+
if (firebolt::rialto::MediaSourceStatus::OK == status || firebolt::rialto::MediaSourceStatus::EOS == status)
1200+
{
1201+
m_player->getFlushAndDataSynchronizer().notifyDataPushed(m_sourceId);
1202+
}
1203+
11801204
m_player->m_backendQueue->postMessage(
11811205
std::make_shared<HaveDataMessage>(status, m_sourceId, m_needDataRequestId, m_player));
11821206
}

source/GStreamerMSEMediaPlayerClient.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
#include "BufferParser.h"
3737
#include "Constants.h"
38+
#include "FlushAndDataSynchronizer.h"
3839
#include "IMediaPipeline.h"
3940
#include "IMessageQueue.h"
4041
#include "IPullModePlaybackDelegate.h"
@@ -322,6 +323,7 @@ class GStreamerMSEMediaPlayerClient : public firebolt::rialto::IMediaPipelineCli
322323
void setUseBuffering(bool useBuffering);
323324
bool getUseBuffering();
324325
bool switchSource(const std::unique_ptr<firebolt::rialto::IMediaPipeline::MediaSource> &source);
326+
IFlushAndDataSynchronizer &getFlushAndDataSynchronizer();
325327

326328
private:
327329
bool areAllStreamsAttached();
@@ -340,6 +342,8 @@ class GStreamerMSEMediaPlayerClient : public firebolt::rialto::IMediaPipelineCli
340342
int32_t m_videoStreams;
341343
int32_t m_subtitleStreams;
342344
firebolt::rialto::PlaybackInfo m_playbackInfo{-1, 1.0};
345+
FlushAndDataSynchronizer m_flushAndDataSynchronizer;
346+
bool wasPlayingBeforeEos{false};
343347

344348
struct Rectangle
345349
{

source/IFlushAndDataSynchronizer.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (C) 2026 Sky UK
3+
*
4+
* This library is free software; you can redistribute it and/or
5+
* modify it under the terms of the GNU Lesser General Public
6+
* License as published by the Free Software Foundation;
7+
* version 2.1 of the License.
8+
*
9+
* This library is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12+
* Lesser General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Lesser General Public
15+
* License along with this library; if not, write to the Free Software
16+
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17+
*/
18+
19+
#ifndef IFLUSH_AND_DATA_SYNCHRONIZER_H_
20+
#define IFLUSH_AND_DATA_SYNCHRONIZER_H_
21+
22+
#include <cstdint>
23+
24+
class IFlushAndDataSynchronizer
25+
{
26+
public:
27+
virtual ~IFlushAndDataSynchronizer() = default;
28+
29+
virtual void addSource(int32_t sourceId) = 0;
30+
virtual void removeSource(int32_t sourceId) = 0;
31+
virtual void notifyFlushStarted(int32_t sourceId) = 0;
32+
virtual void notifyFlushCompleted(int32_t sourceId) = 0;
33+
virtual void notifyDataReceived(int32_t sourceId) = 0;
34+
virtual void notifyDataPushed(int32_t sourceId) = 0;
35+
virtual void waitIfRequired(int32_t sourceId) = 0;
36+
virtual bool isAnySourceFlushing() const = 0;
37+
};
38+
39+
#endif // IFLUSH_AND_DATA_SYNCHRONIZER_H_

0 commit comments

Comments
 (0)