Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions merlin/systems/triton/models/executor_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,19 @@ def execute(self, request):
A list of pb_utils.InferenceResponse. The length of this list must
be the same as `requests`
"""
inputs = triton_request_to_tensor_table(request, self.ensemble.input_schema)
inputs = triton_request_to_tensor_table(request, self.ensemble.input_schema.column_names)

try:
outputs = self.ensemble.transform(inputs, runtime=TritonExecutorRuntime())
except Exception as exc:
raise pb_utils.TritonModelException(str(exc)) from exc
return tensor_table_to_triton_response(outputs, self.ensemble.output_schema)
import traceback

raise pb_utils.TritonModelException(
f"Error: {type(exc)} - {str(exc)}, "
f"Traceback: {traceback.format_tb(exc.__traceback__)}"
) from exc

return tensor_table_to_triton_response(outputs, self.ensemble.output_schema.column_names)


def _parse_model_repository(model_repository: str) -> str:
Expand Down
47 changes: 29 additions & 18 deletions merlin/systems/triton/models/workflow_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,24 +97,35 @@ def execute(self, request):
"""Transforms the input batches by running through a NVTabular workflow.transform
function.
"""
# transform the triton tensors to a dict of name:numpy tensor
input_tensors = {
name: _convert_tensor(pb_utils.get_input_tensor_by_name(request, name))
for name in self.input_dtypes
}

# multihots are represented as a tuple of (values, offsets)
for name, dtype in self.input_multihots.items():
values = _convert_tensor(pb_utils.get_input_tensor_by_name(request, name + "__values"))
offsets = _convert_tensor(
pb_utils.get_input_tensor_by_name(request, name + "__offsets")
)
input_tensors[name] = (values, offsets)

transformed = self.runner.run_workflow(input_tensors)
result = [pb_utils.Tensor(name, data) for name, data in transformed.items()]

return pb_utils.InferenceResponse(result)
try:
# transform the triton tensors to a dict of name:numpy tensor
input_tensors = {
name: _convert_tensor(pb_utils.get_input_tensor_by_name(request, name))
for name in self.input_dtypes
}

# multihots are represented as a tuple of (values, offsets)
for name, dtype in self.input_multihots.items():
values = _convert_tensor(
pb_utils.get_input_tensor_by_name(request, name + "__values")
)
offsets = _convert_tensor(
pb_utils.get_input_tensor_by_name(request, name + "__offsets")
)
input_tensors[name] = (values, offsets)

transformed = self.runner.run_workflow(input_tensors)
result = [pb_utils.Tensor(name, data) for name, data in transformed.items()]

return pb_utils.InferenceResponse(result)

except Exception as exc:
import traceback

raise pb_utils.TritonModelException(
f"Error: {type(exc)} - {str(exc)}, "
f"Traceback: {traceback.format_tb(exc.__traceback__)}"
) from exc

def _is_list_dtype(self, column: str) -> bool:
"""Check if a column of a Workflow contains list elements"""
Expand Down