-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
40 lines (33 loc) · 1.26 KB
/
server.py
File metadata and controls
40 lines (33 loc) · 1.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from mcp.server.fastmcp import FastMCP
from typing import Annotated, Literal
from pydantic import Field
import uuid
import requests
mcp = FastMCP(
name="BikeRental_MCPServer"
)
@mcp.tool(description="Register a bike with an optional bike type and optional location")
def register_bike(
# annotate optional parameters to inform a model about the expected parameter data types.
bikeType: str | None = None,
location: str | None = None
) -> str:
url = "http://localhost:8024/v2/commands?context=default"
payload = {
"payload": {
"bikeId": str(uuid.uuid4()),
"bikeType": bikeType,
"location": location
},
# the command (implemented as a Java record, see the Axon application) to send for registering a bike.
"name": "io.axoniq.demo.bikerental.coreapi.rental.RegisterBikeCommand"
}
requests.post(url, json=payload)
return "Command was sent successfully"
@mcp.tool(description="Get registered bikes")
def get_bikes() -> list:
# note: Due to an issue with the Query API of Axon Server, use the REST API of the Axon application as an intermediary for sending
# the query.
url = "http://localhost:8080/bikes"
response = requests.get(url)
return response.json()