Skip to content
This repository was archived by the owner on Apr 8, 2025. It is now read-only.

Commit 3fe4f06

Browse files
Jinming-Huvinjiang
authored andcommitted
Fix timer_handler memory leak
1 parent 246dc84 commit 3fe4f06

File tree

3 files changed

+67
-60
lines changed

3 files changed

+67
-60
lines changed

Microsoft.WindowsAzure.Storage/includes/wascore/timer_handler.h

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,38 +45,43 @@ namespace azure { namespace storage { namespace core {
4545

4646
WASTORAGE_API void stop_timer();
4747

48-
pplx::cancellation_token get_cancellation_token()
48+
bool timer_started() const
4949
{
50-
return m_worker_cancellation_token_source->get_token();
50+
return m_timer_started.load(std::memory_order_acquire);
5151
}
5252

53-
bool is_canceled()
53+
pplx::cancellation_token get_cancellation_token() const
5454
{
55-
return m_worker_cancellation_token_source->get_token().is_canceled();
55+
return m_worker_cancellation_token_source.get_token();
5656
}
5757

58-
bool is_canceled_by_timeout()
58+
bool is_canceled() const
5959
{
60-
return m_is_canceled_by_timeout;
60+
return m_worker_cancellation_token_source.get_token().is_canceled();
61+
}
62+
63+
bool is_canceled_by_timeout() const
64+
{
65+
return m_is_canceled_by_timeout.load(std::memory_order_acquire);
6166
}
6267

6368
private:
64-
std::shared_ptr<pplx::cancellation_token_source> m_worker_cancellation_token_source;
69+
pplx::cancellation_token_source m_worker_cancellation_token_source;
6570
pplx::cancellation_token_registration m_cancellation_token_registration;
6671
pplx::cancellation_token m_cancellation_token;
6772
pplx::task<void> m_timeout_task;
68-
bool m_is_canceled_by_timeout;
73+
std::atomic<bool> m_is_canceled_by_timeout;
6974
pplx::task_completion_event<void> m_tce;
7075

71-
std::shared_ptr<std::mutex> m_mutex;
76+
std::mutex m_mutex;
7277

7378
WASTORAGE_API pplx::task<void> timeout_after(const std::chrono::milliseconds& time);
7479

7580
#ifndef _WIN32
76-
typedef std::chrono::steady_clock std_clock;
77-
std::shared_ptr<boost::asio::basic_waitable_timer<std_clock>> m_timer;
81+
std::shared_ptr<boost::asio::basic_waitable_timer<std::chrono::steady_clock>> m_timer;
7882
#else
7983
std::shared_ptr<concurrency::timer<int>> m_timer;
8084
#endif
85+
std::atomic<bool> m_timer_started;
8186
};
8287
}}}

Microsoft.WindowsAzure.Storage/src/executor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ namespace azure { namespace storage { namespace core {
3232
auto instance = std::make_shared<executor_impl>(command, options, context);
3333
return pplx::details::_do_while([instance]() -> pplx::task<bool>
3434
{
35-
//Start the timer to track timeout.
36-
if (instance->m_command->m_use_timeout)
35+
// Start the timer to track timeout.
36+
if (instance->m_command->m_use_timeout && !instance->m_command->m_timer_handler->timer_started())
3737
{
3838
// Timer will be stopped when instance is out of scope, so no need to stop here.
3939
instance->m_command->m_timer_handler->start_timer(instance->m_request_options.maximum_execution_time());

Microsoft.WindowsAzure.Storage/src/timer_handler.cpp

Lines changed: 49 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,14 @@
2121
namespace azure { namespace storage { namespace core {
2222

2323
timer_handler::timer_handler(const pplx::cancellation_token& token) :
24-
m_cancellation_token(token), m_is_canceled_by_timeout(false)
24+
m_cancellation_token(token), m_is_canceled_by_timeout(false), m_timer_started(false)
2525
{
26-
m_worker_cancellation_token_source = std::make_shared<pplx::cancellation_token_source>();
2726
if (m_cancellation_token != pplx::cancellation_token::none())
2827
{
2928
m_cancellation_token_registration = m_cancellation_token.register_callback([this]()
3029
{
31-
this->m_worker_cancellation_token_source->cancel();
32-
this->stop_timer();
30+
m_worker_cancellation_token_source.cancel();
31+
stop_timer();
3332
});
3433
}
3534
}
@@ -41,67 +40,66 @@ namespace azure { namespace storage { namespace core {
4140
m_cancellation_token.deregister_callback(m_cancellation_token_registration);
4241
}
4342

44-
#ifdef _WIN32
4543
stop_timer();
46-
#else // LINUX
47-
try
48-
{
49-
stop_timer();
50-
}
51-
catch (boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> > &e)
52-
{
53-
}
54-
#endif
55-
5644
}
5745

5846
void timer_handler::start_timer(const std::chrono::milliseconds& time)
5947
{
60-
m_mutex = std::make_shared<std::mutex>();
61-
auto this_pointer = std::dynamic_pointer_cast<timer_handler>(shared_from_this());
62-
m_timeout_task = timeout_after(time).then([this_pointer]()
48+
std::lock_guard<std::mutex> guard(m_mutex);
49+
if (m_timer_started.load(std::memory_order_acquire))
50+
{
51+
return;
52+
}
53+
m_timer_started.store(true, std::memory_order_release);
54+
std::weak_ptr<timer_handler> weak_this_pointer = shared_from_this();
55+
m_timeout_task = timeout_after(time).then([weak_this_pointer]()
6356
{
64-
this_pointer->m_is_canceled_by_timeout = true;
65-
this_pointer->m_worker_cancellation_token_source->cancel();
57+
auto this_pointer = weak_this_pointer.lock();
58+
if (this_pointer)
59+
{
60+
this_pointer->m_is_canceled_by_timeout.store(true, std::memory_order_release);
61+
this_pointer->m_worker_cancellation_token_source.cancel();
62+
}
6663
});
6764
}
6865

6966
void timer_handler::stop_timer()
7067
{
71-
if (m_timer != nullptr)
68+
std::lock_guard<std::mutex> guard(m_mutex);
69+
if (m_timer_started.load(std::memory_order_acquire) && m_timer)
7270
{
73-
std::lock_guard<std::mutex> guard(*m_mutex);
74-
if (m_timer != nullptr)
75-
{
7671
#ifndef _WIN32
77-
m_timer->cancel();
72+
m_timer->cancel();
7873
#else
79-
m_timer->stop();
74+
m_timer->stop();
8075
#endif
81-
if (!m_tce._IsTriggered())
82-
{
83-
// if task_completion_event is not yet triggered, it means timeout has not been triggered.
84-
m_tce._Cancel();
85-
}
86-
m_timer.reset();
76+
if (!m_tce._IsTriggered())
77+
{
78+
// If task_completion_event is not yet triggered, it means timeout has not been triggered.
79+
m_tce._Cancel();
8780
}
81+
m_timer.reset();
8882
}
8983
}
9084

9185
#ifndef _WIN32
9286
pplx::task<void> timer_handler::timeout_after(const std::chrono::milliseconds& time)
9387
{
94-
m_timer = std::make_shared<boost::asio::basic_waitable_timer<std_clock>>(crossplat::threadpool::shared_instance().service());
95-
m_timer->expires_from_now(std::chrono::duration_cast<std_clock::duration>(time));
96-
auto this_pointer = std::dynamic_pointer_cast<timer_handler>(shared_from_this());
97-
auto callback = [this_pointer](const boost::system::error_code& ec)
88+
m_timer = std::make_shared<boost::asio::basic_waitable_timer<std::chrono::steady_clock>>(crossplat::threadpool::shared_instance().service());
89+
m_timer->expires_from_now(std::chrono::duration_cast<std::chrono::steady_clock::duration>(time));
90+
std::weak_ptr<timer_handler> weak_this_pointer = shared_from_this();
91+
auto callback = [weak_this_pointer](const boost::system::error_code& ec)
9892
{
9993
if (ec != boost::asio::error::operation_aborted)
10094
{
101-
std::lock_guard<std::mutex> guard(*(this_pointer->m_mutex));
102-
if (!this_pointer->m_tce._IsTriggered())
95+
auto this_pointer = weak_this_pointer.lock();
96+
if (this_pointer)
10397
{
104-
this_pointer->m_tce.set();
98+
std::lock_guard<std::mutex> guard(this_pointer->m_mutex);
99+
if (!this_pointer->m_tce._IsTriggered())
100+
{
101+
this_pointer->m_tce.set();
102+
}
105103
}
106104
}
107105
};
@@ -114,23 +112,27 @@ namespace azure { namespace storage { namespace core {
114112
#else
115113
pplx::task<void> timer_handler::timeout_after(const std::chrono::milliseconds& time)
116114
{
117-
// initialize the timer and connect the callback with completion event.
115+
// Initialize the timer and connect the callback with completion event.
118116
m_timer = std::make_shared<concurrency::timer<int>>(static_cast<unsigned int>(time.count()), 0);
119-
auto this_pointer = std::dynamic_pointer_cast<timer_handler>(shared_from_this());
120-
auto callback = std::make_shared<concurrency::call<int>>([this_pointer](int)
117+
std::weak_ptr<timer_handler> weak_this_pointer = shared_from_this();
118+
auto callback = std::make_shared<concurrency::call<int>>([weak_this_pointer](int)
121119
{
122-
std::lock_guard<std::mutex> guard(*(this_pointer->m_mutex));
123-
if (!this_pointer->m_tce._IsTriggered())
120+
auto this_pointer = weak_this_pointer.lock();
121+
if (this_pointer)
124122
{
125-
this_pointer->m_tce.set();
123+
std::lock_guard<std::mutex> guard(this_pointer->m_mutex);
124+
if (!this_pointer->m_tce._IsTriggered())
125+
{
126+
this_pointer->m_tce.set();
127+
}
126128
}
127129
});
128-
m_timer->link_target(callback.get());//When timer stops, tce will trigger cancellation.
130+
m_timer->link_target(callback.get()); // When timer stops, tce will trigger cancellation.
129131
m_timer->start();
130132

131133
auto event_set = pplx::create_task(m_tce);
132134

133-
//timer and callback should be preserved before event set has been triggered.
135+
// Timer and callback should be preserved before event set has been triggered.
134136
return event_set.then([callback]() {});
135137
}
136138
#endif

0 commit comments

Comments
 (0)