@@ -153,6 +153,23 @@ ModelInstanceState::SetErrorForResponseSendMessage(
153153 }
154154}
155155
156+ bool
157+ ModelInstanceState::IsStubProcessAlive ()
158+ {
159+ boost::posix_time::ptime timeout =
160+ boost::get_system_time () + boost::posix_time::seconds (1 );
161+ bi::scoped_lock<bi::interprocess_mutex> lock (*Stub ()->HealthMutex (), timeout);
162+
163+ // Check if lock has been acquired.
164+ if (lock) {
165+ return Stub ()->IpcControl ()->stub_health ;
166+ } else {
167+ // If It failed to obtain the lock, it means that the stub has been
168+ // stuck or exited while holding the health mutex lock.
169+ return false ;
170+ }
171+ }
172+
156173TRITONSERVER_Error*
157174ModelInstanceState::SaveRequestsToSharedMemory (
158175 TRITONBACKEND_Request** requests, const uint32_t request_count,
@@ -1011,11 +1028,43 @@ ModelInstanceState::ProcessModelControlRequest(
10111028 });
10121029}
10131030
1014- void
1031+ TRITONSERVER_Error*
10151032ModelInstanceState::SendMessageToStub (
10161033 bi::managed_external_buffer::handle_t message)
10171034{
1018- Stub ()->StubMessageQueue ()->Push (message);
1035+ // Stub()->StubMessageQueue()->Push(message);
1036+ bool success = false ;
1037+ while (!success) {
1038+ uint64_t timeout_miliseconds = 1000 ;
1039+ {
1040+ boost::posix_time::ptime timeout =
1041+ boost::get_system_time () +
1042+ boost::posix_time::milliseconds (timeout_miliseconds);
1043+
1044+ bi::scoped_lock<bi::interprocess_mutex> lock (
1045+ *(Stub ()->HealthMutex ()), timeout);
1046+
1047+ // Check if lock has been acquired.
1048+ if (lock) {
1049+ Stub ()->IpcControl ()->stub_health = false ;
1050+ } else {
1051+ // If it failed to obtain the lock, it means that the stub has been
1052+ // stuck or exited while holding the health mutex lock.
1053+ return TRITONSERVER_ErrorNew (
1054+ TRITONSERVER_ERROR_INTERNAL, " Failed to obtain the health mutex." );
1055+ }
1056+ }
1057+
1058+ Stub ()->StubMessageQueue ()->Push (
1059+ message, timeout_miliseconds /* duration ms */ , success);
1060+
1061+ if (!success && !IsStubProcessAlive ()) {
1062+ return TRITONSERVER_ErrorNew (
1063+ TRITONSERVER_ERROR_INTERNAL, " Stub process is not healthy." );
1064+ }
1065+ }
1066+
1067+ return nullptr ; // success
10191068}
10201069
10211070void
@@ -1025,10 +1074,29 @@ ModelInstanceState::SendMessageAndReceiveResponse(
10251074 std::shared_ptr<std::vector<TRITONBACKEND_Response*>>& responses,
10261075 TRITONBACKEND_Request** requests, const uint32_t request_count)
10271076{
1028- SendMessageToStub (message);
1077+ // SendMessageToStub(message);
1078+
1079+ // bi::managed_external_buffer::handle_t response_message;
1080+ // Stub()->ReceiveMessageFromStub(response_message);
1081+
1082+ // response = response_message;
1083+
1084+ auto error = SendMessageToStub (message);
1085+ if (error != nullptr ) {
1086+ RespondErrorToAllRequests (
1087+ TRITONSERVER_ErrorMessage (error), responses, requests, request_count);
1088+
1089+ return ;
1090+ }
10291091
10301092 bi::managed_external_buffer::handle_t response_message;
1031- Stub ()->ReceiveMessageFromStub (response_message);
1093+ error = Stub ()->ReceiveMessageFromStub (response_message);
1094+ if (error != nullptr ) {
1095+ RespondErrorToAllRequests (
1096+ TRITONSERVER_ErrorMessage (error), responses, requests, request_count);
1097+
1098+ return ;
1099+ }
10321100
10331101 response = response_message;
10341102}
@@ -1061,6 +1129,7 @@ ModelInstanceState::RespondErrorToAllRequests(
10611129 }
10621130}
10631131
1132+
10641133void
10651134ModelInstanceState::StartMonitor ()
10661135{
@@ -1282,7 +1351,7 @@ ModelInstanceState::ProcessRequests(
12821351 {
12831352 Stub ()->StubMessageQueue ()->Push (ipc_message->ShmHandle ());
12841353 bi::managed_external_buffer::handle_t response_message;
1285- Stub ()->ReceiveMessageFromStub (response_message);
1354+ RETURN_IF_ERROR ( Stub ()->ReceiveMessageFromStub (response_message) );
12861355 response =
12871356 IPCMessage::LoadFromSharedMemory (Stub ()->ShmPool (), response_message);
12881357 }
@@ -1329,26 +1398,34 @@ ModelInstanceState::ProcessRequests(
13291398 }
13301399
13311400 if (response_batch_shm_ptr->batch_size > 0 ) {
1401+ bi::managed_external_buffer::handle_t * response_shm_handle =
1402+ reinterpret_cast <bi::managed_external_buffer::handle_t *>(
1403+ ipc_message_shm + sizeof (ResponseBatch) + sizeof (IPCMessageShm));
1404+
13321405 std::shared_ptr<std::vector<TRITONBACKEND_Response*>> responses (
13331406 new std::vector<TRITONBACKEND_Response*>());
13341407 responses->reserve (request_count);
13351408 for (size_t i = 0 ; i < request_count; i++) {
1336- TRITONBACKEND_Response* response;
1337- auto err = TRITONBACKEND_ResponseNew (&response, requests[i]);
1338- if (err == nullptr ) {
1339- responses->emplace_back (response);
1340- } else {
1409+ // It is possible to have multiple responses batched together in a single
1410+ // response batch shm, where some of the responses are None due to the
1411+ // usage of response sender, so only create a TRITONBACKEND_Response
1412+ // object for the valid responses, and skip the None responses later.
1413+ if (response_shm_handle[i] == 0 ) {
1414+ std::cerr << " === PYBE response_shm_handle is 0 ===" << std::endl;
13411415 responses->emplace_back (nullptr );
1342- LOG_MESSAGE (TRITONSERVER_LOG_ERROR, " Fail to create response" );
1343- TRITONSERVER_ErrorDelete (err);
1416+ } else {
1417+ TRITONBACKEND_Response* response;
1418+ auto err = TRITONBACKEND_ResponseNew (&response, requests[i]);
1419+ if (err == nullptr ) {
1420+ responses->emplace_back (response);
1421+ } else {
1422+ responses->emplace_back (nullptr );
1423+ LOG_MESSAGE (TRITONSERVER_LOG_ERROR, " Fail to create response" );
1424+ TRITONSERVER_ErrorDelete (err);
1425+ }
13441426 }
13451427 }
1346- bi::managed_external_buffer::handle_t * response_shm_handle =
1347- reinterpret_cast <bi::managed_external_buffer::handle_t *>(
1348- ipc_message_shm + sizeof (ResponseBatch) + sizeof (IPCMessageShm));
13491428
1350- // If the output provided by the model is in GPU, we will pass the list of
1351- // buffers provided by Triton to the stub process.
13521429 std::vector<bool > requires_deferred_callback;
13531430
13541431 bool has_gpu_output = false ;
@@ -1360,6 +1437,11 @@ ModelInstanceState::ProcessRequests(
13601437 std::cerr << " === PYBE request_count: " << request_count << std::endl;
13611438 for (uint32_t r = 0 ; r < request_count; ++r) {
13621439 NVTX_RANGE (nvtx_, " LoadingResponse " + Name ());
1440+ if (response_shm_handle[r] == 0 ) {
1441+ std::cerr << " === PYBE skip the response_shm_handle is 0 ==="
1442+ << std::endl;
1443+ continue ;
1444+ }
13631445 TRITONBACKEND_Response* response = (*responses)[r];
13641446 TRITONBACKEND_Request* request = requests[r];
13651447 uint32_t requested_output_count = 0 ;
@@ -1378,13 +1460,14 @@ ModelInstanceState::ProcessRequests(
13781460 continue ;
13791461 }
13801462
1381- if (response_shm_handle[r] == 0 ) {
1382- LOG_IF_ERROR (
1383- TRITONBACKEND_ResponseDelete ((*responses)[r]),
1384- " failed to delete response" );
1385- (*responses)[r] = nullptr ;
1386- continue ;
1387- }
1463+ // if (response_shm_handle[r] == 0) {
1464+ // std::cerr << "=== PYBE response_shm_handle is 0 ===" << std::endl;
1465+ // LOG_IF_ERROR(
1466+ // TRITONBACKEND_ResponseDelete((*responses)[r]),
1467+ // "failed to delete response");
1468+ // (*responses)[r] = nullptr;
1469+ // continue;
1470+ // }
13881471 {
13891472 TRITONBACKEND_ResponseFactory* response_factory =
13901473 reinterpret_cast <TRITONBACKEND_ResponseFactory*>(
@@ -1448,6 +1531,8 @@ ModelInstanceState::ProcessRequests(
14481531 responses, r,
14491532 TRITONBACKEND_RequestOutputName (request, j, &output_name));
14501533 requested_output_names.insert (output_name);
1534+ std::cerr << " === PYBE requested_output_name: " << output_name
1535+ << std::endl;
14511536 }
14521537
14531538 bool require_deferred_callback = false ;
0 commit comments