@@ -49,7 +49,9 @@ namespace client {
49
49
* process m1, m2, m3...mn in order.
50
50
*
51
51
*/
52
- class HAZELCAST_API reliable_topic : public proxy::ProxyImpl
52
+ class HAZELCAST_API reliable_topic
53
+ : public proxy::ProxyImpl
54
+ , public std::enable_shared_from_this<reliable_topic>
53
55
{
54
56
friend class spi ::ProxyManager;
55
57
friend class hazelcast_client ;
@@ -106,7 +108,7 @@ class HAZELCAST_API reliable_topic : public proxy::ProxyImpl
106
108
logger_,
107
109
execution_service_,
108
110
executor_,
109
- runners_map_ ));
111
+ shared_from_this () ));
110
112
runners_map_.put (id, runner);
111
113
runner->next ();
112
114
return std::to_string (id);
@@ -149,10 +151,9 @@ class HAZELCAST_API reliable_topic : public proxy::ProxyImpl
149
151
int batch_size,
150
152
logger& lg,
151
153
std::shared_ptr<spi::impl::ClientExecutionServiceImpl>
152
- execution_service,
154
+ execution_service,
153
155
util::hz_thread_pool& executor,
154
- util::SynchronizedMap<int , util::concurrent::Cancellable>&
155
- runners_map)
156
+ std::weak_ptr<reliable_topic> topic)
156
157
: listener_(listener)
157
158
, id_(id)
158
159
, ringbuffer_(rb)
@@ -163,7 +164,7 @@ class HAZELCAST_API reliable_topic : public proxy::ProxyImpl
163
164
, executor_(executor)
164
165
, serialization_service_(service)
165
166
, batch_size_(batch_size)
166
- , runners_map_(runners_map )
167
+ , topic_(std::move(topic) )
167
168
{
168
169
// we are going to listen to next publication. We don't care about
169
170
// what already has been published.
@@ -334,7 +335,10 @@ class HAZELCAST_API reliable_topic : public proxy::ProxyImpl
334
335
bool cancel () override
335
336
{
336
337
cancelled_.store (true );
337
- runners_map_.remove (id_);
338
+ auto topic_ptr = topic_.lock ();
339
+ if (topic_ptr) {
340
+ topic_ptr->runners_map_ .remove (id_);
341
+ }
338
342
return true ;
339
343
}
340
344
@@ -414,7 +418,7 @@ class HAZELCAST_API reliable_topic : public proxy::ProxyImpl
414
418
util::hz_thread_pool& executor_;
415
419
serialization::pimpl::SerializationService& serialization_service_;
416
420
int batch_size_;
417
- util::SynchronizedMap< int , util::concurrent::Cancellable>& runners_map_ ;
421
+ std::weak_ptr<reliable_topic> topic_ ;
418
422
};
419
423
420
424
util::SynchronizedMap<int , util::concurrent::Cancellable> runners_map_;
0 commit comments