1010
1111#include " InfoLoggerDispatch.h"
1212#include < mysql.h>
13+ #include < mysqld_error.h>
1314#include " utility.h"
1415#include " infoLoggerMessage.h"
1516#include < unistd.h>
@@ -48,7 +49,8 @@ class InfoLoggerDispatchSQLImpl
4849 std::string sql_insert;
4950
5051 unsigned long long insertCount = 0 ; // counter for number of queries executed
51- unsigned long long msgDroppedCount = 0 ; // counter for number of messages dropped (DB unavailable, etc)
52+ unsigned long long msgDelayedCount = 0 ; // counter for number of messages delayed (insert failed, retry)
53+ unsigned long long msgDroppedCount = 0 ; // counter for number of messages dropped (insert failed, dropped)
5254
5355 int connectDB (); // function to connect to database
5456 int disconnectDB (); // disconnect/cleanup DB connection
@@ -121,7 +123,7 @@ InfoLoggerDispatchSQL::InfoLoggerDispatchSQL(ConfigInfoLoggerServer* config, Sim
121123void InfoLoggerDispatchSQLImpl::stop ()
122124{
123125 disconnectDB ();
124- theLog->info (" DB thread insert count = %llu, dropped msg count = %llu" , insertCount, msgDroppedCount);
126+ theLog->info (" DB thread insert count = %llu, delayed msg count = %llu, dropped msg count = %llu" , insertCount, msgDelayedCount , msgDroppedCount);
125127}
126128
127129InfoLoggerDispatchSQL::~InfoLoggerDispatchSQL ()
@@ -261,11 +263,24 @@ int InfoLoggerDispatchSQLImpl::customLoop()
261263
262264int InfoLoggerDispatchSQLImpl::customMessageProcess (std::shared_ptr<InfoLoggerMessageList> lmsg)
263265{
264- // todo: keep message in queue on error!
265-
266- if (!dbIsConnected) {
266+ // procedure for dropped messages and keep count of them
267+ auto returnDroppedMessage = [&](const char * message) {
268+ // log bad message content (truncated)
269+ const int maxLen = 200 ;
270+ int msgLen = (int )strlen (message);
271+ theLog->error (" Dropping message (%d bytes): %.*s%s" , msgLen, maxLen, message, (msgLen>maxLen) ? " ..." : " " );
267272 msgDroppedCount++;
268- return -1 ;
273+ return 0 ; // remove message from queue
274+ };
275+
276+ // procedure for delayed messages and keep count of them
277+ auto returnDelayedMessage = [&]() {
278+ msgDelayedCount++;
279+ return -1 ; // keep message in queue
280+ };
281+
282+ if (!dbIsConnected) {
283+ return returnDelayedMessage ();
269284 }
270285
271286 infoLog_msg_t* m;
@@ -305,7 +320,7 @@ int InfoLoggerDispatchSQLImpl::customMessageProcess(std::shared_ptr<InfoLoggerMe
305320 if (mysql_query (db, " START TRANSACTION" )) {
306321 theLog->error (" DB start transaction failed: %s" , mysql_error (db));
307322 commitEnabled = 0 ;
308- return - 1 ;
323+ return returnDelayedMessage () ;
309324 } else {
310325 if (commitDebug) {
311326 theLog->info (" DB transaction started" );
@@ -329,18 +344,21 @@ int InfoLoggerDispatchSQLImpl::customMessageProcess(std::shared_ptr<InfoLoggerMe
329344 // update bind variables
330345 if (mysql_stmt_bind_param (stmt, bind)) {
331346 theLog->error (" mysql_stmt_bind() failed: %s" , mysql_error (db));
332- disconnectDB ( );
333- msgDroppedCount++;
334- return - 1 ;
347+ theLog-> error ( " message: %s " , msg );
348+ // if can not bind, message malformed, drop it
349+ return returnDroppedMessage (msg) ;
335350 }
336351
337352 // Do the insertion
338353 if (mysql_stmt_execute (stmt)) {
339- theLog->error (" mysql_stmt_exec() failed: %s" , mysql_error (db));
354+ theLog->error (" mysql_stmt_exec() failed: (%d) %s" , mysql_errno (db), mysql_error (db));
355+ // column too long
356+ if (mysql_errno (db) == ER_DATA_TOO_LONG) {
357+ return returnDroppedMessage (msg);
358+ }
340359 // retry with new connection - usually it means server was down
341360 disconnectDB ();
342- msgDroppedCount++;
343- return -1 ;
361+ return returnDelayedMessage ();
344362 }
345363
346364 insertCount++;
@@ -353,5 +371,6 @@ int InfoLoggerDispatchSQLImpl::customMessageProcess(std::shared_ptr<InfoLoggerMe
353371 }
354372 }
355373 }
374+ // report message success, it will be removed from queue
356375 return 0 ;
357376}
0 commit comments