-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
89 lines (70 loc) · 2.24 KB
/
main.py
File metadata and controls
89 lines (70 loc) · 2.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import os
from dotenv import load_dotenv
# Load environment variables before importing modules that depend on them
load_dotenv()
from mcp.server.fastmcp import FastMCP
from modules import fhir
# Initialize FastMCP
port = int(os.environ.get("PORT", 8000))
mcp = FastMCP(
"HAPI-FHIR-Server",
host="0.0.0.0",
port=port
)
# --- Generic FHIR Tools ---
@mcp.tool()
def create_resource(resource_type: str, resource: dict) -> dict:
"""
Create a new FHIR resource.
Args:
resource_type: The type of resource (e.g., "Patient", "Observation").
resource: The resource data.
"""
return fhir.create_resource(resource_type, resource)
@mcp.tool()
def read_resource(resource_type: str, resource_id: str) -> dict:
"""
Read a FHIR resource by ID.
Args:
resource_type: The type of resource.
resource_id: The ID of the resource.
"""
return fhir.read_resource(resource_type, resource_id)
@mcp.tool()
def update_resource(resource_type: str, resource_id: str, resource: dict) -> dict:
"""
Update a FHIR resource.
Args:
resource_type: The type of resource.
resource_id: The ID of the resource.
resource: The updated resource data.
"""
return fhir.update_resource(resource_type, resource_id, resource)
@mcp.tool()
def delete_resource(resource_type: str, resource_id: str) -> str:
"""
Delete a FHIR resource.
Args:
resource_type: The type of resource.
resource_id: The ID of the resource.
"""
return fhir.delete_resource(resource_type, resource_id)
@mcp.tool()
def search_resources(resource_type: str, query: str = None) -> dict:
"""
Search for FHIR resources.
Args:
resource_type: The type of resource.
query: The query string (e.g., "name=doe&active=true").
"""
return fhir.search_resources(resource_type, query)
@mcp.tool()
def create_transaction(bundle: dict) -> dict:
"""
Process a FHIR transaction bundle.
Args:
bundle: The FHIR Bundle resource.
"""
return fhir.create_transaction(bundle)
if __name__ == "__main__":
mcp.run(transport="streamable-http")