Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 194 additions & 1 deletion qa/L0_backend_python/decoupled/decoupled_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Copyright 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -46,6 +46,34 @@
_tritonserver_ipaddr = os.environ.get("TRITONSERVER_IPADDR", "localhost")


def prepare_decoupled_bls_cancel_inputs(input_value, max_sum_value, ignore_cancel):
input_data = np.array([input_value], dtype=np.int32)
max_sum_data = np.array([max_sum_value], dtype=np.int32)
ignore_cancel_data = np.array([ignore_cancel], dtype=np.bool_)
inputs = [
grpcclient.InferInput(
"INPUT",
input_data.shape,
np_to_triton_dtype(input_data.dtype),
),
grpcclient.InferInput(
"MAX_SUM",
max_sum_data.shape,
np_to_triton_dtype(max_sum_data.dtype),
),
grpcclient.InferInput(
"IGNORE_CANCEL",
ignore_cancel_data.shape,
np_to_triton_dtype(ignore_cancel_data.dtype),
),
]
inputs[0].set_data_from_numpy(input_data)
inputs[1].set_data_from_numpy(max_sum_data)
inputs[2].set_data_from_numpy(ignore_cancel_data)

return inputs


class UserData:
def __init__(self):
self._completed_requests = queue.Queue()
Expand Down Expand Up @@ -324,6 +352,171 @@ def test_decoupled_execute_cancel(self):
self.assertIn("[execute_cancel] Request not cancelled at 1.0 s", log_text)
self.assertIn("[execute_cancel] Request cancelled at ", log_text)

def test_decoupled_bls_cancel(self):
model_names = ["decoupled_bls_cancel", "decoupled_bls_async_cancel"]
input_value = 1
max_sum_value = 10
ignore_cancel = False
user_data = UserData()
for model_name in model_names:
with self._shm_leak_detector.Probe() as shm_probe:
with grpcclient.InferenceServerClient(
f"{_tritonserver_ipaddr}:8001"
) as client:
client.start_stream(callback=partial(callback, user_data))
inputs = prepare_decoupled_bls_cancel_inputs(
input_value=input_value,
max_sum_value=max_sum_value,
ignore_cancel=ignore_cancel,
)
client.async_stream_infer(model_name, inputs)

# Check the results of the decoupled model using BLS
def check_result(result):
# Make sure the result is not an exception
self.assertIsNot(type(result), InferenceServerException)
is_cancelled = result.as_numpy("IS_CANCELLED")
self.assertTrue(
is_cancelled[0],
"error: expected the request to be cancelled",
)

max_sum_data = np.array([max_sum_value], dtype=np.int32)
sum_data = result.as_numpy("SUM")
self.assertIsNotNone(sum_data, "error: expected 'SUM'")
self.assertTrue(
np.array_equal(sum_data, max_sum_data),
"error: expected output {} to match input {}".format(
sum_data, max_sum_data
),
)

result = user_data._completed_requests.get()
check_result(result)

def test_decoupled_bls_ignore_cancel(self):
model_names = ["decoupled_bls_cancel", "decoupled_bls_async_cancel"]
input_value = 1
max_sum_value = 10
ignore_cancel = True
user_data = UserData()
for model_name in model_names:
with self._shm_leak_detector.Probe() as shm_probe:
with grpcclient.InferenceServerClient(
f"{_tritonserver_ipaddr}:8001"
) as client:
client.start_stream(callback=partial(callback, user_data))
inputs = prepare_decoupled_bls_cancel_inputs(
input_value=input_value,
max_sum_value=max_sum_value,
ignore_cancel=ignore_cancel,
)
client.async_stream_infer(model_name, inputs)

# Check the results of the decoupled model using BLS
def check_result(result):
# Make sure the result is not an exception
self.assertIsNot(type(result), InferenceServerException)
is_cancelled = result.as_numpy("IS_CANCELLED")
self.assertFalse(
is_cancelled[0],
"error: expected the request not being cancelled",
)

max_sum_data = np.array([max_sum_value], dtype=np.int32)
sum_data = result.as_numpy("SUM")
self.assertIsNotNone(sum_data, "error: expected 'SUM'")
self.assertTrue(
sum_data > max_sum_data,
"error: expected sum_data {} to be greater than max_sum_data {}".format(
sum_data, max_sum_data
),
)

result = user_data._completed_requests.get()
check_result(result)

def test_decoupled_bls_cancel_after_cancellation(self):
model_name = "decoupled_bls_cancel_after_complete"
input_value = 1
max_sum_value = 10
ignore_cancel = False
user_data = UserData()
with self._shm_leak_detector.Probe() as shm_probe:
with grpcclient.InferenceServerClient(
f"{_tritonserver_ipaddr}:8001"
) as client:
client.start_stream(callback=partial(callback, user_data))
inputs = prepare_decoupled_bls_cancel_inputs(
input_value=input_value,
max_sum_value=max_sum_value,
ignore_cancel=ignore_cancel,
)
client.async_stream_infer(model_name, inputs)

# Check the results of the decoupled model using BLS
def check_result(result):
# Make sure the result is not an exception
self.assertIsNot(type(result), InferenceServerException)
is_cancelled = result.as_numpy("IS_CANCELLED")
self.assertTrue(
is_cancelled[0], "error: expected the request to be cancelled"
)

max_sum_data = np.array([max_sum_value], dtype=np.int32)
sum_data = result.as_numpy("SUM")
self.assertIsNotNone(sum_data, "error: expected 'SUM'")
self.assertTrue(
np.array_equal(sum_data, max_sum_data),
"error: expected output {} to match input {}".format(
sum_data, max_sum_data
),
)

result = user_data._completed_requests.get()
check_result(result)

def test_decoupled_bls_cancel_after_completion(self):
model_name = "decoupled_bls_cancel_after_complete"
input_value = 1
max_sum_value = 25
ignore_cancel = False
user_data = UserData()
with self._shm_leak_detector.Probe() as shm_probe:
with grpcclient.InferenceServerClient(
f"{_tritonserver_ipaddr}:8001"
) as client:
client.start_stream(callback=partial(callback, user_data))
inputs = prepare_decoupled_bls_cancel_inputs(
input_value=input_value,
max_sum_value=max_sum_value,
ignore_cancel=ignore_cancel,
)
client.async_stream_infer(model_name, inputs)

# Check the results of the decoupled model using BLS
def check_result(result):
# Make sure the result is not an exception
self.assertIsNot(type(result), InferenceServerException)
is_cancelled = result.as_numpy("IS_CANCELLED")
self.assertFalse(
is_cancelled[0],
"error: expected the request not being cancelled",
)

max_sum_data = np.array([max_sum_value], dtype=np.int32)
sum_data = result.as_numpy("SUM")
self.assertIsNotNone(sum_data, "error: expected 'SUM'")
self.assertTrue(
sum_data < max_sum_data,
"error: expected sum_data {} to be lesser than max_sum_data {}".format(
sum_data, max_sum_data
),
)

result = user_data._completed_requests.get()
check_result(result)

def test_decoupled_raise_exception(self):
# The decoupled_raise_exception model raises an exception for the request.
# This test case is making sure that repeated exceptions are properly handled.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import asyncio

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'asyncio' is not used.

Copilot Autofix

AI 6 months ago

To fix the problem, we need to remove the unused import statement for the asyncio module. This will eliminate the unnecessary dependency and make the code cleaner and easier to read. The change should be made in the file qa/L0_backend_python/decoupled/models/decoupled_bls_async_cancel/1/model.py by deleting the line that imports asyncio.

Suggested changeset 1
qa/L0_backend_python/decoupled/models/decoupled_bls_async_cancel/1/model.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/qa/L0_backend_python/decoupled/models/decoupled_bls_async_cancel/1/model.py b/qa/L0_backend_python/decoupled/models/decoupled_bls_async_cancel/1/model.py
--- a/qa/L0_backend_python/decoupled/models/decoupled_bls_async_cancel/1/model.py
+++ b/qa/L0_backend_python/decoupled/models/decoupled_bls_async_cancel/1/model.py
@@ -25,3 +25,2 @@
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-import asyncio
 
EOF
@@ -25,3 +25,2 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import asyncio

Copilot is powered by AI and may make mistakes. Always verify output.
@richardhuo-nv richardhuo-nv committed this autofix suggestion 6 months ago.

import numpy as np
import triton_python_backend_utils as pb_utils


class TritonPythonModel:
"""
This model sends a decoupled bls inference request to 'response_sender_until_cancelled'
model, and sums up its responses.
Once the MAX_SUM is reached, the model will call the response iterator's
cancel() method to cancel the response stream.
If the IGNORE_CANCEL is set to True, the 'response_sender_until_cancelled' model will not hornor
the request cancellation and keep sending the output to the model.
The number of total responses should not reach MAX_RESPONSE_COUNT.
"""

async def execute(self, requests):
max_sum = (
pb_utils.get_input_tensor_by_name(requests[0], "MAX_SUM").as_numpy().flat[0]
)
input = pb_utils.get_input_tensor_by_name(requests[0], "INPUT")
ignore_cancel = pb_utils.get_input_tensor_by_name(requests[0], "IGNORE_CANCEL")
delay = pb_utils.Tensor("DELAY", np.array([50], dtype=np.int32))
max_response_count = pb_utils.Tensor(
"MAX_RESPONSE_COUNT", np.array([20], dtype=np.int32)
)

infer_request = pb_utils.InferenceRequest(
model_name="response_sender_until_cancelled",
inputs=[input, max_response_count, delay, ignore_cancel],
requested_output_names=["OUTPUT"],
)

response_stream = await infer_request.async_exec(decoupled=True)

is_cancelled = False
error = None
response_sum = 0
for infer_response in response_stream:
if infer_response.has_error():
if infer_response.error().code() == pb_utils.TritonError.CANCELLED:
is_cancelled = True
else:
error = infer_response.error()
break

out = pb_utils.get_output_tensor_by_name(
infer_response, "OUTPUT"
).as_numpy()[0]

response_sum += out
if response_sum >= max_sum:
response_stream.cancel()

responses = [
pb_utils.InferenceResponse(
output_tensors=[
pb_utils.Tensor("SUM", np.array([response_sum], dtype=np.int32)),
pb_utils.Tensor(
"IS_CANCELLED", np.array([is_cancelled], dtype=np.bool_)
),
],
error=error,
)
]

return responses
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

name: "decoupled_bls_async_cancel"
backend: "python"

input [
{
name: "INPUT"
data_type: TYPE_INT32
dims: [ 1 ]
},
{
name: "MAX_SUM"
data_type: TYPE_INT32
dims: [ 1 ]
},
{
name: "IGNORE_CANCEL"
data_type: TYPE_BOOL
dims: [ 1 ]
}
]
output [
{
name: "SUM"
data_type: TYPE_INT32
dims: [ 1 ]
},
{
name: "IS_CANCELLED"
data_type: TYPE_BOOL
dims: [ 1 ]
}
]

instance_group [
{
count: 1
kind : KIND_CPU
}
]
Loading
Loading