From 3ac7505f1a5a8b2c5025508b499c1c2cdb39bbb9 Mon Sep 17 00:00:00 2001 From: richardhuo-nv Date: Wed, 19 Mar 2025 00:28:36 -0700 Subject: [PATCH 1/7] responses cancellation adjust the comments title add finished check rename function adjust comment comment comment fix fix fix readme fix comments fix leak --- .gitignore | 1 + CMakeLists.txt | 2 + README.md | 55 +++++++++++++++++++++- src/infer_payload.cc | 17 ++++++- src/infer_payload.h | 5 +- src/infer_response.cc | 2 +- src/ipc_message.h | 5 +- src/pb_bls_cancel.cc | 92 +++++++++++++++++++++++++++++++++++++ src/pb_bls_cancel.h | 63 +++++++++++++++++++++++++ src/pb_response_iterator.cc | 11 ++++- src/pb_response_iterator.h | 5 +- src/pb_stub.cc | 47 ++++++++++++++++++- src/pb_stub.h | 10 +++- src/pb_utils.h | 7 ++- src/python_be.cc | 40 +++++++++++++++- src/python_be.h | 5 +- src/request_executor.cc | 55 ++++++++++++++++++++-- src/request_executor.h | 7 ++- 18 files changed, 410 insertions(+), 19 deletions(-) create mode 100644 src/pb_bls_cancel.cc create mode 100644 src/pb_bls_cancel.h diff --git a/.gitignore b/.gitignore index bafd2974..419005f0 100644 --- a/.gitignore +++ b/.gitignore @@ -140,3 +140,4 @@ dmypy.json # vscode .vscode/settings.json +.vscode/c_cpp_properties.json diff --git a/CMakeLists.txt b/CMakeLists.txt index 69c7c698..0aaa95af 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -241,6 +241,8 @@ set( src/pb_response_iterator.cc src/pb_cancel.cc src/pb_cancel.h + src/pb_bls_cancel.cc + src/pb_bls_cancel.h ) list(APPEND diff --git a/README.md b/README.md index b00dc0bf..fe1a2810 100644 --- a/README.md +++ b/README.md @@ -1409,14 +1409,65 @@ class TritonPythonModel: A complete example for sync and async BLS for decoupled models is included in the [Examples](#examples) section. +Note: Async BLS is not supported on Python 3.6 or lower due to the `async` +keyword and `asyncio.run` being introduced in Python 3.7. + Starting from the 22.04 release, the lifetime of the BLS output tensors have been improved such that if a tensor is no longer needed in your Python model it will be automatically deallocated. This can increase the number of BLS requests that you can execute in your model without running into the out of GPU or shared memory error. -Note: Async BLS is not supported on Python 3.6 or lower due to the `async` -keyword and `asyncio.run` being introduced in Python 3.7. +Starting from the 25.04 release, you can use the `infer_responses.cancel()` function +on a BLS decoupled response iterator to stop the response stream, which cancels +the request to the decoupled model. This is useful for stopping long inference +requests, such as those from auto-generative large language models, which may +run for an indeterminate amount of time and consume significant server resources. +The response iterator can be generated from `infer_request.exec(decoupled=True)` +and `infer_request.async_exec(decoupled=True)` functions: + +```python +import triton_python_backend_utils as pb_utils + +class TritonPythonModel: + ... + def execute(self, requests): + ... + inference_request = pb_utils.InferenceRequest( + model_name='model_name', + requested_output_names=['REQUESTED_OUTPUT'], + inputs=[]) + + # Execute the inference_request and wait for the response. Here we are + # running a BLS request on a decoupled model, hence setting the parameter + # 'decoupled' to 'True'. + infer_responses = infer_request.exec(decoupled=True) + + response_tensors_received = [] + for infer_response in infer_responses: + # Check if the inference response indicates an error. + # vLLM backend uses the CANCELLED error code when a request is cancelled. + # TensorRT-LLM backend does not use error codes; instead, it sends the + # TRITONSERVER_RESPONSE_COMPLETE_FINAL flag to the iterator. + if inference_response.has_error(): + if infer_response.error().code() == pb_utils.TritonError.CANCELLED: + print("request has been cancelled.") + break + + # Collect the output tensor from the model's response + output = pb_utils.get_output_tensor_by_name( + inference_response, 'REQUESTED_OUTPUT') + response_tensors_received.append(output) + + # Check if we have received enough inference output tensors + # and then cancel the response iterator + if has_enough_response(response_tensors_received): + infer_responses.cancel() +``` + +Note: Whether the decoupled model returns a cancellation error and stops executing +the request depends on the model's backend implementation. Please refer to the +documentation for more details [Handing in Backend](https://github.com/triton-inference-server/server/blob/main/docs/user_guide/request_cancellation.md#handling-in-backend) ## Model Loading API diff --git a/src/infer_payload.cc b/src/infer_payload.cc index 762201e8..091e8129 100644 --- a/src/infer_payload.cc +++ b/src/infer_payload.cc @@ -1,4 +1,4 @@ -// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -31,7 +31,8 @@ namespace triton { namespace backend { namespace python { InferPayload::InferPayload( const bool is_decoupled, std::function)> callback) - : is_decoupled_(is_decoupled), is_promise_set_(false), callback_(callback) + : is_decoupled_(is_decoupled), is_promise_set_(false), callback_(callback), + request_address_(reinterpret_cast(nullptr)) { promise_.reset(new std::promise>()); } @@ -91,4 +92,16 @@ InferPayload::ResponseAllocUserp() return response_alloc_userp_; } +void +InferPayload::SetRequestAddress(intptr_t request_address) +{ + request_address_ = request_address; +} + +intptr_t +InferPayload::GetRequestAddress() +{ + return request_address_; +} + }}} // namespace triton::backend::python diff --git a/src/infer_payload.h b/src/infer_payload.h index 662e8922..153b63e5 100644 --- a/src/infer_payload.h +++ b/src/infer_payload.h @@ -1,4 +1,4 @@ -// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -62,6 +62,8 @@ class InferPayload : public std::enable_shared_from_this { void SetResponseAllocUserp( const ResponseAllocatorUserp& response_alloc_userp); std::shared_ptr ResponseAllocUserp(); + void SetRequestAddress(intptr_t request_address); + intptr_t GetRequestAddress(); private: std::unique_ptr>> promise_; @@ -70,6 +72,7 @@ class InferPayload : public std::enable_shared_from_this { bool is_promise_set_; std::function)> callback_; std::shared_ptr response_alloc_userp_; + intptr_t request_address_; }; }}} // namespace triton::backend::python diff --git a/src/infer_response.cc b/src/infer_response.cc index a6b6847d..382756d4 100644 --- a/src/infer_response.cc +++ b/src/infer_response.cc @@ -91,6 +91,7 @@ InferResponse::SaveToSharedMemory( response_shm_ptr->is_error_set = false; shm_handle_ = response_shm_.handle_; response_shm_ptr->is_last_response = is_last_response_; + response_shm_ptr->id = id_; // Only save the output tensors to shared memory when the inference response // doesn't have error. @@ -113,7 +114,6 @@ InferResponse::SaveToSharedMemory( tensor_handle_shm_ptr[j] = output_tensor->ShmHandle(); j++; } - response_shm_ptr->id = id_; parameters_shm_ = PbString::Create(shm_pool, parameters_); response_shm_ptr->parameters = parameters_shm_->ShmHandle(); diff --git a/src/ipc_message.h b/src/ipc_message.h index c3d1472e..b3a6efa3 100644 --- a/src/ipc_message.h +++ b/src/ipc_message.h @@ -1,4 +1,4 @@ -// Copyright 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -67,7 +67,8 @@ typedef enum PYTHONSTUB_commandtype_enum { PYTHONSTUB_LoadModelRequest, PYTHONSTUB_UnloadModelRequest, PYTHONSTUB_ModelReadinessRequest, - PYTHONSTUB_IsRequestCancelled + PYTHONSTUB_IsRequestCancelled, + PYTHONSTUB_CancelBLSDecoupledInferRequest } PYTHONSTUB_CommandType; /// diff --git a/src/pb_bls_cancel.cc b/src/pb_bls_cancel.cc new file mode 100644 index 00000000..bae62320 --- /dev/null +++ b/src/pb_bls_cancel.cc @@ -0,0 +1,92 @@ +// Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of NVIDIA CORPORATION nor the names of its +// contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "pb_bls_cancel.h" + +#include "pb_stub.h" + +namespace triton { namespace backend { namespace python { + +void +PbBLSCancel::SaveToSharedMemory(std::unique_ptr& shm_pool) +{ + cancel_shm_ = shm_pool->Construct(); + new (&(cancel_shm_.data_->mu)) bi::interprocess_mutex; + new (&(cancel_shm_.data_->cv)) bi::interprocess_condition; + cancel_shm_.data_->waiting_on_stub = false; + cancel_shm_.data_->infer_payload_id = infer_playload_id_; + cancel_shm_.data_->is_cancelled = is_cancelled_; +} + +bi::managed_external_buffer::handle_t +PbBLSCancel::ShmHandle() +{ + return cancel_shm_.handle_; +} + +CancelBLSRequestMessage* +PbBLSCancel::ShmPayload() +{ + return cancel_shm_.data_.get(); +} + +void +PbBLSCancel::Cancel() +{ + // Release the GIL. Python objects are not accessed during the check. + py::gil_scoped_release gil_release; + + std::unique_lock lk(mu_); + // The cancelled flag can only move from false to true, not the other way, so + // it is checked on each query until cancelled and then implicitly cached. + if (is_cancelled_) { + return; + } + if (!updating_) { + std::unique_ptr& stub = Stub::GetOrCreateInstance(); + if (!stub->StubToParentServiceActive()) { + LOG_ERROR << "Cannot communicate with parent service"; + return; + } + + stub->EnqueueCancelBLSDecoupledRequest(this); + updating_ = true; + } + cv_.wait(lk, [this] { return !updating_; }); +} + +void +PbBLSCancel::ReportIsCancelled(bool is_cancelled) +{ + { + std::lock_guard lk(mu_); + is_cancelled_ = is_cancelled; + updating_ = false; + } + cv_.notify_all(); +} + +}}} // namespace triton::backend::python diff --git a/src/pb_bls_cancel.h b/src/pb_bls_cancel.h new file mode 100644 index 00000000..7fdd3fbf --- /dev/null +++ b/src/pb_bls_cancel.h @@ -0,0 +1,63 @@ +// Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of NVIDIA CORPORATION nor the names of its +// contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include + +#include "pb_utils.h" + +namespace triton { namespace backend { namespace python { + +class PbBLSCancel { + public: + PbBLSCancel(void* infer_playload_id) + : updating_(false), infer_playload_id_(infer_playload_id), + is_cancelled_(false) + { + } + DISALLOW_COPY_AND_ASSIGN(PbBLSCancel); + + void SaveToSharedMemory(std::unique_ptr& shm_pool); + bi::managed_external_buffer::handle_t ShmHandle(); + CancelBLSRequestMessage* ShmPayload(); + + void Cancel(); + void ReportIsCancelled(bool is_cancelled); + + private: + AllocatedSharedMemory cancel_shm_; + + std::mutex mu_; + std::condition_variable cv_; + bool updating_; + + void* infer_playload_id_; + bool is_cancelled_; +}; + +}}}; // namespace triton::backend::python diff --git a/src/pb_response_iterator.cc b/src/pb_response_iterator.cc index 9abf4997..536d4232 100644 --- a/src/pb_response_iterator.cc +++ b/src/pb_response_iterator.cc @@ -1,4 +1,4 @@ -// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -40,6 +40,7 @@ ResponseIterator::ResponseIterator( : id_(response->Id()), is_finished_(false), is_cleared_(false), idx_(0) { response_buffer_.push(response); + pb_bls_cancel_ = std::make_shared(response->Id()); } ResponseIterator::~ResponseIterator() @@ -159,4 +160,12 @@ ResponseIterator::GetExistingResponses() return responses; } +void +ResponseIterator::Cancel() +{ + if (!is_finished_) { + pb_bls_cancel_->Cancel(); + } +} + }}} // namespace triton::backend::python diff --git a/src/pb_response_iterator.h b/src/pb_response_iterator.h index cad5ff1f..cb26d6a3 100644 --- a/src/pb_response_iterator.h +++ b/src/pb_response_iterator.h @@ -1,4 +1,4 @@ -// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -29,6 +29,7 @@ #include #include "infer_response.h" +#include "pb_bls_cancel.h" namespace triton { namespace backend { namespace python { @@ -43,6 +44,7 @@ class ResponseIterator { void* Id(); void Clear(); std::vector> GetExistingResponses(); + void Cancel(); private: std::vector> responses_; @@ -53,6 +55,7 @@ class ResponseIterator { bool is_finished_; bool is_cleared_; size_t idx_; + std::shared_ptr pb_bls_cancel_; }; }}} // namespace triton::backend::python diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 4e09ea1d..40366969 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -1136,6 +1136,10 @@ Stub::ServiceStubToParentRequests() } else if ( utils_msg_payload->command_type == PYTHONSTUB_IsRequestCancelled) { SendIsCancelled(utils_msg_payload); + } else if ( + utils_msg_payload->command_type == + PYTHONSTUB_CancelBLSDecoupledInferRequest) { + SendCancelBLSDecoupledRequest(utils_msg_payload); } else { std::cerr << "Error when sending message via stub_to_parent message " "buffer - unknown command\n"; @@ -1221,6 +1225,46 @@ Stub::EnqueueCleanupId(void* id, const PYTHONSTUB_CommandType& command_type) } } +void +Stub::SendCancelBLSDecoupledRequest( + std::unique_ptr& utils_msg_payload) +{ + PbBLSCancel* pb_bls_cancel = + reinterpret_cast(utils_msg_payload->utils_message_ptr); + pb_bls_cancel->SaveToSharedMemory(shm_pool_); + + CancelBLSRequestMessage* message_payload = pb_bls_cancel->ShmPayload(); + std::unique_ptr ipc_message = + IPCMessage::Create(shm_pool_, false /* inline_response */); + ipc_message->Command() = utils_msg_payload->command_type; + ipc_message->Args() = pb_bls_cancel->ShmHandle(); + + bool is_cancelled = false; + { + bi::scoped_lock lk(message_payload->mu); + + SendIPCUtilsMessage(ipc_message); + while (!message_payload->waiting_on_stub) { + message_payload->cv.wait(lk); + } + + is_cancelled = message_payload->is_cancelled; + message_payload->waiting_on_stub = false; + message_payload->cv.notify_all(); + } + pb_bls_cancel->ReportIsCancelled(is_cancelled); +} + +void +Stub::EnqueueCancelBLSDecoupledRequest(PbBLSCancel* pb_bls_cancel) +{ + std::unique_ptr utils_msg_payload = + std::make_unique( + PYTHONSTUB_CancelBLSDecoupledInferRequest, + reinterpret_cast(pb_bls_cancel)); + EnqueueUtilsMessage(std::move(utils_msg_payload)); +} + void Stub::EnqueueIsCancelled(PbCancel* pb_cancel) { @@ -1909,7 +1953,8 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module) it.Iter(); return it; }) - .def("__next__", &ResponseIterator::Next); + .def("__next__", &ResponseIterator::Next) + .def("cancel", &ResponseIterator::Cancel); py::class_ logger(module, "Logger"); py::enum_(logger, "LogLevel") diff --git a/src/pb_stub.h b/src/pb_stub.h index 7d76ec9a..5ea637d1 100644 --- a/src/pb_stub.h +++ b/src/pb_stub.h @@ -1,4 +1,4 @@ -// Copyright 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -321,6 +321,14 @@ class Stub { /// and the response factory for BLS decoupled response. void EnqueueCleanupId(void* id, const PYTHONSTUB_CommandType& command_type); + /// Send the id to the python backend for object cleanup + void SendCancelBLSDecoupledRequest( + std::unique_ptr& utils_msg_payload); + + /// Add infer payload id to queue. This is used for retrieving the request + /// address from the infer_payload + void EnqueueCancelBLSDecoupledRequest(PbBLSCancel* pb_bls_cancel); + /// Add request cancellation query to queue void EnqueueIsCancelled(PbCancel* pb_cancel); diff --git a/src/pb_utils.h b/src/pb_utils.h index aacf6b49..1306f375 100644 --- a/src/pb_utils.h +++ b/src/pb_utils.h @@ -1,4 +1,4 @@ -// Copyright 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -187,6 +187,11 @@ struct CleanupMessage : SendMessageBase { void* id; }; +struct CancelBLSRequestMessage : SendMessageBase { + void* infer_payload_id; + bool is_cancelled; +}; + struct IsCancelledMessage : SendMessageBase { intptr_t response_factory_address; intptr_t request_address; diff --git a/src/python_be.cc b/src/python_be.cc index bdf7b95f..d001fad9 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -1,4 +1,4 @@ -// Copyright 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2020-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -765,6 +765,10 @@ ModelInstanceState::StubToParentMQMonitor() boost::asio::post(*thread_pool_, std::move(task)); break; } + case PYTHONSTUB_CancelBLSDecoupledInferRequest: { + ProcessCancelBLSDecoupledRequest(message); + break; + } default: { LOG_MESSAGE( TRITONSERVER_LOG_ERROR, "Unexpected message type received."); @@ -855,6 +859,40 @@ ModelInstanceState::ProcessCleanupRequest( } } +void +ModelInstanceState::ProcessCancelBLSDecoupledRequest( + const std::unique_ptr& message) +{ + AllocatedSharedMemory message_shm = + Stub()->ShmPool()->Load(message->Args()); + CancelBLSRequestMessage* message_payload = + reinterpret_cast(message_shm.data_.get()); + + { + bi::scoped_lock lk{message_payload->mu}; + + intptr_t id = reinterpret_cast(message_payload->infer_payload_id); + try { + { + std::lock_guard lock(infer_payload_mu_); + if (infer_payload_.find(id) != infer_payload_.end()) { + request_executor_->Cancel(infer_payload_[id]); + } + } + message_payload->is_cancelled = true; + } + catch (const PythonBackendException& pb_exception) { + LOG_MESSAGE(TRITONSERVER_LOG_ERROR, pb_exception.what()); + } + + message_payload->waiting_on_stub = true; + message_payload->cv.notify_all(); + while (message_payload->waiting_on_stub) { + message_payload->cv.wait(lk); + } + } +} + void ModelInstanceState::ProcessIsRequestCancelled( const std::unique_ptr& message) diff --git a/src/python_be.h b/src/python_be.h index c98e1284..46e2db7e 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -1,4 +1,4 @@ -// Copyright 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -403,6 +403,9 @@ class ModelInstanceState : public BackendModelInstance { // Process the decoupled cleanup request for InferPayload and ResponseFactory void ProcessCleanupRequest(const std::unique_ptr& message); + // Process cancelling a BLS decoupled request + void ProcessCancelBLSDecoupledRequest(const std::unique_ptr& message); + // Process request cancellation query void ProcessIsRequestCancelled(const std::unique_ptr& message); diff --git a/src/request_executor.cc b/src/request_executor.cc index 3c51e626..1112c9e0 100644 --- a/src/request_executor.cc +++ b/src/request_executor.cc @@ -69,6 +69,9 @@ InferRequestComplete( TRITONSERVER_InferenceRequest* request, const uint32_t flags, void* userp) { if (request != nullptr) { + auto request_executor = reinterpret_cast(userp); + request_executor->EraseRequestAddress(reinterpret_cast(request)); + LOG_IF_ERROR( TRITONSERVER_InferenceRequestDelete(request), "Failed to delete inference request."); @@ -85,10 +88,16 @@ InferResponseComplete( std::vector> output_tensors; std::shared_ptr pb_error; std::string parameters_string; + TRITONSERVER_Error_Code error_code = TRITONSERVER_ERROR_INTERNAL; if (response != nullptr) { try { - THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceResponseError(response)); + TRITONSERVER_Error* server_error = + TRITONSERVER_InferenceResponseError(response); + if (server_error != nullptr) { + error_code = TRITONSERVER_ErrorCode(server_error); + } + THROW_IF_TRITON_ERROR(server_error); uint32_t output_count; THROW_IF_TRITON_ERROR( @@ -182,7 +191,7 @@ InferResponseComplete( response = nullptr; } - pb_error = std::make_shared(pb_exception.what()); + pb_error = std::make_shared(pb_exception.what(), error_code); output_tensors.clear(); } @@ -407,7 +416,7 @@ RequestExecutor::Infer( irequest, infer_request->Timeout())); THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceRequestSetReleaseCallback( - irequest, InferRequestComplete, nullptr /* request_release_userp */)); + irequest, InferRequestComplete, reinterpret_cast(this))); TRITONSERVER_InferenceTrace* trace = nullptr; if (infer_request->GetTrace().TritonTrace() != nullptr) { @@ -476,11 +485,23 @@ RequestExecutor::Infer( reinterpret_cast(infer_payload->ResponseAllocUserp().get()), InferResponseComplete, reinterpret_cast(infer_payload.get()))); + { + std::lock_guard lk(on_going_request_addresses_mu_); + on_going_request_addresses_.insert( + reinterpret_cast(irequest)); + } + // Store the inference request address submitted to the Triton server for + // retrieval + infer_payload->SetRequestAddress(reinterpret_cast(irequest)); + THROW_IF_TRITON_ERROR( TRITONSERVER_ServerInferAsync(server_, irequest, trace)); } } catch (const PythonBackendException& pb_exception) { + EraseRequestAddress(reinterpret_cast(irequest)); + infer_payload->SetRequestAddress(0L); + LOG_IF_ERROR( TRITONSERVER_InferenceRequestDelete(irequest), "Failed to delete inference request."); @@ -493,6 +514,34 @@ RequestExecutor::Infer( return response_future; } +void +RequestExecutor::Cancel(std::shared_ptr& infer_payload) +{ + intptr_t request_address = infer_payload->GetRequestAddress(); + if (request_address == 0L) { + return; + } + + { + std::lock_guard lk(on_going_request_addresses_mu_); + if (on_going_request_addresses_.find(request_address) != + on_going_request_addresses_.end()) { + TRITONSERVER_InferenceRequest* irequest = + reinterpret_cast(request_address); + THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceRequestCancel(irequest)); + } + } +} + +void +RequestExecutor::EraseRequestAddress(intptr_t request_address) +{ + if (request_address != 0L) { + std::unique_lock lk(on_going_request_addresses_mu_); + on_going_request_addresses_.erase(request_address); + } +} + RequestExecutor::~RequestExecutor() { if (response_allocator_ != nullptr) { diff --git a/src/request_executor.h b/src/request_executor.h index 1c5eb1fa..755e938b 100644 --- a/src/request_executor.h +++ b/src/request_executor.h @@ -1,4 +1,4 @@ -// Copyright 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -27,6 +27,7 @@ #pragma once #include +#include #include "infer_payload.h" #include "infer_request.h" @@ -41,11 +42,15 @@ class RequestExecutor { TRITONSERVER_ResponseAllocator* response_allocator_ = nullptr; TRITONSERVER_Server* server_; std::unique_ptr& shm_pool_; + std::mutex on_going_request_addresses_mu_; + std::unordered_set on_going_request_addresses_; public: std::future> Infer( std::shared_ptr& infer_request, std::shared_ptr& infer_payload); + void EraseRequestAddress(intptr_t request_address); + void Cancel(std::shared_ptr& infer_payload); RequestExecutor( std::unique_ptr& shm_pool, From a6a1a00c899fb408296a4dd9a633e5d2f58e42bd Mon Sep 17 00:00:00 2001 From: richardhuo-nv Date: Sun, 23 Mar 2025 22:46:25 -0700 Subject: [PATCH 2/7] fix commit hook --- src/python_be.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/python_be.h b/src/python_be.h index 46e2db7e..02dfc55a 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -404,7 +404,8 @@ class ModelInstanceState : public BackendModelInstance { void ProcessCleanupRequest(const std::unique_ptr& message); // Process cancelling a BLS decoupled request - void ProcessCancelBLSDecoupledRequest(const std::unique_ptr& message); + void ProcessCancelBLSDecoupledRequest( + const std::unique_ptr& message); // Process request cancellation query void ProcessIsRequestCancelled(const std::unique_ptr& message); From 02dcf1dae25f52b154296b4d9a83fb8d5fbefa63 Mon Sep 17 00:00:00 2001 From: richardhuo-nv Date: Tue, 25 Mar 2025 10:38:08 -0700 Subject: [PATCH 3/7] resolve comments --- README.md | 19 ++++++------- src/infer_payload.cc | 32 +++++++++++++++++++++ src/infer_payload.h | 7 +++++ src/ipc_message.h | 2 +- src/pb_bls_cancel.cc | 2 +- src/pb_stub.cc | 11 ++++---- src/pb_stub.h | 4 +-- src/python_be.cc | 8 +++--- src/python_be.h | 5 ++-- src/request_executor.cc | 62 +++++++++++++++++------------------------ src/request_executor.h | 10 ++++--- 11 files changed, 93 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index fe1a2810..10210124 100644 --- a/README.md +++ b/README.md @@ -1418,13 +1418,10 @@ will be automatically deallocated. This can increase the number of BLS requests that you can execute in your model without running into the out of GPU or shared memory error. -Starting from the 25.04 release, you can use the `infer_responses.cancel()` function -on a BLS decoupled response iterator to stop the response stream, which cancels -the request to the decoupled model. This is useful for stopping long inference -requests, such as those from auto-generative large language models, which may -run for an indeterminate amount of time and consume significant server resources. -The response iterator can be generated from `infer_request.exec(decoupled=True)` -and `infer_request.async_exec(decoupled=True)` functions: +### Cancelling decoupled BLS requests +A decoupled BLS inference request may be cancelled by calling the `cancel()` +method on the response iterator returned from the method executing the BLS +inference request. For example, ```python import triton_python_backend_utils as pb_utils @@ -1433,12 +1430,12 @@ class TritonPythonModel: ... def execute(self, requests): ... - inference_request = pb_utils.InferenceRequest( + infer_request = pb_utils.InferenceRequest( model_name='model_name', requested_output_names=['REQUESTED_OUTPUT'], inputs=[]) - # Execute the inference_request and wait for the response. Here we are + # Execute the infer_request and wait for the response. Here we are # running a BLS request on a decoupled model, hence setting the parameter # 'decoupled' to 'True'. infer_responses = infer_request.exec(decoupled=True) @@ -1449,14 +1446,14 @@ class TritonPythonModel: # vLLM backend uses the CANCELLED error code when a request is cancelled. # TensorRT-LLM backend does not use error codes; instead, it sends the # TRITONSERVER_RESPONSE_COMPLETE_FINAL flag to the iterator. - if inference_response.has_error(): + if infer_response.has_error(): if infer_response.error().code() == pb_utils.TritonError.CANCELLED: print("request has been cancelled.") break # Collect the output tensor from the model's response output = pb_utils.get_output_tensor_by_name( - inference_response, 'REQUESTED_OUTPUT') + infer_response, 'REQUESTED_OUTPUT') response_tensors_received.append(output) # Check if we have received enough inference output tensors diff --git a/src/infer_payload.cc b/src/infer_payload.cc index 091e8129..50e4fbd8 100644 --- a/src/infer_payload.cc +++ b/src/infer_payload.cc @@ -32,6 +32,7 @@ InferPayload::InferPayload( const bool is_decoupled, std::function)> callback) : is_decoupled_(is_decoupled), is_promise_set_(false), callback_(callback), + is_request_deleted_(false), request_address_(reinterpret_cast(nullptr)) { promise_.reset(new std::promise>()); @@ -104,4 +105,35 @@ InferPayload::GetRequestAddress() return request_address_; } +void +InferPayload::SetRequestDeleted() +{ + std::unique_lock lock(request_deletion_mutex_); + is_request_deleted_ = true; +} + +void +InferPayload::SetRequestCancellationFunc( + const std::function& request_cancel_func) +{ + request_cancel_func_ = request_cancel_func; +} + +void +InferPayload::SafeCancelRequest() +{ + std::unique_lock lock(request_deletion_mutex_); + if (is_request_deleted_) { + return; + } + + if (request_address_ == 0L) { + return; + } + + if (request_cancel_func_) { + request_cancel_func_(request_address_); + } +} + }}} // namespace triton::backend::python diff --git a/src/infer_payload.h b/src/infer_payload.h index 153b63e5..d86f8c79 100644 --- a/src/infer_payload.h +++ b/src/infer_payload.h @@ -62,8 +62,12 @@ class InferPayload : public std::enable_shared_from_this { void SetResponseAllocUserp( const ResponseAllocatorUserp& response_alloc_userp); std::shared_ptr ResponseAllocUserp(); + void SetRequestDeleted(); void SetRequestAddress(intptr_t request_address); intptr_t GetRequestAddress(); + void SetRequestCancellationFunc( + const std::function& request_cancel_func); + void SafeCancelRequest(); private: std::unique_ptr>> promise_; @@ -72,7 +76,10 @@ class InferPayload : public std::enable_shared_from_this { bool is_promise_set_; std::function)> callback_; std::shared_ptr response_alloc_userp_; + std::mutex request_deletion_mutex_; + bool is_request_deleted_; intptr_t request_address_; + std::function request_cancel_func_; }; }}} // namespace triton::backend::python diff --git a/src/ipc_message.h b/src/ipc_message.h index b3a6efa3..c0fab3a3 100644 --- a/src/ipc_message.h +++ b/src/ipc_message.h @@ -68,7 +68,7 @@ typedef enum PYTHONSTUB_commandtype_enum { PYTHONSTUB_UnloadModelRequest, PYTHONSTUB_ModelReadinessRequest, PYTHONSTUB_IsRequestCancelled, - PYTHONSTUB_CancelBLSDecoupledInferRequest + PYTHONSTUB_CancelBLSInferRequest } PYTHONSTUB_CommandType; /// diff --git a/src/pb_bls_cancel.cc b/src/pb_bls_cancel.cc index bae62320..0df4492b 100644 --- a/src/pb_bls_cancel.cc +++ b/src/pb_bls_cancel.cc @@ -72,7 +72,7 @@ PbBLSCancel::Cancel() return; } - stub->EnqueueCancelBLSDecoupledRequest(this); + stub->EnqueueCancelBLSRequest(this); updating_ = true; } cv_.wait(lk, [this] { return !updating_; }); diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 40366969..0a2279ec 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -1137,9 +1137,8 @@ Stub::ServiceStubToParentRequests() utils_msg_payload->command_type == PYTHONSTUB_IsRequestCancelled) { SendIsCancelled(utils_msg_payload); } else if ( - utils_msg_payload->command_type == - PYTHONSTUB_CancelBLSDecoupledInferRequest) { - SendCancelBLSDecoupledRequest(utils_msg_payload); + utils_msg_payload->command_type == PYTHONSTUB_CancelBLSInferRequest) { + SendCancelBLSRequest(utils_msg_payload); } else { std::cerr << "Error when sending message via stub_to_parent message " "buffer - unknown command\n"; @@ -1226,7 +1225,7 @@ Stub::EnqueueCleanupId(void* id, const PYTHONSTUB_CommandType& command_type) } void -Stub::SendCancelBLSDecoupledRequest( +Stub::SendCancelBLSRequest( std::unique_ptr& utils_msg_payload) { PbBLSCancel* pb_bls_cancel = @@ -1256,11 +1255,11 @@ Stub::SendCancelBLSDecoupledRequest( } void -Stub::EnqueueCancelBLSDecoupledRequest(PbBLSCancel* pb_bls_cancel) +Stub::EnqueueCancelBLSRequest(PbBLSCancel* pb_bls_cancel) { std::unique_ptr utils_msg_payload = std::make_unique( - PYTHONSTUB_CancelBLSDecoupledInferRequest, + PYTHONSTUB_CancelBLSInferRequest, reinterpret_cast(pb_bls_cancel)); EnqueueUtilsMessage(std::move(utils_msg_payload)); } diff --git a/src/pb_stub.h b/src/pb_stub.h index 5ea637d1..a901a6ce 100644 --- a/src/pb_stub.h +++ b/src/pb_stub.h @@ -322,12 +322,12 @@ class Stub { void EnqueueCleanupId(void* id, const PYTHONSTUB_CommandType& command_type); /// Send the id to the python backend for object cleanup - void SendCancelBLSDecoupledRequest( + void SendCancelBLSRequest( std::unique_ptr& utils_msg_payload); /// Add infer payload id to queue. This is used for retrieving the request /// address from the infer_payload - void EnqueueCancelBLSDecoupledRequest(PbBLSCancel* pb_bls_cancel); + void EnqueueCancelBLSRequest(PbBLSCancel* pb_bls_cancel); /// Add request cancellation query to queue void EnqueueIsCancelled(PbCancel* pb_cancel); diff --git a/src/python_be.cc b/src/python_be.cc index d001fad9..83e53852 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -765,8 +765,8 @@ ModelInstanceState::StubToParentMQMonitor() boost::asio::post(*thread_pool_, std::move(task)); break; } - case PYTHONSTUB_CancelBLSDecoupledInferRequest: { - ProcessCancelBLSDecoupledRequest(message); + case PYTHONSTUB_CancelBLSInferRequest: { + ProcessCancelBLSRequest(message); break; } default: { @@ -860,7 +860,7 @@ ModelInstanceState::ProcessCleanupRequest( } void -ModelInstanceState::ProcessCancelBLSDecoupledRequest( +ModelInstanceState::ProcessCancelBLSRequest( const std::unique_ptr& message) { AllocatedSharedMemory message_shm = @@ -876,7 +876,7 @@ ModelInstanceState::ProcessCancelBLSDecoupledRequest( { std::lock_guard lock(infer_payload_mu_); if (infer_payload_.find(id) != infer_payload_.end()) { - request_executor_->Cancel(infer_payload_[id]); + infer_payload_[id]->SafeCancelRequest(); } } message_payload->is_cancelled = true; diff --git a/src/python_be.h b/src/python_be.h index 02dfc55a..6082c50b 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -403,9 +403,8 @@ class ModelInstanceState : public BackendModelInstance { // Process the decoupled cleanup request for InferPayload and ResponseFactory void ProcessCleanupRequest(const std::unique_ptr& message); - // Process cancelling a BLS decoupled request - void ProcessCancelBLSDecoupledRequest( - const std::unique_ptr& message); + // Process cancelling a BLS request + void ProcessCancelBLSRequest(const std::unique_ptr& message); // Process request cancellation query void ProcessIsRequestCancelled(const std::unique_ptr& message); diff --git a/src/request_executor.cc b/src/request_executor.cc index 1112c9e0..e43ff926 100644 --- a/src/request_executor.cc +++ b/src/request_executor.cc @@ -69,12 +69,15 @@ InferRequestComplete( TRITONSERVER_InferenceRequest* request, const uint32_t flags, void* userp) { if (request != nullptr) { - auto request_executor = reinterpret_cast(userp); - request_executor->EraseRequestAddress(reinterpret_cast(request)); + RequestCompletionUserp* completion_userp = + reinterpret_cast(userp); + completion_userp->infer_payload->SetRequestDeleted(); LOG_IF_ERROR( TRITONSERVER_InferenceRequestDelete(request), "Failed to delete inference request."); + + delete completion_userp; } } @@ -322,6 +325,18 @@ ResponseAlloc( return nullptr; // Success } +void +InferRequestCancel(intptr_t request_address) +{ + if (request_address == 0L) { + return; + } + + TRITONSERVER_InferenceRequest* irequest = + reinterpret_cast(request_address); + THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceRequestCancel(irequest)); +} + TRITONSERVER_Error* OutputBufferQuery( TRITONSERVER_ResponseAllocator* allocator, void* userp, @@ -364,6 +379,7 @@ RequestExecutor::Infer( bool is_ready = false; const char* model_name = infer_request->ModelName().c_str(); TRITONSERVER_InferenceRequest* irequest = nullptr; + RequestCompletionUserp* completion_userp = nullptr; try { int64_t model_version = infer_request->ModelVersion(); @@ -415,8 +431,10 @@ RequestExecutor::Infer( THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceRequestSetTimeoutMicroseconds( irequest, infer_request->Timeout())); + completion_userp = new RequestCompletionUserp(infer_payload); THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceRequestSetReleaseCallback( - irequest, InferRequestComplete, reinterpret_cast(this))); + irequest, InferRequestComplete, + reinterpret_cast(completion_userp))); TRITONSERVER_InferenceTrace* trace = nullptr; if (infer_request->GetTrace().TritonTrace() != nullptr) { @@ -485,22 +503,20 @@ RequestExecutor::Infer( reinterpret_cast(infer_payload->ResponseAllocUserp().get()), InferResponseComplete, reinterpret_cast(infer_payload.get()))); - { - std::lock_guard lk(on_going_request_addresses_mu_); - on_going_request_addresses_.insert( - reinterpret_cast(irequest)); - } // Store the inference request address submitted to the Triton server for // retrieval infer_payload->SetRequestAddress(reinterpret_cast(irequest)); + infer_payload->SetRequestCancellationFunc(InferRequestCancel); THROW_IF_TRITON_ERROR( TRITONSERVER_ServerInferAsync(server_, irequest, trace)); } } catch (const PythonBackendException& pb_exception) { - EraseRequestAddress(reinterpret_cast(irequest)); infer_payload->SetRequestAddress(0L); + if (completion_userp != nullptr) { + delete completion_userp; + } LOG_IF_ERROR( TRITONSERVER_InferenceRequestDelete(irequest), @@ -514,34 +530,6 @@ RequestExecutor::Infer( return response_future; } -void -RequestExecutor::Cancel(std::shared_ptr& infer_payload) -{ - intptr_t request_address = infer_payload->GetRequestAddress(); - if (request_address == 0L) { - return; - } - - { - std::lock_guard lk(on_going_request_addresses_mu_); - if (on_going_request_addresses_.find(request_address) != - on_going_request_addresses_.end()) { - TRITONSERVER_InferenceRequest* irequest = - reinterpret_cast(request_address); - THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceRequestCancel(irequest)); - } - } -} - -void -RequestExecutor::EraseRequestAddress(intptr_t request_address) -{ - if (request_address != 0L) { - std::unique_lock lk(on_going_request_addresses_mu_); - on_going_request_addresses_.erase(request_address); - } -} - RequestExecutor::~RequestExecutor() { if (response_allocator_ != nullptr) { diff --git a/src/request_executor.h b/src/request_executor.h index 755e938b..db0d772f 100644 --- a/src/request_executor.h +++ b/src/request_executor.h @@ -38,19 +38,21 @@ namespace triton { namespace backend { namespace python { TRITONSERVER_Error* CreateTritonErrorFromException( const PythonBackendException& pb_exception); +struct RequestCompletionUserp { + std::shared_ptr infer_payload; + RequestCompletionUserp(std::shared_ptr& infer_payload) + : infer_payload(infer_payload){}; +}; + class RequestExecutor { TRITONSERVER_ResponseAllocator* response_allocator_ = nullptr; TRITONSERVER_Server* server_; std::unique_ptr& shm_pool_; - std::mutex on_going_request_addresses_mu_; - std::unordered_set on_going_request_addresses_; public: std::future> Infer( std::shared_ptr& infer_request, std::shared_ptr& infer_payload); - void EraseRequestAddress(intptr_t request_address); - void Cancel(std::shared_ptr& infer_payload); RequestExecutor( std::unique_ptr& shm_pool, From 6f47428e90322eed2502f429e602ee34e994bdbc Mon Sep 17 00:00:00 2001 From: richardhuo-nv Date: Tue, 25 Mar 2025 10:47:29 -0700 Subject: [PATCH 4/7] fix README --- README.md | 50 ++++++++++++++++++++------------------------------ 1 file changed, 20 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 10210124..dd5e877a 100644 --- a/README.md +++ b/README.md @@ -1427,39 +1427,29 @@ inference request. For example, import triton_python_backend_utils as pb_utils class TritonPythonModel: - ... + ... def execute(self, requests): - ... - infer_request = pb_utils.InferenceRequest( - model_name='model_name', - requested_output_names=['REQUESTED_OUTPUT'], - inputs=[]) + ... + bls_response_iterator = bls_request.exec(decoupled=True) + ... + bls_response_iterator.cancel() + ... +``` - # Execute the infer_request and wait for the response. Here we are - # running a BLS request on a decoupled model, hence setting the parameter - # 'decoupled' to 'True'. - infer_responses = infer_request.exec(decoupled=True) +You may also call the `cancel()` method on the response iterator returned from +the `async_exec()` method of the inference request. For example, - response_tensors_received = [] - for infer_response in infer_responses: - # Check if the inference response indicates an error. - # vLLM backend uses the CANCELLED error code when a request is cancelled. - # TensorRT-LLM backend does not use error codes; instead, it sends the - # TRITONSERVER_RESPONSE_COMPLETE_FINAL flag to the iterator. - if infer_response.has_error(): - if infer_response.error().code() == pb_utils.TritonError.CANCELLED: - print("request has been cancelled.") - break - - # Collect the output tensor from the model's response - output = pb_utils.get_output_tensor_by_name( - infer_response, 'REQUESTED_OUTPUT') - response_tensors_received.append(output) - - # Check if we have received enough inference output tensors - # and then cancel the response iterator - if has_enough_response(response_tensors_received): - infer_responses.cancel() +```python +import triton_python_backend_utils as pb_utils + +class TritonPythonModel: + ... + async def execute(self, requests): + ... + bls_response_iterator = await bls_request.async_exec(decoupled=True) + ... + bls_response_iterator.cancel() + ... ``` Note: Whether the decoupled model returns a cancellation error and stops executing From 37f3243ccb6b99fa3b443051b613249ec025192b Mon Sep 17 00:00:00 2001 From: richardhuo-nv Date: Tue, 25 Mar 2025 11:17:15 -0700 Subject: [PATCH 5/7] remove include --- src/request_executor.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/request_executor.h b/src/request_executor.h index db0d772f..07562d6a 100644 --- a/src/request_executor.h +++ b/src/request_executor.h @@ -27,7 +27,6 @@ #pragma once #include -#include #include "infer_payload.h" #include "infer_request.h" From 59420526c5f389bc812a7774e22175d330fdf711 Mon Sep 17 00:00:00 2001 From: richardhuo-nv Date: Wed, 26 Mar 2025 15:21:12 -0700 Subject: [PATCH 6/7] resolve comments --- src/infer_payload.cc | 21 ++------------------- src/infer_payload.h | 5 +---- src/request_executor.cc | 2 +- 3 files changed, 4 insertions(+), 24 deletions(-) diff --git a/src/infer_payload.cc b/src/infer_payload.cc index 50e4fbd8..6baad307 100644 --- a/src/infer_payload.cc +++ b/src/infer_payload.cc @@ -32,7 +32,6 @@ InferPayload::InferPayload( const bool is_decoupled, std::function)> callback) : is_decoupled_(is_decoupled), is_promise_set_(false), callback_(callback), - is_request_deleted_(false), request_address_(reinterpret_cast(nullptr)) { promise_.reset(new std::promise>()); @@ -96,22 +95,10 @@ InferPayload::ResponseAllocUserp() void InferPayload::SetRequestAddress(intptr_t request_address) { + std::unique_lock lock(request_address_mutex_); request_address_ = request_address; } -intptr_t -InferPayload::GetRequestAddress() -{ - return request_address_; -} - -void -InferPayload::SetRequestDeleted() -{ - std::unique_lock lock(request_deletion_mutex_); - is_request_deleted_ = true; -} - void InferPayload::SetRequestCancellationFunc( const std::function& request_cancel_func) @@ -122,11 +109,7 @@ InferPayload::SetRequestCancellationFunc( void InferPayload::SafeCancelRequest() { - std::unique_lock lock(request_deletion_mutex_); - if (is_request_deleted_) { - return; - } - + std::unique_lock lock(request_address_mutex_); if (request_address_ == 0L) { return; } diff --git a/src/infer_payload.h b/src/infer_payload.h index d86f8c79..8e4aa7d3 100644 --- a/src/infer_payload.h +++ b/src/infer_payload.h @@ -62,9 +62,7 @@ class InferPayload : public std::enable_shared_from_this { void SetResponseAllocUserp( const ResponseAllocatorUserp& response_alloc_userp); std::shared_ptr ResponseAllocUserp(); - void SetRequestDeleted(); void SetRequestAddress(intptr_t request_address); - intptr_t GetRequestAddress(); void SetRequestCancellationFunc( const std::function& request_cancel_func); void SafeCancelRequest(); @@ -76,8 +74,7 @@ class InferPayload : public std::enable_shared_from_this { bool is_promise_set_; std::function)> callback_; std::shared_ptr response_alloc_userp_; - std::mutex request_deletion_mutex_; - bool is_request_deleted_; + std::mutex request_address_mutex_; intptr_t request_address_; std::function request_cancel_func_; }; diff --git a/src/request_executor.cc b/src/request_executor.cc index e43ff926..716d3c56 100644 --- a/src/request_executor.cc +++ b/src/request_executor.cc @@ -71,7 +71,7 @@ InferRequestComplete( if (request != nullptr) { RequestCompletionUserp* completion_userp = reinterpret_cast(userp); - completion_userp->infer_payload->SetRequestDeleted(); + completion_userp->infer_payload->SetRequestAddress(0L); LOG_IF_ERROR( TRITONSERVER_InferenceRequestDelete(request), From 9482ee21b46bc2c82fec273476dcb7fb9599ea89 Mon Sep 17 00:00:00 2001 From: richardhuo-nv Date: Mon, 31 Mar 2025 13:13:19 -0700 Subject: [PATCH 7/7] fix comment --- src/pb_stub.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pb_stub.h b/src/pb_stub.h index a901a6ce..172c04a8 100644 --- a/src/pb_stub.h +++ b/src/pb_stub.h @@ -321,7 +321,8 @@ class Stub { /// and the response factory for BLS decoupled response. void EnqueueCleanupId(void* id, const PYTHONSTUB_CommandType& command_type); - /// Send the id to the python backend for object cleanup + /// Send the id to the python backend for request address retrieval and + /// cancellation void SendCancelBLSRequest( std::unique_ptr& utils_msg_payload);