Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"editor.formatOnSave": true
},
"[typescriptreact]": {
"editor.defaultFormatter": "esbenp.prettier-vscode",
"editor.defaultFormatter": "vscode.typescript-language-features",
"editor.formatOnSave": true
},
"[css]": {
Expand Down
282 changes: 280 additions & 2 deletions app/backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import mimetypes
import os
import time
import aiofiles
from pathlib import Path
from typing import Any, AsyncGenerator, Dict, Union, cast

Expand All @@ -16,7 +17,7 @@
SpeechSynthesizer,
)
from azure.core.exceptions import ResourceNotFoundError
from azure.identity.aio import DefaultAzureCredential, get_bearer_token_provider
from azure.identity.aio import DefaultAzureCredential, AzureCliCredential, ManagedIdentityCredential, ChainedTokenCredential, get_bearer_token_provider
from azure.monitor.opentelemetry import configure_azure_monitor
from azure.search.documents.aio import SearchClient
from azure.search.documents.indexes.aio import SearchIndexClient
Expand Down Expand Up @@ -85,6 +86,8 @@
from prepdocslib.filestrategy import UploadUserFileStrategy
from prepdocslib.listfilestrategy import File

import webprepdocs

bp = Blueprint("routes", __name__, static_folder="static")
# Fix Windows registry issue with mimetypes
mimetypes.add_type("application/javascript", ".js")
Expand Down Expand Up @@ -377,6 +380,277 @@ async def list_uploaded(auth_claims: dict[str, Any]):
current_app.logger.exception("Error listing uploaded files", error)
return jsonify(files), 200

@bp.post("/chunk_upload")
@authenticated
async def chunk_upload(auth_claims: dict[str, Any]):
"""
Endpoint for chunked file upload. Expects fields: 'filename', 'chunk', 'chunkIndex', 'totalChunks'.
Assembles chunks into a file in a temp directory, then uploads to Azure Storage when complete.
All uploads go directly to the content container root (no user-specific folders).
"""
import os
from azure.storage.blob.aio import BlobServiceClient

form = await request.form
files = await request.files # Await only once

filename = form.get("filename")
chunk_index = int(form.get("chunkIndex", -1))
total_chunks = int(form.get("totalChunks", -1))
chunk = files.get("chunk")

if not chunk or not filename or chunk_index < 0 or total_chunks <= 0:
return jsonify({"error": "Missing or invalid required fields"}), 400

temp_dir = os.path.join("/tmp", "chunk_upload")
os.makedirs(temp_dir, exist_ok=True)

chunk_path = os.path.join(temp_dir, f"{filename}.part{chunk_index}")
async with aiofiles.open(chunk_path, "wb") as f:
await f.write(chunk.read())

# Check if all chunks are present
chunk_files = [os.path.join(temp_dir, f"{filename}.part{i}") for i in range(total_chunks)]
if all(os.path.exists(p) for p in chunk_files):
# Assemble file
assembled_path = os.path.join(temp_dir, filename)
async with aiofiles.open(assembled_path, "wb") as outfile:
for i in range(total_chunks):
async with aiofiles.open(chunk_files[i], "rb") as infile:
await outfile.write(await infile.read())

# Upload to Azure Blob Storage (content container root)
container_name = os.environ["AZURE_STORAGE_CONTAINER"]
storage_con_str = os.environ["AZURE_STORAGE_CON_STRING"]
blob_service_client = BlobServiceClient.from_connection_string(storage_con_str)
container_client = blob_service_client.get_container_client(container_name)

blob_path = filename # Upload to root, not user folder
blob_client = container_client.get_blob_client(blob_path)

async with aiofiles.open(assembled_path, "rb") as f:
await blob_client.upload_blob(await f.read(), overwrite=True, metadata={"UploadedBy": auth_claims.get("oid", "")})

# Clean up
for p in chunk_files:
os.remove(p)
os.remove(assembled_path)

# Call reindex_file logic after successful upload
userid = auth_claims.get("oid", "")
if userid:
await indexerIncremental(filename, filename, userid, False, userid)

return jsonify({"message": "File uploaded successfully"}), 200

else:
# Upload partial chunk to 'chunks/' folder in content container (optional, or skip this step)
# For now, just acknowledge chunk upload, do not upload partials to blob storage
return jsonify({"message": f"Chunk {chunk_index + 1}/{total_chunks} uploaded"}), 202

@bp.get("/list_container_files")
@authenticated
async def list_container_files(auth_claims: dict[str, Any]):
"""
List all files in the Azure storage container specified by AZURE_STORAGE_CONTAINER, including size and last modified date.
Enhanced with error logging for better production diagnostics.
"""
try:
current_app.logger.info("/list_container_files endpoint hit.")
from azure.storage.blob.aio import BlobServiceClient
container_name = os.environ.get("AZURE_STORAGE_CONTAINER")
storage_con_str = os.environ.get("AZURE_STORAGE_CON_STRING")

current_app.logger.info("Container Name: %s", container_name)
current_app.logger.info("Storage Connection String: %s", storage_con_str)

if not container_name or not storage_con_str:
raise ValueError("Environment variables AZURE_STORAGE_CONTAINER and AZURE_STORAGE_CON_STRING must be set.")

blob_service_client = BlobServiceClient.from_connection_string(storage_con_str)
container_client = blob_service_client.get_container_client(container_name)
files = []
async for blob in container_client.list_blobs():
files.append({
"name": blob.name,
"size": blob.size,
"last_modified": blob.last_modified.isoformat() if blob.last_modified else None
})
return jsonify(files), 200
except Exception as e:
import traceback
current_app.logger.error("Exception in /list_container_files: %s\n%s", str(e), traceback.format_exc())
return jsonify({"error": str(e)}), 500


@bp.post("/delete_container_file")
@authenticated
async def delete_container_file(auth_claims: dict[str, Any]):
"""
Delete a file from the Azure Blob Storage container using env vars.
Expects JSON: { "filename": "..." }
"""
from azure.storage.blob.aio import BlobServiceClient
data = await request.get_json()
filename = data.get("filename")
if not filename:
return jsonify({"error": "Missing filename"}), 400
try:
container_name = os.environ["AZURE_STORAGE_CONTAINER"]
storage_con_str = os.environ["AZURE_STORAGE_CON_STRING"]
blob_service_client = BlobServiceClient.from_connection_string(storage_con_str)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(filename)
await blob_client.delete_blob()
return jsonify({"message": f"File {filename} deleted successfully"}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500


@bp.post("/reindex_after_delete")
@authenticated
async def reindex_after_delete(auth_claims: dict[str, Any]):
"""
Remove a file's content from the Azure Search index after it has been deleted from blob storage.
Expects JSON: { "filename": "...", "userId": "..." }
"""
from webprepdocslib.strategy import SearchInfo
from webprepdocslib.searchmanager import SearchManager
data = await request.get_json()
filename = data.get("filename")
userId = data.get("userId")
if not filename or not userId:
return jsonify({"error": "Missing filename or userId"}), 400

endpoint = f"https://{os.environ['AZURE_SEARCH_SERVICE']}.search.windows.net"
index_name = os.environ['AZURE_SEARCH_INDEX']
credential = DefaultAzureCredential(exclude_shared_token_cache_credential=True)
search_info = SearchInfo(endpoint=endpoint, credential=credential, index_name=index_name)
search_manager = SearchManager(search_info)
await search_manager.remove_content_v2(path=None, category=filename, users=userId)
return jsonify({"message": f"Requested reindex/removal for {filename} (user: {userId}) in search index."}), 200

@bp.post("/reindex_file")
@authenticated
async def reindex_file(auth_claims: dict[str, Any]):
"""
Reindex a selected file for a user. Expects JSON: { "filename": "...", "userid": "..." }
"""
data = await request.get_json()
filename = data.get("filename")
userid = data.get("userid")
if not filename or not userid:
return jsonify({"error": "Missing filename or userid"}), 400
# Call indexerIncremental to reindex the file
await indexerIncremental(filename, filename, userid, False, userid)
return jsonify({"message": f"Reindexing started for {filename} (user: {userid})"}), 200


@bp.post("/reindex_container_file")
@authenticated
async def reindex_container_file(auth_claims: dict[str, Any]):
"""
Reindex a selected file in the main container. Expects JSON: { "filename": "...", "userid": "..." }
"""
data = await request.get_json()
filename = data.get("filename")
userid = data.get("userid")
if not filename or not userid:
return jsonify({"error": "Missing filename or userid"}), 400
await indexerIncremental(filename, filename, userid, False, userid)
return jsonify({"message": f"Reindexing started for {filename} (user: {userid})"}), 200



@bp.get("/download_container_file")
@authenticated
async def download_container_file(auth_claims: dict[str, Any]):
"""
Download a file from the Azure Blob Storage container using env vars.
Expects query param: filename=...
"""
from azure.storage.blob.aio import BlobServiceClient
from quart import send_file
import tempfile

filename = request.args.get("filename")
if not filename:
return jsonify({"error": "Missing filename"}), 400
try:
container_name = os.environ["AZURE_STORAGE_CONTAINER"]
storage_con_str = os.environ["AZURE_STORAGE_CON_STRING"]
blob_service_client = BlobServiceClient.from_connection_string(storage_con_str)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(filename)
# Download blob to a temp file
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
stream = await blob_client.download_blob()
data = await stream.readall()
tmp_file.write(data)
tmp_file_path = tmp_file.name
# Send file as attachment
return await send_file(tmp_file_path, as_attachment=True, attachment_filename=filename)
except Exception as e:
return jsonify({"error": str(e)}), 500


async def indexerIncremental(fileName: str, fltrCat: str, userid: str, removeFile: bool, space: str): # @PY ¦ indexerIncremental
print("II¦INDXRINCTR⇒Start","fileName", fileName,"fltrCat", fltrCat,"userid",userid, "removeFile",removeFile)
# Use AzureKeyCredential if key is set, otherwise fall back to DefaultAzureCredential
from azure.core.credentials import AzureKeyCredential
from azure.identity.aio import DefaultAzureCredential
search_key = os.environ.get("AZURE_SEARCH_KEY", "")
storage_key = os.environ.get("AZURE_STORAGE_KEY", "")
if search_key:
azure_credential = AzureKeyCredential(search_key)
else:
azure_credential = DefaultAzureCredential(
exclude_environment_credential=True,
exclude_managed_identity_credential=False,
exclude_shared_token_cache_credential=True,
exclude_visual_studio_code_credential=True,
exclude_cli_credential=True,
)
if fileName:
srchTrgt = os.path.join(os.getcwd(),"data",userid,fileName)
else:
srchTrgt = os.path.join(os.getcwd(),"data",userid)

print("indexerIncremental¦fileName",fileName,"fltrCat",fltrCat,"userid",userid,"removeFile",removeFile,"space",space, "srchTrgt",srchTrgt)
args_dict = {
'files': srchTrgt,
'storageaccount': os.environ.get("AZURE_STORAGE_ACCOUNT", ""),
'container': os.environ.get("AZURE_STORAGE_CONTAINER", ""),
'storagekey': storage_key,
'openaihost': os.environ.get("OPENAI_HOST", ""),
'openaiservice': os.environ.get("AZURE_OPENAI_SERVICE", ""),
'openaikey': os.environ.get("AZURE_API_KEY", ""),
'openaiorg': os.environ.get("AZURE_OPENAI_ORGANIZATION", ""),
'openaideployment': os.environ.get("AZURE_OPENAI_EMB_DEPLOYMENT", ""),
'openaimodelname': os.environ.get("AZURE_OPENAI_EMB_MODEL_NAME", ""),
'searchservice': os.environ.get("AZURE_SEARCH_SERVICE", ""),
'index': os.environ.get("AZURE_SEARCH_INDEX", ""),
'searchanalyzername': os.environ.get("AZURE_SEARCH_ANALYZER_NAME", ""),
'datalakestorageaccount': os.environ.get("AZURE_ADLS_GEN2_STORAGE_ACCOUNT", ""),
'tenantid': os.environ.get("AZURE_TENANT_ID", ""),
'verbose': False,
'localpdfparser': True,
'novectors': False,
'disablebatchvectors': False,
'removeall': False,
'remove': removeFile,
'useacls': False,
'category': fltrCat,
'searchkey': search_key,
'users': userid,
'space': space
}
args = webprepdocs.Args(args_dict)

file_strategy = webprepdocs.setup_file_strategy(azure_credential, args)
await webprepdocs.runStrat(file_strategy, azure_credential, args)
return jsonify({"message": "Document Processed Successfully"})


@bp.before_app_serving
async def setup_clients():
Expand Down Expand Up @@ -436,7 +710,11 @@ async def setup_clients():
# just use 'az login' locally, and managed identity when deployed on Azure). If you need to use keys, use separate AzureKeyCredential instances with the
# keys for each service
# If you encounter a blocking error during a DefaultAzureCredential resolution, you can exclude the problematic credential by using a parameter (ex. exclude_shared_token_cache_credential=True)
azure_credential = DefaultAzureCredential(exclude_shared_token_cache_credential=True)
# azure_credential = DefaultAzureCredential(exclude_shared_token_cache_credential=True)
azure_credential = ChainedTokenCredential(
AzureCliCredential(), # Dev
ManagedIdentityCredential() # Prod
)

# Set up clients for AI Search and Storage
search_client = SearchClient(
Expand Down
20 changes: 18 additions & 2 deletions app/backend/approaches/chatapproach.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,17 @@


class ChatApproach(Approach, ABC):
# query_prompt_few_shots: list[ChatCompletionMessageParam] = [
# {"role": "user", "content": "How did crypto do last year?"},
# {"role": "assistant", "content": "Summarize Cryptocurrency Market Dynamics from last year"},
# {"role": "user", "content": "What are my health plans?"},
# {"role": "assistant", "content": "Show available health plans"},
# ]
query_prompt_few_shots: list[ChatCompletionMessageParam] = [
{"role": "user", "content": "How did crypto do last year?"},
{"role": "assistant", "content": "Summarize Cryptocurrency Market Dynamics from last year"},
{"role": "user", "content": "What are my health plans?"},
{"role": "assistant", "content": "Show available health plans"},
{"role": "user", "content": "What is Infection Control Policy?"},
{"role": "assistant", "content": "Show available policies"},
]
NO_RESPONSE = "0"

Expand All @@ -36,6 +42,16 @@ class ChatApproach(Approach, ABC):
If you cannot generate a search query, return just the number 0.
"""

# query_prompt_template = """You are an "Whiddon Company Knowledge Assistant" that helps the employees with their Policy and Procedures questions, and questions about the employee handbook. Be brief in your answers. Answer ONLY with the facts listed in the list of sources below. If there isn't enough information below, say you don't know. Do not generate answers that don't use the sources below. If asking a clarifying question to the user would help, ask the question. For tabular information return it as an html table. Do not return markdown format. If the question is not in English, answer in the language used in the question. Each source has a name followed by colon and the actual information, always include the source name for each fact you use in the response. Use square brackets to reference the source, for example [info1.txt]. Don't combine sources, list each source separately, for example [info1.txt][info2.pdf].
# You have access to Azure AI Search index with 100's of documents.
# Generate a search query based on the conversation and the new question.
# Do not include cited source filenames and document names e.g info.txt or doc.pdf in the search query terms.
# Do not include any text inside [] or <<>> in the search query terms.
# Do not include any special characters like '+'.
# If the question is not in English, translate the question to English before generating the search query.
# If you cannot generate a search query, return just the number 0.
# """

@property
@abstractmethod
def system_message_chat_conversation(self) -> str:
Expand Down
Loading
Loading