@@ -474,37 +474,63 @@ void Countly::setUpdateInterval(size_t milliseconds) {
474474void Countly::addEvent (const cly::Event &event) {
475475 mutex->lock ();
476476#ifndef COUNTLY_USE_SQLITE
477- if (event_queue.size () == configuration->eventQueueThreshold ) {
478- log (LogLevel::WARNING, " Event queue is full, dropping the oldest event to insert a new one" );
479- event_queue.pop_front ();
480- }
481477 event_queue.push_back (event.serialize ());
482478#else
483479 addEventToSqlite (event);
480+ #endif
481+ checkAndSendEventToRQ ();
482+ mutex->unlock ();
483+ }
484+
485+ void Countly::checkAndSendEventToRQ () {
486+ nlohmann::json events = nlohmann::json::array ();
484487 mutex->unlock ();
485- int queueSize = checkPersistentEQSize ();
488+ int queueSize = checkEQSize ();
486489 mutex->lock ();
490+ #ifdef COUNTLY_USE_SQLITE
487491 if (queueSize >= configuration->eventQueueThreshold ) {
488492 log (LogLevel::DEBUG, " Event queue threshold is reached" );
493+ std::string event_ids;
494+
495+ // fetch events up to the threshold from the database
496+ fillEventsIntoJson (events, event_ids);
497+ // send them to request queue
498+ sendEventsToRQ (events);
499+ // remove them from database
500+ removeEventWithId (event_ids);
501+ }
502+ #else
503+ if (queueSize >= configuration->eventQueueThreshold ) {
504+ log (LogLevel::WARNING, " Event queue is full, dropping the oldest event to insert a new one" );
505+ for (const auto &event_json : event_queue) {
506+ events.push_back (nlohmann::json::parse (event_json));
507+ }
508+ sendEventsToRQ (events);
509+ event_queue.clear ();
489510 }
490511#endif
491- mutex->unlock ();
492512}
493513
494514void Countly::setMaxEvents (size_t value) {
495- if (is_sdk_initialized) {
496- log (LogLevel::WARNING, " [Countly][setMaxEvents] You can not set the event queue size after SDK initialization." );
497- return ;
498- }
515+ log (LogLevel::WARNING, " [Countly][setMaxEvents/SetMaxEventsPerMessage] These calls are deprecated. Use 'setEventsToRQThreshold' instead." );
516+ setEventsToRQThreshold (value);
517+ }
499518
519+ void Countly::setEventsToRQThreshold (int value) {
520+ log (LogLevel::DEBUG, " [Countly][setEventsToRQThreshold] Given threshold:[" + std::to_string (value) + " ]" );
500521 mutex->lock ();
501- configuration->eventQueueThreshold = value;
502- #ifndef COUNTLY_USE_SQLITE
503- if (event_queue.size () > configuration->eventQueueThreshold ) {
504- log (LogLevel::WARNING, " New event queue size is smaller than the old one, dropping the oldest events to fit" );
505- event_queue.resize (configuration->eventQueueThreshold );
522+ if (value < 1 ) {
523+ log (LogLevel::WARNING, " [Countly][setEventsToRQThreshold] Threshold can not be less than 1. Setting it to 1 instead of:[" + std::to_string (value) + " ]" );
524+ value = 1 ;
525+ } else if (value > 10000 ) {
526+ log (LogLevel::WARNING, " [Countly][setEventsToRQThreshold] Threshold can not be greater than 10000. Setting it to 10000 instead of:[" + std::to_string (value) + " ]" );
527+ value = 10000 ;
506528 }
507- #endif
529+
530+ // set the value
531+ configuration->eventQueueThreshold = value;
532+ // if current queue size is greater than the new threshold, send events to RQ
533+ checkAndSendEventToRQ ();
508534 mutex->unlock ();
509535}
510536
@@ -555,14 +581,13 @@ bool Countly::attemptSessionUpdateEQ() {
555581 }
556582 mutex->unlock ();
557583#else
558- int event_count = checkPersistentEQSize ();
584+ int event_count = checkEQSize ();
559585 if (event_count <= 0 ) {
560586 return false ;
561587 }
562588#endif
563589
564- bool update_failed = !updateSession ();
565- return update_failed;
590+ return !updateSession ();
566591}
567592
568593void Countly::clearEQInternal () {
@@ -689,30 +714,21 @@ bool Countly::updateSession() {
689714
690715 // events array
691716 nlohmann::json events = nlohmann::json::array ();
692- bool no_events;
717+ std::string event_ids;
718+ bool no_events = isEQEmpty ();
693719
694- #ifndef COUNTLY_USE_SQLITE
695- no_events = event_queue.empty ();
696720 if (!no_events) {
721+ #ifndef COUNTLY_USE_SQLITE
697722 for (const auto &event_json : event_queue) {
698723 events.push_back (nlohmann::json::parse (event_json));
699724 }
700- } else {
701- log (LogLevel::DEBUG, " [Countly][updateSession] EQ empty." );
702- }
703725#else
704- mutex->unlock ();
705- no_events = checkPersistentEQSize () > 0 ? false : true ;
706- mutex->lock ();
707-
708- std::string event_ids;
709- if (!no_events) {
710726 // TODO: If database_path was empty there was return false here
711727 fillEventsIntoJson (events, event_ids);
728+ #endif
712729 } else {
713730 log (LogLevel::DEBUG, " [Countly][updateSession] EQ empty." );
714731 }
715- #endif
716732 mutex->unlock ();
717733 auto duration = std::chrono::duration_cast<std::chrono::seconds>(getSessionDuration ());
718734 mutex->lock ();
@@ -728,10 +744,7 @@ bool Countly::updateSession() {
728744
729745 // report events if there are any to request queue
730746 if (!no_events) {
731- log (LogLevel::DEBUG, " [Countly][updateSession] sending event." );
732- std::map<std::string, std::string> data = {{" app_key" , session_params[" app_key" ].get <std::string>()}, {" device_id" , session_params[" device_id" ].get <std::string>()}, {" events" , events.dump ()}};
733-
734- requestModule->addRequestToQueue (data);
747+ sendEventsToRQ (events);
735748 }
736749
737750// clear event queue
@@ -740,6 +753,7 @@ bool Countly::updateSession() {
740753 event_queue.clear ();
741754#else
742755 if (!event_ids.empty ()) {
756+ // this is a partial clearance, we only remove the events that were sent
743757 removeEventWithId (event_ids);
744758 }
745759#endif
@@ -752,6 +766,23 @@ bool Countly::updateSession() {
752766 return true ;
753767}
754768
769+ void Countly::sendEventsToRQ (const nlohmann::json &events) {
770+ log (LogLevel::DEBUG, " [Countly][sendEventsToRQ] Sending events to RQ." );
771+ std::map<std::string, std::string> data = {{" app_key" , session_params[" app_key" ].get <std::string>()}, {" device_id" , session_params[" device_id" ].get <std::string>()}, {" events" , events.dump ()}};
772+ requestModule->addRequestToQueue (data);
773+ }
774+
775+ bool Countly::isEQEmpty () {
776+ log (LogLevel::DEBUG, " [Countly][isEQEmpty] Checking if the event queue is empty." );
777+ #ifdef COUNTLY_USE_SQLITE
778+ mutex->unlock ();
779+ return checkEQSize () > 0 ? false : true ;
780+ mutex->lock ();
781+ #else
782+ return event_queue.empty ();
783+ #endif
784+ }
785+
755786bool Countly::endSession () {
756787 log (LogLevel::INFO, " [Countly][endSession]" );
757788 const std::chrono::system_clock::time_point now = Countly::getTimestamp ();
@@ -777,6 +808,29 @@ bool Countly::endSession() {
777808
778809std::chrono::system_clock::time_point Countly::getTimestamp () { return std::chrono::system_clock::now (); }
779810
811+ int Countly::checkEQSize () {
812+ log (LogLevel::DEBUG, " [Countly][checkEQSize]" );
813+ int event_count = -1 ;
814+ if (!is_sdk_initialized) {
815+ log (LogLevel::DEBUG, " [Countly][checkEQSize] SDK is not initialized." );
816+ return event_count;
817+ }
818+
819+ #ifdef COUNTLY_USE_SQLITE
820+ event_count = checkPersistentEQSize ();
821+ #else
822+ event_count = checkMemoryEQSize ();
823+ #endif
824+ return event_count;
825+ }
826+
827+ #ifndef COUNTLY_USE_SQLITE
828+ int Countly::checkMemoryEQSize () {
829+ log (LogLevel::DEBUG, " [Countly][checkMemoryEQSize] Checking event queue size in memory" );
830+ return event_queue.size ();
831+ }
832+ #endif
833+
780834// Standalone Sqlite functions
781835#ifdef COUNTLY_USE_SQLITE
782836void Countly::removeEventWithId (const std::string &event_ids) {
@@ -827,9 +881,7 @@ void Countly::fillEventsIntoJson(nlohmann::json &events, std::string &event_ids)
827881
828882 // create sql statement to fetch events as much as the event queue threshold
829883 // TODO: check if this is something we want to do
830- std::ostringstream sql_statement_stream;
831- sql_statement_stream << " SELECT evtid, event FROM events LIMIT " << std::dec << configuration->eventQueueThreshold << ' ;' ;
832- std::string sql_statement = sql_statement_stream.str ();
884+ std::string sql_statement = " SELECT evtid, event FROM events;" ;
833885
834886 // execute sql statement
835887 return_value = sqlite3_get_table (database, sql_statement.c_str (), &table, &row_count, &column_count, &error_message);
@@ -862,13 +914,12 @@ void Countly::fillEventsIntoJson(nlohmann::json &events, std::string &event_ids)
862914}
863915
864916int Countly::checkPersistentEQSize () {
865- log (LogLevel::DEBUG, " [Countly][checkEQSize]" );
866- int event_count = -1 ;
917+ int result = -1 ;
867918 mutex->lock ();
868919 if (database_path.empty ()) {
869920 mutex->unlock ();
870921 log (LogLevel::FATAL, " [Countly][checkEQSize] Sqlite database path is not set" );
871- return event_count ;
922+ return result ;
872923 }
873924
874925 sqlite3 *database;
@@ -881,8 +932,8 @@ int Countly::checkPersistentEQSize() {
881932 char **table;
882933 return_value = sqlite3_get_table (database, " SELECT COUNT(*) FROM events;" , &table, &row_count, &column_count, &error_message);
883934 if (return_value == SQLITE_OK) {
884- event_count = atoi (table[1 ]);
885- log (LogLevel::DEBUG, " [Countly][checkEQSize] Fetched event count from database: " + std::to_string (event_count ));
935+ result = atoi (table[1 ]);
936+ log (LogLevel::DEBUG, " [Countly][checkEQSize] Fetched event count from database: " + std::to_string (result ));
886937 } else {
887938 log (LogLevel::ERROR, error_message);
888939 sqlite3_free (error_message);
@@ -891,9 +942,8 @@ int Countly::checkPersistentEQSize() {
891942 } else {
892943 log (LogLevel::WARNING, " [Countly][checkEQSize] Could not open database" );
893944 }
894-
895945 sqlite3_close (database);
896- return event_count ;
946+ return result ;
897947}
898948
899949void Countly::addEventToSqlite (const cly::Event &event) {
0 commit comments