Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/infer_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ InferRequest::Exec(const bool is_decoupled)
{
bi::scoped_lock<bi::interprocess_mutex> lock{
*(ipc_message->ResponseMutex())};
stub->SendIPCMessage(ipc_message);
stub->SendIPCUtilsMessage(ipc_message);
ipc_message->ResponseCondition()->wait(lock);
}

Expand Down
19 changes: 19 additions & 0 deletions src/ipc_message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ IPCMessage::Create(
new IPCMessage(ipc_message_shm, response_mutex_shm, response_cond_shm));
}

std::unique_ptr<IPCMessage>
IPCMessage::Create(IPCMessageShm* ipc_message_shm,
bi::managed_external_buffer::handle_t& message_handle)
{
return std::unique_ptr<IPCMessage>(new IPCMessage(ipc_message_shm, message_handle));
}

AllocatedSharedMemory<IPCMessageShm>&
IPCMessage::GetAllocatedSharedMemory()
{
return ipc_message_shm_;
}

std::unique_ptr<IPCMessage>
IPCMessage::LoadFromSharedMemory(
std::unique_ptr<SharedMemoryManager>& shm_pool,
Expand Down Expand Up @@ -133,4 +146,10 @@ IPCMessage::IPCMessage(
ipc_message_handle_ = ipc_message_shm_.handle_;
}

IPCMessage::IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle)
{
ipc_message_handle_ = handle;
ipc_message_shm_ptr_ = ipc_message_shm;
}

}}}; // namespace triton::backend::python
7 changes: 7 additions & 0 deletions src/ipc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ class IPCMessage {
static std::unique_ptr<IPCMessage> Create(
const std::unique_ptr<SharedMemoryManager>& shm_pool,
bool inline_response);

static std::unique_ptr<IPCMessage>
Create(IPCMessageShm* ipc_message_shm,
bi::managed_external_buffer::handle_t& message_handle);
static std::unique_ptr<IPCMessage> LoadFromSharedMemory(
std::unique_ptr<SharedMemoryManager>& shm_pool,
bi::managed_external_buffer::handle_t message_handle);
Expand All @@ -108,6 +112,7 @@ class IPCMessage {
bi::interprocess_mutex* ResponseMutex();
bi::managed_external_buffer::handle_t& Args();
bi::managed_external_buffer::handle_t ShmHandle();
AllocatedSharedMemory<IPCMessageShm>& GetAllocatedSharedMemory();

private:
AllocatedSharedMemory<IPCMessageShm> ipc_message_shm_;
Expand All @@ -129,6 +134,8 @@ class IPCMessage {
AllocatedSharedMemory<IPCMessageShm>& ipc_message_shm,
AllocatedSharedMemory<bi::interprocess_mutex>& response_mutex_shm,
AllocatedSharedMemory<bi::interprocess_condition>& response_cond_shm);

IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle);
};

}}}; // namespace triton::backend::python
81 changes: 63 additions & 18 deletions src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -653,27 +653,19 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
{
py::list py_request_list =
LoadRequestsFromSharedMemory(request_batch_shm_ptr);
std::unique_ptr<IPCMessage> execute_response =
IPCMessage::Create(shm_pool_, false /* Inline response */);
execute_response->Command() = PYTHONSTUB_ExecuteResponse;
std::unique_ptr<IPCMessage> execute_response;
// IPCMessage::Create(shm_pool_, false /* Inline response */);

AllocatedSharedMemory<ResponseBatch> response_batch =
shm_pool_->Construct<ResponseBatch>();
ResponseBatch* response_batch_shm_ptr =
reinterpret_cast<ResponseBatch*>(response_batch.data_.get());
execute_response->Args() = response_batch.handle_;
std::optional<AllocatedSharedMemory<char>> response_batch;
bool has_exception = false;
std::string error_string;
std::unique_ptr<PbString> error_string_shm;

ScopedDefer execute_finalize([this] { stub_message_queue_->Pop(); });
ScopedDefer _(
[this, &execute_response] { SendIPCMessage(execute_response); });

py::object execute_return;
try {
response_batch_shm_ptr->has_error = false;
response_batch_shm_ptr->is_error_set = false;

if (!py::hasattr(model_instance_, "execute")) {
std::string message = "Python model " + model_context_.PythonModelPath() +
" does not implement `execute` method.";
Expand All @@ -683,7 +675,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
{
NVTX_RANGE(nvtx_, "PyExecute " + name_);

py::object execute_return =
execute_return =
model_instance_.attr("execute")(py_request_list);

bool is_coroutine = py::module::import("asyncio")
Expand All @@ -696,10 +688,10 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
} else {
py::object coroutine_return =
RunCoroutine(execute_return, false /* in_background */);
ProcessReturnedResponses(py_request_list, coroutine_return);
ProcessReturnedResponses(py_request_list, coroutine_return, response_batch);
}
} else {
ProcessReturnedResponses(py_request_list, execute_return);
ProcessReturnedResponses(py_request_list, execute_return, response_batch);
}
}
}
Expand All @@ -719,6 +711,12 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
"', message: ") +
error_string;
LOG_ERROR << err_message.c_str();
if (!response_batch) {
response_batch = shm_pool_->Construct<char>(sizeof(ResponseBatch) + sizeof(IPCMessageShm));
}
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get() + sizeof(IPCMessageShm));

response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
response_batch_shm_ptr->has_error = true;
error_string_shm = PbString::Create(shm_pool_, err_message);
response_batch_shm_ptr->error = error_string_shm->ShmHandle();
Expand All @@ -732,11 +730,38 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
request->GetResponseSender()->Close();
}
}

if (!response_batch) {
response_batch = shm_pool_->Construct<char>(sizeof(ResponseBatch) + sizeof(IPCMessageShm));
ResponseBatch* response_batch_shm_ptr =reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get() + sizeof(IPCMessageShm));
response_batch_shm_ptr->batch_size = 0;
}
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get() + sizeof(IPCMessageShm));
response_batch_shm_ptr->has_error = false;
response_batch_shm_ptr->is_error_set = false;
execute_response = IPCMessage::Create(reinterpret_cast<IPCMessageShm*>(response_batch.value().data_.get()), response_batch.value().handle_);
execute_response->Args() = response_batch.value().handle_;
execute_response->InlineResponse() = false;
execute_response->Command() = PYTHONSTUB_ExecuteResponse;
_.Complete();
execute_finalize.Complete();
}

void
Stub::ProcessResponse(InferResponse* response)
{
response->SaveToSharedMemory(shm_pool_, false /* copy_gpu */);

for (auto& output_tensor : response->OutputTensors()) {
if (!output_tensor->IsCPU()) {
gpu_tensors_.push_back(output_tensor);
}
}
}

void
Stub::ProcessReturnedResponses(
py::list py_requests, py::object py_responses_obj)
py::list py_requests, py::object py_responses_obj, std::optional<AllocatedSharedMemory<char>>& response_batch)
{
// Return if there is nothing to process.
if (py::isinstance<py::none>(py_responses_obj)) {
Expand Down Expand Up @@ -784,12 +809,32 @@ Stub::ProcessReturnedResponses(
"return list, found type '" +
std::string(py::str(py_responses[i].get_type())) + "'.");
}

std::shared_ptr<InferResponse> response =
py_responses[i].cast<std::shared_ptr<InferResponse>>();
request->GetResponseSender()->Send(
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
request->GetResponseSender()->UpdateStateAndCounters(response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
}
}
response_batch = std::move(shm_pool_->Construct<char>(sizeof(IPCMessageShm) +
requests_size * sizeof(bi::managed_external_buffer::handle_t) +
sizeof(ResponseBatch)));
ResponseBatch* response_batch_shm_ptr =
reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get() + sizeof(IPCMessageShm));

bi::managed_external_buffer::handle_t* responses_shm_handle =
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
response_batch.value().data_.get() + sizeof(ResponseBatch) + sizeof(IPCMessageShm));

for (size_t i = 0; i < responses_size; i++) {
// Check the return type of execute function.
InferRequest* infer_request = py_requests[i].cast<InferRequest*>();
InferResponse* infer_response = py_responses[i].cast<InferResponse*>();
infer_response->PruneOutputTensors(
infer_request->RequestedOutputNames());
ProcessResponse(infer_response);
responses_shm_handle[i] = infer_response->ShmHandle();
}
response_batch_shm_ptr->batch_size = requests_size;
}

py::object
Expand Down
4 changes: 3 additions & 1 deletion src/pb_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ class Stub {
void ProcessRequests(RequestBatch* request_batch_shm_ptr);

void ProcessReturnedResponses(
py::list py_requests, py::object py_responses_obj);
py::list py_requests, py::object py_responses_obj, std::optional<AllocatedSharedMemory<char>>& response_batch);

void ProcessResponse(InferResponse* response);

py::object GetAsyncEventLoop();

Expand Down
Loading
Loading