Skip to content

Commit 3f9d3f7

Browse files
committed
Rework IPC mechanism.
1 parent cbdd8ea commit 3f9d3f7

File tree

3 files changed

+246
-141
lines changed

3 files changed

+246
-141
lines changed

src/common/ipc/IpcMessage.h

Lines changed: 112 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "../StatusArg.h"
3030
#include "../isc_s_proto.h"
3131
#include "../isc_proto.h"
32+
#include "../utils_proto.h"
3233
#include <atomic>
3334
#include <chrono>
3435
#include <functional>
@@ -39,11 +40,28 @@
3940
#include <string>
4041
#include <utility>
4142
#include <cstdint>
43+
#ifdef WIN_NT
44+
#include <process.h>
45+
#else
46+
#include <unistd.h>
47+
#endif
48+
49+
#ifndef WIN_NT
50+
#define IPC_MESSAGE_USE_SHARED_SIGNAL
51+
#endif
52+
53+
#ifdef IPC_MESSAGE_USE_SHARED_SIGNAL
54+
#include "../ipc/IpcSharedSignal.h"
55+
#else
56+
#include "../ipc/IpcNamedSignal.h"
57+
#endif
4258

4359
namespace Firebird {
4460

4561

46-
inline constexpr SLONG IPC_MESSAGE_TIMEOUT_MICROSECONDS = 500'000; // 0.5s
62+
inline constexpr auto IPC_MESSAGE_TIMEOUT = std::chrono::milliseconds(500); // 0.5s
63+
inline constexpr auto IPC_MESSAGE_SIGNAL_FORMAT = "ipc_message_%d_%d";
64+
inline std::atomic_uint32_t IPC_MESSAGE_COUNTER = 0;
4765

4866

4967
struct IpcMessageParameters final
@@ -96,26 +114,54 @@ class IpcMessageObjectImpl final : public IpcObject
96114
public:
97115
struct Header : public MemoryHeader
98116
{
99-
event_t receiverEvent;
100-
event_t senderEvent;
117+
int32_t ownerPid;
118+
int32_t ownerId;
119+
#ifdef IPC_MESSAGE_USE_SHARED_SIGNAL
120+
IpcSharedSignal receiverSignal;
121+
IpcSharedSignal senderSignal;
122+
#endif
123+
std::atomic_uint8_t receiverFlag;
124+
std::atomic_uint8_t senderFlag;
101125
uint16_t messageLen;
102126
uint8_t messageIndex;
103127
uint8_t messageBuffer[getMaxSize()];
104128
};
105129

106130
public:
107-
explicit IpcMessageObjectImpl(const IpcMessageParameters& aParameters)
131+
explicit IpcMessageObjectImpl(const IpcMessageParameters& aParameters, bool aIsOwner)
108132
: parameters(aParameters),
109-
sharedMemory(parameters.physicalName.c_str(), sizeof(Header), this)
133+
sharedMemory(parameters.physicalName.c_str(), sizeof(Header), this),
134+
isOwner(aIsOwner)
110135
{
111-
checkHeader(sharedMemory.getHeader());
136+
const auto header = sharedMemory.getHeader();
137+
checkHeader(header);
138+
139+
if (isOwner)
140+
{
141+
header->ownerPid = (int) getpid();
142+
header->ownerId = ++IPC_MESSAGE_COUNTER;
143+
144+
#ifdef IPC_MESSAGE_USE_SHARED_SIGNAL
145+
new (&header->receiverSignal) IpcSharedSignal();
146+
new (&header->senderSignal) IpcSharedSignal();
147+
#endif
148+
}
149+
150+
string signalPrefix;
151+
signalPrefix.printf(IPC_MESSAGE_SIGNAL_FORMAT, (int) header->ownerPid, (int) header->ownerId);
152+
153+
#ifdef IPC_MESSAGE_USE_SHARED_SIGNAL
154+
receiverSignal = &header->receiverSignal;
155+
senderSignal = &header->senderSignal;
156+
#else
157+
receiverSignal.emplace(signalPrefix + "_r");
158+
senderSignal.emplace(signalPrefix + "_s");
159+
#endif
112160
}
113161

114162
~IpcMessageObjectImpl()
115163
{
116-
const auto header = sharedMemory.getHeader();
117-
118-
if (header->receiverEvent.event_pid == 0 && header->senderEvent.event_pid == 0)
164+
if (isOwner)
119165
sharedMemory.removeMapFile();
120166
}
121167

@@ -160,6 +206,14 @@ class IpcMessageObjectImpl final : public IpcObject
160206
public:
161207
IpcMessageParameters parameters;
162208
SharedMemory<Header> sharedMemory;
209+
#ifdef IPC_MESSAGE_USE_SHARED_SIGNAL
210+
IpcSharedSignal* receiverSignal = nullptr;
211+
IpcSharedSignal* senderSignal = nullptr;
212+
#else
213+
std::optional<IpcNamedSignal> receiverSignal;
214+
std::optional<IpcNamedSignal> senderSignal;
215+
#endif
216+
const bool isOwner;
163217
};
164218

165219

@@ -192,7 +246,6 @@ class IpcMessageReceiver final
192246

193247
private:
194248
IpcMessageObjectImpl<Message> ipc;
195-
SLONG eventCounter = 1;
196249
std::atomic_bool disconnected = false;
197250
std::mutex mutex;
198251
};
@@ -237,30 +290,27 @@ class IpcMessageSender final
237290

238291
template <MessageConcept Message>
239292
inline IpcMessageReceiver<Message>::IpcMessageReceiver(const IpcMessageParameters& parameters)
240-
: ipc(parameters)
293+
: ipc(parameters, true)
241294
{
242-
const auto sharedMemory = &ipc.sharedMemory;
243-
const auto header = sharedMemory->getHeader();
244-
245-
SharedMutexGuard guard(sharedMemory);
246-
247-
if (sharedMemory->eventInit(&header->receiverEvent) != FB_SUCCESS)
248-
(Arg::Gds(isc_random) << (ipc.parameters.logicalName + " eventInit(receiverEvent) failed").c_str()).raise();
249295
}
250296

251297
template <MessageConcept Message>
252298
inline IpcMessageReceiver<Message>::~IpcMessageReceiver()
253299
{
300+
const auto header = ipc.sharedMemory.getHeader();
301+
254302
disconnect();
255303

256-
const auto sharedMemory = &ipc.sharedMemory;
257-
const auto header = sharedMemory->getHeader();
304+
#ifdef IPC_MESSAGE_USE_SHARED_SIGNAL
305+
header->receiverSignal.~IpcSharedSignal();
306+
header->senderSignal.~IpcSharedSignal();
307+
#elif !defined(WIN_NT)
308+
string signalPrefix;
309+
signalPrefix.printf(IPC_MESSAGE_SIGNAL_FORMAT, (int) header->ownerPid, (int) header->ownerId);
258310

259-
if (header->receiverEvent.event_pid)
260-
{
261-
sharedMemory->eventFini(&header->receiverEvent);
262-
header->receiverEvent.event_pid = 0;
263-
}
311+
IpcNamedSignal::remove(signalPrefix + "_r");
312+
IpcNamedSignal::remove(signalPrefix + "_s");
313+
#endif
264314
}
265315

266316
template <MessageConcept Message>
@@ -284,23 +334,27 @@ inline std::optional<Message> IpcMessageReceiver<Message>::receive(std::function
284334
const auto sharedMemory = &ipc.sharedMemory;
285335
const auto header = sharedMemory->getHeader();
286336

287-
while (sharedMemory->eventWait(&header->receiverEvent, eventCounter, IPC_MESSAGE_TIMEOUT_MICROSECONDS) !=
288-
FB_SUCCESS)
337+
while (header->receiverFlag.load(std::memory_order_acquire) == 0)
289338
{
290-
if (disconnected)
291-
return std::nullopt;
339+
if (!ipc.receiverSignal->wait(IPC_MESSAGE_TIMEOUT))
340+
{
341+
if (disconnected)
342+
return std::nullopt;
292343

293-
if (idleFunc)
294-
idleFunc();
344+
if (idleFunc)
345+
idleFunc();
346+
}
295347
}
296348

297-
eventCounter = sharedMemory->eventClear(&header->receiverEvent);
349+
ipc.receiverSignal->reset();
350+
351+
std::optional<Message> messageOpt;
298352

299353
if constexpr (IpcMessageObjectImpl<Message>::isMessagePair)
300354
{
301-
std::optional<Message> messageOpt(std::make_pair(
355+
messageOpt.emplace(
302356
createVariantByIndex<typename Message::first_type>(header->messageIndex),
303-
typename Message::second_type{}));
357+
typename Message::second_type{});
304358
auto& varMessage = messageOpt->first;
305359
auto& fixedMessage = messageOpt->second;
306360

@@ -309,49 +363,38 @@ inline std::optional<Message> IpcMessageReceiver<Message>::receive(std::function
309363

310364
memcpy(&fixedMessage, header->messageBuffer, sizeof(fixedMessage));
311365
memcpy(span.data(), header->messageBuffer + sizeof(fixedMessage), span.size());
312-
313-
if (sharedMemory->eventPost(&header->senderEvent) != FB_SUCCESS)
314-
(Arg::Gds(isc_random) << (ipc.parameters.logicalName + " eventPost(senderEvent) failed").c_str()).raise();
315-
316-
return messageOpt;
317366
}
318367
else
319368
{
320-
std::optional<Message> messageOpt(createVariantByIndex<Message>(header->messageIndex));
369+
messageOpt.emplace(createVariantByIndex<Message>(header->messageIndex));
321370
auto& varMessage = messageOpt.value();
322371

323372
const auto span = getVariantIndexAndSpan(varMessage).second;
324373
fb_assert(span.size() == header->messageLen);
325374

326375
memcpy(span.data(), header->messageBuffer, span.size());
376+
}
327377

328-
if (sharedMemory->eventPost(&header->senderEvent) != FB_SUCCESS)
329-
(Arg::Gds(isc_random) << (ipc.parameters.logicalName + " eventPost(senderEvent) failed").c_str()).raise();
378+
header->receiverFlag.store(0, std::memory_order_release);
330379

331-
return messageOpt;
332-
}
380+
ipc.senderSignal->signal();
381+
382+
header->senderFlag.store(1, std::memory_order_release);
383+
384+
return messageOpt;
333385
}
334386

335387

336388
template <MessageConcept Message>
337389
inline IpcMessageSender<Message>::IpcMessageSender(const IpcMessageParameters& parameters)
338-
: ipc(parameters)
390+
: ipc(parameters, false)
339391
{
340392
}
341393

342394
template <MessageConcept Message>
343395
inline IpcMessageSender<Message>::~IpcMessageSender()
344396
{
345397
disconnect();
346-
347-
const auto sharedMemory = &ipc.sharedMemory;
348-
const auto header = sharedMemory->getHeader();
349-
350-
if (header->senderEvent.event_pid)
351-
{
352-
sharedMemory->eventFini(&header->senderEvent);
353-
header->senderEvent.event_pid = 0;
354-
}
355398
}
356399

357400
template <MessageConcept Message>
@@ -385,7 +428,7 @@ inline bool IpcMessageSender<Message>::send(const Message& message, std::functio
385428

386429
SharedMutexGuard guard(sharedMemory, false);
387430

388-
while (!guard.tryLock(std::chrono::milliseconds(IPC_MESSAGE_TIMEOUT_MICROSECONDS / 1000)))
431+
while (!guard.tryLock(IPC_MESSAGE_TIMEOUT))
389432
{
390433
if (disconnected)
391434
return false;
@@ -415,31 +458,26 @@ inline bool IpcMessageSender<Message>::send(const Message& message, std::functio
415458
memcpy(header->messageBuffer, span.data(), span.size());
416459
}
417460

418-
if (sharedMemory->eventInit(&header->senderEvent) != FB_SUCCESS)
419-
(Arg::Gds(isc_random) << (ipc.parameters.logicalName + " eventInit(senderEvent) failed").c_str()).raise();
461+
header->receiverFlag.store(1, std::memory_order_release);
420462

421-
Cleanup senderEventCleanup([&] {
422-
if (header->senderEvent.event_pid)
423-
{
424-
sharedMemory->eventFini(&header->senderEvent);
425-
header->senderEvent.event_pid = 0;
426-
}
427-
});
428-
429-
const SLONG eventCounter = sharedMemory->eventClear(&header->senderEvent);
463+
ipc.receiverSignal->signal();
430464

431-
if (sharedMemory->eventPost(&header->receiverEvent) != FB_SUCCESS)
432-
(Arg::Gds(isc_random) << (ipc.parameters.logicalName + " eventPost(receiverEvent) failed").c_str()).raise();
433-
434-
while (sharedMemory->eventWait(&header->senderEvent, eventCounter, IPC_MESSAGE_TIMEOUT_MICROSECONDS) != FB_SUCCESS)
465+
while (header->senderFlag.load(std::memory_order_acquire) == 0)
435466
{
436-
if (disconnected)
437-
return false;
467+
if (!ipc.senderSignal->wait(IPC_MESSAGE_TIMEOUT))
468+
{
469+
if (disconnected)
470+
return false;
438471

439-
if (idleFunc)
440-
idleFunc();
472+
if (idleFunc)
473+
idleFunc();
474+
}
441475
}
442476

477+
ipc.senderSignal->reset();
478+
479+
header->senderFlag.store(0, std::memory_order_release);
480+
443481
return true;
444482
}
445483

0 commit comments

Comments
 (0)