From c7a944a7625ccbf6f92c39185e58b474f0f35360 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 24 Oct 2025 12:51:38 +0200 Subject: [PATCH 01/31] chore: update submodules --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index fa52e172..3d4317aa 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit fa52e1725d0691979895644d63c0b61728ea771b +Subproject commit 3d4317aaea9cfd2d8f241e87de67cc39fe98f55a From f63a8bad07c612e18b92279663994492353233c7 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 24 Oct 2025 12:51:47 +0200 Subject: [PATCH 02/31] perf(alembic): add etl task table --- .../ac36ed533db7_adds_etl_task_table.py | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 alembic/versions/ac36ed533db7_adds_etl_task_table.py diff --git a/alembic/versions/ac36ed533db7_adds_etl_task_table.py b/alembic/versions/ac36ed533db7_adds_etl_task_table.py new file mode 100644 index 00000000..3fc8ad9f --- /dev/null +++ b/alembic/versions/ac36ed533db7_adds_etl_task_table.py @@ -0,0 +1,89 @@ +"""adds etl task table + +Revision ID: ac36ed533db7 +Revises: 24ca8432bd8b +Create Date: 2025-10-24 10:50:23.262494 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "ac36ed533db7" +down_revision = "24ca8432bd8b" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "etl_task", + sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("created_at", sa.DateTime(), nullable=True), + sa.Column("created_by", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("markdown_file_id", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("sharepoint_file_id", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("extract_config", sa.JSON(), nullable=True), + sa.Column("transform_config", sa.JSON(), nullable=True), + sa.Column("load_config", sa.JSON(), nullable=True), + sa.Column("notify_config", sa.JSON(), nullable=True), + sa.Column("priority", sa.Boolean(), nullable=True), + sa.Column("is_active", sa.Boolean(), nullable=True), + sa.Column("started_at", sa.DateTime(), nullable=True), + sa.Column("finished_at", sa.DateTime(), nullable=True), + sa.Column("state", sa.String(), nullable=True), + sa.Column("error_message", sa.String(), nullable=True), + sa.ForeignKeyConstraint(["created_by"], ["user.id"], ondelete="SET NULL"), + sa.ForeignKeyConstraint( + ["markdown_file_id"], ["cognition.markdown_file.id"], ondelete="CASCADE" + ), + sa.ForeignKeyConstraint( + ["sharepoint_file_id"], ["integration.sharepoint.id"], ondelete="CASCADE" + ), + sa.PrimaryKeyConstraint("id"), + schema="global", + ) + op.create_index( + op.f("ix_global_etl_task_created_by"), + "etl_task", + ["created_by"], + unique=False, + schema="global", + ) + op.create_index( + op.f("ix_global_etl_task_markdown_file_id"), + "etl_task", + ["markdown_file_id"], + unique=False, + schema="global", + ) + op.create_index( + op.f("ix_global_etl_task_sharepoint_file_id"), + "etl_task", + ["sharepoint_file_id"], + unique=False, + schema="global", + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index( + op.f("ix_global_etl_task_sharepoint_file_id"), + table_name="etl_task", + schema="global", + ) + op.drop_index( + op.f("ix_global_etl_task_markdown_file_id"), + table_name="etl_task", + schema="global", + ) + op.drop_index( + op.f("ix_global_etl_task_created_by"), table_name="etl_task", schema="global" + ) + op.drop_table("etl_task", schema="global") + # ### end Alembic commands ### From 7b5fcbe2cac5196756a2336a5e81817e63813e95 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 24 Oct 2025 13:03:44 +0200 Subject: [PATCH 03/31] chore: update submodules --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index 3d4317aa..c002a6e2 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 3d4317aaea9cfd2d8f241e87de67cc39fe98f55a +Subproject commit c002a6e2d612517d88749e967691e38ab82aa7d9 From 4f57597ed6f18989a7e05690b92d7548e088ac92 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 24 Oct 2025 13:03:55 +0200 Subject: [PATCH 04/31] perf(alembic): update etl task table --- ...l_task_table.py => e46afed08420_adds_etl_task_table.py} | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) rename alembic/versions/{ac36ed533db7_adds_etl_task_table.py => e46afed08420_adds_etl_task_table.py} (95%) diff --git a/alembic/versions/ac36ed533db7_adds_etl_task_table.py b/alembic/versions/e46afed08420_adds_etl_task_table.py similarity index 95% rename from alembic/versions/ac36ed533db7_adds_etl_task_table.py rename to alembic/versions/e46afed08420_adds_etl_task_table.py index 3fc8ad9f..26d203b8 100644 --- a/alembic/versions/ac36ed533db7_adds_etl_task_table.py +++ b/alembic/versions/e46afed08420_adds_etl_task_table.py @@ -1,8 +1,8 @@ """adds etl task table -Revision ID: ac36ed533db7 +Revision ID: e46afed08420 Revises: 24ca8432bd8b -Create Date: 2025-10-24 10:50:23.262494 +Create Date: 2025-10-24 11:02:58.188338 """ @@ -11,7 +11,7 @@ from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision = "ac36ed533db7" +revision = "e46afed08420" down_revision = "24ca8432bd8b" branch_labels = None depends_on = None @@ -30,6 +30,7 @@ def upgrade(): sa.Column("transform_config", sa.JSON(), nullable=True), sa.Column("load_config", sa.JSON(), nullable=True), sa.Column("notify_config", sa.JSON(), nullable=True), + sa.Column("llm_config", sa.JSON(), nullable=True), sa.Column("priority", sa.Boolean(), nullable=True), sa.Column("is_active", sa.Boolean(), nullable=True), sa.Column("started_at", sa.DateTime(), nullable=True), From aa9f19359dff300ef990d29eb1b58f1cd53b1cc3 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 24 Oct 2025 13:12:10 +0200 Subject: [PATCH 05/31] chore: update submodules --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index c002a6e2..c12e193d 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit c002a6e2d612517d88749e967691e38ab82aa7d9 +Subproject commit c12e193dde22f860f0e47f04e91878e858308183 From c939facae3143741bfa29c2d7837efff862c82fe Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 24 Oct 2025 13:12:20 +0200 Subject: [PATCH 06/31] perf(alembic): add org_id column --- ...py => b9ad3672dd8d_adds_etl_task_table.py} | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) rename alembic/versions/{e46afed08420_adds_etl_task_table.py => b9ad3672dd8d_adds_etl_task_table.py} (81%) diff --git a/alembic/versions/e46afed08420_adds_etl_task_table.py b/alembic/versions/b9ad3672dd8d_adds_etl_task_table.py similarity index 81% rename from alembic/versions/e46afed08420_adds_etl_task_table.py rename to alembic/versions/b9ad3672dd8d_adds_etl_task_table.py index 26d203b8..9aa7ee72 100644 --- a/alembic/versions/e46afed08420_adds_etl_task_table.py +++ b/alembic/versions/b9ad3672dd8d_adds_etl_task_table.py @@ -1,8 +1,8 @@ """adds etl task table -Revision ID: e46afed08420 +Revision ID: b9ad3672dd8d Revises: 24ca8432bd8b -Create Date: 2025-10-24 11:02:58.188338 +Create Date: 2025-10-24 11:11:47.341059 """ @@ -11,7 +11,7 @@ from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision = "e46afed08420" +revision = "b9ad3672dd8d" down_revision = "24ca8432bd8b" branch_labels = None depends_on = None @@ -22,6 +22,7 @@ def upgrade(): op.create_table( "etl_task", sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("organization_id", postgresql.UUID(as_uuid=True), nullable=True), sa.Column("created_at", sa.DateTime(), nullable=True), sa.Column("created_by", postgresql.UUID(as_uuid=True), nullable=True), sa.Column("markdown_file_id", postgresql.UUID(as_uuid=True), nullable=True), @@ -31,16 +32,19 @@ def upgrade(): sa.Column("load_config", sa.JSON(), nullable=True), sa.Column("notify_config", sa.JSON(), nullable=True), sa.Column("llm_config", sa.JSON(), nullable=True), - sa.Column("priority", sa.Boolean(), nullable=True), - sa.Column("is_active", sa.Boolean(), nullable=True), sa.Column("started_at", sa.DateTime(), nullable=True), sa.Column("finished_at", sa.DateTime(), nullable=True), sa.Column("state", sa.String(), nullable=True), + sa.Column("is_active", sa.Boolean(), nullable=True), + sa.Column("priority", sa.Integer(), nullable=True), sa.Column("error_message", sa.String(), nullable=True), sa.ForeignKeyConstraint(["created_by"], ["user.id"], ondelete="SET NULL"), sa.ForeignKeyConstraint( ["markdown_file_id"], ["cognition.markdown_file.id"], ondelete="CASCADE" ), + sa.ForeignKeyConstraint( + ["organization_id"], ["organization.id"], ondelete="CASCADE" + ), sa.ForeignKeyConstraint( ["sharepoint_file_id"], ["integration.sharepoint.id"], ondelete="CASCADE" ), @@ -61,6 +65,13 @@ def upgrade(): unique=False, schema="global", ) + op.create_index( + op.f("ix_global_etl_task_organization_id"), + "etl_task", + ["organization_id"], + unique=False, + schema="global", + ) op.create_index( op.f("ix_global_etl_task_sharepoint_file_id"), "etl_task", @@ -78,6 +89,11 @@ def downgrade(): table_name="etl_task", schema="global", ) + op.drop_index( + op.f("ix_global_etl_task_organization_id"), + table_name="etl_task", + schema="global", + ) op.drop_index( op.f("ix_global_etl_task_markdown_file_id"), table_name="etl_task", From 4d258417a16364790f457f4daba800d9dd655d90 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 24 Oct 2025 13:47:38 +0200 Subject: [PATCH 07/31] chore: update submodules --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index c12e193d..571073f8 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit c12e193dde22f860f0e47f04e91878e858308183 +Subproject commit 571073f8770c9618cae12b9dbf67417424c98ee9 From 217832c3ea6fab17ef08050b9352542690358747 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Fri, 24 Oct 2025 13:47:48 +0200 Subject: [PATCH 08/31] perf(alembic): update etl task table --- ...tl_task_table.py => 8a5e0469e9d0_adds_etl_task_table.py} | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) rename alembic/versions/{b9ad3672dd8d_adds_etl_task_table.py => 8a5e0469e9d0_adds_etl_task_table.py} (97%) diff --git a/alembic/versions/b9ad3672dd8d_adds_etl_task_table.py b/alembic/versions/8a5e0469e9d0_adds_etl_task_table.py similarity index 97% rename from alembic/versions/b9ad3672dd8d_adds_etl_task_table.py rename to alembic/versions/8a5e0469e9d0_adds_etl_task_table.py index 9aa7ee72..a4b90bac 100644 --- a/alembic/versions/b9ad3672dd8d_adds_etl_task_table.py +++ b/alembic/versions/8a5e0469e9d0_adds_etl_task_table.py @@ -1,8 +1,8 @@ """adds etl task table -Revision ID: b9ad3672dd8d +Revision ID: 8a5e0469e9d0 Revises: 24ca8432bd8b -Create Date: 2025-10-24 11:11:47.341059 +Create Date: 2025-10-24 11:47:15.649814 """ @@ -11,7 +11,7 @@ from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision = "b9ad3672dd8d" +revision = "8a5e0469e9d0" down_revision = "24ca8432bd8b" branch_labels = None depends_on = None From c21859456d427b2044363c7bc182dbc40934d362 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Sun, 26 Oct 2025 09:39:16 +0100 Subject: [PATCH 09/31] chore: update submodules --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index 571073f8..2bea8576 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 571073f8770c9618cae12b9dbf67417424c98ee9 +Subproject commit 2bea8576426ce2a305138e56d45874e46cfdd0b4 From cb64bf8a4bf19a33b30bf8edbbb6db0923e78837 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Sun, 26 Oct 2025 09:39:26 +0100 Subject: [PATCH 10/31] perf: add file_size_bytes to etl_task --- ...task_table.py => e07dd53f5fcb_adds_etl_task_table.py} | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) rename alembic/versions/{8a5e0469e9d0_adds_etl_task_table.py => e07dd53f5fcb_adds_etl_task_table.py} (92%) diff --git a/alembic/versions/8a5e0469e9d0_adds_etl_task_table.py b/alembic/versions/e07dd53f5fcb_adds_etl_task_table.py similarity index 92% rename from alembic/versions/8a5e0469e9d0_adds_etl_task_table.py rename to alembic/versions/e07dd53f5fcb_adds_etl_task_table.py index a4b90bac..1f54096a 100644 --- a/alembic/versions/8a5e0469e9d0_adds_etl_task_table.py +++ b/alembic/versions/e07dd53f5fcb_adds_etl_task_table.py @@ -1,8 +1,8 @@ """adds etl task table -Revision ID: 8a5e0469e9d0 +Revision ID: e07dd53f5fcb Revises: 24ca8432bd8b -Create Date: 2025-10-24 11:47:15.649814 +Create Date: 2025-10-25 20:13:45.417677 """ @@ -11,7 +11,7 @@ from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision = "8a5e0469e9d0" +revision = "e07dd53f5fcb" down_revision = "24ca8432bd8b" branch_labels = None depends_on = None @@ -27,6 +27,9 @@ def upgrade(): sa.Column("created_by", postgresql.UUID(as_uuid=True), nullable=True), sa.Column("markdown_file_id", postgresql.UUID(as_uuid=True), nullable=True), sa.Column("sharepoint_file_id", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("file_path", sa.String(), nullable=True), + sa.Column("file_size_bytes", sa.BigInteger(), nullable=True), + sa.Column("tokenizer", sa.String(), nullable=True), sa.Column("extract_config", sa.JSON(), nullable=True), sa.Column("transform_config", sa.JSON(), nullable=True), sa.Column("load_config", sa.JSON(), nullable=True), From bd1b6332456f00c2c6b59dd5153ce7195d80a680 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Mon, 27 Oct 2025 01:16:12 +0100 Subject: [PATCH 11/31] chore: update submodules --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index 2bea8576..e6d425b1 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 2bea8576426ce2a305138e56d45874e46cfdd0b4 +Subproject commit e6d425b1d6a7da0eeaed016f19960763ba20ee98 From 92e89cd9129e77600f9c6f081283715f08494bf5 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Mon, 27 Oct 2025 01:16:22 +0100 Subject: [PATCH 12/31] perf: add split_config --- ...l_task_table.py => 5f8de6fcce30_adds_etl_task_table.py} | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) rename alembic/versions/{e07dd53f5fcb_adds_etl_task_table.py => 5f8de6fcce30_adds_etl_task_table.py} (95%) diff --git a/alembic/versions/e07dd53f5fcb_adds_etl_task_table.py b/alembic/versions/5f8de6fcce30_adds_etl_task_table.py similarity index 95% rename from alembic/versions/e07dd53f5fcb_adds_etl_task_table.py rename to alembic/versions/5f8de6fcce30_adds_etl_task_table.py index 1f54096a..2354f4c1 100644 --- a/alembic/versions/e07dd53f5fcb_adds_etl_task_table.py +++ b/alembic/versions/5f8de6fcce30_adds_etl_task_table.py @@ -1,8 +1,8 @@ """adds etl task table -Revision ID: e07dd53f5fcb +Revision ID: 5f8de6fcce30 Revises: 24ca8432bd8b -Create Date: 2025-10-25 20:13:45.417677 +Create Date: 2025-10-26 20:43:48.216324 """ @@ -11,7 +11,7 @@ from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision = "e07dd53f5fcb" +revision = "5f8de6fcce30" down_revision = "24ca8432bd8b" branch_labels = None depends_on = None @@ -31,6 +31,7 @@ def upgrade(): sa.Column("file_size_bytes", sa.BigInteger(), nullable=True), sa.Column("tokenizer", sa.String(), nullable=True), sa.Column("extract_config", sa.JSON(), nullable=True), + sa.Column("split_config", sa.JSON(), nullable=True), sa.Column("transform_config", sa.JSON(), nullable=True), sa.Column("load_config", sa.JSON(), nullable=True), sa.Column("notify_config", sa.JSON(), nullable=True), From 5b18a3cfc440b74e667755793a8234b2229b1bd8 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Mon, 27 Oct 2025 21:53:18 +0100 Subject: [PATCH 13/31] chore: update submodules --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index e6d425b1..1e6dff4d 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit e6d425b1d6a7da0eeaed016f19960763ba20ee98 +Subproject commit 1e6dff4d80e135c44816918341cbb36071a58c73 From 97b01bc49dad820606932392eab0523619378d78 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Mon, 27 Oct 2025 21:53:29 +0100 Subject: [PATCH 14/31] perf(etl): fkey alignment --- .../5f8de6fcce30_adds_etl_task_table.py | 110 ------- .../60cb22681e6c_adds_etl_task_table.py | 295 ++++++++++++++++++ 2 files changed, 295 insertions(+), 110 deletions(-) delete mode 100644 alembic/versions/5f8de6fcce30_adds_etl_task_table.py create mode 100644 alembic/versions/60cb22681e6c_adds_etl_task_table.py diff --git a/alembic/versions/5f8de6fcce30_adds_etl_task_table.py b/alembic/versions/5f8de6fcce30_adds_etl_task_table.py deleted file mode 100644 index 2354f4c1..00000000 --- a/alembic/versions/5f8de6fcce30_adds_etl_task_table.py +++ /dev/null @@ -1,110 +0,0 @@ -"""adds etl task table - -Revision ID: 5f8de6fcce30 -Revises: 24ca8432bd8b -Create Date: 2025-10-26 20:43:48.216324 - -""" - -from alembic import op -import sqlalchemy as sa -from sqlalchemy.dialects import postgresql - -# revision identifiers, used by Alembic. -revision = "5f8de6fcce30" -down_revision = "24ca8432bd8b" -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.create_table( - "etl_task", - sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False), - sa.Column("organization_id", postgresql.UUID(as_uuid=True), nullable=True), - sa.Column("created_at", sa.DateTime(), nullable=True), - sa.Column("created_by", postgresql.UUID(as_uuid=True), nullable=True), - sa.Column("markdown_file_id", postgresql.UUID(as_uuid=True), nullable=True), - sa.Column("sharepoint_file_id", postgresql.UUID(as_uuid=True), nullable=True), - sa.Column("file_path", sa.String(), nullable=True), - sa.Column("file_size_bytes", sa.BigInteger(), nullable=True), - sa.Column("tokenizer", sa.String(), nullable=True), - sa.Column("extract_config", sa.JSON(), nullable=True), - sa.Column("split_config", sa.JSON(), nullable=True), - sa.Column("transform_config", sa.JSON(), nullable=True), - sa.Column("load_config", sa.JSON(), nullable=True), - sa.Column("notify_config", sa.JSON(), nullable=True), - sa.Column("llm_config", sa.JSON(), nullable=True), - sa.Column("started_at", sa.DateTime(), nullable=True), - sa.Column("finished_at", sa.DateTime(), nullable=True), - sa.Column("state", sa.String(), nullable=True), - sa.Column("is_active", sa.Boolean(), nullable=True), - sa.Column("priority", sa.Integer(), nullable=True), - sa.Column("error_message", sa.String(), nullable=True), - sa.ForeignKeyConstraint(["created_by"], ["user.id"], ondelete="SET NULL"), - sa.ForeignKeyConstraint( - ["markdown_file_id"], ["cognition.markdown_file.id"], ondelete="CASCADE" - ), - sa.ForeignKeyConstraint( - ["organization_id"], ["organization.id"], ondelete="CASCADE" - ), - sa.ForeignKeyConstraint( - ["sharepoint_file_id"], ["integration.sharepoint.id"], ondelete="CASCADE" - ), - sa.PrimaryKeyConstraint("id"), - schema="global", - ) - op.create_index( - op.f("ix_global_etl_task_created_by"), - "etl_task", - ["created_by"], - unique=False, - schema="global", - ) - op.create_index( - op.f("ix_global_etl_task_markdown_file_id"), - "etl_task", - ["markdown_file_id"], - unique=False, - schema="global", - ) - op.create_index( - op.f("ix_global_etl_task_organization_id"), - "etl_task", - ["organization_id"], - unique=False, - schema="global", - ) - op.create_index( - op.f("ix_global_etl_task_sharepoint_file_id"), - "etl_task", - ["sharepoint_file_id"], - unique=False, - schema="global", - ) - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.drop_index( - op.f("ix_global_etl_task_sharepoint_file_id"), - table_name="etl_task", - schema="global", - ) - op.drop_index( - op.f("ix_global_etl_task_organization_id"), - table_name="etl_task", - schema="global", - ) - op.drop_index( - op.f("ix_global_etl_task_markdown_file_id"), - table_name="etl_task", - schema="global", - ) - op.drop_index( - op.f("ix_global_etl_task_created_by"), table_name="etl_task", schema="global" - ) - op.drop_table("etl_task", schema="global") - # ### end Alembic commands ### diff --git a/alembic/versions/60cb22681e6c_adds_etl_task_table.py b/alembic/versions/60cb22681e6c_adds_etl_task_table.py new file mode 100644 index 00000000..adef8d80 --- /dev/null +++ b/alembic/versions/60cb22681e6c_adds_etl_task_table.py @@ -0,0 +1,295 @@ +"""adds etl task table + +Revision ID: 60cb22681e6c +Revises: 24ca8432bd8b +Create Date: 2025-10-27 20:49:11.247647 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "60cb22681e6c" +down_revision = "24ca8432bd8b" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "etl_task", + sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("organization_id", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("created_at", sa.DateTime(), nullable=True), + sa.Column("created_by", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("file_path", sa.String(), nullable=True), + sa.Column("file_size_bytes", sa.BigInteger(), nullable=True), + sa.Column("tokenizer", sa.String(), nullable=True), + sa.Column("extract_config", sa.JSON(), nullable=True), + sa.Column("split_config", sa.JSON(), nullable=True), + sa.Column("transform_config", sa.JSON(), nullable=True), + sa.Column("load_config", sa.JSON(), nullable=True), + sa.Column("notify_config", sa.JSON(), nullable=True), + sa.Column("llm_config", sa.JSON(), nullable=True), + sa.Column("started_at", sa.DateTime(), nullable=True), + sa.Column("finished_at", sa.DateTime(), nullable=True), + sa.Column("state", sa.String(), nullable=True), + sa.Column("is_active", sa.Boolean(), nullable=True), + sa.Column("priority", sa.Integer(), nullable=True), + sa.Column("error_message", sa.String(), nullable=True), + sa.ForeignKeyConstraint(["created_by"], ["user.id"], ondelete="SET NULL"), + sa.ForeignKeyConstraint( + ["organization_id"], ["organization.id"], ondelete="CASCADE" + ), + sa.PrimaryKeyConstraint("id"), + schema="global", + ) + op.create_index( + op.f("ix_global_etl_task_created_by"), + "etl_task", + ["created_by"], + unique=False, + schema="global", + ) + op.create_index( + op.f("ix_global_etl_task_organization_id"), + "etl_task", + ["organization_id"], + unique=False, + schema="global", + ) + op.add_column( + "markdown_file", + sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True), + schema="cognition", + ) + op.create_index( + op.f("ix_cognition_markdown_file_etl_task_id"), + "markdown_file", + ["etl_task_id"], + unique=False, + schema="cognition", + ) + op.create_foreign_key( + None, + "markdown_file", + "etl_task", + ["etl_task_id"], + ["id"], + source_schema="cognition", + referent_schema="global", + ondelete="CASCADE", + ) + op.add_column( + "github_file", + sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True), + schema="integration", + ) + op.drop_constraint( + "unique_github_file_source", "github_file", schema="integration", type_="unique" + ) + op.create_unique_constraint( + "unique_github_file_source", + "github_file", + ["integration_id", "running_id", "source", "etl_task_id"], + schema="integration", + ) + op.create_index( + op.f("ix_integration_github_file_etl_task_id"), + "github_file", + ["etl_task_id"], + unique=False, + schema="integration", + ) + op.create_foreign_key( + None, + "github_file", + "etl_task", + ["etl_task_id"], + ["id"], + source_schema="integration", + referent_schema="global", + ondelete="CASCADE", + ) + op.add_column( + "github_issue", + sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True), + schema="integration", + ) + op.drop_constraint( + "unique_github_issue_source", + "github_issue", + schema="integration", + type_="unique", + ) + op.create_unique_constraint( + "unique_github_issue_source", + "github_issue", + ["integration_id", "running_id", "source", "etl_task_id"], + schema="integration", + ) + op.create_index( + op.f("ix_integration_github_issue_etl_task_id"), + "github_issue", + ["etl_task_id"], + unique=False, + schema="integration", + ) + op.create_foreign_key( + None, + "github_issue", + "etl_task", + ["etl_task_id"], + ["id"], + source_schema="integration", + referent_schema="global", + ondelete="CASCADE", + ) + op.add_column( + "pdf", + sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True), + schema="integration", + ) + op.drop_constraint("unique_pdf_source", "pdf", schema="integration", type_="unique") + op.create_unique_constraint( + "unique_pdf_source", + "pdf", + ["integration_id", "running_id", "source", "etl_task_id"], + schema="integration", + ) + op.create_index( + op.f("ix_integration_pdf_etl_task_id"), + "pdf", + ["etl_task_id"], + unique=False, + schema="integration", + ) + op.create_foreign_key( + None, + "pdf", + "etl_task", + ["etl_task_id"], + ["id"], + source_schema="integration", + referent_schema="global", + ondelete="CASCADE", + ) + op.add_column( + "sharepoint", + sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True), + schema="integration", + ) + op.drop_constraint( + "unique_sharepoint_source", "sharepoint", schema="integration", type_="unique" + ) + op.create_unique_constraint( + "unique_sharepoint_source", + "sharepoint", + ["integration_id", "running_id", "source", "etl_task_id"], + schema="integration", + ) + op.create_index( + op.f("ix_integration_sharepoint_etl_task_id"), + "sharepoint", + ["etl_task_id"], + unique=False, + schema="integration", + ) + op.create_foreign_key( + None, + "sharepoint", + "etl_task", + ["etl_task_id"], + ["id"], + source_schema="integration", + referent_schema="global", + ondelete="CASCADE", + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint(None, "sharepoint", schema="integration", type_="foreignkey") + op.drop_index( + op.f("ix_integration_sharepoint_etl_task_id"), + table_name="sharepoint", + schema="integration", + ) + op.drop_constraint( + "unique_sharepoint_source", "sharepoint", schema="integration", type_="unique" + ) + op.create_unique_constraint( + "unique_sharepoint_source", + "sharepoint", + ["integration_id", "running_id", "source"], + schema="integration", + ) + op.drop_column("sharepoint", "etl_task_id", schema="integration") + op.drop_constraint(None, "pdf", schema="integration", type_="foreignkey") + op.drop_index( + op.f("ix_integration_pdf_etl_task_id"), table_name="pdf", schema="integration" + ) + op.drop_constraint("unique_pdf_source", "pdf", schema="integration", type_="unique") + op.create_unique_constraint( + "unique_pdf_source", + "pdf", + ["integration_id", "running_id", "source"], + schema="integration", + ) + op.drop_column("pdf", "etl_task_id", schema="integration") + op.drop_constraint(None, "github_issue", schema="integration", type_="foreignkey") + op.drop_index( + op.f("ix_integration_github_issue_etl_task_id"), + table_name="github_issue", + schema="integration", + ) + op.drop_constraint( + "unique_github_issue_source", + "github_issue", + schema="integration", + type_="unique", + ) + op.create_unique_constraint( + "unique_github_issue_source", + "github_issue", + ["integration_id", "running_id", "source"], + schema="integration", + ) + op.drop_column("github_issue", "etl_task_id", schema="integration") + op.drop_constraint(None, "github_file", schema="integration", type_="foreignkey") + op.drop_index( + op.f("ix_integration_github_file_etl_task_id"), + table_name="github_file", + schema="integration", + ) + op.drop_constraint( + "unique_github_file_source", "github_file", schema="integration", type_="unique" + ) + op.create_unique_constraint( + "unique_github_file_source", + "github_file", + ["integration_id", "running_id", "source"], + schema="integration", + ) + op.drop_column("github_file", "etl_task_id", schema="integration") + op.drop_constraint(None, "markdown_file", schema="cognition", type_="foreignkey") + op.drop_index( + op.f("ix_cognition_markdown_file_etl_task_id"), + table_name="markdown_file", + schema="cognition", + ) + op.drop_column("markdown_file", "etl_task_id", schema="cognition") + op.drop_index( + op.f("ix_global_etl_task_organization_id"), + table_name="etl_task", + schema="global", + ) + op.drop_index( + op.f("ix_global_etl_task_created_by"), table_name="etl_task", schema="global" + ) + op.drop_table("etl_task", schema="global") + # ### end Alembic commands ### From 7a40c8106cd0cd44f7b5a9d5330e6761c8d5e81f Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Tue, 28 Oct 2025 09:36:16 +0100 Subject: [PATCH 15/31] chore: update submodules --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index 1e6dff4d..2ddce6ed 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 1e6dff4d80e135c44816918341cbb36071a58c73 +Subproject commit 2ddce6edc29e7e8525f5f53d4d30d13b39ee02ef From 79b05b1d837a51c75d7886647adaf4f4ea78c72f Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Tue, 28 Oct 2025 09:36:21 +0100 Subject: [PATCH 16/31] perf: task cancellation --- controller/monitor/manager.py | 11 +++++++++++ fast_api/routes/misc.py | 2 ++ 2 files changed, 13 insertions(+) diff --git a/controller/monitor/manager.py b/controller/monitor/manager.py index f4bdd43b..907b5bf6 100644 --- a/controller/monitor/manager.py +++ b/controller/monitor/manager.py @@ -126,3 +126,14 @@ def cancel_integration_task( task_monitor.set_integration_task_to_failed( integration_id, error_message="Cancelled by task manager" ) + + +def cancel_etl_task( + task_info: Dict[str, Any], +) -> None: + + etl_task_id = task_info.get("etlTaskId") + + task_monitor.set_etl_task_to_failed( + etl_task_id, error_message="Cancelled by task manager" + ) diff --git a/fast_api/routes/misc.py b/fast_api/routes/misc.py index d3a62f2f..e98af18f 100644 --- a/fast_api/routes/misc.py +++ b/fast_api/routes/misc.py @@ -136,6 +136,8 @@ def cancel_task( ) elif task_type == enums.TaskType.EXECUTE_INTEGRATION.value: controller_manager.cancel_integration_task(task_info) + elif task_type == enums.TaskType.EXECUTE_ETL.value: + controller_manager.cancel_etl_task(task_info) else: raise ValueError(f"{task_type} is no valid task type") From 2b4ad4edb572d09c31cf8e414997dacb7ac6e58f Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Wed, 29 Oct 2025 19:09:33 +0100 Subject: [PATCH 17/31] chore: update submodules --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index 2ddce6ed..99323bda 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 2ddce6edc29e7e8525f5f53d4d30d13b39ee02ef +Subproject commit 99323bda6dc969569a882d729f016b743dc12509 From 6264d2868662beee7459d58fa18f73ee0b0fe01a Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 01:02:01 +0100 Subject: [PATCH 18/31] fix: update submodules merge conflict --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index 99323bda..5549a617 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 99323bda6dc969569a882d729f016b743dc12509 +Subproject commit 5549a617a856699c95e7a76f51ad058541f0c2ad From 1aff7a81af5e7ef1c5acb0cbbaa2c9b614762890 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 01:22:38 +0100 Subject: [PATCH 19/31] perf: add cache_config --- ...py => bf8e8646ebdc_adds_etl_task_table.py} | 50 ++++++++++++++++--- 1 file changed, 42 insertions(+), 8 deletions(-) rename alembic/versions/{60cb22681e6c_adds_etl_task_table.py => bf8e8646ebdc_adds_etl_task_table.py} (88%) diff --git a/alembic/versions/60cb22681e6c_adds_etl_task_table.py b/alembic/versions/bf8e8646ebdc_adds_etl_task_table.py similarity index 88% rename from alembic/versions/60cb22681e6c_adds_etl_task_table.py rename to alembic/versions/bf8e8646ebdc_adds_etl_task_table.py index adef8d80..fc475d48 100644 --- a/alembic/versions/60cb22681e6c_adds_etl_task_table.py +++ b/alembic/versions/bf8e8646ebdc_adds_etl_task_table.py @@ -1,8 +1,8 @@ """adds etl task table -Revision ID: 60cb22681e6c +Revision ID: bf8e8646ebdc Revises: 24ca8432bd8b -Create Date: 2025-10-27 20:49:11.247647 +Create Date: 2025-10-30 00:21:05.246324 """ @@ -11,7 +11,7 @@ from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision = "60cb22681e6c" +revision = "bf8e8646ebdc" down_revision = "24ca8432bd8b" branch_labels = None depends_on = None @@ -28,6 +28,7 @@ def upgrade(): sa.Column("file_path", sa.String(), nullable=True), sa.Column("file_size_bytes", sa.BigInteger(), nullable=True), sa.Column("tokenizer", sa.String(), nullable=True), + sa.Column("cache_config", sa.JSON(), nullable=True), sa.Column("extract_config", sa.JSON(), nullable=True), sa.Column("split_config", sa.JSON(), nullable=True), sa.Column("transform_config", sa.JSON(), nullable=True), @@ -61,6 +62,11 @@ def upgrade(): unique=False, schema="global", ) + op.add_column( + "conversation", + sa.Column("incognito_mode", sa.Boolean(), nullable=True), + schema="cognition", + ) op.add_column( "markdown_file", sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True), @@ -73,6 +79,12 @@ def upgrade(): unique=False, schema="cognition", ) + op.create_unique_constraint( + "unique_markdown_file_etl_task_id", + "markdown_file", + ["id", "etl_task_id"], + schema="cognition", + ) op.create_foreign_key( None, "markdown_file", @@ -213,7 +225,12 @@ def upgrade(): def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - op.drop_constraint(None, "sharepoint", schema="integration", type_="foreignkey") + op.drop_constraint( + "sharepoint_etl_task_id_fkey", + "sharepoint", + schema="integration", + type_="foreignkey", + ) op.drop_index( op.f("ix_integration_sharepoint_etl_task_id"), table_name="sharepoint", @@ -229,7 +246,9 @@ def downgrade(): schema="integration", ) op.drop_column("sharepoint", "etl_task_id", schema="integration") - op.drop_constraint(None, "pdf", schema="integration", type_="foreignkey") + op.drop_constraint( + "pdf_etl_task_id_fkey", "pdf", schema="integration", type_="foreignkey" + ) op.drop_index( op.f("ix_integration_pdf_etl_task_id"), table_name="pdf", schema="integration" ) @@ -241,7 +260,12 @@ def downgrade(): schema="integration", ) op.drop_column("pdf", "etl_task_id", schema="integration") - op.drop_constraint(None, "github_issue", schema="integration", type_="foreignkey") + op.drop_constraint( + "github_issue_etl_task_id_fkey", + "github_issue", + schema="integration", + type_="foreignkey", + ) op.drop_index( op.f("ix_integration_github_issue_etl_task_id"), table_name="github_issue", @@ -260,7 +284,12 @@ def downgrade(): schema="integration", ) op.drop_column("github_issue", "etl_task_id", schema="integration") - op.drop_constraint(None, "github_file", schema="integration", type_="foreignkey") + op.drop_constraint( + "github_file_etl_task_id_fkey", + "github_file", + schema="integration", + type_="foreignkey", + ) op.drop_index( op.f("ix_integration_github_file_etl_task_id"), table_name="github_file", @@ -276,7 +305,12 @@ def downgrade(): schema="integration", ) op.drop_column("github_file", "etl_task_id", schema="integration") - op.drop_constraint(None, "markdown_file", schema="cognition", type_="foreignkey") + op.drop_constraint( + "markdown_file_etl_task_id_fkey", + "markdown_file", + schema="cognition", + type_="foreignkey", + ) op.drop_index( op.f("ix_cognition_markdown_file_etl_task_id"), table_name="markdown_file", From e3a19db0f31d3e16ad030eb27a82e596891e6243 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 01:22:53 +0100 Subject: [PATCH 20/31] perf: align /notify to etl provider --- controller/transfer/cognition/etl.py | 107 ++++++++++++++++++ controller/transfer/cognition/minio_upload.py | 84 ++++++++++---- 2 files changed, 172 insertions(+), 19 deletions(-) create mode 100644 controller/transfer/cognition/etl.py diff --git a/controller/transfer/cognition/etl.py b/controller/transfer/cognition/etl.py new file mode 100644 index 00000000..47e17a27 --- /dev/null +++ b/controller/transfer/cognition/etl.py @@ -0,0 +1,107 @@ +from typing import Optional + +from submodules.model import enums +from submodules.model.models import ( + EtlTask, + CognitionMarkdownFile, + CognitionMarkdownDataset, +) +from submodules.model.global_objects import etl_task as etl_task_bo +from submodules.model.cognition_objects import markdown_file as markdown_file_bo + +DEFAULT_FILE_TYPE = enums.ETLFileType.PDF +DEFAULT_EXTRACTORS = { + enums.ETLFileType.MD: enums.ETLExtractor.MD.FILESYSTEM, + enums.ETLFileType.PDF: enums.ETLExtractor.PDF.PDF2MD, +} + +DEFAULT_FALLBACK_EXTRACTORS = { + enums.ETLFileType.PDF: [ + enums.ETLExtractor.PDF.PDF2MD, + enums.ETLExtractor.PDF.VISION, + enums.ETLExtractor.PDF.AZURE_DI, + ], +} + + +def get_or_create_task( + markdown_file: CognitionMarkdownFile, + markdown_dataset: CognitionMarkdownDataset, + file_size_bytes: int, + minio_path: str, + original_file_name: str, + file_type: Optional[enums.ETLFileType] = None, + extractor: Optional[enums.ETLExtractor] = None, + fallback_extractors: Optional[list[enums.ETLExtractor]] = None, + split_strategy: Optional[enums.ETLSplitStrategy] = None, + chunk_size: Optional[int] = 1000, + priority: Optional[int] = -1, +) -> EtlTask: + if markdown_file.etl_task_id: + if etl_task := etl_task_bo.get_by_id(markdown_file.etl_task_id): + return etl_task + + file_type = file_type or DEFAULT_FILE_TYPE + split_strategy = split_strategy or enums.ETLSplitStrategy.CHUNK + extractor = extractor or DEFAULT_EXTRACTORS[file_type] + fallback_extractors = list( + filter( + lambda x: x != extractor, + (fallback_extractors or DEFAULT_FALLBACK_EXTRACTORS.get(file_type, [])), + ) + ) + + etl_task = etl_task_bo.create( + org_id=markdown_dataset.organization_id, + user_id=markdown_file.created_by, + file_size_bytes=file_size_bytes, + extract_config={ + "file_type": file_type.value, + "extractor": extractor.value, + "fallback_extractors": [fe.value for fe in fallback_extractors], + "minio_path": minio_path, + "original_file_name": original_file_name, + }, + split_config={ + "strategy": split_strategy.value, + "chunk_size": chunk_size, + }, + transform_config={ + "transformers": [ + { + "name": enums.ETLTransformer.CLEANSE.value, + "system_prompt": None, + "user_prompt": None, + }, + { + "name": enums.ETLTransformer.TEXT_TO_TABLE.value, + "system_prompt": None, + "user_prompt": None, + }, + ] + }, + load_config={ + "refinery_project": {"enabled": False, "id": None}, + "markdown_file": {"enabled": True, "id": str(markdown_file.id)}, + }, + notify_config={ + "http": { + "url": "http://cognition-gateway:80/etl/finished/{markdown_file_id}", + "format": { + "markdown_file_id": str(markdown_file.id), + }, + "method": "POST", + } + }, + llm_config=markdown_dataset.llm_config, + tokenizer=markdown_dataset.tokenizer, + priority=priority, + ) + + markdown_file_bo.update( + org_id=markdown_file.organization_id, + markdown_file_id=markdown_file.id, + etl_task_id=etl_task.id, + ) + + return etl_task diff --git a/controller/transfer/cognition/minio_upload.py b/controller/transfer/cognition/minio_upload.py index 3ea51107..6f49c26a 100644 --- a/controller/transfer/cognition/minio_upload.py +++ b/controller/transfer/cognition/minio_upload.py @@ -1,7 +1,12 @@ from typing import List -from submodules.model.cognition_objects import file_reference as file_reference_db_bo -from submodules.model.enums import TaskType, FileCachingProcessingScope + +from controller.transfer.cognition import etl as etl_util from controller.task_master import manager as task_master_manager +from submodules.model.cognition_objects import ( + file_reference as file_reference_db_bo, + markdown_file as markdown_file_bo, + markdown_dataset as markdown_dataset_bo, +) from submodules.model import enums from submodules.model.business_objects import general @@ -21,7 +26,10 @@ def handle_cognition_file_upload(path_parts: List[str]): or file_reference.state == enums.FileCachingState.COMPLETED.value ): # file_reference is None or already processed in queue - print("File reference duplication error, file is already processed", flush=True) + print( + "File reference duplication error, file is already processed", + flush=True, + ) if file_reference: print(f"File reference id: {str(file_reference.id)}", flush=True) print(f"File name: {file_reference.original_file_name}", flush=True) @@ -29,26 +37,64 @@ def handle_cognition_file_upload(path_parts: List[str]): file_reference.state = enums.FileCachingState.COMPLETED.value general.commit() - prio = ( + priority = -1 + if ( file_reference.meta_data.get("transformation_initiator") == enums.FileCachingInitiator.TMP_DOC_RETRIEVAL.value + ): + priority = 1 + # task_master_manager.queue_task( + # str(file_reference.organization_id), + # str(file_reference.created_by), + # TaskType.PARSE_COGNITION_FILE, + # { + # "parse_scope": FileCachingProcessingScope.EXTRACT_TRANSFORM.value, + # "file_reference_id": str(file_reference.id), + # "extraction_method": extraction_method, + # "meta_data": file_reference.meta_data, + # "extraction_key": file_reference.meta_data.get("extraction_key"), + # "transformation_key": file_reference.meta_data.get( + # "transformation_key" + # ), + # "file_name": file_reference.original_file_name, + # }, + # prio, # not sure if prio is right here as the prio tasks should only take < 1 min but waiting for the normal queue will take ages depending on the queue + # ) + + markdown_file = markdown_file_bo.get( + org_id, file_reference.meta_data.get("markdown_file_id") + ) + if not markdown_file: + print( + "ERROR: Markdown file not found for the given markdown_file_id", + flush=True, + ) + raise ValueError( + f"Markdown file not found for file reference {file_reference.id}" + ) + + markdown_dataset = markdown_dataset_bo.get( + org_id=org_id, id=markdown_file.dataset_id + ) + file_type = enums.ETLFileType.from_string(markdown_file.category_origin) + etl_task = etl_util.get_or_create_task( + markdown_file=markdown_file, + markdown_dataset=markdown_dataset, + minio_path=file_reference.minio_path, + original_file_name=file_reference.original_file_name, + file_size_bytes=file_reference.file_size_bytes, + file_type=file_type, + extractor=enums.ETLExtractorPDF.from_string( + markdown_file.meta_data.get("extractor") + ), + split_strategy=markdown_file.meta_data.get("split_strategy"), + chunk_size=markdown_file.meta_data.get("chunk_size"), + priority=priority, ) - extraction_method = file_reference.meta_data.get("extraction_method") task_master_manager.queue_task( - str(file_reference.organization_id), + org_id, str(file_reference.created_by), - TaskType.PARSE_COGNITION_FILE, - { - "parse_scope": FileCachingProcessingScope.EXTRACT_TRANSFORM.value, - "file_reference_id": str(file_reference.id), - "extraction_method": extraction_method, - "meta_data": file_reference.meta_data, - "extraction_key": file_reference.meta_data.get("extraction_key"), - "transformation_key": file_reference.meta_data.get( - "transformation_key" - ), - "file_name": file_reference.original_file_name, - }, - prio, # not sure if prio is right here as the prio tasks should only take < 1 min but waiting for the normal queue will take ages depending on the queue + enums.TaskType.EXECUTE_ETL, + {"etl_task_id": str(etl_task.id)}, ) From 94af4814e2beb0225697619965e959fcdbb54422 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 01:27:03 +0100 Subject: [PATCH 21/31] chore: update submodules --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index 5549a617..bf33020b 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 5549a617a856699c95e7a76f51ad058541f0c2ad +Subproject commit bf33020b6f5c18470d61efa9a2c04dddd954e8f4 From fb71eb2700c118e0f3ce817a709c81e037da2cba Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 10:15:49 +0100 Subject: [PATCH 22/31] chore: update submodules --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index bf33020b..f5681d71 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit bf33020b6f5c18470d61efa9a2c04dddd954e8f4 +Subproject commit f5681d719c62ad33259696274049239910707ed0 From 0b7c896e295259f1e7dca1528d075b36b4fbaf56 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 10:16:02 +0100 Subject: [PATCH 23/31] perf: update minio_upload for execute_etl --- controller/transfer/cognition/minio_upload.py | 71 +++++++++++++++---- 1 file changed, 57 insertions(+), 14 deletions(-) diff --git a/controller/transfer/cognition/minio_upload.py b/controller/transfer/cognition/minio_upload.py index 6f49c26a..7371b4f6 100644 --- a/controller/transfer/cognition/minio_upload.py +++ b/controller/transfer/cognition/minio_upload.py @@ -1,14 +1,14 @@ from typing import List -from controller.transfer.cognition import etl as etl_util from controller.task_master import manager as task_master_manager +from submodules.model import enums +from submodules.model.business_objects import general +from submodules.model.global_objects import etl_task as etl_task_bo from submodules.model.cognition_objects import ( file_reference as file_reference_db_bo, markdown_file as markdown_file_bo, markdown_dataset as markdown_dataset_bo, ) -from submodules.model import enums -from submodules.model.business_objects import general def handle_cognition_file_upload(path_parts: List[str]): @@ -37,6 +37,7 @@ def handle_cognition_file_upload(path_parts: List[str]): file_reference.state = enums.FileCachingState.COMPLETED.value general.commit() + chunk_size = 1000 priority = -1 if ( file_reference.meta_data.get("transformation_initiator") @@ -76,19 +77,61 @@ def handle_cognition_file_upload(path_parts: List[str]): markdown_dataset = markdown_dataset_bo.get( org_id=org_id, id=markdown_file.dataset_id ) - file_type = enums.ETLFileType.from_string(markdown_file.category_origin) - etl_task = etl_util.get_or_create_task( + + etl_task = etl_task_bo.get_or_create_markdown_file_etl_task( + org_id=org_id, + file_reference=file_reference, markdown_file=markdown_file, markdown_dataset=markdown_dataset, - minio_path=file_reference.minio_path, - original_file_name=file_reference.original_file_name, - file_size_bytes=file_reference.file_size_bytes, - file_type=file_type, - extractor=enums.ETLExtractorPDF.from_string( - markdown_file.meta_data.get("extractor") - ), - split_strategy=markdown_file.meta_data.get("split_strategy"), - chunk_size=markdown_file.meta_data.get("chunk_size"), + extractor=markdown_file.meta_data.get("extractor"), + fallback_extractors=[ + enums.ETLExtractorPDF.PDF2MD, + enums.ETLExtractorPDF.VISION, + ], + cache_config={ + "use_file_cache": True, + "use_extraction_cache": False, + "use_transformation_cache": True, + }, + split_config={ + "strategy": enums.ETLSplitStrategy.CHUNK.value, + "chunk_size": chunk_size, + }, + transform_config={ + "transformers": [ + { # NOTE: __call_gpt_with_key only reads user_prompt + "enabled": True, + "name": enums.ETLTransformer.CLEANSE.value, + "system_prompt": None, + "user_prompt": None, + }, + { + "enabled": True, + "name": enums.ETLTransformer.TEXT_TO_TABLE.value, + "system_prompt": None, + "user_prompt": None, + }, + { + "enabled": False, + "name": enums.ETLTransformer.SUMMARIZE.value, + "system_prompt": None, + "user_prompt": None, + }, + ] + }, + load_config={ + "refinery_project": {"enabled": False, "id": None}, + "markdown_file": {"enabled": True, "id": str(markdown_file.id)}, + }, + notify_config={ + "http": { + "url": "http://cognition-gateway:80/etl/finished/{markdown_file_id}", + "format": { + "markdown_file_id": str(markdown_file.id), + }, + "method": "POST", + } + }, priority=priority, ) From 5e0aafee5fdee167ab00ae4ac54dbf530a65d70d Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 11:21:32 +0100 Subject: [PATCH 24/31] fix: markdown_file update after etl_task creation --- controller/transfer/cognition/minio_upload.py | 21 ++++--------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/controller/transfer/cognition/minio_upload.py b/controller/transfer/cognition/minio_upload.py index 7371b4f6..30fb75a9 100644 --- a/controller/transfer/cognition/minio_upload.py +++ b/controller/transfer/cognition/minio_upload.py @@ -44,23 +44,6 @@ def handle_cognition_file_upload(path_parts: List[str]): == enums.FileCachingInitiator.TMP_DOC_RETRIEVAL.value ): priority = 1 - # task_master_manager.queue_task( - # str(file_reference.organization_id), - # str(file_reference.created_by), - # TaskType.PARSE_COGNITION_FILE, - # { - # "parse_scope": FileCachingProcessingScope.EXTRACT_TRANSFORM.value, - # "file_reference_id": str(file_reference.id), - # "extraction_method": extraction_method, - # "meta_data": file_reference.meta_data, - # "extraction_key": file_reference.meta_data.get("extraction_key"), - # "transformation_key": file_reference.meta_data.get( - # "transformation_key" - # ), - # "file_name": file_reference.original_file_name, - # }, - # prio, # not sure if prio is right here as the prio tasks should only take < 1 min but waiting for the normal queue will take ages depending on the queue - # ) markdown_file = markdown_file_bo.get( org_id, file_reference.meta_data.get("markdown_file_id") @@ -135,6 +118,10 @@ def handle_cognition_file_upload(path_parts: List[str]): priority=priority, ) + markdown_file_bo.update( + org_id=org_id, markdown_file_id=markdown_file.id, etl_task_id=etl_task.id + ) + task_master_manager.queue_task( org_id, str(file_reference.created_by), From 709d22d8902b64c28ecd7865b8706906ec639a22 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 11:46:09 +0100 Subject: [PATCH 25/31] chore: merge dev --- ...9_added_incognito_mode_to_conversations.py | 28 ++++++++++ .../versions/199a0d8aefbe_new_org_column.py | 56 +++++++++++++++++++ ...ccb41_dummy_alembic_to_update_incognito.py | 35 ++++++++++++ app.py | 1 + controller/auth/kratos.py | 20 ++++++- controller/auth/manager.py | 34 +++++++---- controller/user/manager.py | 39 +++++-------- fast_api/routes/organization.py | 5 +- submodules/model | 2 +- util/clean_up.py | 17 ++++++ 10 files changed, 195 insertions(+), 42 deletions(-) create mode 100644 alembic/versions/059f0d62a6b9_added_incognito_mode_to_conversations.py create mode 100644 alembic/versions/199a0d8aefbe_new_org_column.py create mode 100644 alembic/versions/c6d1cbcccb41_dummy_alembic_to_update_incognito.py diff --git a/alembic/versions/059f0d62a6b9_added_incognito_mode_to_conversations.py b/alembic/versions/059f0d62a6b9_added_incognito_mode_to_conversations.py new file mode 100644 index 00000000..633d8b0d --- /dev/null +++ b/alembic/versions/059f0d62a6b9_added_incognito_mode_to_conversations.py @@ -0,0 +1,28 @@ +"""Added incognito mode to conversations + +Revision ID: 059f0d62a6b9 +Revises: 24ca8432bd8b +Create Date: 2025-10-15 14:17:59.216693 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '059f0d62a6b9' +down_revision = '24ca8432bd8b' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('conversation', sa.Column('incognito_mode', sa.Boolean(), nullable=True), schema='cognition') + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('conversation', 'incognito_mode', schema='cognition') + # ### end Alembic commands ### diff --git a/alembic/versions/199a0d8aefbe_new_org_column.py b/alembic/versions/199a0d8aefbe_new_org_column.py new file mode 100644 index 00000000..d08c2e8b --- /dev/null +++ b/alembic/versions/199a0d8aefbe_new_org_column.py @@ -0,0 +1,56 @@ +"""new org column + +Revision ID: 199a0d8aefbe +Revises: c6d1cbcccb41 +Create Date: 2025-10-20 07:36:33.488523 + +""" + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "199a0d8aefbe" +down_revision = "c6d1cbcccb41" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column( + "user", sa.Column("messages_created_this_month", sa.BigInteger(), nullable=True) + ) + + op.create_table( + "timed_executions", + sa.Column("time_key", sa.String(), nullable=False), + sa.Column("last_executed_at", sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint("time_key"), + sa.UniqueConstraint("time_key"), + schema="global", + ) + connection = op.get_bind() + + update_dataset_sql = """ + UPDATE public.user + SET messages_created_this_month = 0 + WHERE messages_created_this_month IS NULL + """ + connection.execute(update_dataset_sql) + op.drop_column("user", "oidc_identifier") + op.create_unique_constraint(None, "timed_executions", ["time_key"], schema="global") + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("user", "messages_created_this_month") + op.drop_table("timed_executions", schema="global") + op.drop_constraint(None, "timed_executions", schema="global", type_="unique") + op.add_column( + "user", + sa.Column("oidc_identifier", sa.VARCHAR(), autoincrement=False, nullable=True), + ) + # ### end Alembic commands ### diff --git a/alembic/versions/c6d1cbcccb41_dummy_alembic_to_update_incognito.py b/alembic/versions/c6d1cbcccb41_dummy_alembic_to_update_incognito.py new file mode 100644 index 00000000..6caac863 --- /dev/null +++ b/alembic/versions/c6d1cbcccb41_dummy_alembic_to_update_incognito.py @@ -0,0 +1,35 @@ +"""Dummy alembic to update incognito + +Revision ID: c6d1cbcccb41 +Revises: 059f0d62a6b9 +Create Date: 2025-10-27 15:38:06.608300 + +""" + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "c6d1cbcccb41" +down_revision = "059f0d62a6b9" +branch_labels = None +depends_on = None + + +def upgrade(): + connection = op.get_bind() + # ### commands auto generated by Alembic - please adjust! ### + pass + # ### end Alembic commands ### + update_dataset_sql = """ + UPDATE cognition.conversation + SET incognito_mode = FALSE + WHERE incognito_mode IS NULL; """ + connection.execute(update_dataset_sql) + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + pass + # ### end Alembic commands ### diff --git a/app.py b/app.py index a5998a7d..6c973e29 100644 --- a/app.py +++ b/app.py @@ -185,3 +185,4 @@ session.start_session_cleanup_thread() log_storage.start_persist_thread() sums_table_manager.start_sums_table_thread() +clean_up.start_timed_executions_thread() diff --git a/controller/auth/kratos.py b/controller/auth/kratos.py index a93c09ef..bd7b7b31 100644 --- a/controller/auth/kratos.py +++ b/controller/auth/kratos.py @@ -254,7 +254,7 @@ def email_with_link(to_email: str, recovery_link: str) -> None: f"{LANGUAGE_MESSAGES['de']}{recovery_link}\n\n{LANGUAGE_EXPIRATION_INFO['de']}\n\n\n------\n\n{LANGUAGE_MESSAGES['en']}{recovery_link}\n\n{LANGUAGE_EXPIRATION_INFO['en']}", ) msg["Subject"] = INVITATION_SUBJECT - msg["From"] = "no-reply@kern.ai" + msg["From"] = "signup@kern.ai" msg["To"] = to_email with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server: @@ -265,6 +265,24 @@ def email_with_link(to_email: str, recovery_link: str) -> None: server.send_message(msg) +def send_bulk_emails(emails: List[str], recovery_links: List[str]) -> None: + + with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server: + if SMTP_USER and SMTP_PASSWORD: + server.ehlo() + server.starttls() + server.login(SMTP_USER, SMTP_PASSWORD) + + for to_email, recovery_link in zip(emails, recovery_links): + msg = MIMEText( + f"{LANGUAGE_MESSAGES['de']}{recovery_link}\n\n{LANGUAGE_EXPIRATION_INFO['de']}\n\n\n------\n\n{LANGUAGE_MESSAGES['en']}{recovery_link}\n\n{LANGUAGE_EXPIRATION_INFO['en']}", + ) + msg["Subject"] = INVITATION_SUBJECT + msg["From"] = "signup@kern.ai" + msg["To"] = to_email + server.send_message(msg) + + def check_user_exists(email: str) -> bool: request = requests.get( f"{KRATOS_ADMIN_URL}/identities?preview_credentials_identifier_similar={quote(email)}" diff --git a/controller/auth/manager.py b/controller/auth/manager.py index b002d7a5..ea30f34c 100644 --- a/controller/auth/manager.py +++ b/controller/auth/manager.py @@ -12,7 +12,7 @@ from controller.user import manager as user_manager from controller.organization import manager as organization_manager from submodules.model import enums, exceptions -from submodules.model.business_objects import organization +from submodules.model.business_objects import general, organization from submodules.model.business_objects.user import check_email_in_full_admin from submodules.model.models import Organization, Project, User import sqlalchemy @@ -183,32 +183,44 @@ def invite_users( team_ids: Optional[List[str]] = None, ): user_ids = [] + recovery_links = [] + organization = organization_manager.get_organization_by_name(organization_name) + if organization is None: + raise exceptions.EntityNotFoundException("Organization not found") for email in emails: # Create accounts for the email user = kratos.create_user_kratos(email, provider) if not user: raise AuthManagerError("User creation failed") user_ids.append(user["id"]) - # Assign the account to the organization - user_manager.update_organization_of_user(organization_name, email) + user_database = user_manager.get_or_create_user(user["id"], with_commit=False) + if not user_database: + raise AuthManagerError("User creation in database failed") - # Assign the user role - user_manager.update_user_role(user["id"], user_role) + user_database.language_display = language - # Add the preferred language - user_manager.update_user_field(user["id"], "language_display", language) + try: + role = enums.UserRoles[user_role.upper()].value + except KeyError: + raise ValueError(f"Invalid role: {role}") + user_database.role = role + user_database.organization_id = organization.id # Add the user to the teams if team_ids: - user_manager.add_user_to_teams(creation_user_id, user["id"], team_ids) + user_manager.add_user_to_teams( + creation_user_id, user["id"], team_ids, with_commit=False + ) # Get the recovery link for the email recovery_link = kratos.get_recovery_link(user["id"]) if not recovery_link: raise AuthManagerError("Failed to get recovery link") - - # Send the recovery link to the email - kratos.email_with_link(email, recovery_link["recovery_link"]) + recovery_links.append(recovery_link["recovery_link"]) + general.commit() + kratos.send_bulk_emails(emails, recovery_links) + kratos.__refresh_identity_cache() + organization_manager.sync_organization_sharepoint_integrations(organization.id) return user_ids diff --git a/controller/user/manager.py b/controller/user/manager.py index ea760d2e..81df8e35 100644 --- a/controller/user/manager.py +++ b/controller/user/manager.py @@ -1,4 +1,4 @@ -from typing import Dict, Optional, Any +from typing import Dict, List, Optional, Any from submodules.model import User, daemon, enums from submodules.model.business_objects import user, general from controller.auth import kratos @@ -17,12 +17,14 @@ def get_user(user_id: str) -> User: return user_item -def get_or_create_user(user_id: str) -> User: +def get_or_create_user(user_id: str, with_commit: bool = True) -> User: user_item = user.get(user_id) if not user_item: - user_item = user.create(user_id, with_commit=True) - kratos.__refresh_identity_cache() - update_last_interaction(user_item.id) + user_item = user.create(user_id, with_commit=with_commit) + if with_commit: + kratos.__refresh_identity_cache() + else: + update_last_interaction(user_item.id) return user_item @@ -89,10 +91,13 @@ def update_user_field(user_id: str, field: str, value: Any) -> User: return user_item -def add_user_to_teams(creation_user_id: str, user_id: str, team_ids: list) -> User: +def add_user_to_teams( + creation_user_id: str, user_id: str, team_ids: list, with_commit: bool = True +) -> User: for team_id in team_ids: team_member_db_co.create(team_id, user_id, creation_user_id, with_commit=False) - general.commit() + if with_commit: + general.commit() def remove_organization_from_user(user_mail: str) -> None: @@ -113,7 +118,7 @@ def get_active_users_filtered( sort_direction: Optional[str] = None, offset: Optional[int] = None, limit: Optional[int] = None, -) -> User: +) -> List[User]: now = datetime.now() last_interaction_range = (now - timedelta(minutes=minutes)) if minutes > 0 else None return user.get_active_users_after_filter( @@ -175,22 +180,4 @@ def __migrate_kratos_users(): if user_database.sso_provider != sso_provider: user_database.sso_provider = sso_provider - if user_database.oidc_identifier is None: - user_search = kratos.__search_kratos_for_user_mail( - user_identity["traits"]["email"] - ) - if user_search and user_search["credentials"]: - if user_search["credentials"].get("oidc", None): - oidc = ( - user_search["credentials"] - .get("oidc", {}) - .get("identifiers", None)[0] - ) - if oidc: - oidc = oidc.split(":") - if len(oidc) > 1: - user_database.oidc_identifier = oidc[1] - else: - user_database.oidc_identifier = None - general.commit() diff --git a/fast_api/routes/organization.py b/fast_api/routes/organization.py index e6cba633..bbd5fe22 100644 --- a/fast_api/routes/organization.py +++ b/fast_api/routes/organization.py @@ -1,5 +1,4 @@ import json -from controller.auth import kratos from fastapi import APIRouter, Request, Body from fast_api.models import ( AddUserToOrganizationBody, @@ -95,7 +94,6 @@ def get_user_info(request: Request): # in use cognition-ui & admin dashboard (07.01.25) @router.get("/get-user-info-extended") def get_user_info_extended(request: Request): - kratos.__refresh_identity_cache() user = auth_manager.get_user_by_info(request.state.info) name = resolve_user_name_by_id(user.id) user_dict = { @@ -282,6 +280,7 @@ def get_mapped_sorted_paginated_users( "created_at": user.created_at.isoformat() if user.created_at else None, "metadata_public": user.metadata_public, "sso_provider": user.sso_provider, + "messages_created_this_month": user.messages_created_this_month, } for user in active_users ] @@ -304,7 +303,7 @@ def delete_user(request: Request, body: DeleteUserBody = Body(...)): # in use admin-dashboard (08.01.25) -@router.post("/missing-users-interaction") +@router.post("/missing-users-interaction-and-message-count") def get_missing_users_interaction(request: Request, body: MissingUsersBody = Body(...)): auth_manager.check_admin_access(request.state.info) data = user.get_missing_users(body.user_ids) diff --git a/submodules/model b/submodules/model index f5681d71..7efb80fa 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit f5681d719c62ad33259696274049239910707ed0 +Subproject commit 7efb80fa7dfa0619d4a9549cf69478e4cb23cb5c diff --git a/util/clean_up.py b/util/clean_up.py index 6ef88b98..97d4a54d 100644 --- a/util/clean_up.py +++ b/util/clean_up.py @@ -1,6 +1,9 @@ from submodules.model.business_objects import upload_task import os import shutil +from submodules.model.daemon import run_without_db_token +from time import sleep +from submodules.model.global_objects import timed_executions def clean_up_database() -> None: @@ -21,3 +24,17 @@ def clean_up_disk() -> None: shutil.rmtree(file_path) except Exception as e: print("Failed to delete %s. Reason: %s" % (file_path, e)) + + +def start_timed_executions_thread() -> None: + run_without_db_token(__run_timed_executions) + + +def __run_timed_executions() -> None: + sleep(10) # wait a bit until app is started + while True: + try: + timed_executions.execute_time_key_update(with_commit=True) + except Exception as e: + print(f"Error during timed executions: {e}") + sleep(3600) # run every hour From b07fc7ddcb7775ce1740804c8bcc7b85ae4bc15b Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 11:46:31 +0100 Subject: [PATCH 26/31] perf: add etl task table --- ...ble.py => f428a22ecdb3_adds_etl_task_table.py} | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) rename alembic/versions/{bf8e8646ebdc_adds_etl_task_table.py => f428a22ecdb3_adds_etl_task_table.py} (97%) diff --git a/alembic/versions/bf8e8646ebdc_adds_etl_task_table.py b/alembic/versions/f428a22ecdb3_adds_etl_task_table.py similarity index 97% rename from alembic/versions/bf8e8646ebdc_adds_etl_task_table.py rename to alembic/versions/f428a22ecdb3_adds_etl_task_table.py index fc475d48..952f49cf 100644 --- a/alembic/versions/bf8e8646ebdc_adds_etl_task_table.py +++ b/alembic/versions/f428a22ecdb3_adds_etl_task_table.py @@ -1,8 +1,8 @@ """adds etl task table -Revision ID: bf8e8646ebdc -Revises: 24ca8432bd8b -Create Date: 2025-10-30 00:21:05.246324 +Revision ID: f428a22ecdb3 +Revises: 199a0d8aefbe +Create Date: 2025-10-30 10:45:20.843280 """ @@ -11,8 +11,8 @@ from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision = "bf8e8646ebdc" -down_revision = "24ca8432bd8b" +revision = "f428a22ecdb3" +down_revision = "199a0d8aefbe" branch_labels = None depends_on = None @@ -62,11 +62,6 @@ def upgrade(): unique=False, schema="global", ) - op.add_column( - "conversation", - sa.Column("incognito_mode", sa.Boolean(), nullable=True), - schema="cognition", - ) op.add_column( "markdown_file", sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True), From 41c9573f8596c799a41ee863ab22f1098648f1fd Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 12:25:22 +0100 Subject: [PATCH 27/31] perf: disable CLEANSE as default --- controller/transfer/cognition/etl.py | 107 ------------------ controller/transfer/cognition/minio_upload.py | 2 +- 2 files changed, 1 insertion(+), 108 deletions(-) delete mode 100644 controller/transfer/cognition/etl.py diff --git a/controller/transfer/cognition/etl.py b/controller/transfer/cognition/etl.py deleted file mode 100644 index 47e17a27..00000000 --- a/controller/transfer/cognition/etl.py +++ /dev/null @@ -1,107 +0,0 @@ -from typing import Optional - -from submodules.model import enums -from submodules.model.models import ( - EtlTask, - CognitionMarkdownFile, - CognitionMarkdownDataset, -) -from submodules.model.global_objects import etl_task as etl_task_bo -from submodules.model.cognition_objects import markdown_file as markdown_file_bo - -DEFAULT_FILE_TYPE = enums.ETLFileType.PDF -DEFAULT_EXTRACTORS = { - enums.ETLFileType.MD: enums.ETLExtractor.MD.FILESYSTEM, - enums.ETLFileType.PDF: enums.ETLExtractor.PDF.PDF2MD, -} - -DEFAULT_FALLBACK_EXTRACTORS = { - enums.ETLFileType.PDF: [ - enums.ETLExtractor.PDF.PDF2MD, - enums.ETLExtractor.PDF.VISION, - enums.ETLExtractor.PDF.AZURE_DI, - ], -} - - -def get_or_create_task( - markdown_file: CognitionMarkdownFile, - markdown_dataset: CognitionMarkdownDataset, - file_size_bytes: int, - minio_path: str, - original_file_name: str, - file_type: Optional[enums.ETLFileType] = None, - extractor: Optional[enums.ETLExtractor] = None, - fallback_extractors: Optional[list[enums.ETLExtractor]] = None, - split_strategy: Optional[enums.ETLSplitStrategy] = None, - chunk_size: Optional[int] = 1000, - priority: Optional[int] = -1, -) -> EtlTask: - if markdown_file.etl_task_id: - if etl_task := etl_task_bo.get_by_id(markdown_file.etl_task_id): - return etl_task - - file_type = file_type or DEFAULT_FILE_TYPE - split_strategy = split_strategy or enums.ETLSplitStrategy.CHUNK - extractor = extractor or DEFAULT_EXTRACTORS[file_type] - fallback_extractors = list( - filter( - lambda x: x != extractor, - (fallback_extractors or DEFAULT_FALLBACK_EXTRACTORS.get(file_type, [])), - ) - ) - - etl_task = etl_task_bo.create( - org_id=markdown_dataset.organization_id, - user_id=markdown_file.created_by, - file_size_bytes=file_size_bytes, - extract_config={ - "file_type": file_type.value, - "extractor": extractor.value, - "fallback_extractors": [fe.value for fe in fallback_extractors], - "minio_path": minio_path, - "original_file_name": original_file_name, - }, - split_config={ - "strategy": split_strategy.value, - "chunk_size": chunk_size, - }, - transform_config={ - "transformers": [ - { - "name": enums.ETLTransformer.CLEANSE.value, - "system_prompt": None, - "user_prompt": None, - }, - { - "name": enums.ETLTransformer.TEXT_TO_TABLE.value, - "system_prompt": None, - "user_prompt": None, - }, - ] - }, - load_config={ - "refinery_project": {"enabled": False, "id": None}, - "markdown_file": {"enabled": True, "id": str(markdown_file.id)}, - }, - notify_config={ - "http": { - "url": "http://cognition-gateway:80/etl/finished/{markdown_file_id}", - "format": { - "markdown_file_id": str(markdown_file.id), - }, - "method": "POST", - } - }, - llm_config=markdown_dataset.llm_config, - tokenizer=markdown_dataset.tokenizer, - priority=priority, - ) - - markdown_file_bo.update( - org_id=markdown_file.organization_id, - markdown_file_id=markdown_file.id, - etl_task_id=etl_task.id, - ) - - return etl_task diff --git a/controller/transfer/cognition/minio_upload.py b/controller/transfer/cognition/minio_upload.py index 30fb75a9..c5a34b36 100644 --- a/controller/transfer/cognition/minio_upload.py +++ b/controller/transfer/cognition/minio_upload.py @@ -83,7 +83,7 @@ def handle_cognition_file_upload(path_parts: List[str]): transform_config={ "transformers": [ { # NOTE: __call_gpt_with_key only reads user_prompt - "enabled": True, + "enabled": False, # this transformer is disabled because it often hangs the ETL process "name": enums.ETLTransformer.CLEANSE.value, "system_prompt": None, "user_prompt": None, From 2554a43809814fb3b7282781d58df07d365e9c69 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 13:19:24 +0100 Subject: [PATCH 28/31] chore: update submodules --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index 7efb80fa..e103f2f6 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 7efb80fa7dfa0619d4a9549cf69478e4cb23cb5c +Subproject commit e103f2f65f4764e2bc19c1e6d6b58d7390547f18 From 52f715e67ca50e4e15a1da75ef672657c4e1f212 Mon Sep 17 00:00:00 2001 From: andhreljaKern Date: Thu, 30 Oct 2025 13:19:34 +0100 Subject: [PATCH 29/31] perf: standard cache config keys --- controller/transfer/cognition/minio_upload.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/controller/transfer/cognition/minio_upload.py b/controller/transfer/cognition/minio_upload.py index c5a34b36..791f765e 100644 --- a/controller/transfer/cognition/minio_upload.py +++ b/controller/transfer/cognition/minio_upload.py @@ -72,9 +72,10 @@ def handle_cognition_file_upload(path_parts: List[str]): enums.ETLExtractorPDF.VISION, ], cache_config={ - "use_file_cache": True, - "use_extraction_cache": False, - "use_transformation_cache": True, + enums.ETLCacheKeys.FILE_CACHE.value: True, + enums.ETLCacheKeys.EXTRACTION.value: True, + enums.ETLCacheKeys.SPLITTING.value: True, + enums.ETLCacheKeys.TRANSFORMATION.value: True, }, split_config={ "strategy": enums.ETLSplitStrategy.CHUNK.value, From 356f38ee6d8c69158d2eb4090aa7f691cae40676 Mon Sep 17 00:00:00 2001 From: JWittmeyer Date: Mon, 3 Nov 2025 14:30:23 +0100 Subject: [PATCH 30/31] Merge with dev --- submodules/model | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/model b/submodules/model index e103f2f6..293c1493 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit e103f2f65f4764e2bc19c1e6d6b58d7390547f18 +Subproject commit 293c14937f8f8d48ac582541cb083988170f98f9 From 2770d256429ff25c9e9030396986a77de15bb84f Mon Sep 17 00:00:00 2001 From: JWittmeyer Date: Mon, 3 Nov 2025 16:29:39 +0100 Subject: [PATCH 31/31] Alembic new table & submodule fix import --- alembic/versions/9d5fb67e29f7_config_sets.py | 51 ++++++++++++++++++++ submodules/model | 2 +- 2 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 alembic/versions/9d5fb67e29f7_config_sets.py diff --git a/alembic/versions/9d5fb67e29f7_config_sets.py b/alembic/versions/9d5fb67e29f7_config_sets.py new file mode 100644 index 00000000..a26eb0ac --- /dev/null +++ b/alembic/versions/9d5fb67e29f7_config_sets.py @@ -0,0 +1,51 @@ +"""Config sets' + + +Revision ID: 9d5fb67e29f7 +Revises: f428a22ecdb3 +Create Date: 2025-11-03 15:28:47.686657 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '9d5fb67e29f7' +down_revision = 'f428a22ecdb3' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('etl_config_preset', + sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('project_id', postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('name', sa.String(), nullable=True), + sa.Column('description', sa.String(), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('created_by', postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('etl_config', sa.JSON(), nullable=True), + sa.Column('add_config', sa.JSON(), nullable=True), + sa.ForeignKeyConstraint(['created_by'], ['user.id'], ondelete='SET NULL'), + sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'), + sa.ForeignKeyConstraint(['project_id'], ['cognition.project.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('name'), + schema='cognition' + ) + op.create_index(op.f('ix_cognition_etl_config_preset_created_by'), 'etl_config_preset', ['created_by'], unique=False, schema='cognition') + op.create_index(op.f('ix_cognition_etl_config_preset_organization_id'), 'etl_config_preset', ['organization_id'], unique=False, schema='cognition') + op.create_index(op.f('ix_cognition_etl_config_preset_project_id'), 'etl_config_preset', ['project_id'], unique=False, schema='cognition') + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_cognition_etl_config_preset_project_id'), table_name='etl_config_preset', schema='cognition') + op.drop_index(op.f('ix_cognition_etl_config_preset_organization_id'), table_name='etl_config_preset', schema='cognition') + op.drop_index(op.f('ix_cognition_etl_config_preset_created_by'), table_name='etl_config_preset', schema='cognition') + op.drop_table('etl_config_preset', schema='cognition') + # ### end Alembic commands ### diff --git a/submodules/model b/submodules/model index 293c1493..2d919530 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit 293c14937f8f8d48ac582541cb083988170f98f9 +Subproject commit 2d919530f6c3f09c6b9f6b79481c99d520d857dd