Skip to content
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d28dc58
Update docstrings and fix README
h3xxit Aug 26, 2025
018806c
Merge pull request #52 from universal-tool-calling-protocol/dev
h3xxit Aug 27, 2025
0f2af7e
Merge pull request #53 from universal-tool-calling-protocol/dev
h3xxit Aug 27, 2025
d28c0af
Update documentation and fix MCP plugin
h3xxit Sep 7, 2025
7ba8b3c
Merge pull request #61 "Update CLI" from universal-tool-calling-proto…
h3xxit Sep 7, 2025
908cd40
Merge pull request #63 from universal-tool-calling-protocol/dev
h3xxit Sep 8, 2025
74a11e2
Merge pull request #69 from universal-tool-calling-protocol/dev
h3xxit Sep 21, 2025
03a4b9f
Merge pull request #70 from universal-tool-calling-protocol/dev
h3xxit Oct 7, 2025
8443cda
Merge branch 'dev'
h3xxit Oct 7, 2025
0150a3b
Merge branch 'dev'
h3xxit Oct 7, 2025
6e2c671
socket protocol updated to be compatible with 1.0v utcp
Oct 25, 2025
9cea90f
cubic fixes done
Oct 26, 2025
7016987
pinned mcp-use to use langchain 0.3.27
Oct 26, 2025
718b668
removed mcp denpendency on langchain
Oct 27, 2025
ca252e5
adding the langchain dependency for testing (temporary)
Oct 27, 2025
45793cf
remove langchain-core pin to resolve dependency conflict
Oct 27, 2025
662d07d
feat: Updated Graphql implementation to be compatible with UTCP 1.0v
Nov 6, 2025
3aed349
Added gql 'how to use' guide in the README.md
Nov 6, 2025
dca4d26
updated cubic comments for GraphQl
Nov 12, 2025
4a2aea4
Update comment on delimeter handling
Thuraabtech Nov 17, 2025
21cbab7
Merge branch 'dev' into feature/graphql-1.0v
h3xxit Nov 29, 2025
700ec92
added gRPC-gnmi protocol to utcp
Dec 14, 2025
70ff230
Fix copilot comments to gRPC protocol implementation
Dec 14, 2025
9bbb819
fixed cubic comments for gRPC gnmi protocol for UTCP -2
Dec 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions plugins/communication_protocols/gnmi/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"

[project]
name = "utcp-gnmi"
version = "1.0.0"
authors = [
{ name = "UTCP Contributors" },
]
description = "UTCP gNMI communication protocol plugin over gRPC"
readme = "README.md"
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pyproject.toml references a README.md file that does not exist in the gnmi plugin directory. This will cause packaging issues. Either create a README.md file with appropriate documentation (similar to the GraphQL plugin's README) or remove the readme field from the project configuration.

Suggested change
readme = "README.md"

Copilot uses AI. Check for mistakes.
requires-python = ">=3.10"
dependencies = [
"pydantic>=2.0",
"protobuf>=4.21",
"grpcio>=1.60",
"utcp>=1.0",
"aiohttp>=3.8"
]
license = "MPL-2.0"

[project.optional-dependencies]
dev = [
"build",
"pytest",
"pytest-asyncio",
"pytest-cov",
]

[project.entry-points."utcp.plugins"]
gnmi = "utcp_gnmi:register"
12 changes: 12 additions & 0 deletions plugins/communication_protocols/gnmi/src/utcp_gnmi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from utcp.plugins.discovery import register_communication_protocol, register_call_template
from utcp_gnmi.gnmi_communication_protocol import GnmiCommunicationProtocol
from utcp_gnmi.gnmi_call_template import GnmiCallTemplateSerializer

def register():
register_communication_protocol("gnmi", GnmiCommunicationProtocol())
register_call_template("gnmi", GnmiCallTemplateSerializer())

__all__ = [
"GnmiCommunicationProtocol",
"GnmiCallTemplateSerializer",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import Optional, Dict, List, Literal
from pydantic import Field
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'Field' is not used.

Suggested change
from pydantic import Field

Copilot uses AI. Check for mistakes.

from utcp.data.call_template import CallTemplate
from utcp.interfaces.serializer import Serializer
from utcp.exceptions import UtcpSerializerValidationError
import traceback

class GnmiCallTemplate(CallTemplate):
call_template_type: Literal["gnmi"] = "gnmi"
target: str
use_tls: bool = True
metadata: Optional[Dict[str, str]] = None
metadata_fields: Optional[List[str]] = None
operation: Literal["capabilities", "get", "set", "subscribe"] = "get"
stub_module: str = "gnmi_pb2_grpc"
message_module: str = "gnmi_pb2"

class GnmiCallTemplateSerializer(Serializer[GnmiCallTemplate]):
def to_dict(self, obj: GnmiCallTemplate) -> dict:
return obj.model_dump()

def validate_dict(self, obj: dict) -> GnmiCallTemplate:
try:
return GnmiCallTemplate.model_validate(obj)
except Exception as e:
raise UtcpSerializerValidationError("Invalid GnmiCallTemplate: " + traceback.format_exc()) from e
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
import importlib
from typing import Dict, Any, List, Optional, AsyncGenerator

from utcp.interfaces.communication_protocol import CommunicationProtocol
from utcp.data.call_template import CallTemplate
from utcp.data.tool import Tool, JsonSchema
from utcp.data.utcp_manual import UtcpManual
from utcp.data.register_manual_response import RegisterManualResult
from utcp_gnmi.gnmi_call_template import GnmiCallTemplate
from utcp.data.auth_implementations.api_key_auth import ApiKeyAuth
from utcp.data.auth_implementations.basic_auth import BasicAuth
from utcp.data.auth_implementations.oauth2_auth import OAuth2Auth

class GnmiCommunicationProtocol(CommunicationProtocol):
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class is missing an __init__ method to initialize the _oauth_tokens dictionary that would be used for OAuth2 token caching (referenced in comment ID 006). Without this initialization, the _handle_oauth2 method will fail with an AttributeError when it tries to access self._oauth_tokens. Add an __init__ method that initializes self._oauth_tokens = {}.

Suggested change
class GnmiCommunicationProtocol(CommunicationProtocol):
class GnmiCommunicationProtocol(CommunicationProtocol):
def __init__(self):
super().__init__()
self._oauth_tokens = {}

Copilot uses AI. Check for mistakes.
async def register_manual(self, caller, manual_call_template: CallTemplate) -> RegisterManualResult:
if not isinstance(manual_call_template, GnmiCallTemplate):
raise ValueError("GnmiCommunicationProtocol can only be used with GnmiCallTemplate")

target = manual_call_template.target
if manual_call_template.use_tls:
pass
else:
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The empty pass statement in the TLS branch serves no purpose and makes the code harder to read. Consider removing it or adding a comment explaining why TLS is always allowed.

Suggested change
if manual_call_template.use_tls:
pass
else:
if not manual_call_template.use_tls:

Copilot uses AI. Check for mistakes.
if not (target.startswith("localhost") or target.startswith("127.0.0.1")):
return RegisterManualResult(
success=False,
manual_call_template=manual_call_template,
manual=UtcpManual(manual_version="0.0.0", tools=[]),
errors=["Insecure channel only allowed for localhost or 127.0.0.1"]
)

tools: List[Tool] = []
ops = ["capabilities", "get", "set", "subscribe"]
for op in ops:
tct = GnmiCallTemplate(
name=manual_call_template.name,
call_template_type="gnmi",
auth=manual_call_template.auth,
target=manual_call_template.target,
use_tls=manual_call_template.use_tls,
metadata=manual_call_template.metadata,
metadata_fields=manual_call_template.metadata_fields,
operation=op,
stub_module=manual_call_template.stub_module,
message_module=manual_call_template.message_module,
)
inputs = JsonSchema(type="object", properties={})
outputs = JsonSchema(type="object", properties={})
tool = Tool(
name=op,
description="",
inputs=inputs,
outputs=outputs,
tags=["gnmi", op],
tool_call_template=tct,
)
tools.append(tool)

manual = UtcpManual(manual_version="1.0.0", tools=tools)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The manual version should come from the manual endpoint. You can be opinionated on how the UtcpManual should be transfered accross the network, and ideally keept the transfered object as close to the original UtcpManual as possible, so that the user on the other side can basically just expose an endpoint that returns this UtcpManual json/object

return RegisterManualResult(
success=True,
manual_call_template=manual_call_template,
manual=manual,
errors=[],
)

async def deregister_manual(self, caller, manual_call_template: CallTemplate) -> None:
return None

async def call_tool(self, caller, tool_name: str, tool_args: Dict[str, Any], tool_call_template: CallTemplate) -> Any:
if not isinstance(tool_call_template, GnmiCallTemplate):
raise ValueError("GnmiCommunicationProtocol can only be used with GnmiCallTemplate")

op = tool_call_template.operation
target = tool_call_template.target

metadata: List[tuple[str, str]] = []
if tool_call_template.metadata:
metadata.extend([(k, v) for k, v in tool_call_template.metadata.items()])
if tool_call_template.metadata_fields:
for k in tool_call_template.metadata_fields:
if k in tool_args:
metadata.append((k, str(tool_args[k])))
if tool_call_template.auth:
if isinstance(tool_call_template.auth, ApiKeyAuth):
if tool_call_template.auth.api_key:
metadata.append((tool_call_template.auth.var_name or "authorization", tool_call_template.auth.api_key))
elif isinstance(tool_call_template.auth, BasicAuth):
import base64
token = base64.b64encode(f"{tool_call_template.auth.username}:{tool_call_template.auth.password}".encode()).decode()
metadata.append(("authorization", f"Basic {token}"))
elif isinstance(tool_call_template.auth, OAuth2Auth):
token = await self._handle_oauth2(tool_call_template.auth)
metadata.append(("authorization", f"Bearer {token}"))

grpc = importlib.import_module("grpc")
aio = importlib.import_module("grpc.aio")
json_format = importlib.import_module("google.protobuf.json_format")
stub_mod = importlib.import_module(tool_call_template.stub_module)
msg_mod = importlib.import_module(tool_call_template.message_module)

if tool_call_template.use_tls:
creds = grpc.ssl_channel_credentials()
channel = aio.secure_channel(target, creds)
else:
channel = aio.insecure_channel(target)

stub = None
for attr in dir(stub_mod):
if attr.endswith("Stub"):
stub_cls = getattr(stub_mod, attr)
stub = stub_cls(channel)
break
if stub is None:
raise ValueError("gNMI stub not found in stub_module")
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module import and stub discovery logic (lines 95-114) is duplicated in both call_tool and call_tool_streaming (lines 168-186). This duplication creates a maintenance burden. Consider extracting this into a private helper method like _create_grpc_stub to eliminate the duplication.

Copilot uses AI. Check for mistakes.

if op == "capabilities":
req = getattr(msg_mod, "CapabilityRequest")()
resp = await stub.Capabilities(req, metadata=metadata)
elif op == "get":
req = getattr(msg_mod, "GetRequest")()
paths = tool_args.get("paths", [])
for p in paths:
path_msg = getattr(msg_mod, "Path")()
for elem in [e for e in p.strip("/").split("/") if e]:
pe = getattr(msg_mod, "PathElem")(name=elem)
path_msg.elem.append(pe)
req.path.append(path_msg)
resp = await stub.Get(req, metadata=metadata)
elif op == "set":
req = getattr(msg_mod, "SetRequest")()
updates = tool_args.get("updates", [])
for upd in updates:
path_msg = getattr(msg_mod, "Path")()
for elem in [e for e in str(upd.get("path", "")).strip("/").split("/") if e]:
pe = getattr(msg_mod, "PathElem")(name=elem)
path_msg.elem.append(pe)
val = getattr(msg_mod, "TypedValue")(json_val=str(upd.get("value", "")))
update_msg = getattr(msg_mod, "Update")(path=path_msg, val=val)
req.update.append(update_msg)
resp = await stub.Set(req, metadata=metadata)
elif op == "subscribe":
req = getattr(msg_mod, "SubscribeRequest")()
sub_list = getattr(msg_mod, "SubscriptionList")()
mode = tool_args.get("mode", "stream").upper()
sub_list.mode = getattr(msg_mod, "SubscriptionList".upper(), None) or 0
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getattr(msg_mod, "SubscriptionList".upper(), None) is incorrect and will always evaluate to None. It attempts to get an attribute named "SUBSCRIPTIONLIST" (all uppercase string literal) instead of the subscription mode enum value. This should likely retrieve the subscription mode from the message module based on the mode variable, such as getattr(msg_mod, f"SubscriptionList.Mode.{mode}", 0) or similar, depending on the protobuf structure.

Suggested change
sub_list.mode = getattr(msg_mod, "SubscriptionList".upper(), None) or 0
# Set the mode enum value correctly
try:
mode_enum = getattr(getattr(msg_mod, "SubscriptionList"), "Mode")
sub_list.mode = getattr(mode_enum, mode, 0)
except AttributeError:
sub_list.mode = 0

Copilot uses AI. Check for mistakes.
paths = tool_args.get("paths", [])
for p in paths:
path_msg = getattr(msg_mod, "Path")()
for elem in [e for e in p.strip("/").split("/") if e]:
pe = getattr(msg_mod, "PathElem")(name=elem)
path_msg.elem.append(pe)
sub = getattr(msg_mod, "Subscription")(path=path_msg)
sub_list.subscription.append(sub)
req.subscribe.CopyFrom(sub_list)
raise ValueError("Subscribe is streaming; use call_tool_streaming")
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message "Subscribe is streaming; use call_tool_streaming" is confusing in the context of the subscribe operation. The subscribe request is being built (lines 142-154) but then immediately raises an error. Either the subscribe case should be removed from call_tool entirely, or this error message should clarify that the operation was attempted through the wrong method. Consider removing this case from call_tool and letting the "Unsupported gNMI operation" error on line 157 handle it instead.

Suggested change
elif op == "subscribe":
req = getattr(msg_mod, "SubscribeRequest")()
sub_list = getattr(msg_mod, "SubscriptionList")()
mode = tool_args.get("mode", "stream").upper()
sub_list.mode = getattr(msg_mod, "SubscriptionList".upper(), None) or 0
paths = tool_args.get("paths", [])
for p in paths:
path_msg = getattr(msg_mod, "Path")()
for elem in [e for e in p.strip("/").split("/") if e]:
pe = getattr(msg_mod, "PathElem")(name=elem)
path_msg.elem.append(pe)
sub = getattr(msg_mod, "Subscription")(path=path_msg)
sub_list.subscription.append(sub)
req.subscribe.CopyFrom(sub_list)
raise ValueError("Subscribe is streaming; use call_tool_streaming")

Copilot uses AI. Check for mistakes.
else:
raise ValueError("Unsupported gNMI operation")

return json_format.MessageToDict(resp)

async def call_tool_streaming(self, caller, tool_name: str, tool_args: Dict[str, Any], tool_call_template: CallTemplate) -> AsyncGenerator[Any, None]:
if not isinstance(tool_call_template, GnmiCallTemplate):
raise ValueError("GnmiCommunicationProtocol can only be used with GnmiCallTemplate")
if tool_call_template.operation != "subscribe":
result = await self.call_tool(caller, tool_name, tool_args, tool_call_template)
yield result
return
grpc = importlib.import_module("grpc")
aio = importlib.import_module("grpc.aio")
json_format = importlib.import_module("google.protobuf.json_format")
stub_mod = importlib.import_module(tool_call_template.stub_module)
msg_mod = importlib.import_module(tool_call_template.message_module)
target = tool_call_template.target
if tool_call_template.use_tls:
creds = grpc.ssl_channel_credentials()
channel = aio.secure_channel(target, creds)
else:
channel = aio.insecure_channel(target)
stub = None
for attr in dir(stub_mod):
if attr.endswith("Stub"):
stub_cls = getattr(stub_mod, attr)
stub = stub_cls(channel)
break
if stub is None:
raise ValueError("gNMI stub not found in stub_module")
metadata: List[tuple[str, str]] = []
if tool_call_template.metadata:
metadata.extend([(k, v) for k, v in tool_call_template.metadata.items()])
if tool_call_template.metadata_fields:
for k in tool_call_template.metadata_fields:
if k in tool_args:
metadata.append((k, str(tool_args[k])))
if tool_call_template.auth:
if isinstance(tool_call_template.auth, ApiKeyAuth):
if tool_call_template.auth.api_key:
metadata.append((tool_call_template.auth.var_name or "authorization", tool_call_template.auth.api_key))
elif isinstance(tool_call_template.auth, BasicAuth):
import base64
token = base64.b64encode(f"{tool_call_template.auth.username}:{tool_call_template.auth.password}".encode()).decode()
metadata.append(("authorization", f"Basic {token}"))
elif isinstance(tool_call_template.auth, OAuth2Auth):
token = await self._handle_oauth2(tool_call_template.auth)
metadata.append(("authorization", f"Bearer {token}"))
req = getattr(msg_mod, "SubscribeRequest")()
sub_list = getattr(msg_mod, "SubscriptionList")()
paths = tool_args.get("paths", [])
for p in paths:
path_msg = getattr(msg_mod, "Path")()
for elem in [e for e in p.strip("/").split("/") if e]:
pe = getattr(msg_mod, "PathElem")(name=elem)
path_msg.elem.append(pe)
sub = getattr(msg_mod, "Subscription")(path=path_msg)
sub_list.subscription.append(sub)
req.subscribe.CopyFrom(sub_list)
call = stub.Subscribe(req, metadata=metadata)
async for resp in call:
yield json_format.MessageToDict(resp)
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gRPC channel created on lines 174-178 is never closed. gRPC channels should be properly closed after use to avoid resource leaks. Consider wrapping the channel usage in a try-finally block or using async context manager pattern to ensure the channel is closed even if the streaming operation is interrupted.

Copilot uses AI. Check for mistakes.

async def _handle_oauth2(self, auth_details: OAuth2Auth) -> str:
import aiohttp
client_id = auth_details.client_id
async with aiohttp.ClientSession() as session:
try:
body_data = {
"grant_type": "client_credentials",
"client_id": auth_details.client_id,
"client_secret": auth_details.client_secret,
"scope": auth_details.scope,
}
async with session.post(auth_details.token_url, data=body_data) as response:
response.raise_for_status()
token_response = await response.json()
return token_response["access_token"]
except Exception:
from aiohttp import BasicAuth as AiohttpBasicAuth
header_auth = AiohttpBasicAuth(auth_details.client_id, auth_details.client_secret)
header_data = {
"grant_type": "client_credentials",
"scope": auth_details.scope,
}
async with session.post(auth_details.token_url, data=header_data, auth=header_auth) as response:
response.raise_for_status()
token_response = await response.json()
return token_response["access_token"]
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OAuth2 token fetching does not cache tokens like other protocol implementations do (e.g., HTTP, GraphQL). This will result in fetching a new token on every call, which is inefficient and may hit rate limits. The implementation should store tokens in an instance variable (e.g., self._oauth_tokens) and check for cached tokens before making a request, similar to the GraphQL implementation at lines 56-58.

Copilot uses AI. Check for mistakes.
50 changes: 50 additions & 0 deletions plugins/communication_protocols/gnmi/tests/test_gnmi_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import sys
from pathlib import Path
import pytest

plugin_src = Path(__file__).parent.parent / "src"
sys.path.insert(0, str(plugin_src))

core_src = Path(__file__).parent.parent.parent.parent.parent / "core" / "src"
sys.path.insert(0, str(core_src))

from utcp.utcp_client import UtcpClient
from utcp_gnmi import register

@pytest.mark.asyncio
async def test_register_manual_and_tools():
register()
client = await UtcpClient.create(config={
"manual_call_templates": [
{
"name": "routerA",
"call_template_type": "gnmi",
"target": "localhost:50051",
"use_tls": False,
"operation": "get"
}
]
})
tools = await client.config.tool_repository.get_tools()
names = [t.name for t in tools]
assert any(n.startswith("routerA.") for n in names)
assert any(n.endswith("subscribe") for n in names)
Comment on lines +14 to +31
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test suite lacks coverage for the core call_tool and call_tool_streaming methods in GnmiCommunicationProtocol. While test_register_manual_and_tools validates tool registration, there are no tests for actual RPC calls (capabilities, get, set) or streaming subscribe. The GraphQL plugin includes comprehensive tests that call tools (lines 103-110 in test_graphql_protocol.py). Consider adding similar tests with mocked gRPC stubs to verify the request building and response handling logic.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot l request to apply changes based on this feedback to the current PR


def test_serializer_roundtrip():
from utcp_gnmi.gnmi_call_template import GnmiCallTemplateSerializer
serializer = GnmiCallTemplateSerializer()
data = {
"name": "routerB",
"call_template_type": "gnmi",
"target": "localhost:50051",
"use_tls": False,
"metadata": {"authorization": "Bearer token"},
"metadata_fields": ["tenant-id"],
"operation": "set",
"stub_module": "gnmi_pb2_grpc",
"message_module": "gnmi_pb2"
}
obj = serializer.validate_dict(data)
out = serializer.to_dict(obj)
assert out["call_template_type"] == "gnmi"
assert out["operation"] == "set"
Loading