Skip to content

Commit 43ff9f9

Browse files
committed
Created the FramerateSynchronizer object
1 parent d1c4812 commit 43ff9f9

File tree

8 files changed

+215
-112
lines changed

8 files changed

+215
-112
lines changed

source/FAST/CMakeLists.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ fast_add_sources(
3131
Attribute.cpp
3232
Attribute.hpp
3333
ProcessObjectRegistry.hpp
34-
PipelineSynchronizer.cpp
35-
PipelineSynchronizer.hpp
34+
FramerateSynchronizer.cpp
35+
FramerateSynchronizer.hpp
3636
DataStream.cpp
3737
DataStream.hpp
3838
)
39+
fast_add_process_object(FramerateSynchronizer FramerateSynchronizer.hpp)
3940
if(FAST_MODULE_Visualization)
4041
fast_add_sources(
4142
Pipeline.hpp
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
#include "FramerateSynchronizer.hpp"
2+
3+
namespace fast {
4+
5+
FramerateSynchronizer::FramerateSynchronizer(int priorityPort) {
6+
m_priorityPort = priorityPort;
7+
}
8+
9+
/*
10+
uint FramerateSynchronizer::addInputConnection(DataChannel::pointer channel) {
11+
uint nr = getNrOfInputConnections();
12+
createInputPort<DataObject>(nr, false);
13+
createOutputPort<DataObject>(nr);
14+
setInputConnection(nr, channel);
15+
return nr;
16+
}*/
17+
18+
void FramerateSynchronizer::execute() {
19+
if(!mInputConnections.empty()) {
20+
// First time
21+
m_parents = mInputConnections;
22+
mInputConnections.clear(); // Severe the connections
23+
24+
// Start one thread per input
25+
for(const auto& parent : m_parents) {
26+
uint port = parent.first;
27+
DataChannel::pointer channel = parent.second;
28+
//channel->setMaximumNumberOfFrames(1);
29+
auto thread = new std::thread([port, channel, this]() {
30+
//std::cout << "Started thread" << std::endl;
31+
try {
32+
while(true) {
33+
channel->getProcessObject()->run();
34+
DataObject::pointer data = channel->getNextFrame();
35+
//std::cout << "Adding data to port " << port << std::endl;
36+
{
37+
std::lock_guard<std::mutex> lock(m_latestDataMutex);
38+
m_latestData[port] = data;
39+
m_newData = true;
40+
}
41+
m_dataCV.notify_one();
42+
if(data->isLastFrame())
43+
break;
44+
}
45+
} catch(ThreadStopped &e) {
46+
}
47+
//std::cout << "THREAD STOPPED" << std::endl;
48+
});
49+
m_threads.push_back(thread);
50+
}
51+
}
52+
53+
std::unique_lock<std::mutex> lock(m_latestDataMutex);
54+
if(m_priorityPort >= 0) {
55+
// Wait until priority port has new data
56+
while(m_latestData.size() != m_parents.size() || m_latestData.count(m_priorityPort) == 0) {
57+
//std::cout << "Waiting for new data on priority port" << std::endl;
58+
m_dataCV.wait(lock);
59+
}
60+
} else {
61+
while(!m_newData || m_latestData.size() != m_parents.size()) {
62+
//std::cout << "Waiting for new data" << std::endl;
63+
m_dataCV.wait(lock);
64+
}
65+
}
66+
auto dataToAdd = m_latestData;
67+
m_newData = false;
68+
lock.unlock();
69+
//std::cout << "Got data on priority port" << std::endl;
70+
for(const auto& parent : m_parents) {
71+
//std::cout << "adding output data to " << parent.first << ": " << dataToAdd[parent.first] << std::endl;
72+
addOutputData(parent.first, dataToAdd[parent.first]);
73+
}
74+
if(m_priorityPort >= 0) {
75+
lock.lock();
76+
m_latestData.erase(m_priorityPort);
77+
lock.unlock();
78+
}
79+
//std::cout << "Done adding data." << std::endl;
80+
mIsModified = true; // Always rerun
81+
}
82+
83+
FramerateSynchronizer::~FramerateSynchronizer() {
84+
//std::cout << "In destructor" << std::endl;
85+
for(auto parent : m_parents) {
86+
parent.second->stop();
87+
}
88+
for(auto thread : m_threads) {
89+
thread->join();
90+
delete thread;
91+
}
92+
}
93+
94+
void FramerateSynchronizer::setInputConnection(uint portID, DataChannel::pointer port) {
95+
createInputPort<DataObject>(portID, false);
96+
createOutputPort<DataObject>(portID);
97+
ProcessObject::setInputConnection(portID, port);
98+
}
99+
100+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#pragma once
2+
3+
#include <FAST/ProcessObject.hpp>
4+
#include <FAST/Streamers/Streamer.hpp>
5+
6+
namespace fast {
7+
8+
/**
9+
* @brief Synchronize multiple process objects
10+
* This PO takes in N input connections and creates N output connections.
11+
* It keeps the last frame of every connection, and every time a connection
12+
* has a new data frame, it send out the latest frame to all output connections.
13+
*/
14+
class FAST_EXPORT FramerateSynchronizer : public ProcessObject {
15+
FAST_PROCESS_OBJECT(FramerateSynchronizer)
16+
public:
17+
FAST_CONSTRUCTOR(FramerateSynchronizer, int, priorityPort, = -1);
18+
~FramerateSynchronizer();
19+
void setInputConnection(uint portID, DataChannel::pointer port) override;
20+
protected:
21+
void execute() override;
22+
23+
std::unordered_map<uint, std::shared_ptr<DataObject>> m_latestData;
24+
25+
int m_priorityPort = -1;
26+
std::map<uint, DataChannel::pointer> m_parents;
27+
std::mutex m_latestDataMutex;
28+
std::condition_variable m_dataCV;
29+
bool m_newData = false;
30+
std::vector<std::thread*> m_threads;
31+
};
32+
33+
}

source/FAST/PipelineSynchronizer.cpp

Lines changed: 0 additions & 39 deletions
This file was deleted.

source/FAST/PipelineSynchronizer.hpp

Lines changed: 0 additions & 29 deletions
This file was deleted.

source/FAST/Tests/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ fast_add_test_sources(
1111
Algorithms/DoubleFilterTests.cpp
1212
SceneGraphTests.cpp
1313
UtilityTests.cpp
14-
PipelineSynchronizerTests.cpp
14+
FramerateSynchronizerTests.cpp
1515
PipelineTests.cpp
1616
)
1717
if(FAST_MODULE_Visualization)
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#include <FAST/Testing.hpp>
2+
#include <FAST/FramerateSynchronizer.hpp>
3+
#include "DummyObjects.hpp"
4+
5+
using namespace fast;
6+
7+
TEST_CASE("Framerate synchronizer - two streams at very different rates", "[fast][FramerateSynchronizer]") {
8+
const int frames = 20;
9+
auto streamer1 = DummyStreamer::New();
10+
streamer1->setStreamingMode(StreamingMode::NewestFrameOnly);
11+
streamer1->setTotalFrames(frames);
12+
streamer1->setSleepTime(100);
13+
14+
auto streamer2 = DummyStreamer::New();
15+
streamer2->setStreamingMode(StreamingMode::NewestFrameOnly);
16+
streamer2->setTotalFrames(frames);
17+
streamer2->setSleepTime(200);
18+
19+
auto synchronizer = FramerateSynchronizer::create();
20+
synchronizer->connect(0, streamer1);
21+
synchronizer->connect(1, streamer2);
22+
23+
auto port1 = synchronizer->getOutputPort(0);
24+
auto port2 = synchronizer->getOutputPort(1);
25+
26+
int previousID1 = -1;
27+
int previousID2 = -1;
28+
for(int i = 0; i < 10; ++i) {
29+
std::cout << "Iteration: " << i << std::endl;
30+
synchronizer->run();
31+
auto data1 = port1->getNextFrame<DummyDataObject>();
32+
auto data2 = port2->getNextFrame<DummyDataObject>();
33+
//CHECK(data1->getID() == i);
34+
CHECK(((int)data1->getID() != previousID1 || (int)data2->getID() != previousID2));
35+
previousID1 = data1->getID();
36+
previousID2 = data2->getID();
37+
std::cout << previousID1 << " " << previousID2 << std::endl;
38+
if(data1->isLastFrame() || data2->isLastFrame())
39+
break;
40+
}
41+
}
42+
43+
TEST_CASE("Framerate synchronizer - two streams at very different rates and priority port", "[fast][FramerateSynchronizer]") {
44+
const int frames = 20;
45+
auto streamer1 = DummyStreamer::New();
46+
streamer1->setStreamingMode(StreamingMode::NewestFrameOnly);
47+
streamer1->setTotalFrames(frames);
48+
streamer1->setSleepTime(100);
49+
50+
auto streamer2 = DummyStreamer::New();
51+
streamer2->setStreamingMode(StreamingMode::NewestFrameOnly);
52+
streamer2->setTotalFrames(frames);
53+
streamer2->setSleepTime(200);
54+
55+
auto synchronizer = FramerateSynchronizer::create(0);
56+
synchronizer->connect(0, streamer1);
57+
synchronizer->connect(1, streamer2);
58+
59+
auto port1 = synchronizer->getOutputPort(0);
60+
auto port2 = synchronizer->getOutputPort(1);
61+
62+
int previousID1 = -1;
63+
int previousID2 = -1;
64+
for(int i = 0; i < 10; ++i) {
65+
std::cout << "Iteration: " << i << std::endl;
66+
synchronizer->run();
67+
auto data1 = port1->getNextFrame<DummyDataObject>();
68+
auto data2 = port2->getNextFrame<DummyDataObject>();
69+
//CHECK(data1->getID() == i);
70+
CHECK((int)data1->getID() > previousID1);
71+
CHECK((int)data2->getID() >= previousID2);
72+
previousID1 = data1->getID();
73+
previousID2 = data2->getID();
74+
std::cout << previousID1 << " " << previousID2 << std::endl;
75+
if(data1->isLastFrame() || data2->isLastFrame())
76+
break;
77+
}
78+
}

source/FAST/Tests/PipelineSynchronizerTests.cpp

Lines changed: 0 additions & 41 deletions
This file was deleted.

0 commit comments

Comments
 (0)