Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions llama_stack/providers/utils/inference/inference_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import asyncio
from typing import Any

from sqlalchemy.exc import IntegrityError

from llama_stack.apis.inference import (
ListOpenAIChatCompletionResponse,
OpenAIChatCompletion,
Expand Down Expand Up @@ -129,16 +131,44 @@ async def _write_chat_completion(
raise ValueError("Inference store is not initialized")

data = chat_completion.model_dump()

await self.sql_store.insert(
table="chat_completions",
data={
"id": data["id"],
"created": data["created"],
"model": data["model"],
"choices": data["choices"],
"input_messages": [message.model_dump() for message in input_messages],
},
record_data = {
"id": data["id"],
"created": data["created"],
"model": data["model"],
"choices": data["choices"],
"input_messages": [message.model_dump() for message in input_messages],
}

try:
await self.sql_store.insert(
table="chat_completions",
data=record_data,
)
except IntegrityError as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment inline here with the motivation in the summary?

# Duplicate chat completion IDs can be generated during tests especially if they are replaying
# recorded responses across different tests. No need to warn or error under those circumstances.
# In the wild, this is not likely to happen at all (no evidence) so we aren't really hiding any problem.

# Check if it's a unique constraint violation
error_message = str(e.orig) if e.orig else str(e)
if self._is_unique_constraint_error(error_message):
# Update the existing record instead
await self.sql_store.update(table="chat_completions", data=record_data, where={"id": data["id"]})
else:
# Re-raise if it's not a unique constraint error
raise

def _is_unique_constraint_error(self, error_message: str) -> bool:
"""Check if the error is specifically a unique constraint violation."""
error_lower = error_message.lower()
return any(
indicator in error_lower
for indicator in [
"unique constraint failed", # SQLite
"duplicate key", # PostgreSQL
"unique violation", # PostgreSQL alternative
"duplicate entry", # MySQL
]
)

async def list_chat_completions(
Expand Down
14 changes: 14 additions & 0 deletions llama_stack/providers/utils/sqlstore/authorized_sqlstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,20 @@ async def fetch_one(

return results.data[0] if results.data else None

async def update(self, table: str, data: Mapping[str, Any], where: Mapping[str, Any]) -> None:
"""Update rows with automatic access control attribute capture."""
enhanced_data = dict(data)

current_user = get_authenticated_user()
if current_user:
enhanced_data["owner_principal"] = current_user.principal
enhanced_data["access_attributes"] = current_user.attributes
else:
enhanced_data["owner_principal"] = None
enhanced_data["access_attributes"] = None

await self.sql_store.update(table, enhanced_data, where)

async def delete(self, table: str, where: Mapping[str, Any]) -> None:
"""Delete rows with automatic access control filtering."""
await self.sql_store.delete(table, where)
Expand Down
Loading