Skip to content

Commit bd28090

Browse files
Unprototyping ETL min (#179)
* creates revision to extend cognition markdown tables * create new revision after merging additional changes into the submodule model * transfer changes from the etl branch * removes debug print * Adds env var * replace last db revision, adds env var column to md dataset * Adds threading logic * dev submodule change --------- Co-authored-by: JWittmeyer <[email protected]>
1 parent 37d1da6 commit bd28090

File tree

7 files changed

+301
-1
lines changed

7 files changed

+301
-1
lines changed
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
"""extends cognition markdown tables
2+
3+
Revision ID: f6bca8990840
4+
Revises: 3d0e01981f06
5+
Create Date: 2023-12-20 10:54:14.354971
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
from sqlalchemy.dialects import postgresql
11+
12+
# revision identifiers, used by Alembic.
13+
revision = "f6bca8990840"
14+
down_revision = "3d0e01981f06"
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
# ### commands auto generated by Alembic - please adjust! ###
21+
op.create_table(
22+
"markdown_dataset",
23+
sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False),
24+
sa.Column("organization_id", postgresql.UUID(as_uuid=True), nullable=True),
25+
sa.Column("refinery_project_id", postgresql.UUID(as_uuid=True), nullable=True),
26+
sa.Column(
27+
"environment_variable_id", postgresql.UUID(as_uuid=True), nullable=True
28+
),
29+
sa.Column("created_by", postgresql.UUID(as_uuid=True), nullable=True),
30+
sa.Column("created_at", sa.DateTime(), nullable=True),
31+
sa.Column("name", sa.String(), nullable=True),
32+
sa.Column("description", sa.String(), nullable=True),
33+
sa.Column("tokenizer", sa.String(), nullable=True),
34+
sa.Column("category_origin", sa.String(), nullable=True),
35+
sa.ForeignKeyConstraint(["created_by"], ["user.id"], ondelete="SET NULL"),
36+
sa.ForeignKeyConstraint(
37+
["environment_variable_id"],
38+
["cognition.environment_variable.id"],
39+
ondelete="SET NULL",
40+
),
41+
sa.ForeignKeyConstraint(
42+
["organization_id"], ["organization.id"], ondelete="CASCADE"
43+
),
44+
sa.ForeignKeyConstraint(
45+
["refinery_project_id"], ["project.id"], ondelete="SET NULL"
46+
),
47+
sa.PrimaryKeyConstraint("id"),
48+
schema="cognition",
49+
)
50+
op.create_index(
51+
op.f("ix_cognition_markdown_dataset_created_by"),
52+
"markdown_dataset",
53+
["created_by"],
54+
unique=False,
55+
schema="cognition",
56+
)
57+
op.create_index(
58+
op.f("ix_cognition_markdown_dataset_environment_variable_id"),
59+
"markdown_dataset",
60+
["environment_variable_id"],
61+
unique=False,
62+
schema="cognition",
63+
)
64+
op.create_index(
65+
op.f("ix_cognition_markdown_dataset_organization_id"),
66+
"markdown_dataset",
67+
["organization_id"],
68+
unique=False,
69+
schema="cognition",
70+
)
71+
op.create_index(
72+
op.f("ix_cognition_markdown_dataset_refinery_project_id"),
73+
"markdown_dataset",
74+
["refinery_project_id"],
75+
unique=False,
76+
schema="cognition",
77+
)
78+
op.create_table(
79+
"markdown_llm_logs",
80+
sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False),
81+
sa.Column("markdown_file_id", postgresql.UUID(as_uuid=True), nullable=True),
82+
sa.Column("created_at", sa.DateTime(), nullable=True),
83+
sa.Column("finished_at", sa.DateTime(), nullable=True),
84+
sa.Column("model_used", sa.String(), nullable=True),
85+
sa.Column("input", sa.String(), nullable=True),
86+
sa.Column("output", sa.String(), nullable=True),
87+
sa.Column("error", sa.String(), nullable=True),
88+
sa.ForeignKeyConstraint(
89+
["markdown_file_id"], ["cognition.markdown_file.id"], ondelete="CASCADE"
90+
),
91+
sa.PrimaryKeyConstraint("id"),
92+
schema="cognition",
93+
)
94+
op.create_index(
95+
op.f("ix_cognition_markdown_llm_logs_markdown_file_id"),
96+
"markdown_llm_logs",
97+
["markdown_file_id"],
98+
unique=False,
99+
schema="cognition",
100+
)
101+
op.add_column(
102+
"environment_variable",
103+
sa.Column("organization_id", postgresql.UUID(as_uuid=True), nullable=True),
104+
schema="cognition",
105+
)
106+
op.create_index(
107+
op.f("ix_cognition_environment_variable_organization_id"),
108+
"environment_variable",
109+
["organization_id"],
110+
unique=False,
111+
schema="cognition",
112+
)
113+
op.create_foreign_key(
114+
None,
115+
"environment_variable",
116+
"organization",
117+
["organization_id"],
118+
["id"],
119+
source_schema="cognition",
120+
ondelete="CASCADE",
121+
)
122+
op.add_column(
123+
"markdown_file",
124+
sa.Column("dataset_id", postgresql.UUID(as_uuid=True), nullable=True),
125+
schema="cognition",
126+
)
127+
op.add_column(
128+
"markdown_file",
129+
sa.Column("started_at", sa.DateTime(), nullable=True),
130+
schema="cognition",
131+
)
132+
op.add_column(
133+
"markdown_file",
134+
sa.Column("finished_at", sa.DateTime(), nullable=True),
135+
schema="cognition",
136+
)
137+
op.add_column(
138+
"markdown_file",
139+
sa.Column("state", sa.String(), nullable=True),
140+
schema="cognition",
141+
)
142+
op.create_index(
143+
op.f("ix_cognition_markdown_file_dataset_id"),
144+
"markdown_file",
145+
["dataset_id"],
146+
unique=False,
147+
schema="cognition",
148+
)
149+
op.create_foreign_key(
150+
None,
151+
"markdown_file",
152+
"markdown_dataset",
153+
["dataset_id"],
154+
["id"],
155+
source_schema="cognition",
156+
referent_schema="cognition",
157+
ondelete="CASCADE",
158+
)
159+
# ### end Alembic commands ###
160+
161+
162+
def downgrade():
163+
# ### commands auto generated by Alembic - please adjust! ###
164+
op.drop_constraint(None, "markdown_file", schema="cognition", type_="foreignkey")
165+
op.drop_index(
166+
op.f("ix_cognition_markdown_file_dataset_id"),
167+
table_name="markdown_file",
168+
schema="cognition",
169+
)
170+
op.drop_column("markdown_file", "state", schema="cognition")
171+
op.drop_column("markdown_file", "finished_at", schema="cognition")
172+
op.drop_column("markdown_file", "started_at", schema="cognition")
173+
op.drop_column("markdown_file", "dataset_id", schema="cognition")
174+
op.drop_constraint(
175+
None, "environment_variable", schema="cognition", type_="foreignkey"
176+
)
177+
op.drop_index(
178+
op.f("ix_cognition_environment_variable_organization_id"),
179+
table_name="environment_variable",
180+
schema="cognition",
181+
)
182+
op.drop_column("environment_variable", "organization_id", schema="cognition")
183+
op.drop_index(
184+
op.f("ix_cognition_markdown_llm_logs_markdown_file_id"),
185+
table_name="markdown_llm_logs",
186+
schema="cognition",
187+
)
188+
op.drop_table("markdown_llm_logs", schema="cognition")
189+
op.drop_index(
190+
op.f("ix_cognition_markdown_dataset_refinery_project_id"),
191+
table_name="markdown_dataset",
192+
schema="cognition",
193+
)
194+
op.drop_index(
195+
op.f("ix_cognition_markdown_dataset_organization_id"),
196+
table_name="markdown_dataset",
197+
schema="cognition",
198+
)
199+
op.drop_index(
200+
op.f("ix_cognition_markdown_dataset_environment_variable_id"),
201+
table_name="markdown_dataset",
202+
schema="cognition",
203+
)
204+
op.drop_index(
205+
op.f("ix_cognition_markdown_dataset_created_by"),
206+
table_name="markdown_dataset",
207+
schema="cognition",
208+
)
209+
op.drop_table("markdown_dataset", schema="cognition")
210+
# ### end Alembic commands ###

api/transfer.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
general,
2020
organization,
2121
tokenization,
22+
project as refinery_project,
2223
)
2324

2425
from submodules.model.cognition_objects import project as cognition_project
@@ -233,6 +234,33 @@ def put(self, request) -> PlainTextResponse:
233234
return PlainTextResponse("OK")
234235

235236

237+
class CognitionParseMarkdownFile(HTTPEndpoint):
238+
def post(self, request) -> PlainTextResponse:
239+
refinery_project_id = request.path_params["project_id"]
240+
refinery_project_item = refinery_project.get(refinery_project_id)
241+
if not refinery_project_item:
242+
return PlainTextResponse("Bad project id", status_code=400)
243+
244+
dataset_id = request.path_params["dataset_id"]
245+
file_id = request.path_params["file_id"]
246+
247+
# via thread to ensure the endpoint returns immediately
248+
249+
daemon.run(
250+
task_queue_manager.add_task,
251+
refinery_project_id,
252+
TaskType.PARSE_MARKDOWN_FILE,
253+
refinery_project_item.created_by,
254+
{
255+
"org_id": str(refinery_project_item.organization_id),
256+
"dataset_id": dataset_id,
257+
"file_id": file_id,
258+
},
259+
)
260+
261+
return PlainTextResponse("OK")
262+
263+
236264
class AssociationsImport(HTTPEndpoint):
237265
async def post(self, request) -> JSONResponse:
238266
project_id = request.path_params["project_id"]

app.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
UploadTaskInfo,
1414
CognitionImport,
1515
CognitionPrepareProject,
16+
CognitionParseMarkdownFile,
1617
)
1718
from middleware.database_session import DatabaseSessionHandler
1819
from starlette.applications import Starlette
@@ -55,6 +56,10 @@
5556
"/project/{cognition_project_id:str}/cognition/continue/{task_id:str}/finalize",
5657
CognitionPrepareProject,
5758
),
59+
Route(
60+
"/project/{project_id:str}/cognition/datasets/{dataset_id:str}/files/{file_id:str}/queue",
61+
CognitionParseMarkdownFile,
62+
),
5863
Route("/project/{project_id:str}/import/task/{task_id:str}", UploadTaskInfo),
5964
Route("/project", ProjectCreationFromWorkflow),
6065
Route("/is_managed", IsManagedRest),
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from typing import Any, Dict, Tuple, Callable
2+
import os
3+
4+
import requests
5+
from submodules.model.business_objects import (
6+
task_queue as task_queue_db_bo,
7+
general,
8+
)
9+
from submodules.model.cognition_objects import (
10+
markdown_file as markdown_file_db_bo,
11+
)
12+
from submodules.model.enums import CognitionMarkdownFileState
13+
14+
BASE_URI = os.getenv("COGNITION_GATEWAY")
15+
16+
TASK_DONE_STATES = [
17+
CognitionMarkdownFileState.FINISHED.value,
18+
CognitionMarkdownFileState.FAILED.value,
19+
]
20+
21+
22+
def get_task_functions() -> Tuple[Callable, Callable, int]:
23+
return __start_task, __check_finished, 1
24+
25+
26+
def __start_task(task: Dict[str, Any]) -> bool:
27+
# check task still relevant
28+
task_db_obj = task_queue_db_bo.get(task["id"])
29+
if task_db_obj is None or task_db_obj.is_active:
30+
return False
31+
32+
action = task["task_info"]
33+
org_id = action["org_id"]
34+
dataset_id = action["dataset_id"]
35+
file_id = action["file_id"]
36+
37+
task_db_obj.is_active = True
38+
general.commit()
39+
requests.post(
40+
f"{BASE_URI}/api/v1/converters/internal/datasets/{dataset_id}/files/{file_id}/parse",
41+
json={"orgId": org_id},
42+
)
43+
return True
44+
45+
46+
def __check_finished(task: Dict[str, Any]) -> bool:
47+
action = task["task_info"]
48+
org_id = action["org_id"]
49+
file_id = action["file_id"]
50+
markdown_file_entity = markdown_file_db_bo.get(org_id=org_id, md_file_id=file_id)
51+
if markdown_file_entity is None:
52+
return True
53+
return markdown_file_entity.state in TASK_DONE_STATES

controller/task_queue/manager.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
tokenization as tokenization_handler,
1717
attribute_calculation as attribute_calculation_handler,
1818
task_queue as task_queue_handler,
19+
markdown_file as markdown_file_handler,
1920
)
2021
from .util import if_task_queue_send_websocket
2122

@@ -87,6 +88,8 @@ def get_task_function_by_type(task_type: str) -> Tuple[Callable, Callable, int]:
8788
return attribute_calculation_handler.get_task_functions()
8889
if task_type == enums.TaskType.TASK_QUEUE.value:
8990
return task_queue_handler.get_task_functions()
91+
if task_type == enums.TaskType.PARSE_MARKDOWN_FILE.value:
92+
return markdown_file_handler.get_task_functions()
9093
raise ValueError(f"Task type {task_type} not supported yet")
9194

9295

start

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ docker run -d --rm \
7272
-e TOKENIZER=http://refinery-tokenizer:80 \
7373
-e DOC_OCK=http://refinery-doc-ock:80 \
7474
-e GATES=http://gates-gateway:80 \
75+
-e COGNITION_GATEWAY=http://cognition-gateway:80 \
7576
-e KRATOS_ADMIN_URL=http://kratos:4434 \
7677
-e TASK_QUEUE_SLOTS=1 \
7778
-e PRIORITY_TASK_QUEUE_SLOTS=1 \

0 commit comments

Comments
 (0)