Skip to content

Commit db69533

Browse files
committed
transaction for bulk inserts
1 parent 51a292e commit db69533

File tree

1 file changed

+49
-5
lines changed

1 file changed

+49
-5
lines changed

src/InfoLoggerDispatchSQL.cxx

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "infoLoggerMessage.h"
1515
#include <unistd.h>
1616
#include <string.h>
17+
#include <Common/Timer.h>
1718

1819
#if LIBMYSQL_VERSION_ID >= 80000
1920
typedef bool my_bool;
@@ -51,6 +52,12 @@ class InfoLoggerDispatchSQLImpl
5152

5253
int connectDB(); // function to connect to database
5354
int disconnectDB(); // disconnect/cleanup DB connection
55+
56+
int commitEnabled = 1; // flag to enable transactions
57+
int commitDebug = 0; // log transactions
58+
int commitTimeout = 1000000; // time between commits
59+
Timer commitTimer; // timer for transaction
60+
int commitNumberOfMsg; // number of messages since last commit
5461
};
5562

5663
void InfoLoggerDispatchSQLImpl::start()
@@ -201,6 +208,9 @@ int InfoLoggerDispatchSQLImpl::connectDB()
201208
disconnectDB();
202209
return -1;
203210
}
211+
212+
// reset transactions
213+
commitNumberOfMsg = 0;
204214
}
205215

206216
return 0;
@@ -229,7 +239,23 @@ int InfoLoggerDispatchSQLImpl::customLoop()
229239
if (err) {
230240
// temporization to avoid immediate retry
231241
sleep(SQL_RETRY_CONNECT);
242+
} else if (commitEnabled) {
243+
// complete pending transactions
244+
if (commitNumberOfMsg) {
245+
if (commitTimer.isTimeout()) {
246+
if (mysql_query (db, "COMMIT")) {
247+
theLog->error("DB transaction commit failed: %s", mysql_error(db));
248+
commitEnabled = 0;
249+
} else {
250+
if (commitDebug) {
251+
theLog->info("DB commit - %d msgs", commitNumberOfMsg);
252+
}
253+
}
254+
commitNumberOfMsg = 0;
255+
}
256+
}
232257
}
258+
233259
return err;
234260
}
235261

@@ -274,6 +300,21 @@ int InfoLoggerDispatchSQLImpl::customMessageProcess(std::shared_ptr<InfoLoggerMe
274300
}
275301
}
276302

303+
if (commitEnabled) {
304+
if (commitNumberOfMsg == 0) {
305+
if (mysql_query (db, "START TRANSACTION")) {
306+
theLog->error("DB start transaction failed: %s", mysql_error(db));
307+
commitEnabled = 0;
308+
return -1;
309+
} else {
310+
if (commitDebug) {
311+
theLog->info("DB transaction started");
312+
}
313+
}
314+
commitTimer.reset(commitTimeout);
315+
}
316+
}
317+
277318
// re-format message with multiple line - assumes it is the LAST field in the protocol
278319
for (msg = (char*)m->values[nFields - 1].value.vString; msg != NULL; msg = nl) {
279320
nl = strchr(msg, '\f');
@@ -303,11 +344,14 @@ int InfoLoggerDispatchSQLImpl::customMessageProcess(std::shared_ptr<InfoLoggerMe
303344
}
304345

305346
insertCount++;
306-
/*
307-
if (insertCount%1000==0) {
308-
theLog->info("insert count = %llu",insertCount);
309-
}
310-
*/
347+
commitNumberOfMsg++;
348+
349+
if (commitDebug) {
350+
if (insertCount%1000==0) {
351+
theLog->info("insert count = %llu",insertCount);
352+
}
353+
}
354+
311355
}
312356
}
313357
return 0;

0 commit comments

Comments
 (0)