Skip to content

Commit 1fdab71

Browse files
committed
Fixed bug with streamers never being destroyed
1 parent bae5f10 commit 1fdab71

File tree

5 files changed

+35
-13
lines changed

5 files changed

+35
-13
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ project(FAST)
77

88
set(VERSION_MAJOR 4)
99
set(VERSION_MINOR 11)
10-
set(VERSION_PATCH 0)
10+
set(VERSION_PATCH 1)
1111
set(VERSION_SO 4) # SO version, should be incremented by 1 every time the existing API changes
1212
set(FAST_VERSION "${VERSION_MAJOR}.${VERSION_MINOR}.${VERSION_PATCH}")
1313

source/FAST/ProcessObject.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -364,9 +364,6 @@ cl::Program ProcessObject::getOpenCLProgram(
364364
return program->build(device, buildOptions);
365365
}
366366

367-
ProcessObject::~ProcessObject() {
368-
}
369-
370367
int ProcessObject::getNrOfInputConnections() const {
371368
return mInputConnections.size();
372369
}

source/FAST/ProcessObject.hpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ class ProcessObject;
6565
*/
6666
class FAST_EXPORT ProcessObject : public AttributeObject {
6767
public:
68-
virtual ~ProcessObject();
6968
/**
7069
* @brief Update/Run the pipeline up to this process object.
7170
*
@@ -128,7 +127,7 @@ class FAST_EXPORT ProcessObject : public AttributeObject {
128127
/**
129128
* @brief Stop a pipeline.
130129
*/
131-
void stopPipeline();
130+
virtual void stopPipeline();
132131

133132
/**
134133
* @brief Mark this process object as modified or not.

source/FAST/Streamers/Streamer.cpp

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,44 @@ void Streamer::setStreamingMode(StreamingMode mode) {
6161
}
6262

6363
DataChannel::pointer Streamer::getOutputPort(uint portID) {
64-
if(m_outputPOs.count(portID) == 0) {
64+
if(m_outputPOs.count(portID) == 0) { // Doesn't exist
6565
auto channel = ProcessObject::getOutputPort(portID);
66-
m_outputPOs[portID] = RunLambda::create([](DataObject::pointer data) -> DataList {
66+
auto PO = RunLambda::create([](DataObject::pointer data) -> DataList {
6767
return DataList(data);
6868
});
69-
m_outputPOs[portID]->setInputConnection(channel);
70-
}
69+
PO->setInputConnection(channel);
70+
m_outputPOs[portID] = PO;
71+
return PO->getOutputPort();
72+
} else {
73+
auto PO = m_outputPOs[portID].lock();
74+
if(!PO) { // Expired, recreate
75+
// TODO duplicate code:
76+
auto channel = ProcessObject::getOutputPort(portID);
77+
auto PO = RunLambda::create([](DataObject::pointer data) -> DataList {
78+
return DataList(data);
79+
});
80+
PO->setInputConnection(channel);
81+
m_outputPOs[portID] = PO;
82+
}
7183

72-
return m_outputPOs[portID]->getOutputPort();
84+
return PO->getOutputPort();
85+
}
7386
}
7487

7588
bool Streamer::isStopped() {
7689
std::lock_guard<std::mutex> lock(m_stopMutex);
7790
return m_stop;
7891
}
7992

93+
Streamer::~Streamer() noexcept {
94+
reportInfo() << "Destroying streamer.." << reportEnd();
95+
stop();
96+
reportInfo() << "Streamer DESTROYED." << reportEnd();
97+
}
98+
99+
void Streamer::stopPipeline() {
100+
stop();
101+
ProcessObject::stopPipeline();
102+
}
103+
80104
}

source/FAST/Streamers/Streamer.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class FAST_EXPORT Streamer : public ProcessObject {
3535
public:
3636
typedef std::shared_ptr<Streamer> pointer;
3737
Streamer();
38-
virtual ~Streamer() { stop(); };
38+
virtual ~Streamer();
3939
static std::string getStaticNameOfClass() {
4040
return "Streamer";
4141
}
@@ -56,6 +56,7 @@ class FAST_EXPORT Streamer : public ProcessObject {
5656
StreamingMode getStreamingMode() const;
5757

5858
virtual DataChannel::pointer getOutputPort(uint portID = 0) override;
59+
void stopPipeline() override;
5960
protected:
6061
/**
6162
* Block until the first data frame has been sent using a condition variable
@@ -87,7 +88,8 @@ class FAST_EXPORT Streamer : public ProcessObject {
8788
std::unique_ptr<std::thread> m_thread;
8889
std::condition_variable m_firstFrameCondition;
8990

90-
std::map<uint, std::shared_ptr<ProcessObject>> m_outputPOs;
91+
// Must be weak_ptr or be we get a circular dependency and streamers are never destroyed here
92+
std::map<uint, std::weak_ptr<ProcessObject>> m_outputPOs;
9193

9294

9395
};

0 commit comments

Comments
 (0)