Skip to content

Commit ff8de21

Browse files
authored
Merge pull request #40 from sy-c/master
v1.3.10
2 parents 5edc7db + 4f9efeb commit ff8de21

File tree

5 files changed

+164
-28
lines changed

5 files changed

+164
-28
lines changed

CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ set(INFOLOGGER_LIB_OBJECTS
119119
$<TARGET_OBJECTS:objCommonConfiguration>
120120
$<TARGET_OBJECTS:objCommonSimpleLog>
121121
$<TARGET_OBJECTS:objCommonLineBuffer>
122+
$<TARGET_OBJECTS:objCommonTimer>
122123
)
123124

124125
# shared library
@@ -380,7 +381,7 @@ foreach (f ${TEST_SRCS})
380381
string(REGEX REPLACE ".cxx" "" test_name ${test_name})
381382
set(exe "${test_name}.exe")
382383
message ("${exe}")
383-
add_executable(${exe} ${f} ${INFOLOGGER_LIB_OBJECTS} $<TARGET_OBJECTS:objCommonTimer>)
384+
add_executable(${exe} ${f} ${INFOLOGGER_LIB_OBJECTS})
384385
target_link_libraries(${exe} InfoLogger)
385386
target_include_directories(${exe} PRIVATE ${COMMON_STANDALONE_INCLUDE_DIRS} src)
386387
add_test(NAME ${test_name} COMMAND ${exe})

doc/releaseNotes.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,6 @@ This file describes the main feature changes for each InfoLogger released versio
3636

3737
## v1.3.9 - 09/03/2020
3838
- upgrading to Common v1.4.9, providing support for log rotate daemon logs.
39+
40+
## v1.3.10 - 04/08/2020
41+
- added automatic reconnection from clients to infoLoggerD

src/InfoLoggerClient.cxx

Lines changed: 125 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ ConfigInfoLoggerClient::ConfigInfoLoggerClient(const char* configPath)
6060
config.getOptionalValue<std::string>(INFOLOGGER_CONFIG_SECTION_NAME_CLIENT ".txSocketPath", txSocketPath);
6161
config.getOptionalValue<int>(INFOLOGGER_CONFIG_SECTION_NAME_CLIENT ".txSocketOutBufferSize", txSocketOutBufferSize);
6262
config.getOptionalValue<std::string>(INFOLOGGER_CONFIG_SECTION_NAME_CLIENT ".logFile", logFile);
63+
config.getOptionalValue<double>(INFOLOGGER_CONFIG_SECTION_NAME_CLIENT ".reconnectTimeout", reconnectTimeout);
64+
config.getOptionalValue<int>(INFOLOGGER_CONFIG_SECTION_NAME_CLIENT ".maxMessagesBuffered", maxMessagesBuffered);
6365
}
6466
}
6567

@@ -73,6 +75,8 @@ void ConfigInfoLoggerClient::resetConfig()
7375
txSocketPath = INFOLOGGER_DEFAULT_LOCAL_SOCKET;
7476
txSocketOutBufferSize = -1;
7577
logFile = "/dev/null";
78+
reconnectTimeout = 5.0;
79+
maxMessagesBuffered = 1000;
7680
}
7781

7882
//////////////////////////////////////////////////////
@@ -83,51 +87,49 @@ void ConfigInfoLoggerClient::resetConfig()
8387
// class InfoLoggerClient
8488
// handles connection from a process to local infoLoggerD
8589
/////////////////////////////////////////////////////////
86-
87-
InfoLoggerClient::InfoLoggerClient()
88-
{
90+
int InfoLoggerClient::connect() {
8991
txSocket = -1;
90-
9192
isInitialized = 0;
92-
log.setLogFile(cfg.logFile.c_str());
9393

9494
try {
9595

9696
// create receiving socket for incoming messages
97-
log.info("Creating transmission socket named %s", cfg.txSocketPath.c_str());
97+
if (isVerbose) log.info("Creating transmission socket named %s", cfg.txSocketPath.c_str());
9898
txSocket = socket(PF_LOCAL, SOCK_STREAM, 0);
9999
if (txSocket == -1) {
100-
log.error("Could not create socket: %s", strerror(errno));
100+
if (isVerbose) log.error("Could not create socket: %s", strerror(errno));
101101
throw __LINE__;
102102
}
103103

104+
/*
104105
// configure socket mode
105106
int socketMode = fcntl(txSocket, F_GETFL);
106107
if (socketMode == -1) {
107-
log.error("fcntl(F_GETFL) failed: %s", strerror(errno));
108+
if (isVerbose) log.error("fcntl(F_GETFL) failed: %s", strerror(errno));
108109
throw __LINE__;
109110
}
110111
//socketMode=(socketMode | O_NONBLOCK); // non-blocking
111112
if (fcntl(txSocket, F_SETFL, socketMode) == -1) {
112-
log.error("fcntl(F_SETFL) failed: %s", strerror(errno));
113+
if (isVerbose) log.error("fcntl(F_SETFL) failed: %s", strerror(errno));
113114
throw __LINE__;
114115
}
115-
116+
*/
117+
116118
// configure socket TX buffer size
117119
if (cfg.txSocketOutBufferSize != -1) {
118120
int txBufSize = cfg.txSocketOutBufferSize;
119121
socklen_t optLen = sizeof(txBufSize);
120122
if (setsockopt(txSocket, SOL_SOCKET, SO_RCVBUF, &txBufSize, optLen) == -1) {
121-
log.error("setsockopt() failed: %s", strerror(errno));
123+
if (isVerbose) log.error("setsockopt() failed: %s", strerror(errno));
122124
throw __LINE__;
123125
}
124126
if (getsockopt(txSocket, SOL_SOCKET, SO_RCVBUF, &txBufSize, &optLen) == -1) {
125-
log.error("getsockopt() failed: %s", strerror(errno));
127+
if (isVerbose) log.error("getsockopt() failed: %s", strerror(errno));
126128
throw __LINE__;
127129
}
128130
txBufSize /= 2; // SO_SNDBUF Linux doubles the value requested.
129131
if (txBufSize != cfg.txSocketOutBufferSize) {
130-
log.warning("Could not set desired buffer size: got %d bytes instead of %d", txBufSize, cfg.txSocketOutBufferSize);
132+
if (isVerbose) log.warning("Could not set desired buffer size: got %d bytes instead of %d", txBufSize, cfg.txSocketOutBufferSize);
131133
throw __LINE__;
132134
}
133135
}
@@ -137,7 +139,7 @@ InfoLoggerClient::InfoLoggerClient()
137139
bzero(&socketAddrr, sizeof(socketAddrr));
138140
socketAddrr.sun_family = PF_LOCAL;
139141
if (cfg.txSocketPath.length() + 2 > sizeof(socketAddrr.sun_path)) {
140-
log.error("Socket name too long: max allowed is %d", (int)sizeof(socketAddrr.sun_path) - 2);
142+
if (isVerbose) log.error("Socket name too long: max allowed is %d", (int)sizeof(socketAddrr.sun_path) - 2);
141143
throw __LINE__;
142144
}
143145
// if name starts with '/', use normal socket name. if not, use an abstract socket name
@@ -148,17 +150,43 @@ InfoLoggerClient::InfoLoggerClient()
148150
// leave first char 0, to get abstract socket name - see man 7 unix
149151
strncpy(&socketAddrr.sun_path[1], cfg.txSocketPath.c_str(), cfg.txSocketPath.length());
150152
}
151-
if (connect(txSocket, (struct sockaddr*)&socketAddrr, sizeof(socketAddrr))) {
152-
log.error("Failed to connect to infoLoggerD: %s", strerror(errno));
153+
if (::connect(txSocket, (struct sockaddr*)&socketAddrr, sizeof(socketAddrr))) {
154+
if (isVerbose) log.error("Failed to connect to infoLoggerD: %s", strerror(errno));
153155
throw __LINE__;
154156
}
155157

156158
isInitialized = 1;
157-
log.info("Connected to infoLoggerD");
159+
if (isVerbose) log.info("Connected to infoLoggerD");
158160
}
159161

160162
catch (int errLine) {
161-
log.error("infoLoggerClient failed to initialize - error %d", errLine);
163+
if (!isInitialized) {
164+
disconnect();
165+
}
166+
return errLine;
167+
}
168+
return 0;
169+
}
170+
171+
void InfoLoggerClient::disconnect() {
172+
if (txSocket >= 0) {
173+
if (isVerbose) log.info("Closing transmission socket");
174+
close(txSocket);
175+
}
176+
txSocket = -1;
177+
isInitialized = 0;
178+
}
179+
180+
InfoLoggerClient::InfoLoggerClient()
181+
{
182+
log.setLogFile(cfg.logFile.c_str());
183+
reconnectThreadCleanup();
184+
int errLine = connect();
185+
// shutdown detail messages after first connect attempt
186+
isVerbose = 0;
187+
if (errLine) {
188+
log.error("infoLoggerClient failed to initialize - error %d", errLine);
189+
reconnectThreadStart();
162190
}
163191
}
164192

@@ -169,23 +197,98 @@ int InfoLoggerClient::isOk()
169197

170198
InfoLoggerClient::~InfoLoggerClient()
171199
{
172-
if (txSocket >= 0) {
173-
log.info("Closing transmission socket");
174-
close(txSocket);
200+
reconnectThreadCleanup();
201+
disconnect();
202+
if (messageBuffer.size()) {
203+
log.error("Some messages still in buffer, %d will be lost",(int)messageBuffer.size());
175204
}
176205
}
177206

178207
int InfoLoggerClient::send(const char* message, unsigned int messageSize)
179208
{
209+
mutex.lock();
180210
if (txSocket >= 0) {
181-
int bytesWritten = write(txSocket, message, messageSize);
211+
// if there was a previous reconnection thread, it has now finished
212+
// otherwise there would not be a socket to write to
213+
// so, clean it up if needed
214+
reconnectThreadCleanup();
215+
216+
int flags = MSG_NOSIGNAL;
217+
int bytesWritten = ::send(txSocket, message, messageSize, MSG_NOSIGNAL);
182218
if (bytesWritten == (int)messageSize) {
219+
mutex.unlock();
183220
return 0;
184221
}
222+
log.error("Failed to send message");
223+
// launch a thread for automatic reconnect, and buffer (reasonably) messages until then
224+
reconnectThreadStart();
225+
}
226+
if (messageBuffer.size()<cfg.maxMessagesBuffered) {
227+
messageBuffer.push(message);
228+
if (messageBuffer.size()==cfg.maxMessagesBuffered) {
229+
log.warning("Max buffer size reached, next messages will be lost until reconnect");
230+
}
185231
}
232+
mutex.unlock();
186233
return -1;
187234
}
188235

236+
void InfoLoggerClient::reconnect() {
237+
for (;!reconnectAbort;) {
238+
bool isOk = 0;
239+
if (reconnectTimer.isTimeout()) {
240+
if (mutex.try_lock()) {
241+
if (connect() == 0) {
242+
isOk = 1;
243+
log.info("Reconnection successful");
244+
int nFlushed = 0;
245+
while(!messageBuffer.empty()) {
246+
int flags = MSG_NOSIGNAL;
247+
size_t messageSize = messageBuffer.front().size();
248+
int bytesWritten = ::send(txSocket, messageBuffer.front().c_str(), messageSize, MSG_NOSIGNAL);
249+
if (bytesWritten == (int)messageSize) {
250+
messageBuffer.pop();
251+
nFlushed++;
252+
} else {
253+
log.info("Failed to flush buffer, will reconnect");
254+
disconnect();
255+
isOk = 0;
256+
}
257+
}
258+
if (nFlushed) {
259+
log.info("%d messages flushed from buffer", nFlushed);
260+
}
261+
}
262+
mutex.unlock();
263+
if (isOk) {
264+
break;
265+
} else {
266+
if (isVerbose) log.info("reconnect failed");
267+
}
268+
reconnectTimer.reset(cfg.reconnectTimeout);
269+
}
270+
}
271+
usleep(200000);
272+
}
273+
}
274+
275+
void InfoLoggerClient::reconnectThreadCleanup() {
276+
if (reconnectThread != nullptr) {
277+
reconnectAbort = 1;
278+
reconnectThread->join();
279+
reconnectThread = nullptr;
280+
}
281+
}
282+
void InfoLoggerClient::reconnectThreadStart() {
283+
disconnect();
284+
reconnectAbort = 0;
285+
std::function<void(void)> loop = std::bind(&InfoLoggerClient::reconnect, this);
286+
log.info("Will try reconnecting at %.2lfs interval", cfg.reconnectTimeout);
287+
isVerbose = 0;
288+
reconnectTimer.reset(cfg.reconnectTimeout*1000000.0);
289+
reconnectThread = std::make_unique<std::thread>(loop);
290+
}
291+
189292
/////////////////////////////////////////////////////////
190293
// end of class InfoLoggerClient
191294
/////////////////////////////////////////////////////////

src/InfoLoggerClient.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99
// or submit itself to any jurisdiction.
1010

1111
#include <Common/SimpleLog.h>
12+
#include <Common/Timer.h>
13+
#include <queue>
14+
#include <mutex>
15+
#include <thread>
1216

1317
// class to communicate with local infoLoggerD process
1418

@@ -21,8 +25,10 @@ class ConfigInfoLoggerClient
2125
void resetConfig(); // set default configuration parameters
2226

2327
std::string txSocketPath; // name of socket used to receive log messages from clients
24-
int txSocketOutBufferSize;
28+
int txSocketOutBufferSize; // buffer size of outgoing socket. -1 for system default.
2529
std::string logFile; // log file for internal library logs
30+
double reconnectTimeout; // retry timeout for infoLoggerD reconnect (seconds)
31+
int maxMessagesBuffered; // max messages in buffer when reconnect pending
2632
};
2733

2834
class InfoLoggerClient
@@ -42,7 +48,20 @@ class InfoLoggerClient
4248
private:
4349
ConfigInfoLoggerClient cfg;
4450

51+
int connect(); // connect to infoLoggerD. returns 0 on success, or an error code.
52+
void disconnect(); // disconnect infoLoggerD.
4553
int isInitialized; // set to 1 when object initialized with success, 0 otherwise
4654
SimpleLog log; // object for daemon logging, as defined in config
4755
int txSocket; // socket to infoLoggerD. >=0 if set.
56+
57+
AliceO2::Common::Timer reconnectTimer; // try to reconnect
58+
bool reconnectNeeded; // set when need to reconnect after timeout
59+
std::queue<std::string> messageBuffer; // pending messages
60+
std::mutex mutex; // lock for exclusive access to buffer
61+
std::unique_ptr<std::thread> reconnectThread; // thread trying to reconnect to infoLoggerD
62+
int reconnectAbort; // flag set to stop thread
63+
void reconnect(); // thread loop
64+
void reconnectThreadCleanup(); // cleanup thread resources
65+
void reconnectThreadStart(); // start reconnection thread
66+
bool isVerbose = true;
4867
};

test/testInfoLoggerPerf.cxx

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ int main(int argc, char* argv[])
2828
int maxMsgSize = 100; // max size of message to send
2929
int sizeIsRandom = 0; // flag set to get a random message size (up to maxMsgSize)
3030
int noOutput = 0; // when set, messages are created but not sent
31-
31+
int delay = 0; // delay in microseconds between messages
32+
3233
// parse command line parameters
3334
int option;
34-
while ((option = getopt(argc, argv, "c:s:rn")) != -1) {
35+
while ((option = getopt(argc, argv, "c:s:rnd:")) != -1) {
3536
switch (option) {
3637
case 'c':
3738
maxMsgCount = atoi(optarg);
@@ -45,10 +46,13 @@ int main(int argc, char* argv[])
4546
case 'n':
4647
noOutput = 1;
4748
break;
49+
case 'd':
50+
delay = atoi(optarg);
51+
break;
4852
}
4953
}
5054

51-
printf("Generating log messages: maxMsgCount=%d maxMsgSize=%d sizeIsRandom=%d\n", maxMsgCount, maxMsgSize, sizeIsRandom);
55+
printf("Generating log messages: maxMsgCount=%d maxMsgSize=%d sizeIsRandom=%d delay=%d\n", maxMsgCount, maxMsgSize, sizeIsRandom, delay);
5256

5357
char* msgBuffer = (char*)malloc(maxMsgSize + 1);
5458
if (msgBuffer == nullptr)
@@ -78,10 +82,16 @@ int main(int argc, char* argv[])
7882
theLog.log("%s", msgBuffer);
7983
}
8084
msgBuffer[sz] = cBak;
85+
if (delay > 0) {
86+
usleep(delay);
87+
}
8188
}
8289
double t = theTimer.getTime();
8390
printf("Done in %lf seconds\n", t);
8491
printf("%.2lf msg/s\n", maxMsgCount / t);
85-
92+
if (msgBuffer != nullptr) {
93+
free(msgBuffer);
94+
}
95+
8696
return 0;
8797
}

0 commit comments

Comments
 (0)