Skip to content

Commit 909073a

Browse files
authored
Merge pull request #57 from sy-c/master
v1.3.19
2 parents 81eaea1 + 17ce96b commit 909073a

File tree

9 files changed

+101
-9
lines changed

9 files changed

+101
-9
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ endif()
5757
include(ExternalProject)
5858
externalproject_add (Common-standalone
5959
GIT_REPOSITORY "https://github.com/AliceO2Group/Common.git"
60-
GIT_TAG "v1.4.9"
60+
GIT_TAG "v1.5.2"
6161
LOG_DOWNLOAD 1
6262
UPDATE_COMMAND ""
6363
PATCH_COMMAND ""

doc/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,3 +276,13 @@ achieved on CentOS 7 with e.g. (as root):
276276
This can be done by specifying in infoLoggerD configuration the rxSocketPath parameter, and name it with a path starting with '/', e.g. '/tmp/infoLoggerD.socket'.
277277
The same value has to be set for the client configuration, in the txSocketPath key.
278278
User is responsible to ensure that file access permissions are configured properly.
279+
280+
* infoLoggerD local cache
281+
282+
infoLoggerD stores messages in a persistent local file until messages are successfully transmitted and acknowledged by infoLoggerServer.
283+
When infoLoggerServer is unavailable for a long time, messages may accumulate locally. They will all be transmitted to infoLoggerServer when available again.
284+
In some cases, one may want to delete this local cache.
285+
This can be configured on startup of infoLoggerD by one of the following ways:
286+
- in the infoLoggerD configuration section, set: `msgQueueReset=1` (this is permanent, done on each startup of infoLoggerD, which might not be what you want)
287+
- when starting infoLoggerD process from the command line (not with the systemctl service), add option: `-o msgQueueReset=1`
288+
- create a file named [msgQueuePath].reset (by default, msgQueuePath=/tmp/infoLoggerD/infoLoggerD.queue), e.g. `touch /tmp/infoLoggerD/infoLoggerD.queue.reset`. This will reset the queue on next startup (by hand or with e.g. service infoLoggerD restart), and the reset file will also be deleted (which ensures cleanup is done once only).

doc/releaseNotes.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,7 @@ This file describes the main feature changes for each InfoLogger released versio
6868

6969
## v1.3.18 - 08/01/2021
7070
- improved DB connection failure recovery
71+
72+
## v1.3.19 - 22/01/2021
73+
- added infoLoggerServer debug options
74+
- added option to reset local message queue on infoLoggerD startup

src/ConfigInfoLoggerServer.cxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ void ConfigInfoLoggerServer::readFromConfigFile(ConfigFile& config)
2323
config.getOptionalValue<int>(INFOLOGGER_CONFIG_SECTION_NAME_SERVER ".serverPortRx", serverPortRx);
2424
config.getOptionalValue<int>(INFOLOGGER_CONFIG_SECTION_NAME_SERVER ".maxClientsRx", maxClientsRx);
2525
config.getOptionalValue<int>(INFOLOGGER_CONFIG_SECTION_NAME_SERVER ".msgQueueLengthRx", msgQueueLengthRx);
26-
26+
config.getOptionalValue<std::string>(INFOLOGGER_CONFIG_SECTION_NAME_SERVER ".msgDumpFile", msgDumpFile);
27+
2728
config.getOptionalValue<std::string>(INFOLOGGER_CONFIG_SECTION_NAME_SERVER ".dbHost", dbHost);
2829
config.getOptionalValue<std::string>(INFOLOGGER_CONFIG_SECTION_NAME_SERVER ".dbUser", dbUser);
2930
config.getOptionalValue<std::string>(INFOLOGGER_CONFIG_SECTION_NAME_SERVER ".dbPassword", dbPassword);

src/ConfigInfoLoggerServer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ class ConfigInfoLoggerServer
3333
int serverPortRx = INFOLOGGER_DEFAULT_SERVER_RX_PORT; // IP port number to receive incoming messages, where infoLoggerD clients connect
3434
int maxClientsRx = 3000; // maximum number of connected infoLoggerD clients
3535
int msgQueueLengthRx = 10000; // reception queue size
36-
36+
std::string msgDumpFile = ""; // a file to dump copy of all incoming messages
37+
3738
// settings for database connection
3839
std::string dbHost = "localhost"; // database host name
3940
std::string dbUser = "o2"; // database user name

src/InfoLoggerClient.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,9 @@ int InfoLoggerClient::send(const char* message, unsigned int messageSize)
242242
// launch a thread for automatic reconnect, and buffer (reasonably) messages until then
243243
reconnectThreadStart();
244244
}
245-
if (messageBuffer.size()<cfg.maxMessagesBuffered) {
245+
if ((int)messageBuffer.size()<cfg.maxMessagesBuffered) {
246246
messageBuffer.push(message);
247-
if (messageBuffer.size()==cfg.maxMessagesBuffered) {
247+
if ((int)messageBuffer.size()==cfg.maxMessagesBuffered) {
248248
log.warning("Max buffer size reached, next messages will be lost until reconnect");
249249
}
250250
}

src/infoLoggerAdminDB.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ int main(int argc, char* argv[])
221221
unsigned, username varchar(32), `system` varchar(32), facility varchar(32), detector varchar(32), `partition` varchar(32), run int unsigned, errcode int unsigned, \
222222
errline smallint unsigned, errsource varchar(32), message text, index ix_severity(severity), index ix_level(level), index ix_timestamp(timestamp), index \
223223
ix_hostname(hostname(14)), index ix_rolename(rolename(20)), index ix_system(`system`(3)), index ix_facility(facility(20)), index ix_detector(detector(8)), index \
224-
ix_partition(`partition`(10)), index ix_run(run), index ix_errcode(errcode), index ix_errline(errline), index ix_errsource(errsource(20))) ENGINE=MyISAM";
224+
ix_partition(`partition`(10)), index ix_run(run), index ix_errcode(errcode), index ix_errline(errline), index ix_errsource(errsource(20)))";
225225

226226
if (optPartitioning) {
227227
log.info("Using partitioning");

src/infoLoggerD.cxx

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ class ConfigInfoLoggerD
7171
// settings for remote infoLoggerServer access
7272
std::string serverHost = "localhost"; // IP name to connect infoLoggerServer
7373
int serverPort = INFOLOGGER_DEFAULT_SERVER_RX_PORT; // IP port number to connect infoLoggerServer
74-
int msgQueueLength = 1000; // transmission queue size
75-
std::string msgQueuePath = "/tmp/infoLoggerD.queue"; // path to temp file storing messages
74+
int msgQueueLength = 10000; // transmission queue size
75+
std::string msgQueuePath = localLogDirectory + "/infoLoggerD.queue"; // path to temp file storing messages
76+
int msgQueueReset = 0; // when set, existing temp file is cleared (and pending messages lost)
7677
std::string clientName = "infoLoggerD"; // name identifying client to infoLoggerServer
7778
int isProxy = 0; // flag set to allow infoLoggerD to be a transport proxy to infoLoggerServer
7879

@@ -101,6 +102,7 @@ void ConfigInfoLoggerD::readFromConfigFile(ConfigFile& config)
101102
config.getOptionalValue<int>(INFOLOGGER_CONFIG_SECTION_NAME_INFOLOGGERD ".serverPort", serverPort);
102103
config.getOptionalValue<int>(INFOLOGGER_CONFIG_SECTION_NAME_INFOLOGGERD ".msgQueueLength", msgQueueLength);
103104
config.getOptionalValue<std::string>(INFOLOGGER_CONFIG_SECTION_NAME_INFOLOGGERD ".msgQueuePath", msgQueuePath);
105+
config.getOptionalValue<int>(INFOLOGGER_CONFIG_SECTION_NAME_INFOLOGGERD ".msgQueueReset", msgQueueReset);
104106
config.getOptionalValue<std::string>(INFOLOGGER_CONFIG_SECTION_NAME_INFOLOGGERD ".clientName", clientName);
105107
config.getOptionalValue<int>(INFOLOGGER_CONFIG_SECTION_NAME_INFOLOGGERD ".isProxy", isProxy);
106108

@@ -213,7 +215,13 @@ class InfoLoggerD : public Daemon
213215
FILE* logOutput = nullptr; // handle to local log file where to copy incoming messages, if configured to do so
214216
};
215217

216-
InfoLoggerD::InfoLoggerD(int argc, char* argv[]) : Daemon(argc, argv)
218+
// list of extra keys accepted on the command line (-o key=value entries)
219+
#define EXTRA_OPTIONS \
220+
{ \
221+
"msgQueueReset" \
222+
}
223+
224+
InfoLoggerD::InfoLoggerD(int argc, char* argv[]) : Daemon(argc, argv, nullptr, EXTRA_OPTIONS)
217225
{
218226
if (isOk()) { // proceed only if base daemon init was a success
219227

@@ -227,6 +235,13 @@ InfoLoggerD::InfoLoggerD(int argc, char* argv[]) : Daemon(argc, argv)
227235
// redirect legacy simplelog interface to SimpleLog
228236
setSimpleLog(&log);
229237

238+
// retrieve configuration parameters from command line
239+
for (auto const& o : execOptions) {
240+
if (o.key == "msgQueueReset") {
241+
configInfoLoggerD.msgQueueReset = atoi(o.value.c_str());
242+
}
243+
}
244+
230245
// retrieve configuration parameters from config file
231246
configInfoLoggerD.readFromConfigFile(config);
232247

@@ -309,6 +324,38 @@ InfoLoggerD::InfoLoggerD(int argc, char* argv[]) : Daemon(argc, argv)
309324
cfgCx.proxy_state = TR_PROXY_CAN_BE_PROXY;
310325
}
311326

327+
// check message queue
328+
struct stat queueInfo;
329+
std::string msgQueuePathFifo = configInfoLoggerD.msgQueuePath + ".fifo";
330+
log.info("Checking message queue file %s", msgQueuePathFifo.c_str());
331+
if (lstat(msgQueuePathFifo.c_str(), &queueInfo) == 0) {
332+
if (S_ISREG(queueInfo.st_mode)) {
333+
log.info("Pending messages = %ld bytes", (long)queueInfo.st_size);
334+
} else {
335+
log.error("This is not a regular file");
336+
}
337+
} else {
338+
log.info("No pending messages");
339+
}
340+
341+
// if the queue.reset file exists, remove it and reset queue
342+
// (so it's done once only, on this startup)
343+
std::string msgQueuePathReset = configInfoLoggerD.msgQueuePath + ".reset";
344+
if (lstat(msgQueuePathReset.c_str(), &queueInfo) == 0) {
345+
if (S_ISREG(queueInfo.st_mode)) {
346+
log.info("Detected file %s, forcing queue reset", msgQueuePathReset.c_str());
347+
remove(msgQueuePathReset.c_str());
348+
configInfoLoggerD.msgQueueReset = 1;
349+
}
350+
}
351+
352+
if (configInfoLoggerD.msgQueueReset) {
353+
log.info("Clearing queue");
354+
if (remove(msgQueuePathFifo.c_str())) {
355+
log.info("Failed to delete %s", msgQueuePathFifo.c_str());
356+
}
357+
}
358+
312359
hCx = TR_client_start(&cfgCx);
313360
if (hCx == nullptr) {
314361
throw __LINE__;

src/infoLoggerServer.cxx

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ class InfoLoggerServer : public Daemon
8787
unsigned int dbRoundRobinIx = 0;
8888

8989
unsigned long long msgCount = 0;
90+
91+
FILE *msgDump = nullptr;
9092
};
9193

9294
InfoLoggerServer::InfoLoggerServer(int argc, char* argv[]) : Daemon(argc, argv)
@@ -111,6 +113,19 @@ InfoLoggerServer::InfoLoggerServer(int argc, char* argv[]) : Daemon(argc, argv)
111113
throw __LINE__;
112114
}
113115

116+
// create message dump (copy of all incoming messages)
117+
if (configInfoLoggerServer.msgDumpFile.size()) {
118+
time_t t = time(NULL);
119+
std::string f = configInfoLoggerServer.msgDumpFile.c_str();
120+
f += std::to_string(t) + ".log";
121+
msgDump = fopen(f.c_str(),"wb");
122+
if (msgDump != nullptr) {
123+
log.info("Dumping copy of incoming messages to %s", f.c_str());
124+
} else {
125+
log.error("Failed to create message dump file %s", f.c_str());
126+
}
127+
}
128+
114129
// create dispatch engines
115130
try {
116131
//dispatchEngines.push_back(std::make_unique<InfoLoggerDispatchPrint>(&log));
@@ -147,6 +162,9 @@ InfoLoggerServer::~InfoLoggerServer()
147162
}
148163

149164
log.info("Received %llu messages", msgCount);
165+
if (msgDump != nullptr) {
166+
fclose(msgDump);
167+
}
150168
}
151169
}
152170

@@ -164,6 +182,17 @@ Daemon::LoopStatus InfoLoggerServer::doLoop()
164182
//fflush(stdout);
165183
//TR_file_dump(newFile);
166184

185+
// dump new message
186+
if (msgDump != nullptr) {
187+
TR_blob* b;
188+
for (b = newFile->first; b != NULL; b = b->next) {
189+
fprintf(msgDump, "*** begin: %d bytes\n", (int)b->size);
190+
fwrite(b->value, b->size, 1, msgDump);
191+
fprintf(msgDump, "\n*** end\n");
192+
fflush(msgDump);
193+
}
194+
}
195+
167196
// decode raw message
168197
std::shared_ptr<InfoLoggerMessageList> msgList = nullptr;
169198
try {

0 commit comments

Comments
 (0)