@@ -665,6 +665,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
665665 ScopedDefer _ (
666666 [this , &execute_response] { SendIPCMessage (execute_response); });
667667 py::object execute_return;
668+ py::object coroutine_return;
668669 try {
669670 if (!py::hasattr (model_instance_, " execute" )) {
670671 std::string message = " Python model " + model_context_.PythonModelPath () +
@@ -685,7 +686,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
685686 // Do not wait for async decoupled execute to return.
686687 RunCoroutine (execute_return, true /* in_background */ );
687688 } else {
688- py::object coroutine_return =
689+ coroutine_return =
689690 RunCoroutine (execute_return, false /* in_background */ );
690691 ProcessReturnedResponses (
691692 py_request_list, coroutine_return, response_batch);
@@ -733,6 +734,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
733734 }
734735 } else {
735736 if (!response_batch) {
737+ std::cerr << " ===== response_batch is not set" << std::endl;
736738 response_batch = shm_pool_->Construct <char >(
737739 sizeof (ResponseBatch) + sizeof (IPCMessageShm));
738740 ResponseBatch* response_batch_shm_ptr = reinterpret_cast <ResponseBatch*>(
@@ -743,6 +745,8 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
743745 response_batch.value ().data_ .get () + sizeof (IPCMessageShm));
744746 response_batch_shm_ptr->has_error = false ;
745747 response_batch_shm_ptr->is_error_set = false ;
748+ std::cerr << " ===== response_batch_shm_ptr->batch_size: "
749+ << response_batch_shm_ptr->batch_size << std::endl;
746750 }
747751
748752 execute_response = IPCMessage::Create (
@@ -779,6 +783,27 @@ Stub::ProcessReturnedResponses(
779783 }
780784 // Only non-decoupled may return responses.
781785 if (IsDecoupled ()) {
786+ // For decoupled mode, if before returning from this error, there was
787+ // already a response sent from the response sender, along with the complete
788+ // final flag, then use the `is_response_factory_deleted` flag to notify the
789+ // backend to NOT to delete the response factory again during error
790+ // handling.
791+ for (py::handle py_request : py_requests) {
792+ InferRequest* request = py_request.cast <InferRequest*>();
793+ if (request->GetResponseSender ()->IsClosed ()) {
794+ // Notify the backend to NOT to delete the response factory again during
795+ // error handling.
796+ if (!response_batch) {
797+ response_batch = std::move (shm_pool_->Construct <char >(
798+ sizeof (ResponseBatch) + sizeof (IPCMessageShm)));
799+ }
800+ ResponseBatch* response_batch_shm_ptr =
801+ reinterpret_cast <ResponseBatch*>(
802+ response_batch.value ().data_ .get () + sizeof (IPCMessageShm));
803+ response_batch_shm_ptr->is_response_factory_deleted = true ;
804+ }
805+ }
806+
782807 throw PythonBackendException (
783808 " Python model '" + name_ +
784809 " ' is using the decoupled mode and the execute function must return "
@@ -821,8 +846,31 @@ Stub::ProcessReturnedResponses(
821846 }
822847
823848 InferResponse* response = py_responses[i].cast <InferResponse*>();
824- request->GetResponseSender ()->UpdateStateAndCounters (
825- response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
849+
850+ try {
851+ request->GetResponseSender ()->UpdateStateAndCounters (
852+ response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
853+ }
854+ catch (const PythonBackendException& pb_exception) {
855+ // Special case for default(non-decoupled) mode, where the response
856+ // factory should already be cleaned up with the previous response sent
857+ // from response sender, and yet the model tries to return another
858+ // response from `execute()` function. Notify the backend to NOT to
859+ // delete the response factory again during error handling.
860+ std::string error_string = pb_exception.what ();
861+ if (error_string.find (
862+ " Non-decoupled model cannot send more than one response" ) !=
863+ std::string::npos) {
864+ response_batch = std::move (shm_pool_->Construct <char >(
865+ sizeof (ResponseBatch) + sizeof (IPCMessageShm)));
866+ ResponseBatch* response_batch_shm_ptr =
867+ reinterpret_cast <ResponseBatch*>(
868+ response_batch.value ().data_ .get () + sizeof (IPCMessageShm));
869+ response_batch_shm_ptr->is_response_factory_deleted = true ;
870+ LOG_ERROR << " === caught error: " << pb_exception.what ();
871+ }
872+ throw pb_exception;
873+ }
826874 }
827875 }
828876 // Return all the created responses using response_batch. The reason
0 commit comments