Skip to content

Commit 6629deb

Browse files
authored
Modify L0_backend_python bls test for BLS decoupled support (#5455)
* Lower the tensor size to avoid intermittent issue * Disable multiprocessing for decoupled case * Fix indent * Lower the tensor size to avoid intermittent issue * Add unittest for response iterator * Address comment * Add checks for the last empty response * Fix up for decoupled test
1 parent 1b0fe1f commit 6629deb

File tree

5 files changed

+150
-51
lines changed

5 files changed

+150
-51
lines changed

qa/L0_backend_python/decoupled/models/decoupled_bls_stream/1/model.py

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -80,34 +80,35 @@ def response_thread(self, response_sender, in_value):
8080

8181
response_count = 0
8282
for infer_response in infer_responses:
83-
output0 = pb_utils.get_output_tensor_by_name(
84-
infer_response, "OUT")
85-
if infer_response.has_error():
86-
response = pb_utils.InferenceResponse(
87-
error=infer_response.error().message())
88-
response_sender.send(
89-
response,
90-
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
91-
elif np.any(in_value != output0.as_numpy()):
92-
error_message = (
93-
"BLS Request input and BLS response output do not match."
94-
f" {in_value} != {output0.as_numpy()}")
95-
response = pb_utils.InferenceResponse(error=error_message)
96-
response_sender.send(
97-
response,
98-
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
99-
else:
100-
output_tensors = [pb_utils.Tensor('OUT', output0.as_numpy())]
101-
response = pb_utils.InferenceResponse(
102-
output_tensors=output_tensors)
103-
response_sender.send(response)
83+
if len(infer_response.output_tensors()) > 0:
84+
output0 = pb_utils.get_output_tensor_by_name(
85+
infer_response, "OUT")
86+
if infer_response.has_error():
87+
response = pb_utils.InferenceResponse(
88+
error=infer_response.error().message())
89+
response_sender.send(
90+
response,
91+
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
92+
elif np.any(in_value != output0.as_numpy()):
93+
error_message = (
94+
"BLS Request input and BLS response output do not match."
95+
f" {in_value} != {output0.as_numpy()}")
96+
response = pb_utils.InferenceResponse(error=error_message)
97+
response_sender.send(
98+
response,
99+
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
100+
else:
101+
output_tensors = [pb_utils.Tensor('OUT', output0.as_numpy())]
102+
response = pb_utils.InferenceResponse(
103+
output_tensors=output_tensors)
104+
response_sender.send(response)
104105

105106
response_count += 1
106107

107-
if response_count != in_value:
108+
if in_value != response_count-1:
108109
error_message = (
109110
"Expected {} responses, got {}".format(
110-
in_value, len(infer_responses)))
111+
in_value, len(infer_responses)-1))
111112
response = pb_utils.InferenceResponse(
112113
error=error_message)
113114
response_sender.send(

qa/python_models/bls/model.py

Lines changed: 108 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,18 +84,19 @@ def bls_square(_=None):
8484
if infer_response.has_error():
8585
return False
8686

87-
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
88-
if output0 is None:
89-
return False
87+
if len(infer_response.output_tensors()) > 0:
88+
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
89+
if output0 is None:
90+
return False
9091

91-
expected_output = input0.as_numpy()
92+
expected_output = input0.as_numpy()
9293

93-
if not np.all(expected_output == output0.as_numpy()):
94-
return False
94+
if not np.all(expected_output == output0.as_numpy()):
95+
return False
9596

9697
response_count += 1
9798

98-
if not np.all(response_count == input0.as_numpy()):
99+
if not np.all(input0.as_numpy() == response_count-1):
99100
return False
100101

101102
return True
@@ -459,14 +460,16 @@ def test_gpu_bls(self):
459460
def test_multiprocess(self):
460461
# Test multiprocess Pool with sync BLS
461462
if self._is_decoupled:
462-
func_name = bls_square
463+
# Fixme: DLIS-4630
464+
# func_name = bls_square
465+
pass
463466
else:
464467
func_name = bls_add_sub
465468

466-
pool = Pool(10)
467-
pool.map(func_name, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
468-
pool.close()
469-
pool.join()
469+
pool = Pool(10)
470+
pool.map(func_name, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
471+
pool.close()
472+
pool.join()
470473

471474
def test_bls_sync(self):
472475
infer_request = pb_utils.InferenceRequest(
@@ -553,6 +556,99 @@ def test_timeout(self):
553556
infer_response.error().message())
554557
self.assertTrue(len(infer_response.output_tensors()) == 0)
555558

559+
def _test_response_iterator_square(self,
560+
expected_output_cnt,
561+
expected_output_value,
562+
response_iterator):
563+
response_count = 0
564+
expected_output_cnt = np.array([expected_output_cnt], dtype=np.int32)
565+
566+
for infer_response in response_iterator:
567+
self.assertFalse(infer_response.has_error())
568+
if len(infer_response.output_tensors()) > 0:
569+
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
570+
self.assertIsNotNone(output0)
571+
self.assertEqual(expected_output_value, output0.as_numpy())
572+
573+
response_count += 1
574+
575+
self.assertEqual(response_count, expected_output_cnt)
576+
577+
# Make sure the iterator is exhausted.
578+
with self.assertRaises(StopIteration):
579+
next(response_iterator)
580+
581+
return response_iterator
582+
583+
def test_response_iterator(self):
584+
if self._is_decoupled:
585+
# Test the response iterator for decoupled responses. The request
586+
# has 4 decoupled responses followed by an empty response.
587+
response_value = 4
588+
input0_np = np.array([response_value], dtype=np.int32)
589+
input0 = pb_utils.Tensor('IN', input0_np)
590+
infer_request = pb_utils.InferenceRequest(
591+
model_name='square_int32',
592+
inputs=[input0],
593+
requested_output_names=['OUT'])
594+
infer_responses = infer_request.exec(decoupled=True)
595+
596+
# case 1. Use Next() to get the next response first, then use
597+
# for-loop to get the remaining responses.
598+
infer_response = next(infer_responses)
599+
self.assertFalse(infer_response.has_error())
600+
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
601+
self.assertIsNotNone(output0)
602+
self.assertEqual(response_value, output0.as_numpy())
603+
# The iterator now should only have 4 remaining responses.
604+
infer_responses = self._test_response_iterator_square(
605+
4, response_value, infer_responses)
606+
607+
# case 2. Call for-loop to get all the responses multiple times.
608+
infer_responses = self._test_response_iterator_square(
609+
5, response_value, infer_responses)
610+
infer_responses = self._test_response_iterator_square(
611+
5, response_value, infer_responses)
612+
infer_responses = self._test_response_iterator_square(
613+
5, response_value, infer_responses)
614+
615+
# case 3. Break from the iteration, then use Next() and for-loop to
616+
# get the remaining responses.
617+
response_count = 0
618+
for infer_response in infer_responses:
619+
self.assertFalse(infer_response.has_error())
620+
output0 = pb_utils.get_output_tensor_by_name(infer_response,
621+
'OUT')
622+
self.assertIsNotNone(output0)
623+
self.assertEqual(response_value, output0.as_numpy())
624+
625+
response_count += 1
626+
if response_count == 2:
627+
break
628+
629+
infer_response = next(infer_responses)
630+
self.assertFalse(infer_response.has_error())
631+
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
632+
self.assertIsNotNone(output0)
633+
self.assertEqual(response_value, output0.as_numpy())
634+
635+
# The iterator now should only have 2 remaining responses.
636+
infer_responses = self._test_response_iterator_square(
637+
2, response_value, infer_responses)
638+
639+
# case 4. Delete the iterator before all the responses have been
640+
# retrieved.
641+
infer_responses = infer_request.exec(decoupled=True)
642+
643+
infer_response = next(infer_responses)
644+
self.assertFalse(infer_response.has_error())
645+
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
646+
self.assertIsNotNone(output0)
647+
self.assertEqual(response_value, output0.as_numpy())
648+
649+
del infer_responses
650+
651+
556652
class TritonPythonModel:
557653

558654
def execute(self, requests):

qa/python_models/bls_async/model.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -97,27 +97,29 @@ def verify_square_results(input0, infer_responses):
9797
flush=True)
9898
return False
9999

100-
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
100+
101+
if len(infer_response.output_tensors()) > 0:
102+
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
101103

102-
if (output0 is None):
103-
return False
104+
if (output0 is None):
105+
return False
104106

105-
if not output0.is_cpu():
106-
output0 = from_dlpack(
107-
output0.to_dlpack()).to('cpu').cpu().detach().numpy()
108-
else:
109-
output0 = output0.as_numpy()
107+
if not output0.is_cpu():
108+
output0 = from_dlpack(
109+
output0.to_dlpack()).to('cpu').cpu().detach().numpy()
110+
else:
111+
output0 = output0.as_numpy()
110112

111-
expected_output = input0
113+
expected_output = input0
112114

113-
if not np.all(expected_output == input0):
114-
print(f'For OUT expected {expected_output} found {output0}')
115-
return False
115+
if not np.all(expected_output == input0):
116+
print(f'For OUT expected {expected_output} found {output0}')
117+
return False
116118

117119
response_count += 1
118120

119-
if not np.all(response_count == input0):
120-
print('Expected {} responses, got {}'.format(input0, response_count))
121+
if not np.all(input0 == response_count-1):
122+
print('Expected {} responses, got {}'.format(input0, response_count-1))
121123
return False
122124

123125
return True

qa/python_models/bls_memory/model.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def _send_identity_tensor(self, size, is_decoupled):
5656
return input0_np, infer_response
5757

5858
def test_bls_out_of_memory(self):
59-
tensor_size = 1024 * 1024 * 1024
59+
tensor_size = 256 * 1024 * 1024
6060
input0_np, infer_response = self._send_identity_tensor(
6161
tensor_size, self._is_decoupled)
6262
out_of_memory_message = "Failed to increase the shared memory pool size for key"

qa/python_models/bls_memory_async/model.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ async def _send_identity_tensor(size, is_decoupled):
5050
async def test_bls_out_of_memory():
5151
is_decoupled = True if os.environ['BLS_KIND'] == "decoupled" else False
5252

53-
tensor_size = 1024 * 1024 * 1024
53+
tensor_size = 256 * 1024 * 1024
5454
input0_np, infer_response = await _send_identity_tensor(
5555
tensor_size, is_decoupled)
5656

0 commit comments

Comments
 (0)