Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions deployment/kustomizations/base/cm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ data:
llm: litellm_llm.default
- name: data_assistance
llm: litellm_llm.default
- name: data_exploration_assistance
llm: litellm_llm.default
- name: user_clarification_assistance
llm: litellm_llm.default
- name: sql_pairs_indexing
document_store: qdrant
embedder: litellm_embedder.default
Expand Down
4 changes: 4 additions & 0 deletions docker/config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ pipes:
llm: litellm_llm.default
- name: data_assistance
llm: litellm_llm.default
- name: data_exploration_assistance
llm: litellm_llm.default
- name: user_clarification_assistance
llm: litellm_llm.default
- name: sql_pairs_indexing
document_store: qdrant
embedder: litellm_embedder.default
Expand Down
7 changes: 7 additions & 0 deletions wren-ai-service/src/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ def create_service_container(
**pipe_components["user_guide_assistance"],
wren_ai_docs=wren_ai_docs,
),
"data_exploration_assistance": generation.DataExplorationAssistance(
**pipe_components["data_exploration_assistance"],
),
"user_clarification_assistance": generation.UserClarificationAssistance(
**pipe_components["user_clarification_assistance"],
),
"db_schema_retrieval": _db_schema_retrieval_pipeline,
"historical_question": retrieval.HistoricalQuestionRetrieval(
**pipe_components["historical_question_retrieval"],
Expand All @@ -146,6 +152,7 @@ def create_service_container(
**pipe_components["followup_sql_generation"],
),
"sql_functions_retrieval": _sql_functions_retrieval_pipeline,
"sql_executor": _sql_executor_pipeline,
},
allow_intent_classification=settings.allow_intent_classification,
allow_sql_generation_reasoning=settings.allow_sql_generation_reasoning,
Expand Down
4 changes: 4 additions & 0 deletions wren-ai-service/src/pipelines/generation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .chart_adjustment import ChartAdjustment
from .chart_generation import ChartGeneration
from .data_assistance import DataAssistance
from .data_exploration_assistance import DataExplorationAssistance
from .followup_sql_generation import FollowUpSQLGeneration
from .followup_sql_generation_reasoning import FollowUpSQLGenerationReasoning
from .intent_classification import IntentClassification
Expand All @@ -15,6 +16,7 @@
from .sql_question import SQLQuestion
from .sql_regeneration import SQLRegeneration
from .sql_tables_extraction import SQLTablesExtraction
from .user_clarification_assistance import UserClarificationAssistance
from .user_guide_assistance import UserGuideAssistance

__all__ = [
Expand All @@ -36,4 +38,6 @@
"FollowUpSQLGenerationReasoning",
"MisleadingAssistance",
"SQLTablesExtraction",
"DataExplorationAssistance",
"UserClarificationAssistance",
]
20 changes: 14 additions & 6 deletions wren-ai-service/src/pipelines/generation/data_assistance.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
data_assistance_system_prompt = """
### TASK ###
You are a data analyst great at answering user's questions about given database schema.
Please carefully read user's question and database schema to answer it in easy to understand manner
Please carefully read user's question, intent for the question, and database schema to answer it in easy to understand manner
using the Markdown format. Your goal is to help guide user understand its database!

### INSTRUCTIONS ###
Expand All @@ -41,8 +41,16 @@
{{ db_schema }}
{% endfor %}

{% if histories %}
### PREVIOUS QUESTIONS ###
{% for history in histories %}
{{ history.question }}
{% endfor %}
{% endif %}

### INPUT ###
User's question: {{query}}
Intent for user's question: {{intent_reasoning}}
Language: {{language}}

Custom Instruction: {{ custom_instruction }}
Expand All @@ -55,19 +63,17 @@
@observe(capture_input=False)
def prompt(
query: str,
intent_reasoning: str,
db_schemas: list[str],
language: str,
histories: list[AskHistory],
prompt_builder: PromptBuilder,
custom_instruction: str,
) -> dict:
previous_query_summaries = (
[history.question for history in histories] if histories else []
)
query = "\n".join(previous_query_summaries) + "\n" + query

_prompt = prompt_builder.run(
query=query,
intent_reasoning=intent_reasoning,
histories=histories,
db_schemas=db_schemas,
language=language,
custom_instruction=custom_instruction,
Expand Down Expand Up @@ -150,6 +156,7 @@ async def _get_streaming_results(query_id):
async def run(
self,
query: str,
intent_reasoning: str,
db_schemas: list[str],
language: str,
query_id: Optional[str] = None,
Expand All @@ -161,6 +168,7 @@ async def run(
["data_assistance"],
inputs={
"query": query,
"intent_reasoning": intent_reasoning,
"db_schemas": db_schemas,
"language": language,
"query_id": query_id or "",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import asyncio
import logging
import sys
from typing import Any, Optional

from hamilton import base
from hamilton.async_driver import AsyncDriver
from haystack.components.builders.prompt_builder import PromptBuilder
from langfuse.decorators import observe

from src.core.pipeline import BasicPipeline
from src.core.provider import LLMProvider
from src.pipelines.common import clean_up_new_lines
from src.utils import trace_cost
from src.web.v1.services.ask import AskHistory

logger = logging.getLogger("wren-ai-service")


data_exploration_assistance_system_prompt = """
You are a great data analyst good at exploring data.
You are given a user question, an intent for the question, and a sql data.
You need to understand the user question, the intent for the question, and the sql data, and then answer the user question.

### INSTRUCTIONS ###
1. Your answer should be in the same language as the language user provided.
2. You must follow the sql data to answer the user question.
3. You should provide your answer in Markdown format.
4. You have the following skills:
- explain the data in a easy to understand manner
- provide insights and trends in the data
- find out anomalies and outliers in the data
5. You only need to use the skills required to answer the user question based on the user question and the sql data.

### OUTPUT FORMAT ###
Please provide your response in proper Markdown format without ```markdown``` tags.
"""

data_exploration_assistance_user_prompt_template = """
{% if histories %}
### PREVIOUS QUESTIONS ###
{% for history in histories %}
{{ history.question }}
{% endfor %}
{% endif %}

### INPUT ###
User Question: {{query}}
Intent for user's question: {{intent_reasoning}}
Language: {{language}}
SQL Data:
{{ sql_data }}

Custom Instruction: {{ custom_instruction }}

Please think step by step.
"""


## Start of Pipeline
@observe(capture_input=False)
def prompt(
query: str,
intent_reasoning: str,
histories: list[AskHistory],
language: str,
sql_data: dict,
prompt_builder: PromptBuilder,
custom_instruction: str,
) -> dict:
_prompt = prompt_builder.run(
query=query,
intent_reasoning=intent_reasoning,
language=language,
sql_data=sql_data,
histories=histories,
custom_instruction=custom_instruction,
)
return {"prompt": clean_up_new_lines(_prompt.get("prompt"))}


@observe(as_type="generation", capture_input=False)
@trace_cost
async def data_exploration_assistance(
prompt: dict, generator: Any, query_id: str
) -> dict:
return await generator(prompt=prompt.get("prompt"), query_id=query_id)


## End of Pipeline


class DataExplorationAssistance(BasicPipeline):
def __init__(
self,
llm_provider: LLMProvider,
**kwargs,
):
self._user_queues = {}
self._components = {
"generator": llm_provider.get_generator(
system_prompt=data_exploration_assistance_system_prompt,
streaming_callback=self._streaming_callback,
),
"prompt_builder": PromptBuilder(
template=data_exploration_assistance_user_prompt_template
),
}

super().__init__(
AsyncDriver({}, sys.modules[__name__], result_builder=base.DictResult())
)

def _streaming_callback(self, chunk, query_id):
if query_id not in self._user_queues:
self._user_queues[
query_id
] = asyncio.Queue() # Create a new queue for the user if it doesn't exist
# Put the chunk content into the user's queue
asyncio.create_task(self._user_queues[query_id].put(chunk.content))
if chunk.meta.get("finish_reason"):
asyncio.create_task(self._user_queues[query_id].put("<DONE>"))

async def get_streaming_results(self, query_id):
async def _get_streaming_results(query_id):
return await self._user_queues[query_id].get()

if query_id not in self._user_queues:
self._user_queues[query_id] = asyncio.Queue()

while True:
try:
# Wait for an item from the user's queue
self._streaming_results = await asyncio.wait_for(
_get_streaming_results(query_id), timeout=120
)
if (
self._streaming_results == "<DONE>"
): # Check for end-of-stream signal
del self._user_queues[query_id]
break
if self._streaming_results: # Check if there are results to yield
yield self._streaming_results
self._streaming_results = "" # Clear after yielding
except TimeoutError:
break

@observe(name="Data Exploration Assistance")
async def run(
self,
query: str,
intent_reasoning: str,
sql_data: dict,
language: str,
query_id: Optional[str] = None,
histories: Optional[list[AskHistory]] = None,
custom_instruction: Optional[str] = None,
):
logger.info("Data Exploration Assistance pipeline is running...")
return await self._pipe.execute(
["data_exploration_assistance"],
inputs={
"query": query,
"intent_reasoning": intent_reasoning,
"language": language,
"query_id": query_id or "",
"sql_data": sql_data,
"histories": histories or [],
"custom_instruction": custom_instruction or "",
**self._components,
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
{% for history in histories %}
Question:
{{ history.question }}
SQL:
{{ history.sql }}
Response:
{{ history.response }}
{% endfor %}

### QUESTION ###
Expand Down
Loading
Loading