1010
1111#include " InfoLoggerDispatch.h"
1212#include < mysql.h>
13+ #include < mysqld_error.h>
1314#include " utility.h"
1415#include " infoLoggerMessage.h"
1516#include < unistd.h>
1617#include < string.h>
18+ #include < Common/Timer.h>
1719
1820#if LIBMYSQL_VERSION_ID >= 80000
1921typedef bool my_bool;
@@ -47,10 +49,17 @@ class InfoLoggerDispatchSQLImpl
4749 std::string sql_insert;
4850
4951 unsigned long long insertCount = 0 ; // counter for number of queries executed
50- unsigned long long msgDroppedCount = 0 ; // counter for number of messages dropped (DB unavailable, etc)
52+ unsigned long long msgDelayedCount = 0 ; // counter for number of messages delayed (insert failed, retry)
53+ unsigned long long msgDroppedCount = 0 ; // counter for number of messages dropped (insert failed, dropped)
5154
5255 int connectDB (); // function to connect to database
5356 int disconnectDB (); // disconnect/cleanup DB connection
57+
58+ int commitEnabled = 1 ; // flag to enable transactions
59+ int commitDebug = 0 ; // log transactions
60+ int commitTimeout = 1000000 ; // time between commits
61+ Timer commitTimer; // timer for transaction
62+ int commitNumberOfMsg; // number of messages since last commit
5463};
5564
5665void InfoLoggerDispatchSQLImpl::start ()
@@ -114,7 +123,7 @@ InfoLoggerDispatchSQL::InfoLoggerDispatchSQL(ConfigInfoLoggerServer* config, Sim
114123void InfoLoggerDispatchSQLImpl::stop ()
115124{
116125 disconnectDB ();
117- theLog->info (" DB thread insert count = %llu, dropped msg count = %llu" , insertCount, msgDroppedCount);
126+ theLog->info (" DB thread insert count = %llu, delayed msg count = %llu, dropped msg count = %llu" , insertCount, msgDelayedCount , msgDroppedCount);
118127}
119128
120129InfoLoggerDispatchSQL::~InfoLoggerDispatchSQL ()
@@ -201,6 +210,9 @@ int InfoLoggerDispatchSQLImpl::connectDB()
201210 disconnectDB ();
202211 return -1 ;
203212 }
213+
214+ // reset transactions
215+ commitNumberOfMsg = 0 ;
204216 }
205217
206218 return 0 ;
@@ -229,17 +241,46 @@ int InfoLoggerDispatchSQLImpl::customLoop()
229241 if (err) {
230242 // temporization to avoid immediate retry
231243 sleep (SQL_RETRY_CONNECT);
244+ } else if (commitEnabled) {
245+ // complete pending transactions
246+ if (commitNumberOfMsg) {
247+ if (commitTimer.isTimeout ()) {
248+ if (mysql_query (db, " COMMIT" )) {
249+ theLog->error (" DB transaction commit failed: %s" , mysql_error (db));
250+ commitEnabled = 0 ;
251+ } else {
252+ if (commitDebug) {
253+ theLog->info (" DB commit - %d msgs" , commitNumberOfMsg);
254+ }
255+ }
256+ commitNumberOfMsg = 0 ;
257+ }
258+ }
232259 }
260+
233261 return err;
234262}
235263
236264int InfoLoggerDispatchSQLImpl::customMessageProcess (std::shared_ptr<InfoLoggerMessageList> lmsg)
237265{
238- // todo: keep message in queue on error!
266+ // procedure for dropped messages and keep count of them
267+ auto returnDroppedMessage = [&](const char * message) {
268+ // log bad message content (truncated)
269+ const int maxLen = 200 ;
270+ int msgLen = (int )strlen (message);
271+ theLog->error (" Dropping message (%d bytes): %.*s%s" , msgLen, maxLen, message, (msgLen > maxLen) ? " ..." : " " );
272+ msgDroppedCount++;
273+ return 0 ; // remove message from queue
274+ };
275+
276+ // procedure for delayed messages and keep count of them
277+ auto returnDelayedMessage = [&]() {
278+ msgDelayedCount++;
279+ return -1 ; // keep message in queue
280+ };
239281
240282 if (!dbIsConnected) {
241- msgDroppedCount++;
242- return -1 ;
283+ return returnDelayedMessage ();
243284 }
244285
245286 infoLog_msg_t* m;
@@ -274,6 +315,21 @@ int InfoLoggerDispatchSQLImpl::customMessageProcess(std::shared_ptr<InfoLoggerMe
274315 }
275316 }
276317
318+ if (commitEnabled) {
319+ if (commitNumberOfMsg == 0 ) {
320+ if (mysql_query (db, " START TRANSACTION" )) {
321+ theLog->error (" DB start transaction failed: %s" , mysql_error (db));
322+ commitEnabled = 0 ;
323+ return returnDelayedMessage ();
324+ } else {
325+ if (commitDebug) {
326+ theLog->info (" DB transaction started" );
327+ }
328+ }
329+ commitTimer.reset (commitTimeout);
330+ }
331+ }
332+
277333 // re-format message with multiple line - assumes it is the LAST field in the protocol
278334 for (msg = (char *)m->values [nFields - 1 ].value .vString ; msg != NULL ; msg = nl) {
279335 nl = strchr (msg, ' \f ' );
@@ -288,27 +344,33 @@ int InfoLoggerDispatchSQLImpl::customMessageProcess(std::shared_ptr<InfoLoggerMe
288344 // update bind variables
289345 if (mysql_stmt_bind_param (stmt, bind)) {
290346 theLog->error (" mysql_stmt_bind() failed: %s" , mysql_error (db));
291- disconnectDB ( );
292- msgDroppedCount++;
293- return - 1 ;
347+ theLog-> error ( " message: %s " , msg );
348+ // if can not bind, message malformed, drop it
349+ return returnDroppedMessage (msg) ;
294350 }
295351
296352 // Do the insertion
297353 if (mysql_stmt_execute (stmt)) {
298- theLog->error (" mysql_stmt_exec() failed: %s" , mysql_error (db));
354+ theLog->error (" mysql_stmt_exec() failed: (%d) %s" , mysql_errno (db), mysql_error (db));
355+ // column too long
356+ if (mysql_errno (db) == ER_DATA_TOO_LONG) {
357+ return returnDroppedMessage (msg);
358+ }
299359 // retry with new connection - usually it means server was down
300360 disconnectDB ();
301- msgDroppedCount++;
302- return -1 ;
361+ return returnDelayedMessage ();
303362 }
304363
305364 insertCount++;
306- /*
307- if (insertCount%1000==0) {
308- theLog->info("insert count = %llu",insertCount);
309- }
310- */
365+ commitNumberOfMsg++;
366+
367+ if (commitDebug) {
368+ if (insertCount % 1000 == 0 ) {
369+ theLog->info (" insert count = %llu" , insertCount);
370+ }
371+ }
311372 }
312373 }
374+ // report message success, it will be removed from queue
313375 return 0 ;
314376}
0 commit comments