Skip to content

Commit 76e3d39

Browse files
authored
Fix a leak when cancelling scheduled tasks (#1022)
1 parent 179984f commit 76e3d39

File tree

9 files changed

+57
-43
lines changed

9 files changed

+57
-43
lines changed

.circleci/config.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ jobs:
88
working_directory: /tmp/licode
99

1010
steps:
11+
- run:
12+
name: Install Git client
13+
command: |
14+
set -x
15+
sudo apt-get update
16+
sudo apt-get install -y git
17+
1118
- checkout
1219

1320
- setup_remote_docker

erizo/src/erizo/DtlsTransport.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ static std::mutex dtls_mutex;
2626

2727
Resender::Resender(DtlsTransport* transport, dtls::DtlsSocketContext* ctx)
2828
: transport_(transport), socket_context_(ctx),
29-
resend_seconds_(kInitialSecsPerResend), max_resends_(kMaxResends) {
29+
resend_seconds_(kInitialSecsPerResend), max_resends_(kMaxResends),
30+
scheduled_task_{std::make_shared<ScheduledTaskReference>()} {
3031
}
3132

3233
Resender::~Resender() {

erizo/src/erizo/DtlsTransport.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class Resender {
7878
packetPtr packet_;
7979
unsigned int resend_seconds_;
8080
unsigned int max_resends_;
81-
int scheduled_task_ = -1;
81+
std::shared_ptr<ScheduledTaskReference> scheduled_task_;
8282
};
8383
} // namespace erizo
8484
#endif // ERIZO_SRC_ERIZO_DTLSTRANSPORT_H_

erizo/src/erizo/rtp/PliPacerHandler.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ constexpr duration PliPacerHandler::kKeyframeTimeout;
1313

1414
PliPacerHandler::PliPacerHandler(std::shared_ptr<erizo::Clock> the_clock)
1515
: enabled_{true}, connection_{nullptr}, clock_{the_clock}, time_last_keyframe_{clock_->now()},
16-
waiting_for_keyframe_{false}, scheduled_pli_{-1},
16+
waiting_for_keyframe_{false}, scheduled_pli_{std::make_shared<ScheduledTaskReference>()},
1717
video_sink_ssrc_{0}, video_source_ssrc_{0}, fir_seq_number_{0} {}
1818

1919
void PliPacerHandler::enable() {
@@ -38,7 +38,7 @@ void PliPacerHandler::read(Context *ctx, std::shared_ptr<dataPacket> packet) {
3838
time_last_keyframe_ = clock_->now();
3939
waiting_for_keyframe_ = false;
4040
connection_->getWorker()->unschedule(scheduled_pli_);
41-
scheduled_pli_ = -1;
41+
scheduled_pli_ = std::make_shared<ScheduledTaskReference>();
4242
}
4343
ctx->fireRead(std::move(packet));
4444
}
@@ -54,7 +54,7 @@ void PliPacerHandler::sendFIR() {
5454
getContext()->fireWrite(RtpUtils::createFIR(video_source_ssrc_, video_sink_ssrc_, fir_seq_number_++));
5555
getContext()->fireWrite(RtpUtils::createFIR(video_source_ssrc_, video_sink_ssrc_, fir_seq_number_++));
5656
waiting_for_keyframe_ = false;
57-
scheduled_pli_ = -1;
57+
scheduled_pli_ = std::make_shared<ScheduledTaskReference>();
5858
}
5959

6060
void PliPacerHandler::scheduleNextPLI() {

erizo/src/erizo/rtp/PliPacerHandler.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include "./logger.h"
77
#include "pipeline/Handler.h"
8+
#include "thread/Worker.h"
89
#include "lib/Clock.h"
910

1011
namespace erizo {
@@ -43,7 +44,7 @@ class PliPacerHandler: public Handler, public std::enable_shared_from_this<PliPa
4344
std::shared_ptr<erizo::Clock> clock_;
4445
time_point time_last_keyframe_;
4546
bool waiting_for_keyframe_;
46-
int scheduled_pli_;
47+
std::shared_ptr<ScheduledTaskReference> scheduled_pli_;
4748
uint32_t video_sink_ssrc_;
4849
uint32_t video_source_ssrc_;
4950
uint8_t fir_seq_number_;

erizo/src/erizo/rtp/RtpPaddingGeneratorHandler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ RtpPaddingGeneratorHandler::RtpPaddingGeneratorHandler(std::shared_ptr<erizo::Cl
2626
marker_rate_{std::chrono::milliseconds(100), 20, 1., clock_},
2727
rtp_header_length_{12},
2828
bucket_{kInitialBitrate, kPaddingBurstSize, clock_},
29-
scheduled_task_{-1} {}
29+
scheduled_task_{std::make_shared<ScheduledTaskReference>()} {}
3030

3131

3232

erizo/src/erizo/rtp/RtpPaddingGeneratorHandler.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "pipeline/Handler.h"
88
#include "lib/Clock.h"
99
#include "lib/TokenBucket.h"
10+
#include "thread/Worker.h"
1011
#include "rtp/SequenceNumberTranslator.h"
1112
#include "./Stats.h"
1213

@@ -64,7 +65,7 @@ class RtpPaddingGeneratorHandler: public Handler, public std::enable_shared_from
6465
MovingIntervalRateStat marker_rate_;
6566
uint32_t rtp_header_length_;
6667
TokenBucket bucket_;
67-
int scheduled_task_;
68+
std::shared_ptr<ScheduledTaskReference> scheduled_task_;
6869
};
6970

7071
} // namespace erizo

erizo/src/erizo/thread/Worker.cpp

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,17 @@
1010

1111
using erizo::Worker;
1212
using erizo::SimulatedWorker;
13+
using erizo::ScheduledTaskReference;
14+
15+
ScheduledTaskReference::ScheduledTaskReference() : cancelled{false} {
16+
}
17+
18+
bool ScheduledTaskReference::isCancelled() {
19+
return cancelled;
20+
}
21+
void ScheduledTaskReference::cancel() {
22+
cancelled = true;
23+
}
1324

1425
Worker::Worker(std::weak_ptr<Scheduler> scheduler, std::shared_ptr<Clock> the_clock)
1526
: scheduler_{scheduler},
@@ -50,21 +61,22 @@ void Worker::close() {
5061
service_.stop();
5162
}
5263

53-
int Worker::scheduleFromNow(Task f, duration delta) {
64+
std::shared_ptr<ScheduledTaskReference> Worker::scheduleFromNow(Task f, duration delta) {
5465
auto delta_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta);
55-
int uuid = next_scheduled_++;
66+
auto id = std::make_shared<ScheduledTaskReference>();
5667
if (auto scheduler = scheduler_.lock()) {
57-
scheduler->scheduleFromNow(safeTask([f, uuid](std::shared_ptr<Worker> this_ptr) {
58-
this_ptr->task(this_ptr->safeTask([f, uuid](std::shared_ptr<Worker> this_ptr) {
59-
std::unique_lock<std::mutex> lock(this_ptr->cancel_mutex_);
60-
if (this_ptr->isCancelled(uuid)) {
61-
return;
68+
scheduler->scheduleFromNow(safeTask([f, id](std::shared_ptr<Worker> this_ptr) {
69+
this_ptr->task(this_ptr->safeTask([f, id](std::shared_ptr<Worker> this_ptr) {
70+
{
71+
if (id->isCancelled()) {
72+
return;
73+
}
6274
}
6375
f();
6476
}));
6577
}), delta_ms);
6678
}
67-
return uuid;
79+
return id;
6880
}
6981

7082
void Worker::scheduleEvery(ScheduledTask f, duration period) {
@@ -84,20 +96,8 @@ void Worker::scheduleEvery(ScheduledTask f, duration period, duration next_delay
8496
}), next_delay);
8597
}
8698

87-
void Worker::unschedule(int uuid) {
88-
if (uuid < 0) {
89-
return;
90-
}
91-
std::unique_lock<std::mutex> lock(cancel_mutex_);
92-
cancelled_.push_back(uuid);
93-
}
94-
95-
bool Worker::isCancelled(int uuid) {
96-
if (std::find(cancelled_.begin(), cancelled_.end(), uuid) != cancelled_.end()) {
97-
cancelled_.erase(std::remove(cancelled_.begin(), cancelled_.end(), uuid), cancelled_.end());
98-
return true;
99-
}
100-
return false;
99+
void Worker::unschedule(std::shared_ptr<ScheduledTaskReference> id) {
100+
id->cancel();
101101
}
102102

103103
std::function<void()> Worker::safeTask(std::function<void(std::shared_ptr<Worker>)> f) {
@@ -128,15 +128,15 @@ void SimulatedWorker::close() {
128128
tasks_.clear();
129129
}
130130

131-
int SimulatedWorker::scheduleFromNow(Task f, duration delta) {
132-
int uuid = next_scheduled_++;
133-
scheduled_tasks_[clock_->now() + delta] = [this, f, uuid] {
134-
if (isCancelled(uuid)) {
131+
std::shared_ptr<ScheduledTaskReference> SimulatedWorker::scheduleFromNow(Task f, duration delta) {
132+
auto id = std::make_shared<ScheduledTaskReference>();
133+
scheduled_tasks_[clock_->now() + delta] = [this, f, id] {
134+
if (id->isCancelled()) {
135135
return;
136136
}
137137
f();
138138
};
139-
return uuid;
139+
return id;
140140
}
141141

142142
void SimulatedWorker::executeTasks() {

erizo/src/erizo/thread/Worker.h

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@
1717

1818
namespace erizo {
1919

20+
class ScheduledTaskReference {
21+
public:
22+
ScheduledTaskReference();
23+
bool isCancelled();
24+
void cancel();
25+
private:
26+
std::atomic<bool> cancelled;
27+
};
28+
2029
class Worker : public std::enable_shared_from_this<Worker> {
2130
public:
2231
typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
@@ -33,14 +42,11 @@ class Worker : public std::enable_shared_from_this<Worker> {
3342
virtual void start(std::shared_ptr<std::promise<void>> start_promise);
3443
virtual void close();
3544

36-
virtual int scheduleFromNow(Task f, duration delta);
37-
virtual void unschedule(int uuid);
45+
virtual std::shared_ptr<ScheduledTaskReference> scheduleFromNow(Task f, duration delta);
46+
virtual void unschedule(std::shared_ptr<ScheduledTaskReference> id);
3847

3948
virtual void scheduleEvery(ScheduledTask f, duration period);
4049

41-
protected:
42-
bool isCancelled(int uuid);
43-
4450
private:
4551
void scheduleEvery(ScheduledTask f, duration period, duration next_delay);
4652
std::function<void()> safeTask(std::function<void(std::shared_ptr<Worker>)> f);
@@ -55,8 +61,6 @@ class Worker : public std::enable_shared_from_this<Worker> {
5561
asio_worker service_worker_;
5662
boost::thread_group group_;
5763
std::atomic<bool> closed_;
58-
std::vector<int> cancelled_;
59-
mutable std::mutex cancel_mutex_;
6064
};
6165

6266
class SimulatedWorker : public Worker {
@@ -66,7 +70,7 @@ class SimulatedWorker : public Worker {
6670
void start() override;
6771
void start(std::shared_ptr<std::promise<void>> start_promise) override;
6872
void close() override;
69-
int scheduleFromNow(Task f, duration delta) override;
73+
std::shared_ptr<ScheduledTaskReference> scheduleFromNow(Task f, duration delta) override;
7074

7175
void executeTasks();
7276
void executePastScheduledTasks();

0 commit comments

Comments
 (0)