Skip to content

Commit 2b6fc34

Browse files
authored
Flying docs (#200)
* Adds basic procedure logic * Adds alembic version * Adds max_file_size * Adds archived column * Adds new env var id column * comment * Redo of alembic file * changes route * PR comments * Adds cascade logic to existing alembic revision * Submodule update
1 parent 62ee8a1 commit 2b6fc34

File tree

7 files changed

+134
-4
lines changed

7 files changed

+134
-4
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
"""Adds tmp file indicators
2+
3+
Revision ID: 889ef4df126f
4+
Revises: 4861b97fcd5d
5+
Create Date: 2024-04-10 14:31:06.210778
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
from sqlalchemy.dialects import postgresql
11+
12+
# revision identifiers, used by Alembic.
13+
revision = '889ef4df126f'
14+
down_revision = '4861b97fcd5d'
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
# ### commands auto generated by Alembic - please adjust! ###
21+
op.add_column('conversation', sa.Column('has_tmp_files', sa.Boolean(), nullable=True), schema='cognition')
22+
op.add_column('conversation', sa.Column('archived', sa.Boolean(), nullable=True), schema='cognition')
23+
op.add_column('project', sa.Column('allow_file_upload', sa.Boolean(), nullable=True), schema='cognition')
24+
op.add_column('project', sa.Column('max_file_size_mb', sa.Float(), nullable=True), schema='cognition')
25+
op.add_column('project', sa.Column('open_ai_env_var_id', postgresql.UUID(as_uuid=True), nullable=True), schema='cognition')
26+
op.create_index(op.f('ix_cognition_project_open_ai_env_var_id'), 'project', ['open_ai_env_var_id'], unique=False, schema='cognition')
27+
op.create_foreign_key(None, 'project', 'environment_variable', ['open_ai_env_var_id'], ['id'], source_schema='cognition', referent_schema='cognition', ondelete='SET NULL')
28+
op.drop_constraint('message_strategy_id_fkey', 'message', schema='cognition', type_='foreignkey')
29+
op.create_foreign_key('message_strategy_id_fkey', 'message', 'strategy', ['strategy_id'], ['id'], source_schema='cognition', referent_schema='cognition', ondelete='SET NULL')
30+
# ### end Alembic commands ###
31+
32+
33+
def downgrade():
34+
# ### commands auto generated by Alembic - please adjust! ###
35+
op.drop_constraint(None, 'project', schema='cognition', type_='foreignkey')
36+
op.drop_index(op.f('ix_cognition_project_open_ai_env_var_id'), table_name='project', schema='cognition')
37+
op.drop_column('project', 'open_ai_env_var_id', schema='cognition')
38+
op.drop_column('project', 'max_file_size_mb', schema='cognition')
39+
op.drop_column('project', 'allow_file_upload', schema='cognition')
40+
op.drop_column('conversation', 'archived', schema='cognition')
41+
op.drop_column('conversation', 'has_tmp_files', schema='cognition')
42+
op.drop_constraint('message_strategy_id_fkey', 'message', schema='cognition', type_='foreignkey')
43+
op.create_foreign_key('message_strategy_id_fkey', 'message', 'strategy', ['strategy_id'], ['id'], source_schema='cognition', referent_schema='cognition', ondelete='CASCADE')
44+
# ### end Alembic commands ###

api/transfer.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from submodules.model.enums import NotificationType
3838
from submodules.model.models import UploadTask
3939
from util import daemon, notification
40+
from controller.transfer.cognition.minio_upload import handle_cognition_file_upload
4041

4142
from controller.task_queue import manager as task_queue_manager
4243
from submodules.model.enums import TaskType, RecordTokenizationScope
@@ -51,12 +52,18 @@ async def post(self, request) -> PlainTextResponse:
5152
data = await request.json()
5253
file_path = data["Key"]
5354

54-
if len(file_path.split("/")) != 4:
55+
parts = file_path.split("/")
56+
57+
if parts[1] == "_cognition":
58+
handle_cognition_file_upload(parts)
59+
return PlainTextResponse("OK")
60+
61+
if len(parts) != 4:
5562
# We need handling for lf execution notification here.
5663
# ATM we have a different path of handling in util/payload_scheduler.py update_records method
5764
return PlainTextResponse("OK")
5865

59-
org_id, project_id, upload_task_id, file_name = file_path.split("/")
66+
org_id, project_id, upload_task_id, file_name = parts
6067
if len(project_id) != 36:
6168
return PlainTextResponse("OK")
6269
if upload_task_id == "download":
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from typing import Any, Dict, Tuple, Callable
2+
import os
3+
import submodules.s3.controller as s3
4+
5+
import requests
6+
from submodules.model.business_objects import (
7+
task_queue as task_queue_db_bo,
8+
general,
9+
)
10+
11+
BASE_URI = os.getenv("COGNITION_GATEWAY")
12+
13+
14+
def get_task_functions() -> Tuple[Callable, Callable, int]:
15+
return __start_task, __check_finished, 1
16+
17+
18+
def __start_task(task: Dict[str, Any]) -> bool:
19+
# check task still relevant
20+
task_db_obj = task_queue_db_bo.get(task["id"])
21+
if task_db_obj is None or task_db_obj.is_active:
22+
return False
23+
24+
task_db_obj.is_active = True
25+
general.commit()
26+
27+
action = task["task_info"]
28+
conversation_id = action["conversation_id"]
29+
cognition_project_id = action["cognition_project_id"]
30+
requests.post(
31+
f"{BASE_URI}/api/v1/converters/internal/projects/{cognition_project_id}/conversation/{conversation_id}/parse-tmp-file",
32+
json={"minio_path": action["minio_path"], "bucket": action["bucket"]},
33+
)
34+
return True
35+
36+
37+
def __check_finished(task: Dict[str, Any]) -> bool:
38+
39+
action = task["task_info"]
40+
41+
return not s3.object_exists(action["bucket"], action["minio_path"])

controller/task_queue/manager.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
attribute_calculation as attribute_calculation_handler,
1818
task_queue as task_queue_handler,
1919
markdown_file as markdown_file_handler,
20+
parse_cognition_tmp_file as cognition_tmp_file,
2021
)
2122
from .util import if_task_queue_send_websocket
2223

@@ -90,6 +91,8 @@ def get_task_function_by_type(task_type: str) -> Tuple[Callable, Callable, int]:
9091
return task_queue_handler.get_task_functions()
9192
if task_type == enums.TaskType.PARSE_MARKDOWN_FILE.value:
9293
return markdown_file_handler.get_task_functions()
94+
if task_type == enums.TaskType.PARSE_COGNITION_TMP_FILE.value:
95+
return cognition_tmp_file.get_task_functions()
9396
raise ValueError(f"Task type {task_type} not supported yet")
9497

9598

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from typing import List
2+
from submodules.model.cognition_objects import project as cognition_project
3+
from submodules.model.cognition_objects import conversation
4+
from submodules.model.enums import TaskType
5+
from controller.task_queue import manager as task_queue_manager
6+
7+
8+
def handle_cognition_file_upload(path_parts: List[str]):
9+
10+
if path_parts[1] != "_cognition":
11+
return
12+
13+
if path_parts[3] == "chat_tmp_files" and path_parts[5] == "queued":
14+
cognition_project_id = path_parts[2]
15+
conversation_id = path_parts[4]
16+
cognition_prj = cognition_project.get(cognition_project_id)
17+
if not cognition_prj:
18+
return
19+
project_id = str(cognition_prj.refinery_references_project_id)
20+
conversation_item = conversation.get(cognition_project_id, conversation_id)
21+
if not conversation_item:
22+
return
23+
24+
task_queue_manager.add_task(
25+
project_id,
26+
TaskType.PARSE_COGNITION_TMP_FILE,
27+
conversation_item.created_by,
28+
{
29+
"cognition_project_id": cognition_project_id,
30+
"conversation_id": conversation_id,
31+
"minio_path": "/".join(path_parts[1:]),
32+
"bucket": path_parts[0],
33+
},
34+
True, # not sure if prio is right here as the prio tasks should only take < 1 min but waiting for the normal queue will take ages depending on the queue
35+
)

0 commit comments

Comments
 (0)