@@ -291,7 +291,7 @@ ModelInstanceState::SaveRequestsToSharedMemory(
291291
292292 std::unique_ptr<InferRequest> infer_request;
293293 TRITONBACKEND_ResponseFactory* factory_ptr = nullptr ;
294- // RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request));
294+ RETURN_IF_ERROR (TRITONBACKEND_ResponseFactoryNew (&factory_ptr, request));
295295
296296 infer_request = std::make_unique<InferRequest>(
297297 id, correlation_id, pb_input_tensors, requested_output_names,
@@ -1009,6 +1009,62 @@ ModelInstanceState::ProcessModelControlRequest(
10091009 });
10101010}
10111011
1012+ void
1013+ ModelInstanceState::SendMessageToStub (
1014+ bi::managed_external_buffer::handle_t message)
1015+ {
1016+ Stub ()->StubMessageQueue ()->Push (message);
1017+ }
1018+
1019+ void
1020+ ModelInstanceState::SendMessageAndReceiveResponse (
1021+ bi::managed_external_buffer::handle_t message,
1022+ bi::managed_external_buffer::handle_t & response,
1023+ std::shared_ptr<std::vector<TRITONBACKEND_Response*>>& responses,
1024+ TRITONBACKEND_Request** requests, const uint32_t request_count)
1025+ {
1026+ SendMessageToStub (message);
1027+
1028+ bi::managed_external_buffer::handle_t response_message;
1029+ auto error = Stub ()->ReceiveMessageFromStub (response_message);
1030+ if (error != nullptr ) {
1031+ RespondErrorToAllRequests (
1032+ TRITONSERVER_ErrorMessage (error), responses, requests, request_count);
1033+
1034+ return ;
1035+ }
1036+
1037+ response = response_message;
1038+ }
1039+
1040+ void
1041+ ModelInstanceState::RespondErrorToAllRequests (
1042+ const char * message,
1043+ std::shared_ptr<std::vector<TRITONBACKEND_Response*>>& responses,
1044+ TRITONBACKEND_Request** requests, const uint32_t request_count)
1045+ {
1046+ for (uint32_t r = 0 ; r < request_count; ++r) {
1047+ if ((*responses)[r] == nullptr )
1048+ continue ;
1049+
1050+ std::string err_message =
1051+ std::string (
1052+ " Failed to process the request(s) for model instance '" + Name () +
1053+ " ', message: " ) +
1054+ message;
1055+
1056+ TRITONSERVER_Error* err =
1057+ TRITONSERVER_ErrorNew (TRITONSERVER_ERROR_INTERNAL, err_message.c_str ());
1058+ LOG_IF_ERROR (
1059+ TRITONBACKEND_ResponseSend (
1060+ (*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err),
1061+ " failed sending response" );
1062+
1063+ (*responses)[r] = nullptr ;
1064+ TRITONSERVER_ErrorDelete (err);
1065+ }
1066+ }
1067+
10121068void
10131069ModelInstanceState::StartMonitor ()
10141070{
@@ -1164,6 +1220,12 @@ ModelInstanceState::ResponseSendDecoupled(
11641220 SetErrorForResponseSendMessage (
11651221 send_message_payload, WrapTritonErrorInSharedPtr (error), error_message);
11661222 }
1223+
1224+ if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
1225+ std::unique_ptr<
1226+ TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
1227+ lresponse_factory (reinterpret_cast <TRITONBACKEND_ResponseFactory*>(response_factory));
1228+ }
11671229}
11681230
11691231TRITONSERVER_Error*
@@ -1265,6 +1327,7 @@ ModelInstanceState::ProcessRequests(
12651327 // bool has_gpu_output = false;
12661328 std::vector<bool > requires_deferred_callback;
12671329
1330+ bool has_gpu_output = false ;
12681331 std::vector<std::unique_ptr<InferResponse>> shm_responses;
12691332 std::vector<std::vector<std::pair<std::unique_ptr<PbMemory>, void *>>>
12701333 gpu_output_buffers (request_count);
@@ -1362,78 +1425,75 @@ ModelInstanceState::ProcessRequests(
13621425 requires_deferred_callback[r] = require_deferred_callback;
13631426
13641427 if (requires_deferred_callback[r]) {
1365- // has_gpu_output = true;
1428+ has_gpu_output = true ;
13661429 }
13671430 }
13681431
1369- }
1370-
13711432 // If the output tensor is in GPU, there will be a second round trip
13721433 // required for filling the GPU buffers provided by the main process.
1373- // if (has_gpu_output) {
1374- // ipc_message->Command() =
1375- // PYTHONSTUB_CommandType::PYTHONSTUB_LoadGPUBuffers;
1376- // gpu_buffer_helper.Complete(Stub()->ShmPool());
1377- // ipc_message->Args() = gpu_buffer_helper.ShmHandle();
1378- // SendMessageAndReceiveResponse(
1379- // ipc_message->ShmHandle(), response_message, restart, responses,
1380- // requests, 0);
1381-
1382- // bool cuda_copy = false;
1383-
1384- // uint32_t response_index = 0;
1385- // for (auto& gpu_output_buffer : gpu_output_buffers) {
1386- // for (auto& buffer_memory_pair : gpu_output_buffer) {
1387- // auto& pb_memory = buffer_memory_pair.first;
1388- // void* pointer = buffer_memory_pair.second;
1389- // bool cuda_used = false;
1390-
1391- // if (pb_memory->MemoryType() == TRITONSERVER_MEMORY_CPU) {
1392- // GUARDED_RESPOND_IF_ERROR(
1393- // responses, response_index,
1394- // CopyBuffer(
1395- // "Failed to copy the output tensor to buffer.",
1396- // TRITONSERVER_MEMORY_CPU, 0, TRITONSERVER_MEMORY_CPU, 0,
1397- // pb_memory->ByteSize(), pb_memory->DataPtr(), pointer,
1398- // CudaStream(), &cuda_used));
1399- // cuda_copy |= cuda_used;
1400- // } else if (
1401- // (pb_memory->MemoryType() == TRITONSERVER_MEMORY_GPU) &&
1402- // pb_memory->UseCUDASharedPool() &&
1403- // (pb_memory->DataPtr() != pointer)) {
1404- // // If the data pointer from pb_memory is not the same as the
1405- // // pointer, it means that the Triton-provided buffer is not used
1406- // // during tensor transfer. Instead, an intermediate buffer that uses
1407- // // CUDA shared memory pool is used. In this case, we need to copy
1408- // // the data from the intermediate buffer back to the Triton-provided
1409- // // buffer.
1410- // GUARDED_RESPOND_IF_ERROR(
1411- // responses, response_index,
1412- // CopyBuffer(
1413- // "Failed to copy the output tensor to buffer.",
1414- // TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(),
1415- // TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(),
1416- // pb_memory->ByteSize(), pb_memory->DataPtr(), pointer,
1417- // CudaStream(), &cuda_used));
1418- // cuda_copy |= cuda_used;
1419- // }
1420- // }
1421- // response_index++;
1422- // #ifdef TRITON_ENABLE_GPU
1423- // if (cuda_copy) {
1424- // cudaStreamSynchronize(stream_);
1425- // }
1426- // #endif // TRITON_ENABLE_GPU
1427- // }
1428- // }
1429-
1430- // bls_defer.Complete();
1431- // for (uint32_t r = 0; r < request_count; ++r) {
1432- // if (requires_deferred_callback[r]) {
1433- // shm_responses[r]->DeferredSendCallback();
1434- // }
1435- // }
1436- // }
1434+ if (has_gpu_output) {
1435+ ipc_message->Command () =
1436+ PYTHONSTUB_CommandType::PYTHONSTUB_LoadGPUBuffers;
1437+ gpu_buffer_helper.Complete (Stub ()->ShmPool ());
1438+ ipc_message->Args () = gpu_buffer_helper.ShmHandle ();
1439+ bi::managed_external_buffer::handle_t response_message;
1440+ SendMessageAndReceiveResponse (
1441+ ipc_message->ShmHandle (), response_message, responses, requests, 0 );
1442+
1443+ bool cuda_copy = false ;
1444+
1445+ uint32_t response_index = 0 ;
1446+ for (auto & gpu_output_buffer : gpu_output_buffers) {
1447+ for (auto & buffer_memory_pair : gpu_output_buffer) {
1448+ auto & pb_memory = buffer_memory_pair.first ;
1449+ void * pointer = buffer_memory_pair.second ;
1450+ bool cuda_used = false ;
1451+
1452+ if (pb_memory->MemoryType () == TRITONSERVER_MEMORY_CPU) {
1453+ GUARDED_RESPOND_IF_ERROR (
1454+ responses, response_index,
1455+ CopyBuffer (
1456+ " Failed to copy the output tensor to buffer." ,
1457+ TRITONSERVER_MEMORY_CPU, 0 , TRITONSERVER_MEMORY_CPU, 0 ,
1458+ pb_memory->ByteSize (), pb_memory->DataPtr (), pointer,
1459+ CudaStream (), &cuda_used));
1460+ cuda_copy |= cuda_used;
1461+ } else if (
1462+ (pb_memory->MemoryType () == TRITONSERVER_MEMORY_GPU) &&
1463+ pb_memory->UseCUDASharedPool () &&
1464+ (pb_memory->DataPtr () != pointer)) {
1465+ // If the data pointer from pb_memory is not the same as the
1466+ // pointer, it means that the Triton-provided buffer is not used
1467+ // during tensor transfer. Instead, an intermediate buffer that uses
1468+ // CUDA shared memory pool is used. In this case, we need to copy
1469+ // the data from the intermediate buffer back to the Triton-provided
1470+ // buffer.
1471+ GUARDED_RESPOND_IF_ERROR (
1472+ responses, response_index,
1473+ CopyBuffer (
1474+ " Failed to copy the output tensor to buffer." ,
1475+ TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId (),
1476+ TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId (),
1477+ pb_memory->ByteSize (), pb_memory->DataPtr (), pointer,
1478+ CudaStream (), &cuda_used));
1479+ cuda_copy |= cuda_used;
1480+ }
1481+ }
1482+ response_index++;
1483+ #ifdef TRITON_ENABLE_GPU
1484+ if (cuda_copy) {
1485+ cudaStreamSynchronize (stream_);
1486+ }
1487+ #endif // TRITON_ENABLE_GPU
1488+ }
1489+ }
1490+
1491+ for (uint32_t r = 0 ; r < request_count; ++r) {
1492+ if (requires_deferred_callback[r]) {
1493+ shm_responses[r]->DeferredSendCallback ();
1494+ }
1495+ }
1496+ }
14371497
14381498 return nullptr ; // success
14391499}
@@ -1575,9 +1635,6 @@ ModelInstanceState::~ModelInstanceState()
15751635 if (Stub ()->IsHealthy ()) {
15761636 // Wait for all the pending tasks to finish.
15771637 thread_pool_->wait ();
1578- // Push a dummy message to signal the thread to terminate.
1579- Stub ()->ParentMessageQueue ()->Push (DUMMY_MESSAGE);
1580- queue_monitor_.join ();
15811638 }
15821639 // Terminate stub first to allow any last messages to be received by the back
15831640 // end before deallocating the queue memory
0 commit comments