@@ -1229,7 +1229,7 @@ ModelInstanceState::ProcessRequests(
12291229 ipc_message->Command () = PYTHONSTUB_CommandType::PYTHONSTUB_ExecuteRequest;
12301230 ipc_message->Args () = request_batch.handle_ ;
12311231 received_message_ = nullptr ;
1232- ScopedDefer _ ([this ] {
1232+ ScopedDefer execute_finalize ([this ] {
12331233 // Push a dummy message to signal the thread to terminate.
12341234 Stub ()->StubMessageQueue ()->Push (DUMMY_MESSAGE);
12351235 });
@@ -1240,19 +1240,23 @@ ModelInstanceState::ProcessRequests(
12401240 cv_.wait (guard, [this ] { return received_message_ != nullptr ; });
12411241 }
12421242
1243- AllocatedSharedMemory<ResponseBatch> response_batch =
1244- Stub ()->ShmPool ()->Load <ResponseBatch>(received_message_->Args ());
1243+
1244+ AllocatedSharedMemory<char > response_batch = Stub ()->ShmPool ()->Load <char >(received_message_->Args ());
1245+
1246+ ResponseBatch* response_batch_shm_ptr =
1247+ reinterpret_cast <ResponseBatch*>(response_batch.data_ .get ());
1248+
12451249 received_message_.reset ();
12461250
12471251 uint64_t compute_end_ns = 0 ;
12481252 SET_TIMESTAMP (compute_end_ns);
12491253 reporter.SetComputeEndNs (compute_end_ns);
12501254 reporter.SetBatchStatistics (total_batch_size);
12511255
1252- if (response_batch. data_ ->has_error ) {
1253- if (response_batch. data_ ->is_error_set ) {
1256+ if (response_batch_shm_ptr ->has_error ) {
1257+ if (response_batch_shm_ptr ->is_error_set ) {
12541258 auto error = PbString::LoadFromSharedMemory (
1255- Stub ()->ShmPool (), response_batch. data_ ->error );
1259+ Stub ()->ShmPool (), response_batch_shm_ptr ->error );
12561260 return TRITONSERVER_ErrorNew (
12571261 TRITONSERVER_ERROR_INTERNAL, error->String ().c_str ());
12581262 }
@@ -1261,6 +1265,202 @@ ModelInstanceState::ProcessRequests(
12611265 TRITONSERVER_ERROR_INTERNAL, " Failed to process the requests." );
12621266 }
12631267
1268+ if (response_batch_shm_ptr->batch_size > 0 ) {
1269+ std::shared_ptr<std::vector<TRITONBACKEND_Response*>> responses (
1270+ new std::vector<TRITONBACKEND_Response*>());
1271+ responses->reserve (request_count);
1272+ for (size_t i = 0 ; i < request_count; i++) {
1273+ TRITONBACKEND_Response* response;
1274+ auto err = TRITONBACKEND_ResponseNew (&response, requests[i]);
1275+ if (err == nullptr ) {
1276+ responses->emplace_back (response);
1277+ } else {
1278+ responses->emplace_back (nullptr );
1279+ LOG_MESSAGE (TRITONSERVER_LOG_ERROR, " Fail to create response" );
1280+ TRITONSERVER_ErrorDelete (err);
1281+ }
1282+ }
1283+ bi::managed_external_buffer::handle_t * response_shm_handle =
1284+ reinterpret_cast <bi::managed_external_buffer::handle_t *>(
1285+ response_batch.data_ .get () + sizeof (ResponseBatch));
1286+
1287+ // If the output provided by the model is in GPU, we will pass the list of
1288+ // buffers provided by Triton to the stub process.
1289+ // bool has_gpu_output = false;
1290+ std::vector<bool > requires_deferred_callback;
1291+
1292+ std::vector<std::unique_ptr<InferResponse>> shm_responses;
1293+ std::vector<std::vector<std::pair<std::unique_ptr<PbMemory>, void *>>>
1294+ gpu_output_buffers (request_count);
1295+ GPUBuffersHelper gpu_buffer_helper;
1296+
1297+ for (uint32_t r = 0 ; r < request_count; ++r) {
1298+ NVTX_RANGE (nvtx_, " LoadingResponse " + Name ());
1299+ TRITONBACKEND_Response* response = (*responses)[r];
1300+ TRITONBACKEND_Request* request = requests[r];
1301+ uint32_t requested_output_count = 0 ;
1302+ requires_deferred_callback.push_back (false );
1303+
1304+ shm_responses.emplace_back (nullptr );
1305+ std::unique_ptr<InferResponse>& infer_response = shm_responses.back ();
1306+ try {
1307+ if (pb_infer_requests[r]->ReleaseFlags () ==
1308+ TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) {
1309+ // For rescheduled requests, we do not need to send a response.
1310+ LOG_IF_ERROR (
1311+ TRITONBACKEND_ResponseDelete ((*responses)[r]),
1312+ " failed to delete response" );
1313+ (*responses)[r] = nullptr ;
1314+ continue ;
1315+ }
1316+ infer_response = InferResponse::LoadFromSharedMemory (
1317+ Stub ()->ShmPool (), response_shm_handle[r],
1318+ false /* open_cuda_handle */ );
1319+ if (infer_response->HasError ()) {
1320+ TRITONSERVER_Error* err = TRITONSERVER_ErrorNew (
1321+ infer_response->Error ()->Code (),
1322+ infer_response->Error ()->Message ().c_str ());
1323+
1324+ LOG_IF_ERROR (
1325+ TRITONBACKEND_ResponseSend (
1326+ (*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err),
1327+ " failed sending response" );
1328+ TRITONSERVER_ErrorDelete (err);
1329+ (*responses)[r] = nullptr ;
1330+
1331+ // Reset the release flags for the request.
1332+ pb_infer_requests[r]->SetReleaseFlags (
1333+ TRITONSERVER_REQUEST_RELEASE_ALL);
1334+
1335+ // If has_error is true, we do not look at the response tensors.
1336+ continue ;
1337+ }
1338+ }
1339+ catch (const PythonBackendException& pb_exception) {
1340+ TRITONSERVER_Error* err = TRITONSERVER_ErrorNew (
1341+ TRITONSERVER_ERROR_INTERNAL, pb_exception.what ());
1342+ LOG_IF_ERROR (
1343+ TRITONBACKEND_ResponseSend (
1344+ (*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err),
1345+ " failed sending response" );
1346+ TRITONSERVER_ErrorDelete (err);
1347+ (*responses)[r] = nullptr ;
1348+
1349+ // Reset the release flags for the request.
1350+ pb_infer_requests[r]->SetReleaseFlags (TRITONSERVER_REQUEST_RELEASE_ALL);
1351+
1352+ continue ;
1353+ }
1354+
1355+ GUARDED_RESPOND_IF_ERROR (
1356+ responses, r,
1357+ TRITONBACKEND_RequestOutputCount (request, &requested_output_count));
1358+
1359+ std::set<std::string> requested_output_names;
1360+ for (size_t j = 0 ; j < requested_output_count; ++j) {
1361+ const char * output_name;
1362+ GUARDED_RESPOND_IF_ERROR (
1363+ responses, r,
1364+ TRITONBACKEND_RequestOutputName (request, j, &output_name));
1365+ requested_output_names.insert (output_name);
1366+ }
1367+
1368+ bool require_deferred_callback = false ;
1369+
1370+ #ifdef TRITON_ENABLE_GPU
1371+ for (auto & output_tensor : infer_response->OutputTensors ()) {
1372+ if (output_tensor->MemoryType () == TRITONSERVER_MEMORY_GPU) {
1373+ // Attempt to use the cuda shared memory pool for GPU tensor.
1374+ ShareCUDAMemoryPool (output_tensor->MemoryTypeId ());
1375+ }
1376+ }
1377+ #endif // TRITON_ENABLE_GPU
1378+
1379+ gpu_output_buffers[r] =
1380+ std::vector<std::pair<std::unique_ptr<PbMemory>, void *>>{};
1381+ infer_response->Send (
1382+ response, CudaStream (), require_deferred_callback,
1383+ TRITONSERVER_RESPONSE_COMPLETE_FINAL, Stub ()->ShmPool (),
1384+ gpu_buffer_helper, gpu_output_buffers[r], requested_output_names);
1385+
1386+ requires_deferred_callback[r] = require_deferred_callback;
1387+
1388+ if (requires_deferred_callback[r]) {
1389+ // has_gpu_output = true;
1390+ }
1391+ }
1392+
1393+ // Finalize the execute.
1394+ execute_finalize.Complete ();
1395+ }
1396+
1397+ // If the output tensor is in GPU, there will be a second round trip
1398+ // required for filling the GPU buffers provided by the main process.
1399+ // if (has_gpu_output) {
1400+ // ipc_message->Command() =
1401+ // PYTHONSTUB_CommandType::PYTHONSTUB_LoadGPUBuffers;
1402+ // gpu_buffer_helper.Complete(Stub()->ShmPool());
1403+ // ipc_message->Args() = gpu_buffer_helper.ShmHandle();
1404+ // SendMessageAndReceiveResponse(
1405+ // ipc_message->ShmHandle(), response_message, restart, responses,
1406+ // requests, 0);
1407+
1408+ // bool cuda_copy = false;
1409+
1410+ // uint32_t response_index = 0;
1411+ // for (auto& gpu_output_buffer : gpu_output_buffers) {
1412+ // for (auto& buffer_memory_pair : gpu_output_buffer) {
1413+ // auto& pb_memory = buffer_memory_pair.first;
1414+ // void* pointer = buffer_memory_pair.second;
1415+ // bool cuda_used = false;
1416+
1417+ // if (pb_memory->MemoryType() == TRITONSERVER_MEMORY_CPU) {
1418+ // GUARDED_RESPOND_IF_ERROR(
1419+ // responses, response_index,
1420+ // CopyBuffer(
1421+ // "Failed to copy the output tensor to buffer.",
1422+ // TRITONSERVER_MEMORY_CPU, 0, TRITONSERVER_MEMORY_CPU, 0,
1423+ // pb_memory->ByteSize(), pb_memory->DataPtr(), pointer,
1424+ // CudaStream(), &cuda_used));
1425+ // cuda_copy |= cuda_used;
1426+ // } else if (
1427+ // (pb_memory->MemoryType() == TRITONSERVER_MEMORY_GPU) &&
1428+ // pb_memory->UseCUDASharedPool() &&
1429+ // (pb_memory->DataPtr() != pointer)) {
1430+ // // If the data pointer from pb_memory is not the same as the
1431+ // // pointer, it means that the Triton-provided buffer is not used
1432+ // // during tensor transfer. Instead, an intermediate buffer that uses
1433+ // // CUDA shared memory pool is used. In this case, we need to copy
1434+ // // the data from the intermediate buffer back to the Triton-provided
1435+ // // buffer.
1436+ // GUARDED_RESPOND_IF_ERROR(
1437+ // responses, response_index,
1438+ // CopyBuffer(
1439+ // "Failed to copy the output tensor to buffer.",
1440+ // TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(),
1441+ // TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(),
1442+ // pb_memory->ByteSize(), pb_memory->DataPtr(), pointer,
1443+ // CudaStream(), &cuda_used));
1444+ // cuda_copy |= cuda_used;
1445+ // }
1446+ // }
1447+ // response_index++;
1448+ // #ifdef TRITON_ENABLE_GPU
1449+ // if (cuda_copy) {
1450+ // cudaStreamSynchronize(stream_);
1451+ // }
1452+ // #endif // TRITON_ENABLE_GPU
1453+ // }
1454+ // }
1455+
1456+ // bls_defer.Complete();
1457+ // for (uint32_t r = 0; r < request_count; ++r) {
1458+ // if (requires_deferred_callback[r]) {
1459+ // shm_responses[r]->DeferredSendCallback();
1460+ // }
1461+ // }
1462+ // }
1463+
12641464 return nullptr ; // success
12651465}
12661466
0 commit comments