-
Notifications
You must be signed in to change notification settings - Fork 1
Query for last chat messages #199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
c97570c
560c122
4e83dae
c478218
00b84d7
4d31258
fbc7609
9222071
3b77e65
7c38a50
5cd521b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ | |
| CognitionIntegrationType, | ||
| ) | ||
| from ..util import prevent_sql_injection | ||
| from submodules.model import enums | ||
|
|
||
| FINISHED_STATES = [ | ||
| CognitionMarkdownFileState.FINISHED.value, | ||
|
|
@@ -329,3 +330,171 @@ def get_distinct_item_ids_for_all_permissions( | |
| return [] | ||
|
|
||
| return [row[0] for row in results if row and row[0]] | ||
|
|
||
|
|
||
| def get_last_integrations_tasks() -> List[Dict[str, Any]]: | ||
| query = f""" | ||
| WITH embeddings_by_state AS ( | ||
| SELECT | ||
| e.project_id, | ||
| e.state, | ||
| COUNT(*) AS count, | ||
| jsonb_agg( | ||
| jsonb_build_object( | ||
| 'createdBy', e.created_by, | ||
| 'finishedAt', e.finished_at, | ||
| 'id', e.id, | ||
| 'name', e.name, | ||
| 'startedAt', e.started_at, | ||
| 'state', e.state | ||
| ) ORDER BY e.started_at DESC | ||
| ) AS embeddings | ||
| FROM embedding e | ||
| LEFT JOIN project p ON p.id = e.project_id | ||
| GROUP BY e.project_id, e.state | ||
| ), | ||
|
|
||
| embedding_agg AS ( | ||
| SELECT | ||
| e.project_id, | ||
| ( | ||
| SELECT jsonb_object_agg( | ||
| ebs.state, | ||
| jsonb_build_object( | ||
| 'count', ebs.count, | ||
| 'embeddings', ebs.embeddings | ||
| ) | ||
| ) | ||
| FROM embeddings_by_state ebs | ||
| WHERE ebs.project_id = e.project_id | ||
| ) AS embeddings_by_state | ||
|
|
||
| FROM embedding e | ||
| LEFT JOIN project p ON p.id = e.project_id | ||
| GROUP BY e.project_id | ||
| ), | ||
|
|
||
| attribute_by_state AS ( | ||
| SELECT | ||
| a.project_id, | ||
| a.state, | ||
| COUNT(*) AS count, | ||
| jsonb_agg( | ||
| jsonb_build_object( | ||
| 'dataType', a.data_type, | ||
| 'finishedAt', a.finished_at, | ||
| 'id', a.id, | ||
| 'name', a.name, | ||
| 'startedAt', a.started_at, | ||
| 'state', a.state | ||
| ) ORDER BY a.started_at DESC | ||
| ) AS attributes | ||
| FROM attribute a | ||
| LEFT JOIN project p ON p.id = a.project_id | ||
| WHERE a.state NOT IN ('UPLOADED','AUTOMATICALLY_CREATED') | ||
| GROUP BY a.project_id, a.state | ||
| ), | ||
|
|
||
| attribute_agg AS ( | ||
| SELECT | ||
| a.project_id, | ||
| ( | ||
| SELECT jsonb_object_agg( | ||
| abs.state, | ||
| jsonb_build_object( | ||
| 'count', abs.count, | ||
| 'attributes', abs.attributes | ||
| ) | ||
| ) | ||
| FROM attribute_by_state abs | ||
| WHERE abs.project_id = a.project_id | ||
| ) AS attributes_by_state | ||
|
|
||
| FROM attribute a | ||
| LEFT JOIN project p ON p.id = a.project_id | ||
| WHERE a.state NOT IN ('UPLOADED','AUTOMATICALLY_CREATED') | ||
| GROUP BY a.project_id | ||
| ), | ||
|
Comment on lines
+398
to
+417
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above |
||
|
|
||
| record_tokenization_tasks_by_state AS ( | ||
| SELECT | ||
| rtt.project_id, | ||
| rtt.state, | ||
| COUNT(*) AS count, | ||
| jsonb_agg( | ||
| jsonb_build_object( | ||
| 'finishedAt', rtt.finished_at, | ||
| 'id', rtt.id, | ||
| 'startedAt', rtt.started_at, | ||
| 'state', rtt.state, | ||
| 'type', rtt.type | ||
| ) ORDER BY rtt.started_at DESC | ||
| ) AS record_tokenization_tasks | ||
| FROM record_tokenization_task rtt | ||
| LEFT JOIN project p ON p.id = rtt.project_id | ||
| GROUP BY rtt.project_id, rtt.state | ||
| ), | ||
|
|
||
| record_tokenization_task_agg AS ( | ||
| SELECT | ||
| rtt.project_id, | ||
| ( | ||
| SELECT jsonb_object_agg( | ||
| rtts.state, | ||
| jsonb_build_object( | ||
| 'count', rtts.count, | ||
| 'record_tokenization_tasks', rtts.record_tokenization_tasks | ||
| ) | ||
| ) | ||
| FROM record_tokenization_tasks_by_state rtts | ||
| WHERE rtts.project_id = rtt.project_id | ||
| ) AS record_tokenization_tasks_by_state | ||
|
|
||
| FROM record_tokenization_task rtt | ||
| LEFT JOIN project p ON p.id = rtt.project_id | ||
| GROUP BY rtt.project_id | ||
| ), | ||
|
Comment on lines
+438
to
+456
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above |
||
|
|
||
| integration_data AS ( | ||
| SELECT | ||
| i.id AS integration_id, | ||
| i.name AS integration_name, | ||
| i.error_message, | ||
| i.started_at, | ||
| i.finished_at, | ||
| i.state, | ||
| i.organization_id, | ||
| i.project_id, | ||
| i.created_by, | ||
| i.type, | ||
| jsonb_build_object( | ||
| 'embeddingsByState', coalesce(ea.embeddings_by_state, '[]'::jsonb), | ||
| 'attributesByState', coalesce(aa.attributes_by_state, '[]'::jsonb), | ||
| 'recordTokenizationTasksByState', coalesce(rtt.record_tokenization_tasks_by_state, '[]'::jsonb) | ||
| ) AS full_data | ||
| FROM cognition.integration i | ||
| LEFT JOIN embedding_agg ea ON ea.project_id = i.project_id | ||
| LEFT JOIN attribute_agg aa ON aa.project_id = i.project_id | ||
| LEFT JOIN record_tokenization_task_agg rtt ON rtt.project_id = i.project_id | ||
|
Comment on lines
+476
to
+478
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we usually write the on in the next line so e.g. |
||
| ) | ||
|
|
||
| SELECT | ||
| o.id AS organization_id, | ||
| o.name AS organization_name, | ||
| i.integration_id, | ||
| i.integration_name, | ||
| i.error_message, | ||
| i.started_at, | ||
| i.finished_at, | ||
| i.state, | ||
| i.full_data, | ||
| i.created_by, | ||
| i.type, | ||
| p.name AS project_name | ||
| FROM organization o | ||
| JOIN integration_data i ON i.organization_id = o.id | ||
| LEFT JOIN project p ON p.id = i.project_id | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. afaik doesn't need to be a left join since prject should always exist for an integration Also we could build this directly in the previousintegration data by joining there with org & project |
||
| ORDER BY o.id, i.started_at DESC | ||
| """ | ||
|
|
||
| return general.execute_all(query) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -218,3 +218,45 @@ def delete_many(org_id: str, md_file_ids: List[str], with_commit: bool = True) - | |
| CognitionMarkdownFile.id.in_(md_file_ids), | ||
| ).delete(synchronize_session=False) | ||
| general.flush_or_commit(with_commit) | ||
|
|
||
|
|
||
| def get_last_etl_tasks( | ||
| states: List[str], | ||
| created_at_from: str, | ||
| created_at_to: Optional[str] = None, | ||
| ) -> List[Any]: | ||
|
|
||
| states = prevent_sql_injection(states, isinstance(states, list)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. afaik we need to prevent every string with a list comprehension not the whole list |
||
| created_at_from = prevent_sql_injection( | ||
| created_at_from, isinstance(created_at_from, str) | ||
| ) | ||
| if created_at_to: | ||
| created_at_to = prevent_sql_injection( | ||
| created_at_to, isinstance(created_at_to, str) | ||
| ) | ||
| created_at_to_filter = "" | ||
|
|
||
| if created_at_to: | ||
| created_at_to_filter = f"AND mf.created_at <= '{created_at_to}'" | ||
|
|
||
| query = f""" | ||
| SELECT * | ||
| FROM ( | ||
| SELECT mf.created_at, mf.created_by, mf.started_at, mf.finished_at, mf.file_name, mf.error, mf.state, md.id AS dataset_id, md.name AS dataset_name, md.organization_id, o.name AS organization_name, | ||
| ROW_NUMBER() OVER ( | ||
| PARTITION BY md.organization_id, md.id | ||
| ORDER BY mf.created_at DESC | ||
| ) AS rn | ||
| FROM cognition.markdown_file mf | ||
| JOIN cognition.markdown_dataset md ON md.id = mf.dataset_id | ||
| JOIN organization o ON o.id = md.organization_id | ||
| WHERE | ||
| mf.created_at >= '{created_at_from}' | ||
| AND mf.state IN ({', '.join(f"'{state}'" for state in states)}) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could potentially fail if states is empty list so maybe an early return for this since without any state it would be empty anyway |
||
| {created_at_to_filter} | ||
| ) sub | ||
| WHERE rn <= 5 | ||
| ORDER BY organization_id, dataset_id, created_at DESC | ||
| """ | ||
|
|
||
| return general.execute_all(query) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,7 @@ | ||
| from typing import Any, Dict, List, Optional, Union, Tuple | ||
| from datetime import datetime | ||
|
|
||
| from submodules.model.enums import MessageType | ||
| from ..business_objects import general | ||
| from ..session import session | ||
| from ..models import CognitionMessage | ||
|
|
@@ -605,3 +607,48 @@ def get_count_by_project_id(project_id: str) -> int: | |
| ) | ||
| .count() | ||
| ) | ||
|
|
||
|
|
||
| def get_last_chat_messages( | ||
| message_type: str, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we could give it the type in the request body directly so fastapi parses it and we can compare without .value |
||
| starting_from: str, | ||
| ending_to: Optional[str] = None, | ||
| ) -> List[Any]: | ||
|
|
||
| message_type = prevent_sql_injection(message_type, isinstance(message_type, str)) | ||
| starting_from = prevent_sql_injection(starting_from, isinstance(starting_from, str)) | ||
| if ending_to: | ||
| ending_to = prevent_sql_injection(ending_to, isinstance(ending_to, str)) | ||
|
|
||
| message_type_filter = "" | ||
| ending_to_filter = "" | ||
|
|
||
| if message_type == MessageType.WITH_ERROR.value: | ||
| message_type_filter = "AND c.error IS NOT NULL" | ||
| elif message_type == MessageType.WITHOUT_ERROR.value: | ||
| message_type_filter = "AND c.error IS NULL" | ||
| if ending_to: | ||
| ending_to_filter = f"AND m.created_at <= '{ending_to}'" | ||
|
|
||
| query = f""" | ||
| SELECT * | ||
| FROM ( | ||
| SELECT m.created_at, m.created_by, m.question, m.answer, m.initiated_via, c.error, cp.id AS project_id, cp.name AS project_name, cp.organization_id, o.name AS organization_name, c.id AS conversation_id, | ||
| ROW_NUMBER() OVER ( | ||
| PARTITION BY cp.organization_id, cp.id | ||
| ORDER BY m.created_at DESC | ||
| ) AS rn | ||
| FROM cognition.message m | ||
| JOIN cognition.conversation c ON c.id = m.conversation_id | ||
| JOIN cognition.project cp ON cp.id = m.project_id | ||
| JOIN organization o ON o.id = cp.organization_id | ||
| WHERE | ||
| m.created_at >= '{starting_from}' | ||
| {message_type_filter} | ||
| {ending_to_filter} | ||
| ) sub | ||
| WHERE rn <= 5 | ||
| ORDER BY organization_id, project_id, created_at DESC | ||
| """ | ||
|
|
||
| return general.execute_all(query) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,12 @@ | ||
| from typing import List, Optional | ||
| from typing import Any, List, Optional | ||
| from datetime import datetime | ||
|
|
||
| from submodules.model.util import prevent_sql_injection | ||
|
|
||
| from ..business_objects import general | ||
| from ..session import session | ||
| from ..models import CognitionStrategy | ||
| from ..enums import StrategyComplexity | ||
| from ..enums import StrategyComplexity, StrategyStepType | ||
|
|
||
|
|
||
| def get(project_id: str, strategy_id: str) -> CognitionStrategy: | ||
|
|
@@ -107,3 +109,76 @@ def delete_all_by_project_id(project_id: str, with_commit: bool = True) -> None: | |
| CognitionStrategy.project_id == project_id | ||
| ).delete() | ||
| general.flush_or_commit(with_commit) | ||
|
|
||
|
|
||
| def get_strategies_info( | ||
| step_types: List[str], | ||
| created_at_from: str, | ||
| created_at_to: Optional[str] = None, | ||
| ) -> List[Any]: | ||
|
|
||
| step_types = prevent_sql_injection(step_types, isinstance(step_types, list)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see above |
||
| created_at_from = prevent_sql_injection( | ||
| created_at_from, isinstance(created_at_from, str) | ||
| ) | ||
| if created_at_to: | ||
| created_at_to = prevent_sql_injection( | ||
| created_at_to, isinstance(created_at_to, str) | ||
| ) | ||
| created_at_to_filter = "" | ||
|
|
||
| if created_at_to: | ||
| created_at_to_filter = f"AND ss.created_at <= '{created_at_to}'" | ||
|
|
||
| step_types_sql = ", ".join([f"'{st}'" for st in step_types]) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could be faulty for empty lists so early return recommended |
||
|
|
||
| query = f""" | ||
| WITH step_data AS ( | ||
| SELECT | ||
| s.id AS strategy_id, s.name AS strategy_name, | ||
| ss.id AS step_id, ss.created_by,ss.created_at, ss.name AS step_name, ss.step_type, | ||
| p.name AS project_name, p.id AS project_id, | ||
| o.name AS organization_name,o.id AS organization_id, | ||
| st.config::jsonb AS template_config | ||
| FROM cognition.strategy s | ||
| JOIN cognition.strategy_step ss ON ss.strategy_id = s.id | ||
| JOIN cognition.project p ON p.id = s.project_id | ||
| JOIN organization o ON o.id = p.organization_id | ||
| LEFT JOIN cognition.step_templates st ON st.id = (ss.config->>'templateId')::uuid | ||
| WHERE ss.created_at >= '{created_at_from}' | ||
| {created_at_to_filter} | ||
| ) | ||
| SELECT strategy_id, strategy_name, step_id, created_by, created_at, step_name, step_type, project_name,project_id, organization_name,organization_id, | ||
| CASE | ||
| WHEN step_type = '{StrategyStepType.TEMPLATED.value}' | ||
| AND EXISTS ( | ||
| SELECT 1 | ||
| FROM jsonb_array_elements(template_config->'steps') t | ||
| WHERE t->>'stepType' IN ({step_types_sql}) | ||
| ) | ||
|
Comment on lines
+154
to
+158
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. exists filter are usually slow. and they are run multiple times in the query. maybe we can preselect the step type (single value) and the template step types (array) in the qith query to then in a sourrounding query filter on whats needed |
||
| THEN ( | ||
| SELECT array_agg((t->>'stepType') || ':' || (t->>'stepName')) | ||
| FROM jsonb_array_elements(template_config->'steps') t | ||
| ) | ||
| WHEN step_type IN ({step_types_sql}) | ||
| THEN ARRAY[(step_type || ':' || step_name)] | ||
| ELSE NULL | ||
| END AS templated_step_names | ||
| FROM step_data | ||
| WHERE | ||
| step_type IN ({step_types_sql}) | ||
| OR ( | ||
| step_type = '{StrategyStepType.TEMPLATED.value}' | ||
| AND EXISTS ( | ||
| SELECT 1 | ||
| FROM jsonb_array_elements(template_config->'steps') t | ||
| WHERE t->>'stepType' IN ({step_types_sql}) | ||
| ) | ||
| ) | ||
| ORDER BY strategy_id, created_at DESC | ||
| """ | ||
|
|
||
| if len(step_types) == 0: | ||
| return [] | ||
|
|
||
| return general.execute_all(query) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to join with project?
Also if we just want the object aggregated embeddings this could be one query instead of the embeddings_by_state & embedding_agg by wrapping it in another select