|
| 1 | +""" |
| 2 | +Simple example of using FastAPI-MCP to add an MCP server to a FastAPI app. |
| 3 | +ref. https://github.com/tadata-org/fastapi_mcp/blob/v0.3.4/examples/shared/apps/items.py |
| 4 | +""" |
| 5 | + |
| 6 | +from fastapi import FastAPI, HTTPException, Query |
| 7 | +from pydantic import BaseModel |
| 8 | + |
| 9 | +app = FastAPI() |
| 10 | + |
| 11 | + |
| 12 | +class Item(BaseModel): |
| 13 | + id: int |
| 14 | + name: str |
| 15 | + description: str | None = None |
| 16 | + price: float |
| 17 | + tags: list[str] = [] |
| 18 | + |
| 19 | + |
| 20 | +items_db: dict[int, Item] = {} |
| 21 | + |
| 22 | + |
| 23 | +@app.get("/items/", response_model=list[Item], tags=["items"], operation_id="list_items") |
| 24 | +async def list_items(skip: int = 0, limit: int = 10): |
| 25 | + """ |
| 26 | + List all items in the database. |
| 27 | +
|
| 28 | + Returns a list of items, with pagination support. |
| 29 | + """ |
| 30 | + return list(items_db.values())[skip : skip + limit] |
| 31 | + |
| 32 | + |
| 33 | +@app.get("/items/{item_id}", response_model=Item, tags=["items"], operation_id="get_item") |
| 34 | +async def read_item(item_id: int): |
| 35 | + """ |
| 36 | + Get a specific item by its ID. |
| 37 | +
|
| 38 | + Raises a 404 error if the item does not exist. |
| 39 | + """ |
| 40 | + if item_id not in items_db: |
| 41 | + raise HTTPException(status_code=404, detail="Item not found") |
| 42 | + return items_db[item_id] |
| 43 | + |
| 44 | + |
| 45 | +@app.post("/items/", response_model=Item, tags=["items"], operation_id="create_item") |
| 46 | +async def create_item(item: Item): |
| 47 | + """ |
| 48 | + Create a new item in the database. |
| 49 | +
|
| 50 | + Returns the created item with its assigned ID. |
| 51 | + """ |
| 52 | + items_db[item.id] = item |
| 53 | + return item |
| 54 | + |
| 55 | + |
| 56 | +@app.put("/items/{item_id}", response_model=Item, tags=["items"], operation_id="update_item") |
| 57 | +async def update_item(item_id: int, item: Item): |
| 58 | + """ |
| 59 | + Update an existing item. |
| 60 | +
|
| 61 | + Raises a 404 error if the item does not exist. |
| 62 | + """ |
| 63 | + if item_id not in items_db: |
| 64 | + raise HTTPException(status_code=404, detail="Item not found") |
| 65 | + |
| 66 | + item.id = item_id |
| 67 | + items_db[item_id] = item |
| 68 | + return item |
| 69 | + |
| 70 | + |
| 71 | +@app.delete("/items/{item_id}", tags=["items"], operation_id="delete_item") |
| 72 | +async def delete_item(item_id: int): |
| 73 | + """ |
| 74 | + Delete an item from the database. |
| 75 | +
|
| 76 | + Raises a 404 error if the item does not exist. |
| 77 | + """ |
| 78 | + if item_id not in items_db: |
| 79 | + raise HTTPException(status_code=404, detail="Item not found") |
| 80 | + |
| 81 | + del items_db[item_id] |
| 82 | + return {"message": "Item deleted successfully"} |
| 83 | + |
| 84 | + |
| 85 | +@app.get("/items/search/", response_model=list[Item], tags=["search"], operation_id="search_items") |
| 86 | +async def search_items( |
| 87 | + q: str | None = Query(None, description="Search query string"), |
| 88 | + min_price: float | None = Query(None, description="Minimum price"), |
| 89 | + max_price: float | None = Query(None, description="Maximum price"), |
| 90 | + tags: list[str] = Query([], description="Filter by tags"), |
| 91 | +): |
| 92 | + """ |
| 93 | + Search for items with various filters. |
| 94 | +
|
| 95 | + Returns a list of items that match the search criteria. |
| 96 | + """ |
| 97 | + results = list(items_db.values()) |
| 98 | + |
| 99 | + if q: |
| 100 | + q = q.lower() |
| 101 | + results = [ |
| 102 | + item for item in results if q in item.name.lower() or (item.description and q in item.description.lower()) |
| 103 | + ] |
| 104 | + |
| 105 | + if min_price is not None: |
| 106 | + results = [item for item in results if item.price >= min_price] |
| 107 | + if max_price is not None: |
| 108 | + results = [item for item in results if item.price <= max_price] |
| 109 | + |
| 110 | + if tags: |
| 111 | + results = [item for item in results if all(tag in item.tags for tag in tags)] |
| 112 | + |
| 113 | + return results |
| 114 | + |
| 115 | + |
| 116 | +sample_items = [ |
| 117 | + Item(id=1, name="Hammer", description="A tool for hammering nails", price=9.99, tags=["tool", "hardware"]), |
| 118 | + Item(id=2, name="Screwdriver", description="A tool for driving screws", price=7.99, tags=["tool", "hardware"]), |
| 119 | + Item(id=3, name="Wrench", description="A tool for tightening bolts", price=12.99, tags=["tool", "hardware"]), |
| 120 | + Item(id=4, name="Saw", description="A tool for cutting wood", price=19.99, tags=["tool", "hardware", "cutting"]), |
| 121 | + Item(id=5, name="Drill", description="A tool for drilling holes", price=49.99, tags=["tool", "hardware", "power"]), |
| 122 | +] |
| 123 | +for item in sample_items: |
| 124 | + items_db[item.id] = item |
0 commit comments