3535#include " ../jrd/tra_proto.h"
3636
3737#include < atomic>
38+ #include < mutex>
39+ #include < vector>
40+ #include " boost/interprocess/sync/interprocess_mutex.hpp"
3841
3942#ifdef WIN_NT
4043#include < process.h>
@@ -100,11 +103,13 @@ namespace
100103 event_t clientEvent;
101104 USHORT bufferSize;
102105 std::atomic<Tag> tag;
106+ unsigned seq;
107+ boost::interprocess::interprocess_mutex bufferMutex;
103108 char userName[USERNAME_LENGTH + 1 ]; // \0 if has PROFILE_ANY_ATTACHMENT
104109 alignas (FB_ALIGNMENT) UCHAR buffer[4096 ];
105110 };
106111
107- static const USHORT VERSION = 2 ;
112+ static const USHORT VERSION = 3 ;
108113
109114 public:
110115 ProfilerIpc (thread_db* tdbb, MemoryPool& pool, AttNumber aAttachmentId, bool server = false );
@@ -179,7 +184,7 @@ class Jrd::ProfilerListener final
179184 listener->watcherThread ();
180185 }
181186
182- void processCommand (thread_db* tdbb);
187+ void processCommand (thread_db* tdbb, ProfilerIpc::Tag tag, std::vector<UCHAR>& buffer );
183188
184189private:
185190 Attachment* const attachment;
@@ -736,6 +741,8 @@ ProfilerIpc::ProfilerIpc(thread_db* tdbb, MemoryPool& pool, AttNumber aAttachmen
736741 {
737742 Guard guard (this );
738743
744+ header->seq = 0 ;
745+
739746 if (sharedMemory->eventInit (&header->serverEvent ) != FB_SUCCESS)
740747 (Arg::Gds (isc_random) << " ProfilerIpc eventInit(serverEvent) failed" ).raise ();
741748 }
@@ -817,18 +824,17 @@ void ProfilerIpc::internalSendAndReceive(thread_db* tdbb, Tag tag,
817824 }
818825 });
819826
820- const SLONG value = sharedMemory->eventClear (&header->clientEvent );
827+ const SLONG clientEventCounter = sharedMemory->eventClear (&header->clientEvent );
828+
829+ std::unique_lock bufferMutexLock (header->bufferMutex );
821830
822- const Tag oldTag = header->tag .exchange (tag);
823- switch (oldTag)
831+ switch (header->tag )
824832 {
825833 case Tag::NOP:
826- header->tag = oldTag;
827834 (Arg::Gds (isc_random) << " Remote attachment failed to start listener thread" ).raise ();
828835 break ;
829836
830837 case Tag::SERVER_EXITED:
831- header->tag = oldTag;
832838 (Arg::Gds (isc_random) << " Cannot start remote profile session - attachment exited" ).raise ();
833839 break ;
834840
@@ -846,41 +852,49 @@ void ProfilerIpc::internalSendAndReceive(thread_db* tdbb, Tag tag,
846852 fb_assert (inSize <= sizeof (header->buffer ));
847853 memcpy (header->buffer , in, inSize);
848854
855+ header->tag = tag;
856+ const auto seq = ++header->seq ;
857+
858+ bufferMutexLock.unlock ();
859+
849860 if (sharedMemory->eventPost (&header->serverEvent ) != FB_SUCCESS)
850861 (Arg::Gds (isc_random) << " Cannot start remote profile session - attachment exited" ).raise ();
851862
863+ const SLONG TIMEOUT = 500 * 1000 ; // 0.5 sec
864+ const int serverPID = header->serverEvent .event_pid ;
865+
866+ while (true )
852867 {
853- const SLONG TIMEOUT = 500 * 1000 ; // 0.5 sec
868+ { // scope
869+ EngineCheckout cout (tdbb, FB_FUNCTION);
854870
855- const int serverPID = header->serverEvent .event_pid ;
856- while (true )
857- {
871+ if (sharedMemory->eventWait (&header->clientEvent , clientEventCounter, TIMEOUT) == FB_SUCCESS)
872+ break ;
873+
874+ if (serverPID != getpid () && !ISC_check_process_existence (serverPID))
858875 {
859- EngineCheckout cout (tdbb, FB_FUNCTION);
860- if (sharedMemory->eventWait (&header->clientEvent , value, TIMEOUT) == FB_SUCCESS)
861- break ;
876+ // Server process was died or exited
877+ fb_assert ((header->tag == tag) || header->tag == Tag::SERVER_EXITED);
862878
863- if (serverPID != getpid () && ! ISC_check_process_existence (serverPID) )
879+ if (header-> tag == tag )
864880 {
865- // Server process was died or exited
866- fb_assert ((header->tag == tag) || header->tag == Tag::SERVER_EXITED);
867-
868- if (header->tag == tag)
881+ header->tag = Tag::SERVER_EXITED;
882+ if (header->serverEvent .event_pid )
869883 {
870- header->tag = Tag::SERVER_EXITED;
871- if (header->serverEvent .event_pid )
872- {
873- sharedMemory->eventFini (&header->serverEvent );
874- header->serverEvent .event_pid = 0 ;
875- }
884+ sharedMemory->eventFini (&header->serverEvent );
885+ header->serverEvent .event_pid = 0 ;
876886 }
877- break ;
878887 }
888+
889+ break ;
879890 }
880- JRD_reschedule (tdbb, true );
881891 }
892+
893+ JRD_reschedule (tdbb, true );
882894 }
883895
896+ bufferMutexLock.lock ();
897+
884898 switch (header->tag )
885899 {
886900 case Tag::SERVER_EXITED:
@@ -977,7 +991,7 @@ void ProfilerListener::watcherThread()
977991 {
978992 while (!exiting)
979993 {
980- const SLONG value = sharedMemory->eventClear (&header->serverEvent );
994+ const SLONG serverEventCounter = sharedMemory->eventClear (&header->serverEvent );
981995
982996 if (startup)
983997 {
@@ -986,18 +1000,39 @@ void ProfilerListener::watcherThread()
9861000 }
9871001 else
9881002 {
1003+ ProfilerIpc::Tag tag;
1004+ unsigned seq;
1005+ std::vector<UCHAR> buffer;
1006+
9891007 fb_assert (header->tag >= ProfilerIpc::Tag::FIRST_CLIENT_OP);
9901008
9911009 try
9921010 {
9931011 FbLocalStatus statusVector;
9941012 EngineContextHolder tdbb (&statusVector, attachment->getInterface (), FB_FUNCTION);
9951013
996- processCommand (tdbb);
997- header->tag = ProfilerIpc::Tag::RESPONSE;
1014+ { // scope
1015+ std::unique_lock bufferMutexLock (header->bufferMutex );
1016+
1017+ if (header->userName [0 ] && attachment->getUserName () != header->userName )
1018+ status_exception::raise (Arg::Gds (isc_miss_prvlg) << " PROFILE_ANY_ATTACHMENT" );
1019+
1020+ fb_assert (header->tag >= ProfilerIpc::Tag::FIRST_CLIENT_OP);
1021+
1022+ tag = header->tag ;
1023+ seq = header->seq ;
1024+ buffer.resize (header->bufferSize );
1025+ memcpy (buffer.data (), header->buffer , header->bufferSize );
1026+ }
1027+
1028+ processCommand (tdbb, tag, buffer);
1029+
1030+ tag = ProfilerIpc::Tag::RESPONSE;
9981031 }
9991032 catch (const status_exception& e)
10001033 {
1034+ tag = ProfilerIpc::Tag::EXCEPTION;
1035+
10011036 // // TODO: Serialize status vector instead of formated message.
10021037
10031038 const ISC_STATUS* status = e.value ();
@@ -1012,32 +1047,51 @@ void ProfilerListener::watcherThread()
10121047 errorMsg += temp;
10131048 }
10141049
1015- header->bufferSize = MIN (errorMsg.length (), sizeof (header->buffer ) - 1 );
1016- strncpy ((char *) header->buffer , errorMsg.c_str (), sizeof (header->buffer ));
1017- header->buffer [header->bufferSize ] = ' \0 ' ;
1018-
1019- header->tag = ProfilerIpc::Tag::EXCEPTION;
1050+ header->bufferSize = MIN (errorMsg.length (), sizeof (header->buffer ));
1051+ memcpy (header->buffer , errorMsg.c_str (), header->bufferSize );
10201052 }
10211053
1022- sharedMemory->eventPost (&header->clientEvent );
1054+ fb_assert (buffer.size () <= sizeof (header->buffer ));
1055+
1056+ { // scope
1057+ std::unique_lock bufferMutexLock (header->bufferMutex , std::try_to_lock);
1058+
1059+ // Otherwise a client lost interest in the response.
1060+ if (bufferMutexLock.owns_lock () && header->seq == seq)
1061+ {
1062+ if (header->seq == seq)
1063+ {
1064+ header->tag = tag;
1065+ header->bufferSize = buffer.size ();
1066+ memcpy (header->buffer , buffer.data (), buffer.size ());
1067+
1068+ sharedMemory->eventPost (&header->clientEvent );
1069+ }
1070+ }
1071+ }
10231072 }
10241073
10251074 if (exiting)
10261075 break ;
10271076
1028- sharedMemory->eventWait (&header->serverEvent , value , 0 );
1077+ sharedMemory->eventWait (&header->serverEvent , serverEventCounter , 0 );
10291078 }
10301079 }
10311080 catch (const Exception& ex)
10321081 {
10331082 iscLogException (" Error in profiler watcher thread\n " , ex);
10341083 }
10351084
1036- const ProfilerIpc::Tag oldTag = header->tag .exchange (ProfilerIpc::Tag::SERVER_EXITED);
1037- if (oldTag >= ProfilerIpc::Tag::FIRST_CLIENT_OP)
1038- {
1039- fb_assert (header->clientEvent .event_pid );
1040- sharedMemory->eventPost (&header->clientEvent );
1085+ { // scope
1086+ std::unique_lock bufferMutexLock (header->bufferMutex );
1087+
1088+ if (header->tag >= ProfilerIpc::Tag::FIRST_CLIENT_OP)
1089+ {
1090+ fb_assert (header->clientEvent .event_pid );
1091+ sharedMemory->eventPost (&header->clientEvent );
1092+ }
1093+
1094+ header->tag = ProfilerIpc::Tag::SERVER_EXITED;
10411095 }
10421096
10431097 try
@@ -1051,70 +1105,75 @@ void ProfilerListener::watcherThread()
10511105 }
10521106}
10531107
1054- void ProfilerListener::processCommand (thread_db* tdbb)
1108+ void ProfilerListener::processCommand (thread_db* tdbb, ProfilerIpc::Tag tag, std::vector<UCHAR>& buffer )
10551109{
1056- const auto header = ipc->sharedMemory ->getHeader ();
10571110 const auto profilerManager = attachment->getProfilerManager (tdbb);
10581111
1059- if (header->userName [0 ] && attachment->getUserName () != header->userName )
1060- status_exception::raise (Arg::Gds (isc_miss_prvlg) << " PROFILE_ANY_ATTACHMENT" );
1061-
10621112 using Tag = ProfilerIpc::Tag;
10631113
1064- switch (header-> tag )
1114+ switch (tag)
10651115 {
10661116 case Tag::CANCEL_SESSION:
1117+ fb_assert (buffer.empty ());
10671118 profilerManager->cancelSession ();
1068- header-> bufferSize = 0 ;
1119+ buffer. resize ( 0 ) ;
10691120 break ;
10701121
10711122 case Tag::DISCARD:
1123+ fb_assert (buffer.empty ());
10721124 profilerManager->discard ();
1073- header-> bufferSize = 0 ;
1125+ buffer. resize ( 0 ) ;
10741126 break ;
10751127
10761128 case Tag::FINISH_SESSION:
10771129 {
1078- const auto in = reinterpret_cast <const ProfilerPackage::FinishSessionInput::Type*>(header->buffer );
1079- fb_assert (sizeof (*in) == header->bufferSize );
1130+ const auto in = reinterpret_cast <const ProfilerPackage::FinishSessionInput::Type*>(buffer.data ());
1131+ fb_assert (sizeof (*in) == buffer.size ());
1132+
10801133 profilerManager->finishSession (tdbb, in->flush );
1081- header->bufferSize = 0 ;
1134+
1135+ buffer.resize (0 );
10821136 break ;
10831137 }
10841138
10851139 case Tag::FLUSH:
1140+ fb_assert (buffer.empty ());
10861141 profilerManager->flush ();
1087- header-> bufferSize = 0 ;
1142+ buffer. resize ( 0 ) ;
10881143 break ;
10891144
10901145 case Tag::PAUSE_SESSION:
10911146 {
1092- const auto in = reinterpret_cast <const ProfilerPackage::PauseSessionInput::Type*>(header->buffer );
1093- fb_assert (sizeof (*in) == header->bufferSize );
1147+ const auto in = reinterpret_cast <const ProfilerPackage::PauseSessionInput::Type*>(buffer.data ());
1148+ fb_assert (sizeof (*in) == buffer.size ());
1149+
10941150 profilerManager->pauseSession (in->flush );
1095- header->bufferSize = 0 ;
1151+
1152+ buffer.resize (0 );
10961153 break ;
10971154 }
10981155
10991156 case Tag::RESUME_SESSION:
1157+ fb_assert (buffer.empty ());
11001158 profilerManager->resumeSession ();
1101- header-> bufferSize = 0 ;
1159+ buffer. resize ( 0 ) ;
11021160 break ;
11031161
11041162 case Tag::SET_FLUSH_INTERVAL:
11051163 {
1106- const auto in = reinterpret_cast <const ProfilerPackage::SetFlushIntervalInput::Type*>(header-> buffer );
1107- fb_assert (sizeof (*in) == header-> bufferSize );
1164+ const auto in = reinterpret_cast <const ProfilerPackage::SetFlushIntervalInput::Type*>(buffer. data () );
1165+ fb_assert (sizeof (*in) == buffer. size () );
11081166
11091167 profilerManager->setFlushInterval (in->flushInterval );
1110- header->bufferSize = 0 ;
1168+
1169+ buffer.resize (0 );
11111170 break ;
11121171 }
11131172
11141173 case Tag::START_SESSION:
11151174 {
1116- const auto in = reinterpret_cast <const ProfilerPackage::StartSessionInput::Type*>(header-> buffer );
1117- fb_assert (sizeof (*in) == header-> bufferSize );
1175+ const auto in = reinterpret_cast <const ProfilerPackage::StartSessionInput::Type*>(buffer. data () );
1176+ fb_assert (sizeof (*in) == buffer. size () );
11181177
11191178 const string description (in->description .str ,
11201179 in->descriptionNull ? 0 : in->description .length );
@@ -1125,14 +1184,12 @@ void ProfilerListener::processCommand(thread_db* tdbb)
11251184 const string pluginOptions (in->pluginOptions .str ,
11261185 in->pluginOptionsNull ? 0 : in->pluginOptions .length );
11271186
1128- const auto out = reinterpret_cast <ProfilerPackage::StartSessionOutput::Type*>(header->buffer );
1129- static_assert (sizeof (*out) <= sizeof (header->buffer ), " Buffer size too small" );
1130- header->bufferSize = sizeof (*out);
1187+ const auto out = reinterpret_cast <ProfilerPackage::StartSessionOutput::Type*>(buffer.data ());
1188+ buffer.resize (sizeof (*out));
11311189
11321190 out->sessionIdNull = FB_FALSE;
11331191 out->sessionId = profilerManager->startSession (tdbb, flushInterval,
11341192 pluginName, description, pluginOptions);
1135-
11361193 break ;
11371194 }
11381195
0 commit comments