Skip to content

Commit 7287f9b

Browse files
authored
Update session and flush events refactoring (#119)
* 1 * lint * uncomment * 2
1 parent cbec460 commit 7287f9b

File tree

3 files changed

+176
-92
lines changed

3 files changed

+176
-92
lines changed

include/countly.hpp

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,35 @@ class Countly : public cly::CountlyDelegates {
101101
void addEvent(const cly::Event &event);
102102

103103
/*
104-
* Checks and returns the size of the event queue in persistent storage.
105-
*/
106-
int checkEQSize();
104+
* Checks and returns the size of the event queue in persistent storage.
105+
*/
106+
int checkPersistentEQSize();
107107

108108
/*
109-
* Erases/clears the event queue in persistent storage.
110-
*/
111-
void clearEQ();
109+
* Erases/clears the event queue in persistent storage.
110+
*/
111+
void clearPersistentEQ();
112+
113+
/*
114+
* Erases/clears the event queue in persistent storage for given ID(s) .
115+
*/
116+
void removeEventWithId(const std::string &event_ids);
117+
118+
/*
119+
* Clears the event queue (persistent or in memory depending on their availability) internally.
120+
*/
121+
void clearEQInternal();
122+
123+
/*
124+
* Attempts to update the session if the session and return true if the session was updated successfully.
125+
* It checks only does so if the event queue is not empty
126+
*/
127+
bool attemptSessionUpdateEQ();
128+
129+
/*
130+
* Fetches events from the persistent storage and writes them into given json array, also gives event arrays
131+
*/
132+
void fillEventsIntoJson(nlohmann::json &events, std::string &event_ids);
112133

113134
void addEventToSqlite(const cly::Event &event);
114135

include/countly/countly_configuration.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ struct CountlyConfiguration {
3636

3737
/**
3838
* Sets the interval for the automatic update calls
39-
* min value 1 (1 second), max value 600 (10 minutes)
4039
*/
4140
unsigned int sessionDuration = 60;
4241

src/countly.cpp

Lines changed: 149 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ void Countly::addEvent(const cly::Event &event) {
482482
#else
483483
addEventToSqlite(event);
484484
mutex->unlock();
485-
int queueSize = checkEQSize();
485+
int queueSize = checkPersistentEQSize();
486486
mutex->lock();
487487
if (queueSize >= configuration->eventQueueThreshold) {
488488
log(LogLevel::DEBUG, "Event queue threshold is reached");
@@ -514,43 +514,65 @@ void Countly::flushEvents(std::chrono::seconds timeout) {
514514
try {
515515
auto wait_duration = std::chrono::seconds(1);
516516
bool update_failed;
517+
518+
// Try to update session
517519
while (timeout.count() != 0) {
518-
#ifndef COUNTLY_USE_SQLITE
519-
mutex->lock();
520-
if (event_queue.empty()) {
521-
mutex->unlock();
520+
521+
// try to update session if event queue is not empty
522+
update_failed = attemptSessionUpdateEQ();
523+
524+
// if update is successful or EQ is empty, break the loop
525+
if (!update_failed) {
522526
break;
523527
}
524-
mutex->unlock();
525528

526-
update_failed = !updateSession();
527-
#else
528-
update_failed = true;
529-
int event_count = checkEQSize();
530-
if (event_count > 0) {
531-
update_failed = !updateSession();
532-
}
533-
#endif
534-
if (update_failed) {
535-
std::this_thread::sleep_for(wait_duration);
536-
wait_duration *= 2;
537-
timeout = (wait_duration > timeout) ? std::chrono::seconds(0) : (timeout - wait_duration);
538-
}
529+
// wait for a while
530+
std::this_thread::sleep_for(wait_duration);
531+
// increase wait/retry duration (exponential backoff: 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, ...)
532+
wait_duration *= 2;
533+
// if wait/retry time is bigger than timeout stop trying, else decrease timeout
534+
timeout = (wait_duration > timeout) ? std::chrono::seconds(0) : (timeout - wait_duration);
539535
}
540536

541-
#ifndef COUNTLY_USE_SQLITE
542-
event_queue.clear();
543-
#else
544-
update_failed = true;
545-
clearEQ();
546-
#endif
537+
// Clear the event queue
538+
clearEQInternal(); // TODO: Check if this is necessary
539+
540+
// TODO: Check if we capture anything other than a system_error
547541
} catch (const std::system_error &e) {
548542
std::ostringstream log_message;
549543
log_message << "flushEvents, error: " << e.what();
550544
log(LogLevel::FATAL, log_message.str());
551545
}
552546
}
553547

548+
bool Countly::attemptSessionUpdateEQ() {
549+
// return false if event queue is empty
550+
#ifndef COUNTLY_USE_SQLITE
551+
mutex->lock();
552+
if (event_queue.empty()) {
553+
mutex->unlock();
554+
return false;
555+
}
556+
mutex->unlock();
557+
#else
558+
int event_count = checkPersistentEQSize();
559+
if (event_count <= 0) {
560+
return false;
561+
}
562+
#endif
563+
564+
bool update_failed = !updateSession();
565+
return update_failed;
566+
}
567+
568+
void Countly::clearEQInternal() {
569+
#ifndef COUNTLY_USE_SQLITE
570+
event_queue.clear();
571+
#else
572+
clearPersistentEQ();
573+
#endif
574+
}
575+
554576
#ifdef COUNTLY_BUILD_TESTS
555577
std::vector<std::string> Countly::debugReturnStateOfEQ() {
556578
try {
@@ -653,17 +675,19 @@ bool Countly::beginSession() {
653675
*/
654676
bool Countly::updateSession() {
655677
try {
678+
// Check if there was a session, if not try to start one
656679
mutex->lock();
657680
if (!began_session) {
658681
mutex->unlock();
659682
if (!beginSession()) {
683+
// if beginSession fails, we should not try to update session
660684
return false;
661685
}
662-
663686
mutex->lock();
664687
began_session = true;
665688
}
666689

690+
// events array
667691
nlohmann::json events = nlohmann::json::array();
668692
bool no_events;
669693

@@ -673,58 +697,27 @@ bool Countly::updateSession() {
673697
for (const auto &event_json : event_queue) {
674698
events.push_back(nlohmann::json::parse(event_json));
675699
}
700+
} else {
701+
log(LogLevel::DEBUG, "[Countly][updateSession] EQ empty.");
676702
}
677703
#else
678-
if (database_path.empty()) {
679-
mutex->unlock();
680-
log(LogLevel::FATAL, "Cannot fetch events, sqlite database path is not set.");
681-
return false;
682-
}
704+
mutex->unlock();
705+
no_events = checkPersistentEQSize() > 0 ? false : true;
706+
mutex->lock();
683707

684-
log(LogLevel::DEBUG, "[Countly][updateSession] fetching events from storage.");
685-
sqlite3 *database;
686-
int return_value, row_count, column_count;
687-
char **table;
688-
char *error_message;
689708
std::string event_ids;
690-
691-
return_value = sqlite3_open(database_path.c_str(), &database);
692-
if (return_value == SQLITE_OK) {
693-
std::ostringstream sql_statement_stream;
694-
sql_statement_stream << "SELECT evtid, event FROM events LIMIT " << std::dec << configuration->eventQueueThreshold << ';';
695-
std::string sql_statement = sql_statement_stream.str();
696-
697-
return_value = sqlite3_get_table(database, sql_statement.c_str(), &table, &row_count, &column_count, &error_message);
698-
no_events = (row_count == 0);
699-
if (return_value == SQLITE_OK && !no_events) {
700-
std::ostringstream event_id_stream;
701-
event_id_stream << '(';
702-
703-
for (int event_index = 1; event_index < row_count + 1; event_index++) {
704-
event_id_stream << table[event_index * column_count] << ',';
705-
events.push_back(nlohmann::json::parse(table[(event_index * column_count) + 1]));
706-
}
707-
708-
log(LogLevel::DEBUG, "[Countly][updateSession] events count = " + std::to_string(events.size()));
709-
710-
event_id_stream.seekp(-1, event_id_stream.cur);
711-
event_id_stream << ')';
712-
event_ids = event_id_stream.str();
713-
} else if (return_value != SQLITE_OK) {
714-
log(LogLevel::ERROR, error_message);
715-
sqlite3_free(error_message);
716-
} else {
717-
log(LogLevel::DEBUG, "[Countly][updateSession] no events detected at the storage.");
718-
}
719-
sqlite3_free_table(table);
709+
if (!no_events) {
710+
// TODO: If database_path was empty there was return false here
711+
fillEventsIntoJson(events, event_ids);
712+
} else {
713+
log(LogLevel::DEBUG, "[Countly][updateSession] EQ empty.");
720714
}
721-
sqlite3_close(database);
722-
723715
#endif
724716
mutex->unlock();
725717
auto duration = std::chrono::duration_cast<std::chrono::seconds>(getSessionDuration());
726718
mutex->lock();
727719

720+
// report session duration if it is greater than the configured session duration value
728721
if (duration.count() >= configuration->sessionDuration) {
729722
log(LogLevel::DEBUG, "[Countly][updateSession] sending session update.");
730723
std::map<std::string, std::string> data = {{"app_key", session_params["app_key"].get<std::string>()}, {"device_id", session_params["device_id"].get<std::string>()}, {"session_duration", std::to_string(duration.count())}};
@@ -733,32 +726,21 @@ bool Countly::updateSession() {
733726
last_sent_session_request += duration;
734727
}
735728

729+
// report events if there are any to request queue
736730
if (!no_events) {
737731
log(LogLevel::DEBUG, "[Countly][updateSession] sending event.");
738732
std::map<std::string, std::string> data = {{"app_key", session_params["app_key"].get<std::string>()}, {"device_id", session_params["device_id"].get<std::string>()}, {"events", events.dump()}};
739733

740734
requestModule->addRequestToQueue(data);
741735
}
742736

737+
// clear event queue
738+
// TODO: check if we want to totally wipe the event queue in memory but not in database
743739
#ifndef COUNTLY_USE_SQLITE
744740
event_queue.clear();
745741
#else
746742
if (!event_ids.empty()) {
747-
log(LogLevel::DEBUG, "[Countly][updateSession] Removing events from storage: " + event_ids);
748-
// we attempt to clear the events in the database only if there were any events collected previously
749-
return_value = sqlite3_open(database_path.c_str(), &database);
750-
if (return_value == SQLITE_OK) {
751-
std::ostringstream sql_statement_stream;
752-
sql_statement_stream << "DELETE FROM events WHERE evtid IN " << event_ids << ';';
753-
std::string sql_statement = sql_statement_stream.str();
754-
755-
return_value = sqlite3_exec(database, sql_statement.c_str(), nullptr, nullptr, &error_message);
756-
if (return_value != SQLITE_OK) {
757-
log(LogLevel::ERROR, error_message);
758-
sqlite3_free(error_message);
759-
}
760-
}
761-
sqlite3_close(database);
743+
removeEventWithId(event_ids);
762744
}
763745
#endif
764746
} catch (const std::system_error &e) {
@@ -797,7 +779,89 @@ std::chrono::system_clock::time_point Countly::getTimestamp() { return std::chro
797779

798780
// Standalone Sqlite functions
799781
#ifdef COUNTLY_USE_SQLITE
800-
int Countly::checkEQSize() {
782+
void Countly::removeEventWithId(const std::string &event_ids) {
783+
// TODO: Check if we should check database_path set or not
784+
log(LogLevel::DEBUG, "[Countly][removeEventWithId] Removing events from storage: " + event_ids);
785+
sqlite3 *database;
786+
int return_value;
787+
char *error_message;
788+
789+
// we attempt to clear the events in the database only if there were any events collected previously
790+
return_value = sqlite3_open(database_path.c_str(), &database);
791+
if (return_value == SQLITE_OK) {
792+
std::ostringstream sql_statement_stream;
793+
sql_statement_stream << "DELETE FROM events WHERE evtid IN " << event_ids << ';';
794+
std::string sql_statement = sql_statement_stream.str();
795+
796+
return_value = sqlite3_exec(database, sql_statement.c_str(), nullptr, nullptr, &error_message);
797+
if (return_value != SQLITE_OK) {
798+
log(LogLevel::ERROR, error_message);
799+
sqlite3_free(error_message);
800+
} else {
801+
log(LogLevel::DEBUG, "[Countly][removeEventWithId] Removed events with the given ID(s).");
802+
}
803+
} else {
804+
log(LogLevel::ERROR, "[Countly][removeEventWithId] Could not open database.");
805+
}
806+
sqlite3_close(database);
807+
}
808+
809+
void Countly::fillEventsIntoJson(nlohmann::json &events, std::string &event_ids) {
810+
if (database_path.empty()) {
811+
mutex->unlock();
812+
log(LogLevel::FATAL, "[Countly][fillEventsIntoJson] Sqlite database path is not set.");
813+
event_ids = "";
814+
return;
815+
}
816+
817+
log(LogLevel::DEBUG, "[Countly][fillEventsIntoJson] Fetching events from storage.");
818+
sqlite3 *database;
819+
int return_value, row_count, column_count;
820+
char **table;
821+
char *error_message;
822+
823+
// open database
824+
return_value = sqlite3_open(database_path.c_str(), &database);
825+
// if database opened successfully
826+
if (return_value == SQLITE_OK) {
827+
828+
// create sql statement to fetch events as much as the event queue threshold
829+
// TODO: check if this is something we want to do
830+
std::ostringstream sql_statement_stream;
831+
sql_statement_stream << "SELECT evtid, event FROM events LIMIT " << std::dec << configuration->eventQueueThreshold << ';';
832+
std::string sql_statement = sql_statement_stream.str();
833+
834+
// execute sql statement
835+
return_value = sqlite3_get_table(database, sql_statement.c_str(), &table, &row_count, &column_count, &error_message);
836+
if (return_value == SQLITE_OK) {
837+
std::ostringstream event_id_stream;
838+
event_id_stream << '(';
839+
840+
for (int event_index = 1; event_index < row_count + 1; event_index++) {
841+
event_id_stream << table[event_index * column_count] << ',';
842+
// add event to the events array
843+
events.push_back(nlohmann::json::parse(table[(event_index * column_count) + 1]));
844+
}
845+
846+
log(LogLevel::DEBUG, "[Countly][fillEventsIntoJson] Events count = " + std::to_string(events.size()));
847+
848+
event_id_stream.seekp(-1, event_id_stream.cur);
849+
event_id_stream << ')';
850+
851+
// write event ids to a string stream (or more like copy out that stream here) to be used in the delete statement
852+
event_ids = event_id_stream.str();
853+
} else {
854+
log(LogLevel::ERROR, error_message);
855+
sqlite3_free(error_message);
856+
}
857+
sqlite3_free_table(table);
858+
} else {
859+
log(LogLevel::ERROR, "[Countly][fillEventsIntoJson] Could not open database.");
860+
}
861+
sqlite3_close(database);
862+
}
863+
864+
int Countly::checkPersistentEQSize() {
801865
log(LogLevel::DEBUG, "[Countly][checkEQSize]");
802866
int event_count = -1;
803867
mutex->lock();
@@ -865,7 +929,7 @@ void Countly::addEventToSqlite(const cly::Event &event) {
865929
}
866930
}
867931

868-
void Countly::clearEQ() {
932+
void Countly::clearPersistentEQ() {
869933
log(LogLevel::DEBUG, "[Countly][clearEQ]");
870934
sqlite3 *database;
871935
int return_value;

0 commit comments

Comments
 (0)