Conversation
Add 5 new endpoint types for benchmarking KServe deployments:
- kserve_chat, kserve_completions, kserve_embeddings (reuse OpenAI
classes with /openai prefix)
- kserve_v2_infer (V2 Open Inference Protocol for Triton/TRT-LLM)
- kserve_v1_predict (V1 TensorFlow Serving style)
Includes {model_name} path template substitution in transport layer,
health_path metadata field, and comprehensive documentation.
Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Add a gRPC transport that enables benchmarking KServe V2 inference servers over gRPC with HTTP/2 multiplexing and server-side streaming. Users switch from HTTP to gRPC by using --url grpc://host:port with the same kserve_v2_infer endpoint. Transport implementation: - GrpcTransport (BaseTransport) with @on_init/@on_stop lifecycle - GrpcClient wrapping grpc.aio for ModelInfer and ModelStreamInfer RPCs - Payload converter: dict <-> protobuf for transparent endpoint compat - GrpcTraceData with gRPC status code/message fields - Status code mapping (gRPC -> HTTP) for consistent metrics - TLS support via grpcs:// scheme - Request cancellation via asyncio.wait_for Framework changes: - Broaden FirstTokenCallback from SSEMessage to InferenceServerResponse - Enable kserve_v2_infer supports_streaming for gRPC ModelStreamInfer - Register grpc transport in plugins.yaml - Add grpcio, protobuf deps; grpcio-tools dev dep - Exclude generated proto stubs from ruff Documentation: - New grpc-transport.md tutorial (architecture, usage, trace data, troubleshooting) - Updated kserve.md with Section 8 gRPC transport and streaming clarifications - Updated patterns.md with Transport and Trace Data developer patterns Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Decouple GrpcTransport and GenericGrpcClient from KServe V2 protobuf stubs. All proto knowledge is now isolated in pluggable serializer classes loaded dynamically from endpoint metadata in plugins.yaml. - Add GrpcSerializerProtocol (runtime_checkable) for type-safe interface - Add KServeV2GrpcSerializer wrapping existing payload_converter - Add StreamChunk dataclass as protocol-agnostic streaming container - Add GrpcEndpointConfig schema for plugins.yaml grpc metadata block - Rewrite GenericGrpcClient to operate on raw bytes (identity passthrough) - GrpcTransport loads serializer/method paths from endpoint metadata - Update and rewrite gRPC documentation for new architecture Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
…gRPC transport Replace the single GenericGrpcClient with per-target channel pooling and full connection reuse strategy support (POOLED, NEVER, STICKY_USER_SESSIONS), mirroring the aiohttp transport pattern. Key changes: - GrpcChannelLeaseManager for sticky-user-sessions strategy - Per-target channel pool with lazy creation for POOLED - Per-request channel create/close for NEVER - GrpcUnaryResult/GrpcStreamCall for trailing metadata capture - Two-stage cancellation (channel-ready + cancel timer) - Mixed-scheme validation at init - Trace data enrichment (request/response headers, status codes) - Absorb payload_converter.py into kserve_v2_serializers.py - raw_output_contents support for Triton responses - Triton integration test marker and Makefile target Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Source fixes: add type hint to _consume_stream closure, wrap debug calls in lambdas, fix timeout=0 treated as falsy in grpc_client, capture lease_manager ref to prevent race, cancel stream_call on timeout, and remove metadata() classmethod from both transports (belongs in plugins.yaml only). Adds 75 new unit tests covering GenericGrpcClient, metadata/target parsing, error paths, streaming edge cases, additional payload datatypes, raw_output_contents, and trace data export registration. Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
…ation Add kserve_v2_embeddings and kserve_v2_rankings endpoints with plugin registration. Fix review issues: falsy data[0] check in V2 infer parse_response, inconsistent max_tokens None checks, DRY violation in rankings format_payload (extract _extract_query_and_passages helper), incomplete gRPC status code mapping, and dead guard in gRPC client close. Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Add two new KServe V2 endpoints for vision-language models and diffusion image generation, both using the same V2 /infer tensor protocol with different tensor layouts. Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Try out this PRQuick install: pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@f9f922235e856dc3e46ad92e0197c6e16008dabeRecommended with virtual environment (using uv): uv venv --python 3.12 && source .venv/bin/activate
uv pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@f9f922235e856dc3e46ad92e0197c6e16008dabeLast updated for commit: |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
WalkthroughAdds end-to-end gRPC support for KServe (V1/V2): new endpoint implementations, a pluggable gRPC transport and client, KServe V2 protobuf/stubs and serializers, schema/plugin registry updates, docs/tutorials, Makefile/pytest wiring, many tests, mock-server and test-harness extensions, and tooling to generate stubs. Changes
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. No actionable comments were generated in the recent review. 🎉 🧹 Recent nitpick comments
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Fix all issues with AI agents
In `@docs/dev/adding-grpc-endpoints.md`:
- Around line 455-464: The Related section contains a duplicate link to the same
source file (kserve_v2_serializers.py) with identical anchor text "Source:
KServeV2GrpcSerializer"; update docs/dev/adding-grpc-endpoints.md to either
remove the redundant entry or merge them into one entry and, if both references
are needed, give distinct anchor text (e.g., "Source: KServeV2GrpcSerializer —
Reference serializer implementation" and "Source: KServeV2GrpcSerializer — V2
dict/protobuf conversion") so the two links to kserve_v2_serializers.py are no
longer identical.
In `@docs/tutorials/grpc-transport.md`:
- Around line 328-329: Remove the duplicate link to kserve_v2_serializers.py in
the docs: keep one entry for the KServe V2 serializer and either delete the
second line or update it to point to the intended different source file
(whichever was meant to document "V2 dict/protobuf conversion"); locate the
duplicated entries referencing kserve_v2_serializers.py and ensure only the
correct link and description remain.
In `@docs/tutorials/kserve.md`:
- Around line 12-23: Update the KServe endpoint types table to include the two
missing V2 entries: add a row for kserve_v2_vlm with Protocol "V2 Open Inference
Protocol", URL Path "/v2/models/{model_name}/infer", Streaming "Yes (gRPC)",
Token Metrics "Yes", and Use Case "Vision-language models on Triton", and add a
row for kserve_v2_images with Protocol "V2 Open Inference Protocol", URL Path
"/v2/models/{model_name}/infer", Streaming "No", Token Metrics "No", and Use
Case "Image generation models (Stable Diffusion, SDXL, Flux) on Triton" so the
table includes kserve_v2_vlm and kserve_v2_images alongside the existing
endpoint rows.
In `@pyproject.toml`:
- Line 32: Update the protobuf dependency constraint in pyproject.toml to avoid
the yanked 5.29.0 release: locate the dependency entry "protobuf>=5.29.0,<6" and
change it to a non-yanked minimum (for example "protobuf>=5.30.0,<6"); keep the
existing "grpcio~=1.71.0" entry untouched but ensure the new protobuf range is
compatible with it.
In `@src/aiperf/transports/grpc/kserve_v2_serializers.py`:
- Around line 206-235: The code currently skips zero dimensions when computing
num_elements, causing shapes like [0] to yield a non-zero count and trigger
struct.error; change the computation in the block that builds num_elements
(which currently iterates "for dim in shape") to multiply all dimensions
directly (do not skip zeros) so zero in shape yields num_elements == 0, and then
add a guard: if num_elements == 0 return an empty list immediately before
calling struct.unpack_from; keep the existing fmt/elem_size logic and use the
same symbols (shape, num_elements, datatype, fmt_map, raw_bytes, elem_size,
struct.unpack_from) so the rest of the function remains unchanged.
In `@tests/aiperf_mock_server/app.py`:
- Around line 812-814: The code can raise TypeError when JSON includes a "model"
key because KServeV2InferRequest is called with both model=model_name and **raw;
before constructing the request (around the body = await request.body(); raw =
orjson.loads(body) code), defensively remove any "model" key from raw (e.g.,
raw.pop("model", None)) or otherwise ensure raw doesn't contain "model" so that
KServeV2InferRequest(model=model_name, **raw) cannot receive duplicate values;
update the call site to use the cleaned raw when creating KServeV2InferRequest.
In `@tests/harness/fake_transport.py`:
- Around line 488-499: The current _do_v2_embedding reads inp.req.inputs[0]
unguarded which can raise IndexError if inputs is empty; update _do_v2_embedding
to defensively handle missing/empty inputs by checking inp.req.inputs (or using
getattr) and defaulting to an empty data list when no input tensor exists, e.g.
compute data = inp.req.inputs[0].get("data", []) if inp.req.inputs else []; then
build texts = [str(d) for d in data] and continue to call
_build_v2_embedding_response as before so the harness returns an empty-embedding
response instead of crashing.
🧹 Nitpick comments (16)
src/aiperf/transports/aiohttp_transport.py (1)
175-178:str.format()can raise on unexpected placeholders in user-supplied paths.If a custom endpoint or metadata path contains brace expressions other than
{model_name}(e.g.,{version},{0}, or even a stray{),str.format(model_name=...)will raiseKeyErrororIndexErrorat runtime. Consider using a safer substitution approach.🛡️ Suggested safer substitution
- if "{" in path: - path = path.format( - model_name=request_info.model_endpoint.primary_model_name - ) + path = path.replace( + "{model_name}", + request_info.model_endpoint.primary_model_name, + )Apply the same change in both branches (custom endpoint at Line 175 and metadata path at Line 197).
Also applies to: 197-200
tests/unit/transports/test_path_template.py (1)
41-51: Missingendpoint_headersandendpoint_paramsfields in helper.
_create_request_infoomitsendpoint_headersandendpoint_paramsthat other test fixtures (e.g., intest_base_transport.py) include. IfRequestInforequires these fields or if future test extensions need headers, this will need updating. Not blocking sinceget_url()doesn't use them, but worth aligning with the existing fixture pattern for consistency.src/aiperf/transports/grpc/status_mapping.py (1)
9-27: Consider using the enum member directly as dictionary key instead of extracting.value[0].Accessing
.value[0]relies on the internal tuple representation ofgrpc.StatusCode. If the grpc library ever changes the enum value format, this breaks silently. Since the function receives anint, you could simply use integer literal keys (which are stable per the gRPC spec) or document why.value[0]is used.That said, this approach does self-document which gRPC code each entry corresponds to, which is a readability benefit. Acceptable as-is.
tests/aiperf_mock_server/app.py (1)
756-791:_build_v2_embedding_responseand_build_v2_ranking_responseare defined but unused.These builders are not called from any endpoint in this file. Since the PR is WIP, I assume these will be wired up to additional V2 endpoints later. Just flagging for visibility.
Would you like me to open a tracking issue for wiring up the V2 embedding and ranking endpoints?
tests/unit/transports/grpc/test_grpc_client.py (1)
199-228: Unusedcall_countvariable in state transition test.
call_count(line 211) is incremented but never asserted on. It appears to be leftover debug tracking. Consider removing it or adding an assertion to validate the expected number of state queries.Remove unused counter
# First call with try_to_connect=True, then without - call_count = 0 def get_state(try_to_connect: bool = False) -> grpc.ChannelConnectivity: - nonlocal call_count - call_count += 1 return next(states)src/aiperf/endpoints/kserve_v2_images.py (1)
84-98: Consider noting thatcast_fn(value)can raise on bad user input.If a user provides a non-numeric value for a numeric typed tensor (e.g.,
--extra guidance_scale:abc),float("abc")will raise aValueErrorwith a somewhat opaque traceback originating from the cast call. This is an edge case, and the current behavior (letting it raise) is acceptable, but a more user-friendly error message could be considered in a follow-up.src/aiperf/endpoints/kserve_v2_vlm.py (2)
49-57: Empty prompt silently submitted when no text content is found.If
turn.textsis empty or alltext.contentsare falsy,promptbecomes""and is still sent as the text tensor. This could make debugging tricky — consider logging a warning or raising if the VLM endpoint requires non-empty text input.💡 Optional: warn on empty prompt
prompt = " ".join(prompts) if prompts else "" + if not prompt: + self.warning(lambda: "No text content found in turn; sending empty prompt")
68-70: Nit: comment describes "what" rather than "why".As per coding guidelines, comments should explain why rather than restate the code. The code already makes it clear that image data is being extracted.
As per coding guidelines:
Comments should explain 'why?' not 'what'src/aiperf/transports/grpc/proto/grpc_predict_v2.proto (1)
15-19: Vendored proto — Buf PACKAGE_DIRECTORY_MATCH warning is expected and can be suppressed.The static analysis flags that package
inferencedoesn't match the file pathsrc/aiperf/transports/grpc/proto. This is a standard consequence of vendoring upstream protos into a project-specific directory. Consider adding abuf.yamllint exception or a comment noting this is intentionally vendored if the Buf linter runs in CI.tests/component_integration/endpoints/test_kserve_v2_vlm_endpoint.py (1)
17-31: Test name could include expected outcome per naming convention.The coding guidelines recommend
test_<function>_<scenario>_<expected>naming. Consider renaming to something liketest_kserve_v2_vlm_synthetic_returns_expected_request_countto capture the expected outcome.That said, this is a minor nit — the test logic itself looks correct.
tests/unit/transports/grpc/test_grpc_trace_data.py (1)
82-90: Registration test accesses private_model_lookup_table.This is fine for verifying discriminator registration, but it couples the test to the internal lookup mechanism. If the registration API changes, this test will break. Consider adding a comment noting this is intentionally testing internal registration behavior.
tests/unit/endpoints/test_kserve_v2_rankings_endpoint.py (1)
207-322: Consider adding a test for non-numeric score handling.
extract_rankingshas atry/exceptpath that warns on non-numeric scores (line 131-134 inkserve_v2_rankings.py). There's no test exercising this branch. A quick parametrized test with a mixed[0.9, "bad", 0.3]data array would cover the warning + skip logic.tests/unit/endpoints/test_kserve_v2_vlm_endpoint.py (1)
152-245: Consider adding an empty-data parse test for consistency.The images endpoint tests include a
test_parse_response_empty_datacase (emptydata: []). The VLM parse_response has similar logic (isinstance(data, list) and len(data) > 0) but no equivalent test here. Adding one would ensure parity and guard the empty-data branch.src/aiperf/endpoints/kserve_v2_infer.py (1)
83-128:_extract_text_from_outputreturnsNonefor empty-string data, causingmake_text_response_datato also returnNone.If a V2 output tensor contains
data: [""],_extract_text_from_outputreturns"", andmake_text_response_data("")returnsNone(since empty string is falsy). The fallback loop then skips this output and tries the next one. This seems intentional for "no meaningful content," but it's worth calling out: a legitimate empty-string response (e.g., a model that returned"") would be silently dropped. If that's intended, no action needed.src/aiperf/transports/grpc/grpc_transport.py (2)
114-119:close_alldoesn't handle individual close failures.If one
lease.close()raises, remaining leases won't be closed. Consider suppressing per-lease errors during shutdown.Proposed fix
async def close_all(self) -> None: """Close all active channel leases.""" leases = list(self._leases.values()) self._leases.clear() for lease in leases: - await lease.close() + with contextlib.suppress(Exception): + await lease.close()(Requires
import contextlibat the top of the file.)
254-264: Sameclose_allconcern applies to_close_grpc_clientpool cleanup.Similar to
GrpcChannelLeaseManager.close_all, if oneclient.close()raises on line 264, remaining pool clients won't be closed.Proposed fix
`@on_stop` async def _close_grpc_client(self) -> None: """Close all gRPC channels and the lease manager.""" if self._lease_manager: lease_manager = self._lease_manager self._lease_manager = None await lease_manager.close_all() clients = list(self._channel_pool.values()) self._channel_pool.clear() for client in clients: - await client.close() + with contextlib.suppress(Exception): + await client.close()
… test coverage Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
47f02af to
8d70753
Compare
- Merge duplicate doc links to kserve_v2_serializers.py - Add missing kserve_v2_vlm and kserve_v2_images to endpoint table - Bump protobuf minimum to 5.29.1 to avoid yanked 5.29.0 - Fix zero-dimension shape producing wrong element count in serializer - Prevent duplicate "model" kwarg TypeError in mock server - Guard against empty inputs IndexError in fake transport - Use safer .replace() instead of str.format() for path templates - Suppress individual close() errors during gRPC shutdown Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com> # Conflicts: # docs/cli_options.md # src/aiperf/plugin/schema/plugins.schema.json # src/aiperf/plugin/schema/schemas.py # src/aiperf/transports/aiohttp_transport.py
- Remove unused call_count variable in test_grpc_client - Remove "what" comment in kserve_v2_vlm (guidelines: comments explain "why") - Add test for non-numeric score handling in rankings endpoint - Add empty-data parse test for VLM endpoint Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Nitpick Review ResponsesResponding to the 16 nitpick comments from the automated review: Addressed:
Not addressing (intentional):
9-16. Remaining nitpicks (proto file structure, proto vendoring approach, etc.): These were informational observations, not actionable suggestions. @coderabbitai please review |
|
Addressed items — Good fixes:
Intentionally not addressed — All reasonable: I'll trigger a fresh review now. ✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/aiperf/transports/grpc/grpc_transport.py`:
- Around line 506-511: The current calculation of send_timeout uses a truthy
check (self.model_endpoint.endpoint.timeout or _CHANNEL_READY_TIMEOUT_S) which
treats 0/0.0 as falsy; change it to only fall back to _CHANNEL_READY_TIMEOUT_S
when timeout is None (e.g., use the endpoint.timeout if endpoint.timeout is not
None, otherwise the default), and keep the min(...) wrapping intact so
send_timeout = min((self.model_endpoint.endpoint.timeout if
self.model_endpoint.endpoint.timeout is not None else _CHANNEL_READY_TIMEOUT_S),
_CHANNEL_READY_TIMEOUT_S); reference the send_timeout variable, the
self.model_endpoint.endpoint.timeout attribute, and the _CHANNEL_READY_TIMEOUT_S
constant when making this change.
🧹 Nitpick comments (5)
tests/aiperf_mock_server/app.py (1)
723-725: Unparameterizeddictin type hint.
list[dict]should belist[dict[str, Any]]for consistency with the rest of the file.Proposed fix
-def _has_input_tensor(inputs: list[dict], name: str) -> bool: +def _has_input_tensor(inputs: list[dict[str, Any]], name: str) -> bool:src/aiperf/transports/grpc/grpc_client.py (1)
108-134:wait_for_ready— good separation of connection setup from request timing.The SHUTDOWN state check prevents infinite loops, and
asyncio.wait_forwrapping ensures the caller can bound the connection wait. One subtle note: if the channel transitions toTRANSIENT_FAILURE, the loop will keep waiting (which is the correct behavior for retryable connection failures), but consider whether a max-retry or logging on transient failures would be useful for debugging slow startups.src/aiperf/endpoints/kserve_v2_embeddings.py (1)
113-130: Consider validating shape consistency in_reshape_embeddings.If
n * d != len(flat_data), the reshape silently produces truncated or padded embeddings. This could mask server-side issues. A debug/warning log when the product doesn't match the data length would help troubleshoot.💡 Optional: add a length mismatch warning
`@staticmethod` def _reshape_embeddings( flat_data: list[float], shape: list[int] ) -> list[list[float]]: if len(shape) == 2: n, d = shape + if n * d != len(flat_data): + # Log would require instance method; consider validating in parse_response instead + pass return [flat_data[i * d : (i + 1) * d] for i in range(n)] return [flat_data]Since
_reshape_embeddingsis a@staticmethodand can't log, the validation could be done inparse_responsebefore calling it.src/aiperf/transports/base_transports.py (1)
21-34: Minor docstring inconsistency inFirstTokenCallback.Line 27 documents the second parameter as
messagewhile the type now acceptsInferenceServerResponse. Consider renaming it toresponsefor consistency with the updated terminology used elsewhere in this file (e.g., Line 166).📝 Suggested docstring update
Args: ttft_ns: duration from request start - message: the first inference server response (SSEMessage, TextResponse, etc.) + response: the first inference server response (SSEMessage, TextResponse, etc.)tests/unit/transports/grpc/test_grpc_transport.py (1)
150-151: Inaccurate return type annotation oninitial_metadata.
tuple[()]means "a tuple containing one element which is an empty tuple." The actual return value()is an empty tuple, so the annotation should betuple[tuple[str, str], ...]to match the real gRPC API, or simplytuple[()]could be replaced.This is a minor mock inconsistency and won't break tests, but could confuse readers.
Suggested fix
- async def initial_metadata(self) -> tuple[()]: - return () + async def initial_metadata(self) -> tuple[tuple[str, str], ...]: + return ()
Avoid treating timeout=0 as falsy when falling back to the default channel-ready timeout. Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Summary by CodeRabbit
New Features
Documentation
Tests
Chores