-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathintrospection.py
More file actions
126 lines (91 loc) · 3.52 KB
/
introspection.py
File metadata and controls
126 lines (91 loc) · 3.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""Runtime service introspection with ``enable_describe``.
Demonstrates how to discover methods, their types, and parameter
signatures at runtime using the built-in ``__describe__`` RPC method
and the ``introspect()`` helper.
Run::
python examples/introspection.py
"""
from __future__ import annotations
import threading
from dataclasses import dataclass
from typing import Protocol
import pyarrow as pa
from vgi_rpc import (
CallContext,
MethodType,
OutputCollector,
ProducerState,
RpcServer,
Stream,
StreamState,
introspect,
make_pipe_pair,
)
# ---------------------------------------------------------------------------
# 1. Define a Protocol with a mix of unary and stream methods
# ---------------------------------------------------------------------------
class DemoService(Protocol):
"""A demo service for introspection."""
def greet(self, name: str) -> str:
"""Greet someone by name."""
...
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
...
def count(self, limit: int) -> Stream[StreamState]:
"""Count from 1 up to *limit*."""
...
# ---------------------------------------------------------------------------
# 2. Implement the service (methods are only called for schema extraction)
# ---------------------------------------------------------------------------
@dataclass
class CountState(ProducerState):
"""State for the count producer stream."""
current: int
limit: int
def produce(self, out: OutputCollector, ctx: CallContext) -> None:
"""Emit one value per tick."""
if self.current > self.limit:
out.finish()
return
out.emit_pydict({"value": [self.current]})
self.current += 1
_COUNT_SCHEMA = pa.schema([pa.field("value", pa.int64())])
class DemoServiceImpl:
"""Concrete implementation of DemoService."""
def greet(self, name: str) -> str:
"""Greet someone by name."""
return f"Hello, {name}!"
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
return a + b
def count(self, limit: int) -> Stream[CountState]:
"""Count from 1 up to *limit*."""
return Stream(output_schema=_COUNT_SCHEMA, state=CountState(current=1, limit=limit))
# ---------------------------------------------------------------------------
# 3. Introspect the service at runtime
# ---------------------------------------------------------------------------
def main() -> None:
"""Start a server with introspection enabled and discover its methods."""
server = RpcServer(DemoService, DemoServiceImpl(), enable_describe=True)
client_pipe, server_pipe = make_pipe_pair()
thread = threading.Thread(target=server.serve, args=(server_pipe,), daemon=True)
thread.start()
try:
desc = introspect(client_pipe)
print(f"Service: {desc.protocol_name}")
print(f"protocol_hash: {desc.protocol_hash}")
print(f"Methods ({len(desc.methods)}):")
for name in sorted(desc.methods):
method = desc.methods[name]
kind = "unary" if method.method_type == MethodType.UNARY else "stream"
# Arrow schema is the wire-canonical type information.
params = ", ".join(f"{f.name}: {f.type}" for f in method.params_schema)
print(f" {name} ({kind})")
if params:
print(f" {params}")
finally:
client_pipe.close()
thread.join(timeout=5)
if __name__ == "__main__":
main()