|
| 1 | +import asyncio |
| 2 | +import os |
1 | 3 | import sys
|
2 | 4 | import threading
|
3 | 5 | import time
|
4 | 6 | import traceback
|
5 | 7 | from contextlib import asynccontextmanager
|
| 8 | +from importlib import resources as importlib_resources |
6 | 9 | from typing import Any, Dict, List, Optional
|
7 | 10 |
|
8 | 11 | import pandas as pd
|
9 | 12 | import psutil
|
10 | 13 | from dateutil import parser
|
11 |
| -from fastapi import Depends, FastAPI, Request, Response, status |
| 14 | +from fastapi import ( |
| 15 | + Depends, |
| 16 | + FastAPI, |
| 17 | + Request, |
| 18 | + Response, |
| 19 | + WebSocket, |
| 20 | + WebSocketDisconnect, |
| 21 | + status, |
| 22 | +) |
12 | 23 | from fastapi.concurrency import run_in_threadpool
|
13 | 24 | from fastapi.logger import logger
|
14 | 25 | from fastapi.responses import JSONResponse
|
| 26 | +from fastapi.staticfiles import StaticFiles |
15 | 27 | from google.protobuf.json_format import MessageToDict
|
16 | 28 | from prometheus_client import Gauge, start_http_server
|
17 | 29 | from pydantic import BaseModel
|
@@ -78,6 +90,15 @@ class GetOnlineFeaturesRequest(BaseModel):
|
78 | 90 | query_string: Optional[str] = None
|
79 | 91 |
|
80 | 92 |
|
| 93 | +class ChatMessage(BaseModel): |
| 94 | + role: str |
| 95 | + content: str |
| 96 | + |
| 97 | + |
| 98 | +class ChatRequest(BaseModel): |
| 99 | + messages: List[ChatMessage] |
| 100 | + |
| 101 | + |
81 | 102 | def _get_features(request: GetOnlineFeaturesRequest, store: "feast.FeatureStore"):
|
82 | 103 | if request.feature_service:
|
83 | 104 | feature_service = store.get_feature_service(
|
@@ -113,6 +134,35 @@ def get_app(
|
113 | 134 | store: "feast.FeatureStore",
|
114 | 135 | registry_ttl_sec: int = DEFAULT_FEATURE_SERVER_REGISTRY_TTL,
|
115 | 136 | ):
|
| 137 | + """ |
| 138 | + Creates a FastAPI app that can be used to start a feature server. |
| 139 | +
|
| 140 | + Args: |
| 141 | + store: The FeatureStore to use for serving features |
| 142 | + registry_ttl_sec: The TTL in seconds for the registry cache |
| 143 | +
|
| 144 | + Returns: |
| 145 | + A FastAPI app |
| 146 | +
|
| 147 | + Example: |
| 148 | + ```python |
| 149 | + from feast import FeatureStore |
| 150 | +
|
| 151 | + store = FeatureStore(repo_path="feature_repo") |
| 152 | + app = get_app(store) |
| 153 | + ``` |
| 154 | +
|
| 155 | + The app provides the following endpoints: |
| 156 | + - `/get-online-features`: Get online features |
| 157 | + - `/retrieve-online-documents`: Retrieve online documents |
| 158 | + - `/push`: Push features to the feature store |
| 159 | + - `/write-to-online-store`: Write to the online store |
| 160 | + - `/health`: Health check |
| 161 | + - `/materialize`: Materialize features |
| 162 | + - `/materialize-incremental`: Materialize features incrementally |
| 163 | + - `/chat`: Chat UI |
| 164 | + - `/ws/chat`: WebSocket endpoint for chat |
| 165 | + """ |
116 | 166 | proto_json.patch()
|
117 | 167 | # Asynchronously refresh registry, notifying shutdown and canceling the active timer if the app is shutting down
|
118 | 168 | registry_proto = None
|
@@ -297,6 +347,21 @@ async def health():
|
297 | 347 | else Response(status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
|
298 | 348 | )
|
299 | 349 |
|
| 350 | + @app.post("/chat") |
| 351 | + async def chat(request: ChatRequest): |
| 352 | + # Process the chat request |
| 353 | + # For now, just return dummy text |
| 354 | + return {"response": "This is a dummy response from the Feast feature server."} |
| 355 | + |
| 356 | + @app.get("/chat") |
| 357 | + async def chat_ui(): |
| 358 | + # Serve the chat UI |
| 359 | + static_dir_ref = importlib_resources.files(__spec__.parent) / "static/chat" # type: ignore[name-defined, arg-type] |
| 360 | + with importlib_resources.as_file(static_dir_ref) as static_dir: |
| 361 | + with open(os.path.join(static_dir, "index.html")) as f: |
| 362 | + content = f.read() |
| 363 | + return Response(content=content, media_type="text/html") |
| 364 | + |
300 | 365 | @app.post("/materialize", dependencies=[Depends(inject_user_details)])
|
301 | 366 | def materialize(request: MaterializeRequest) -> None:
|
302 | 367 | for feature_view in request.feature_views or []:
|
@@ -337,6 +402,46 @@ async def rest_exception_handler(request: Request, exc: Exception):
|
337 | 402 | content=str(exc),
|
338 | 403 | )
|
339 | 404 |
|
| 405 | + # Chat WebSocket connection manager |
| 406 | + class ConnectionManager: |
| 407 | + def __init__(self): |
| 408 | + self.active_connections: List[WebSocket] = [] |
| 409 | + |
| 410 | + async def connect(self, websocket: WebSocket): |
| 411 | + await websocket.accept() |
| 412 | + self.active_connections.append(websocket) |
| 413 | + |
| 414 | + def disconnect(self, websocket: WebSocket): |
| 415 | + self.active_connections.remove(websocket) |
| 416 | + |
| 417 | + async def send_message(self, message: str, websocket: WebSocket): |
| 418 | + await websocket.send_text(message) |
| 419 | + |
| 420 | + manager = ConnectionManager() |
| 421 | + |
| 422 | + @app.websocket("/ws/chat") |
| 423 | + async def websocket_endpoint(websocket: WebSocket): |
| 424 | + await manager.connect(websocket) |
| 425 | + try: |
| 426 | + while True: |
| 427 | + message = await websocket.receive_text() |
| 428 | + # Process the received message (currently unused but kept for future implementation) |
| 429 | + # For now, just return dummy text |
| 430 | + response = f"You sent: '{message}'. This is a dummy response from the Feast feature server." |
| 431 | + |
| 432 | + # Stream the response word by word |
| 433 | + words = response.split() |
| 434 | + for word in words: |
| 435 | + await manager.send_message(word + " ", websocket) |
| 436 | + await asyncio.sleep(0.1) # Add a small delay between words |
| 437 | + except WebSocketDisconnect: |
| 438 | + manager.disconnect(websocket) |
| 439 | + |
| 440 | + # Mount static files |
| 441 | + static_dir_ref = importlib_resources.files(__spec__.parent) / "static" # type: ignore[name-defined, arg-type] |
| 442 | + with importlib_resources.as_file(static_dir_ref) as static_dir: |
| 443 | + app.mount("/static", StaticFiles(directory=static_dir), name="static") |
| 444 | + |
340 | 445 | return app
|
341 | 446 |
|
342 | 447 |
|
|
0 commit comments