2929#include " ../StatusArg.h"
3030#include " ../isc_s_proto.h"
3131#include " ../isc_proto.h"
32+ #include " ../utils_proto.h"
3233#include < atomic>
3334#include < chrono>
3435#include < functional>
3940#include < string>
4041#include < utility>
4142#include < cstdint>
43+ #ifdef WIN_NT
44+ #include < process.h>
45+ #else
46+ #include < unistd.h>
47+ #endif
48+
49+ #ifndef WIN_NT
50+ #define IPC_MESSAGE_USE_SHARED_SIGNAL
51+ #endif
52+
53+ #ifdef IPC_MESSAGE_USE_SHARED_SIGNAL
54+ #include " ../ipc/IpcSharedSignal.h"
55+ #else
56+ #include " ../ipc/IpcNamedSignal.h"
57+ #endif
4258
4359namespace Firebird {
4460
4561
46- inline constexpr SLONG IPC_MESSAGE_TIMEOUT_MICROSECONDS = 500'000 ; // 0.5s
62+ inline constexpr auto IPC_MESSAGE_TIMEOUT = std::chrono::milliseconds(500 ); // 0.5s
63+ inline constexpr auto IPC_MESSAGE_SIGNAL_FORMAT = " ipc_message_%d_%d" ;
64+ inline std::atomic_uint32_t IPC_MESSAGE_COUNTER = 0 ;
4765
4866
4967struct IpcMessageParameters final
@@ -96,26 +114,54 @@ class IpcMessageObjectImpl final : public IpcObject
96114public:
97115 struct Header : public MemoryHeader
98116 {
99- event_t receiverEvent;
100- event_t senderEvent;
117+ int32_t ownerPid;
118+ int32_t ownerId;
119+ #ifdef IPC_MESSAGE_USE_SHARED_SIGNAL
120+ IpcSharedSignal receiverSignal;
121+ IpcSharedSignal senderSignal;
122+ #endif
123+ std::atomic_uint8_t receiverFlag;
124+ std::atomic_uint8_t senderFlag;
101125 uint16_t messageLen;
102126 uint8_t messageIndex;
103127 uint8_t messageBuffer[getMaxSize()];
104128 };
105129
106130public:
107- explicit IpcMessageObjectImpl (const IpcMessageParameters& aParameters)
131+ explicit IpcMessageObjectImpl (const IpcMessageParameters& aParameters, bool aIsOwner )
108132 : parameters(aParameters),
109- sharedMemory(parameters.physicalName.c_str(), sizeof(Header), this)
133+ sharedMemory(parameters.physicalName.c_str(), sizeof(Header), this),
134+ isOwner(aIsOwner)
110135 {
111- checkHeader (sharedMemory.getHeader ());
136+ const auto header = sharedMemory.getHeader ();
137+ checkHeader (header);
138+
139+ if (isOwner)
140+ {
141+ header->ownerPid = (int ) getpid ();
142+ header->ownerId = ++IPC_MESSAGE_COUNTER;
143+
144+ #ifdef IPC_MESSAGE_USE_SHARED_SIGNAL
145+ new (&header->receiverSignal ) IpcSharedSignal ();
146+ new (&header->senderSignal ) IpcSharedSignal ();
147+ #endif
148+ }
149+
150+ string signalPrefix;
151+ signalPrefix.printf (IPC_MESSAGE_SIGNAL_FORMAT, (int ) header->ownerPid , (int ) header->ownerId );
152+
153+ #ifdef IPC_MESSAGE_USE_SHARED_SIGNAL
154+ receiverSignal = &header->receiverSignal ;
155+ senderSignal = &header->senderSignal ;
156+ #else
157+ receiverSignal.emplace (signalPrefix + " _r" );
158+ senderSignal.emplace (signalPrefix + " _s" );
159+ #endif
112160 }
113161
114162 ~IpcMessageObjectImpl ()
115163 {
116- const auto header = sharedMemory.getHeader ();
117-
118- if (header->receiverEvent .event_pid == 0 && header->senderEvent .event_pid == 0 )
164+ if (isOwner)
119165 sharedMemory.removeMapFile ();
120166 }
121167
@@ -160,6 +206,14 @@ class IpcMessageObjectImpl final : public IpcObject
160206public:
161207 IpcMessageParameters parameters;
162208 SharedMemory<Header> sharedMemory;
209+ #ifdef IPC_MESSAGE_USE_SHARED_SIGNAL
210+ IpcSharedSignal* receiverSignal = nullptr ;
211+ IpcSharedSignal* senderSignal = nullptr ;
212+ #else
213+ std::optional<IpcNamedSignal> receiverSignal;
214+ std::optional<IpcNamedSignal> senderSignal;
215+ #endif
216+ const bool isOwner;
163217};
164218
165219
@@ -192,7 +246,6 @@ class IpcMessageReceiver final
192246
193247private:
194248 IpcMessageObjectImpl<Message> ipc;
195- SLONG eventCounter = 1 ;
196249 std::atomic_bool disconnected = false ;
197250 std::mutex mutex;
198251};
@@ -237,30 +290,27 @@ class IpcMessageSender final
237290
238291template <MessageConcept Message>
239292inline IpcMessageReceiver<Message>::IpcMessageReceiver(const IpcMessageParameters& parameters)
240- : ipc(parameters)
293+ : ipc(parameters, true )
241294{
242- const auto sharedMemory = &ipc.sharedMemory ;
243- const auto header = sharedMemory->getHeader ();
244-
245- SharedMutexGuard guard (sharedMemory);
246-
247- if (sharedMemory->eventInit (&header->receiverEvent ) != FB_SUCCESS)
248- (Arg::Gds (isc_random) << (ipc.parameters .logicalName + " eventInit(receiverEvent) failed" ).c_str ()).raise ();
249295}
250296
251297template <MessageConcept Message>
252298inline IpcMessageReceiver<Message>::~IpcMessageReceiver ()
253299{
300+ const auto header = ipc.sharedMemory .getHeader ();
301+
254302 disconnect ();
255303
256- const auto sharedMemory = &ipc.sharedMemory ;
257- const auto header = sharedMemory->getHeader ();
304+ #ifdef IPC_MESSAGE_USE_SHARED_SIGNAL
305+ header->receiverSignal .~IpcSharedSignal ();
306+ header->senderSignal .~IpcSharedSignal ();
307+ #elif !defined(WIN_NT)
308+ string signalPrefix;
309+ signalPrefix.printf (IPC_MESSAGE_SIGNAL_FORMAT, (int ) header->ownerPid , (int ) header->ownerId );
258310
259- if (header->receiverEvent .event_pid )
260- {
261- sharedMemory->eventFini (&header->receiverEvent );
262- header->receiverEvent .event_pid = 0 ;
263- }
311+ IpcNamedSignal::remove (signalPrefix + " _r" );
312+ IpcNamedSignal::remove (signalPrefix + " _s" );
313+ #endif
264314}
265315
266316template <MessageConcept Message>
@@ -284,23 +334,27 @@ inline std::optional<Message> IpcMessageReceiver<Message>::receive(std::function
284334 const auto sharedMemory = &ipc.sharedMemory ;
285335 const auto header = sharedMemory->getHeader ();
286336
287- while (sharedMemory->eventWait (&header->receiverEvent , eventCounter, IPC_MESSAGE_TIMEOUT_MICROSECONDS) !=
288- FB_SUCCESS)
337+ while (header->receiverFlag .load (std::memory_order_acquire) == 0 )
289338 {
290- if (disconnected)
291- return std::nullopt ;
339+ if (!ipc.receiverSignal ->wait (IPC_MESSAGE_TIMEOUT))
340+ {
341+ if (disconnected)
342+ return std::nullopt ;
292343
293- if (idleFunc)
294- idleFunc ();
344+ if (idleFunc)
345+ idleFunc ();
346+ }
295347 }
296348
297- eventCounter = sharedMemory->eventClear (&header->receiverEvent );
349+ ipc.receiverSignal ->reset ();
350+
351+ std::optional<Message> messageOpt;
298352
299353 if constexpr (IpcMessageObjectImpl<Message>::isMessagePair)
300354 {
301- std::optional<Message> messageOpt ( std::make_pair (
355+ messageOpt. emplace (
302356 createVariantByIndex<typename Message::first_type>(header->messageIndex ),
303- typename Message::second_type{})) ;
357+ typename Message::second_type{});
304358 auto & varMessage = messageOpt->first ;
305359 auto & fixedMessage = messageOpt->second ;
306360
@@ -309,49 +363,38 @@ inline std::optional<Message> IpcMessageReceiver<Message>::receive(std::function
309363
310364 memcpy (&fixedMessage, header->messageBuffer , sizeof (fixedMessage));
311365 memcpy (span.data (), header->messageBuffer + sizeof (fixedMessage), span.size ());
312-
313- if (sharedMemory->eventPost (&header->senderEvent ) != FB_SUCCESS)
314- (Arg::Gds (isc_random) << (ipc.parameters .logicalName + " eventPost(senderEvent) failed" ).c_str ()).raise ();
315-
316- return messageOpt;
317366 }
318367 else
319368 {
320- std::optional<Message> messageOpt (createVariantByIndex<Message>(header->messageIndex ));
369+ messageOpt. emplace (createVariantByIndex<Message>(header->messageIndex ));
321370 auto & varMessage = messageOpt.value ();
322371
323372 const auto span = getVariantIndexAndSpan (varMessage).second ;
324373 fb_assert (span.size () == header->messageLen );
325374
326375 memcpy (span.data (), header->messageBuffer , span.size ());
376+ }
327377
328- if (sharedMemory->eventPost (&header->senderEvent ) != FB_SUCCESS)
329- (Arg::Gds (isc_random) << (ipc.parameters .logicalName + " eventPost(senderEvent) failed" ).c_str ()).raise ();
378+ header->receiverFlag .store (0 , std::memory_order_release);
330379
331- return messageOpt;
332- }
380+ ipc.senderSignal ->signal ();
381+
382+ header->senderFlag .store (1 , std::memory_order_release);
383+
384+ return messageOpt;
333385}
334386
335387
336388template <MessageConcept Message>
337389inline IpcMessageSender<Message>::IpcMessageSender(const IpcMessageParameters& parameters)
338- : ipc(parameters)
390+ : ipc(parameters, false )
339391{
340392}
341393
342394template <MessageConcept Message>
343395inline IpcMessageSender<Message>::~IpcMessageSender ()
344396{
345397 disconnect ();
346-
347- const auto sharedMemory = &ipc.sharedMemory ;
348- const auto header = sharedMemory->getHeader ();
349-
350- if (header->senderEvent .event_pid )
351- {
352- sharedMemory->eventFini (&header->senderEvent );
353- header->senderEvent .event_pid = 0 ;
354- }
355398}
356399
357400template <MessageConcept Message>
@@ -385,7 +428,7 @@ inline bool IpcMessageSender<Message>::send(const Message& message, std::functio
385428
386429 SharedMutexGuard guard (sharedMemory, false );
387430
388- while (!guard.tryLock (std::chrono::milliseconds (IPC_MESSAGE_TIMEOUT_MICROSECONDS / 1000 ) ))
431+ while (!guard.tryLock (IPC_MESSAGE_TIMEOUT ))
389432 {
390433 if (disconnected)
391434 return false ;
@@ -415,31 +458,26 @@ inline bool IpcMessageSender<Message>::send(const Message& message, std::functio
415458 memcpy (header->messageBuffer , span.data (), span.size ());
416459 }
417460
418- if (sharedMemory->eventInit (&header->senderEvent ) != FB_SUCCESS)
419- (Arg::Gds (isc_random) << (ipc.parameters .logicalName + " eventInit(senderEvent) failed" ).c_str ()).raise ();
461+ header->receiverFlag .store (1 , std::memory_order_release);
420462
421- Cleanup senderEventCleanup ([&] {
422- if (header->senderEvent .event_pid )
423- {
424- sharedMemory->eventFini (&header->senderEvent );
425- header->senderEvent .event_pid = 0 ;
426- }
427- });
428-
429- const SLONG eventCounter = sharedMemory->eventClear (&header->senderEvent );
463+ ipc.receiverSignal ->signal ();
430464
431- if (sharedMemory->eventPost (&header->receiverEvent ) != FB_SUCCESS)
432- (Arg::Gds (isc_random) << (ipc.parameters .logicalName + " eventPost(receiverEvent) failed" ).c_str ()).raise ();
433-
434- while (sharedMemory->eventWait (&header->senderEvent , eventCounter, IPC_MESSAGE_TIMEOUT_MICROSECONDS) != FB_SUCCESS)
465+ while (header->senderFlag .load (std::memory_order_acquire) == 0 )
435466 {
436- if (disconnected)
437- return false ;
467+ if (!ipc.senderSignal ->wait (IPC_MESSAGE_TIMEOUT))
468+ {
469+ if (disconnected)
470+ return false ;
438471
439- if (idleFunc)
440- idleFunc ();
472+ if (idleFunc)
473+ idleFunc ();
474+ }
441475 }
442476
477+ ipc.senderSignal ->reset ();
478+
479+ header->senderFlag .store (0 , std::memory_order_release);
480+
443481 return true ;
444482}
445483
0 commit comments