@@ -2463,74 +2463,110 @@ namespace glz
24632463 namespace streaming_utils
24642464 {
24652465 // Create a periodic data sender (works with both HTTP and HTTPS)
2466+ // Uses enable_shared_from_this to avoid circular reference memory leaks
24662467 template <typename T>
24672468 void send_periodic_data (std::shared_ptr<streaming_connection_interface> conn, std::function<T()> data_generator,
24682469 std::chrono::milliseconds interval, size_t max_events = 0)
24692470 {
2470- auto counter = std::make_shared<size_t >(0 );
24712471 if (!conn || !conn->is_open ()) return ;
2472- auto timer = std::make_shared<asio::steady_timer>(conn->get_executor ());
24732472
2474- // Use shared_ptr to safely handle recursive lambda calls and avoid compiler-specific segfaults
2475- auto send_next = std::make_shared<std::function<void ()>>();
2476- *send_next = [conn, timer, counter, data_generator, interval, max_events, send_next]() mutable {
2477- if (!conn->is_open () || (max_events > 0 && *counter >= max_events)) {
2478- conn->close ();
2479- return ;
2480- }
2473+ struct periodic_sender : std::enable_shared_from_this<periodic_sender>
2474+ {
2475+ std::shared_ptr<streaming_connection_interface> conn;
2476+ std::shared_ptr<asio::steady_timer> timer;
2477+ std::function<T()> data_generator;
2478+ std::chrono::milliseconds interval;
2479+ size_t max_events;
2480+ size_t counter = 0 ;
2481+
2482+ periodic_sender (std::shared_ptr<streaming_connection_interface> c, std::function<T()> gen,
2483+ std::chrono::milliseconds intv, size_t max_ev)
2484+ : conn(std::move(c)),
2485+ timer (std::make_shared<asio::steady_timer>(conn->get_executor ())),
2486+ data_generator(std::move(gen)),
2487+ interval(intv),
2488+ max_events(max_ev)
2489+ {}
2490+
2491+ void send_next ()
2492+ {
2493+ if (!conn->is_open () || (max_events > 0 && counter >= max_events)) {
2494+ conn->close ();
2495+ return ;
2496+ }
24812497
2482- try {
2483- T data = data_generator ();
2484- conn->send_json_event (data, " data" , std::to_string (*counter), [=](std::error_code ec) mutable {
2485- if (!ec) {
2486- (*counter)++;
2487- timer->expires_after (interval);
2488- timer->async_wait ([send_next, timer, counter](std::error_code) { (*send_next)(); });
2489- }
2490- else {
2491- conn->close ();
2492- }
2493- });
2494- }
2495- catch (const std::exception&) {
2496- conn->close ();
2498+ try {
2499+ T data = data_generator ();
2500+ conn->send_json_event (
2501+ data, " data" , std::to_string (counter), [self = this ->shared_from_this ()](std::error_code ec) {
2502+ if (!ec) {
2503+ self->counter ++;
2504+ self->timer ->expires_after (self->interval );
2505+ self->timer ->async_wait (
2506+ [self](std::error_code) { self->send_next (); });
2507+ }
2508+ else {
2509+ self->conn ->close ();
2510+ }
2511+ });
2512+ }
2513+ catch (const std::exception&) {
2514+ conn->close ();
2515+ }
24972516 }
24982517 };
24992518
2500- (*send_next)();
2519+ auto sender = std::make_shared<periodic_sender>(conn, std::move(data_generator), interval, max_events);
2520+ sender->send_next ();
25012521 }
25022522
25032523 // Create a data stream from a collection (works with both HTTP and HTTPS)
2524+ // Uses enable_shared_from_this to avoid circular reference memory leaks
2525+ // Note: Makes a copy of the container to ensure data outlives the async operation
25042526 template <typename Container>
25052527 void stream_collection (std::shared_ptr<streaming_connection_interface> conn, const Container& data,
25062528 std::chrono::milliseconds delay_between_items = std::chrono::milliseconds(10 ))
25072529 {
2508- auto it = std::make_shared<typename Container::const_iterator>(data.begin ());
2509- auto end_it = data.end ();
25102530 if (!conn || !conn->is_open ()) return ;
2511- auto timer = std::make_shared<asio::steady_timer>(conn->get_executor ());
2512-
2513- // Use shared_ptr to safely handle recursive lambda calls and avoid compiler-specific segfaults
2514- auto send_next = std::make_shared<std::function<void ()>>();
2515- *send_next = [conn, timer, it, end_it, delay_between_items, send_next]() mutable {
2516- if (!conn->is_open () || *it == end_it) {
2517- conn->close ();
2518- return ;
2519- }
25202531
2521- conn->send_json_event (**it, " item" , " " , [=](std::error_code ec) mutable {
2522- if (!ec) {
2523- ++(*it);
2524- timer->expires_after (delay_between_items);
2525- timer->async_wait ([send_next, timer, it](std::error_code) { (*send_next)(); });
2526- }
2527- else {
2532+ struct collection_sender : std::enable_shared_from_this<collection_sender>
2533+ {
2534+ std::shared_ptr<streaming_connection_interface> conn;
2535+ std::shared_ptr<asio::steady_timer> timer;
2536+ Container data;
2537+ typename Container::const_iterator it;
2538+ std::chrono::milliseconds delay;
2539+
2540+ collection_sender (std::shared_ptr<streaming_connection_interface> c, Container d, std::chrono::milliseconds dl)
2541+ : conn(std::move(c)),
2542+ timer (std::make_shared<asio::steady_timer>(conn->get_executor ())),
2543+ data(std::move(d)),
2544+ it(data.begin()),
2545+ delay(dl)
2546+ {}
2547+
2548+ void send_next ()
2549+ {
2550+ if (!conn->is_open () || it == data.end ()) {
25282551 conn->close ();
2552+ return ;
25292553 }
2530- });
2554+
2555+ conn->send_json_event (*it, " item" , " " , [self = this ->shared_from_this ()](std::error_code ec) {
2556+ if (!ec) {
2557+ ++self->it ;
2558+ self->timer ->expires_after (self->delay );
2559+ self->timer ->async_wait ([self](std::error_code) { self->send_next (); });
2560+ }
2561+ else {
2562+ self->conn ->close ();
2563+ }
2564+ });
2565+ }
25312566 };
25322567
2533- (*send_next)();
2568+ auto sender = std::make_shared<collection_sender>(conn, data, delay_between_items);
2569+ sender->send_next ();
25342570 }
25352571 } // namespace streaming_utils
25362572
0 commit comments