diff --git a/alembic/versions/9d5fb67e29f7_config_sets.py b/alembic/versions/9d5fb67e29f7_config_sets.py new file mode 100644 index 00000000..a26eb0ac --- /dev/null +++ b/alembic/versions/9d5fb67e29f7_config_sets.py @@ -0,0 +1,51 @@ +"""Config sets' + + +Revision ID: 9d5fb67e29f7 +Revises: f428a22ecdb3 +Create Date: 2025-11-03 15:28:47.686657 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '9d5fb67e29f7' +down_revision = 'f428a22ecdb3' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('etl_config_preset', + sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False), + sa.Column('organization_id', postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('project_id', postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('name', sa.String(), nullable=True), + sa.Column('description', sa.String(), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('created_by', postgresql.UUID(as_uuid=True), nullable=True), + sa.Column('etl_config', sa.JSON(), nullable=True), + sa.Column('add_config', sa.JSON(), nullable=True), + sa.ForeignKeyConstraint(['created_by'], ['user.id'], ondelete='SET NULL'), + sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], ondelete='CASCADE'), + sa.ForeignKeyConstraint(['project_id'], ['cognition.project.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('name'), + schema='cognition' + ) + op.create_index(op.f('ix_cognition_etl_config_preset_created_by'), 'etl_config_preset', ['created_by'], unique=False, schema='cognition') + op.create_index(op.f('ix_cognition_etl_config_preset_organization_id'), 'etl_config_preset', ['organization_id'], unique=False, schema='cognition') + op.create_index(op.f('ix_cognition_etl_config_preset_project_id'), 'etl_config_preset', ['project_id'], unique=False, schema='cognition') + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_cognition_etl_config_preset_project_id'), table_name='etl_config_preset', schema='cognition') + op.drop_index(op.f('ix_cognition_etl_config_preset_organization_id'), table_name='etl_config_preset', schema='cognition') + op.drop_index(op.f('ix_cognition_etl_config_preset_created_by'), table_name='etl_config_preset', schema='cognition') + op.drop_table('etl_config_preset', schema='cognition') + # ### end Alembic commands ### diff --git a/alembic/versions/f428a22ecdb3_adds_etl_task_table.py b/alembic/versions/f428a22ecdb3_adds_etl_task_table.py new file mode 100644 index 00000000..952f49cf --- /dev/null +++ b/alembic/versions/f428a22ecdb3_adds_etl_task_table.py @@ -0,0 +1,324 @@ +"""adds etl task table + +Revision ID: f428a22ecdb3 +Revises: 199a0d8aefbe +Create Date: 2025-10-30 10:45:20.843280 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "f428a22ecdb3" +down_revision = "199a0d8aefbe" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "etl_task", + sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("organization_id", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("created_at", sa.DateTime(), nullable=True), + sa.Column("created_by", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("file_path", sa.String(), nullable=True), + sa.Column("file_size_bytes", sa.BigInteger(), nullable=True), + sa.Column("tokenizer", sa.String(), nullable=True), + sa.Column("cache_config", sa.JSON(), nullable=True), + sa.Column("extract_config", sa.JSON(), nullable=True), + sa.Column("split_config", sa.JSON(), nullable=True), + sa.Column("transform_config", sa.JSON(), nullable=True), + sa.Column("load_config", sa.JSON(), nullable=True), + sa.Column("notify_config", sa.JSON(), nullable=True), + sa.Column("llm_config", sa.JSON(), nullable=True), + sa.Column("started_at", sa.DateTime(), nullable=True), + sa.Column("finished_at", sa.DateTime(), nullable=True), + sa.Column("state", sa.String(), nullable=True), + sa.Column("is_active", sa.Boolean(), nullable=True), + sa.Column("priority", sa.Integer(), nullable=True), + sa.Column("error_message", sa.String(), nullable=True), + sa.ForeignKeyConstraint(["created_by"], ["user.id"], ondelete="SET NULL"), + sa.ForeignKeyConstraint( + ["organization_id"], ["organization.id"], ondelete="CASCADE" + ), + sa.PrimaryKeyConstraint("id"), + schema="global", + ) + op.create_index( + op.f("ix_global_etl_task_created_by"), + "etl_task", + ["created_by"], + unique=False, + schema="global", + ) + op.create_index( + op.f("ix_global_etl_task_organization_id"), + "etl_task", + ["organization_id"], + unique=False, + schema="global", + ) + op.add_column( + "markdown_file", + sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True), + schema="cognition", + ) + op.create_index( + op.f("ix_cognition_markdown_file_etl_task_id"), + "markdown_file", + ["etl_task_id"], + unique=False, + schema="cognition", + ) + op.create_unique_constraint( + "unique_markdown_file_etl_task_id", + "markdown_file", + ["id", "etl_task_id"], + schema="cognition", + ) + op.create_foreign_key( + None, + "markdown_file", + "etl_task", + ["etl_task_id"], + ["id"], + source_schema="cognition", + referent_schema="global", + ondelete="CASCADE", + ) + op.add_column( + "github_file", + sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True), + schema="integration", + ) + op.drop_constraint( + "unique_github_file_source", "github_file", schema="integration", type_="unique" + ) + op.create_unique_constraint( + "unique_github_file_source", + "github_file", + ["integration_id", "running_id", "source", "etl_task_id"], + schema="integration", + ) + op.create_index( + op.f("ix_integration_github_file_etl_task_id"), + "github_file", + ["etl_task_id"], + unique=False, + schema="integration", + ) + op.create_foreign_key( + None, + "github_file", + "etl_task", + ["etl_task_id"], + ["id"], + source_schema="integration", + referent_schema="global", + ondelete="CASCADE", + ) + op.add_column( + "github_issue", + sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True), + schema="integration", + ) + op.drop_constraint( + "unique_github_issue_source", + "github_issue", + schema="integration", + type_="unique", + ) + op.create_unique_constraint( + "unique_github_issue_source", + "github_issue", + ["integration_id", "running_id", "source", "etl_task_id"], + schema="integration", + ) + op.create_index( + op.f("ix_integration_github_issue_etl_task_id"), + "github_issue", + ["etl_task_id"], + unique=False, + schema="integration", + ) + op.create_foreign_key( + None, + "github_issue", + "etl_task", + ["etl_task_id"], + ["id"], + source_schema="integration", + referent_schema="global", + ondelete="CASCADE", + ) + op.add_column( + "pdf", + sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True), + schema="integration", + ) + op.drop_constraint("unique_pdf_source", "pdf", schema="integration", type_="unique") + op.create_unique_constraint( + "unique_pdf_source", + "pdf", + ["integration_id", "running_id", "source", "etl_task_id"], + schema="integration", + ) + op.create_index( + op.f("ix_integration_pdf_etl_task_id"), + "pdf", + ["etl_task_id"], + unique=False, + schema="integration", + ) + op.create_foreign_key( + None, + "pdf", + "etl_task", + ["etl_task_id"], + ["id"], + source_schema="integration", + referent_schema="global", + ondelete="CASCADE", + ) + op.add_column( + "sharepoint", + sa.Column("etl_task_id", postgresql.UUID(as_uuid=True), nullable=True), + schema="integration", + ) + op.drop_constraint( + "unique_sharepoint_source", "sharepoint", schema="integration", type_="unique" + ) + op.create_unique_constraint( + "unique_sharepoint_source", + "sharepoint", + ["integration_id", "running_id", "source", "etl_task_id"], + schema="integration", + ) + op.create_index( + op.f("ix_integration_sharepoint_etl_task_id"), + "sharepoint", + ["etl_task_id"], + unique=False, + schema="integration", + ) + op.create_foreign_key( + None, + "sharepoint", + "etl_task", + ["etl_task_id"], + ["id"], + source_schema="integration", + referent_schema="global", + ondelete="CASCADE", + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint( + "sharepoint_etl_task_id_fkey", + "sharepoint", + schema="integration", + type_="foreignkey", + ) + op.drop_index( + op.f("ix_integration_sharepoint_etl_task_id"), + table_name="sharepoint", + schema="integration", + ) + op.drop_constraint( + "unique_sharepoint_source", "sharepoint", schema="integration", type_="unique" + ) + op.create_unique_constraint( + "unique_sharepoint_source", + "sharepoint", + ["integration_id", "running_id", "source"], + schema="integration", + ) + op.drop_column("sharepoint", "etl_task_id", schema="integration") + op.drop_constraint( + "pdf_etl_task_id_fkey", "pdf", schema="integration", type_="foreignkey" + ) + op.drop_index( + op.f("ix_integration_pdf_etl_task_id"), table_name="pdf", schema="integration" + ) + op.drop_constraint("unique_pdf_source", "pdf", schema="integration", type_="unique") + op.create_unique_constraint( + "unique_pdf_source", + "pdf", + ["integration_id", "running_id", "source"], + schema="integration", + ) + op.drop_column("pdf", "etl_task_id", schema="integration") + op.drop_constraint( + "github_issue_etl_task_id_fkey", + "github_issue", + schema="integration", + type_="foreignkey", + ) + op.drop_index( + op.f("ix_integration_github_issue_etl_task_id"), + table_name="github_issue", + schema="integration", + ) + op.drop_constraint( + "unique_github_issue_source", + "github_issue", + schema="integration", + type_="unique", + ) + op.create_unique_constraint( + "unique_github_issue_source", + "github_issue", + ["integration_id", "running_id", "source"], + schema="integration", + ) + op.drop_column("github_issue", "etl_task_id", schema="integration") + op.drop_constraint( + "github_file_etl_task_id_fkey", + "github_file", + schema="integration", + type_="foreignkey", + ) + op.drop_index( + op.f("ix_integration_github_file_etl_task_id"), + table_name="github_file", + schema="integration", + ) + op.drop_constraint( + "unique_github_file_source", "github_file", schema="integration", type_="unique" + ) + op.create_unique_constraint( + "unique_github_file_source", + "github_file", + ["integration_id", "running_id", "source"], + schema="integration", + ) + op.drop_column("github_file", "etl_task_id", schema="integration") + op.drop_constraint( + "markdown_file_etl_task_id_fkey", + "markdown_file", + schema="cognition", + type_="foreignkey", + ) + op.drop_index( + op.f("ix_cognition_markdown_file_etl_task_id"), + table_name="markdown_file", + schema="cognition", + ) + op.drop_column("markdown_file", "etl_task_id", schema="cognition") + op.drop_index( + op.f("ix_global_etl_task_organization_id"), + table_name="etl_task", + schema="global", + ) + op.drop_index( + op.f("ix_global_etl_task_created_by"), table_name="etl_task", schema="global" + ) + op.drop_table("etl_task", schema="global") + # ### end Alembic commands ### diff --git a/controller/monitor/manager.py b/controller/monitor/manager.py index f4bdd43b..907b5bf6 100644 --- a/controller/monitor/manager.py +++ b/controller/monitor/manager.py @@ -126,3 +126,14 @@ def cancel_integration_task( task_monitor.set_integration_task_to_failed( integration_id, error_message="Cancelled by task manager" ) + + +def cancel_etl_task( + task_info: Dict[str, Any], +) -> None: + + etl_task_id = task_info.get("etlTaskId") + + task_monitor.set_etl_task_to_failed( + etl_task_id, error_message="Cancelled by task manager" + ) diff --git a/controller/transfer/cognition/minio_upload.py b/controller/transfer/cognition/minio_upload.py index 3ea51107..791f765e 100644 --- a/controller/transfer/cognition/minio_upload.py +++ b/controller/transfer/cognition/minio_upload.py @@ -1,9 +1,14 @@ from typing import List -from submodules.model.cognition_objects import file_reference as file_reference_db_bo -from submodules.model.enums import TaskType, FileCachingProcessingScope + from controller.task_master import manager as task_master_manager from submodules.model import enums from submodules.model.business_objects import general +from submodules.model.global_objects import etl_task as etl_task_bo +from submodules.model.cognition_objects import ( + file_reference as file_reference_db_bo, + markdown_file as markdown_file_bo, + markdown_dataset as markdown_dataset_bo, +) def handle_cognition_file_upload(path_parts: List[str]): @@ -21,7 +26,10 @@ def handle_cognition_file_upload(path_parts: List[str]): or file_reference.state == enums.FileCachingState.COMPLETED.value ): # file_reference is None or already processed in queue - print("File reference duplication error, file is already processed", flush=True) + print( + "File reference duplication error, file is already processed", + flush=True, + ) if file_reference: print(f"File reference id: {str(file_reference.id)}", flush=True) print(f"File name: {file_reference.original_file_name}", flush=True) @@ -29,26 +37,95 @@ def handle_cognition_file_upload(path_parts: List[str]): file_reference.state = enums.FileCachingState.COMPLETED.value general.commit() - prio = ( + chunk_size = 1000 + priority = -1 + if ( file_reference.meta_data.get("transformation_initiator") == enums.FileCachingInitiator.TMP_DOC_RETRIEVAL.value + ): + priority = 1 + + markdown_file = markdown_file_bo.get( + org_id, file_reference.meta_data.get("markdown_file_id") + ) + if not markdown_file: + print( + "ERROR: Markdown file not found for the given markdown_file_id", + flush=True, + ) + raise ValueError( + f"Markdown file not found for file reference {file_reference.id}" + ) + + markdown_dataset = markdown_dataset_bo.get( + org_id=org_id, id=markdown_file.dataset_id + ) + + etl_task = etl_task_bo.get_or_create_markdown_file_etl_task( + org_id=org_id, + file_reference=file_reference, + markdown_file=markdown_file, + markdown_dataset=markdown_dataset, + extractor=markdown_file.meta_data.get("extractor"), + fallback_extractors=[ + enums.ETLExtractorPDF.PDF2MD, + enums.ETLExtractorPDF.VISION, + ], + cache_config={ + enums.ETLCacheKeys.FILE_CACHE.value: True, + enums.ETLCacheKeys.EXTRACTION.value: True, + enums.ETLCacheKeys.SPLITTING.value: True, + enums.ETLCacheKeys.TRANSFORMATION.value: True, + }, + split_config={ + "strategy": enums.ETLSplitStrategy.CHUNK.value, + "chunk_size": chunk_size, + }, + transform_config={ + "transformers": [ + { # NOTE: __call_gpt_with_key only reads user_prompt + "enabled": False, # this transformer is disabled because it often hangs the ETL process + "name": enums.ETLTransformer.CLEANSE.value, + "system_prompt": None, + "user_prompt": None, + }, + { + "enabled": True, + "name": enums.ETLTransformer.TEXT_TO_TABLE.value, + "system_prompt": None, + "user_prompt": None, + }, + { + "enabled": False, + "name": enums.ETLTransformer.SUMMARIZE.value, + "system_prompt": None, + "user_prompt": None, + }, + ] + }, + load_config={ + "refinery_project": {"enabled": False, "id": None}, + "markdown_file": {"enabled": True, "id": str(markdown_file.id)}, + }, + notify_config={ + "http": { + "url": "http://cognition-gateway:80/etl/finished/{markdown_file_id}", + "format": { + "markdown_file_id": str(markdown_file.id), + }, + "method": "POST", + } + }, + priority=priority, + ) + + markdown_file_bo.update( + org_id=org_id, markdown_file_id=markdown_file.id, etl_task_id=etl_task.id ) - extraction_method = file_reference.meta_data.get("extraction_method") task_master_manager.queue_task( - str(file_reference.organization_id), + org_id, str(file_reference.created_by), - TaskType.PARSE_COGNITION_FILE, - { - "parse_scope": FileCachingProcessingScope.EXTRACT_TRANSFORM.value, - "file_reference_id": str(file_reference.id), - "extraction_method": extraction_method, - "meta_data": file_reference.meta_data, - "extraction_key": file_reference.meta_data.get("extraction_key"), - "transformation_key": file_reference.meta_data.get( - "transformation_key" - ), - "file_name": file_reference.original_file_name, - }, - prio, # not sure if prio is right here as the prio tasks should only take < 1 min but waiting for the normal queue will take ages depending on the queue + enums.TaskType.EXECUTE_ETL, + {"etl_task_id": str(etl_task.id)}, ) diff --git a/fast_api/routes/misc.py b/fast_api/routes/misc.py index d3a62f2f..e98af18f 100644 --- a/fast_api/routes/misc.py +++ b/fast_api/routes/misc.py @@ -136,6 +136,8 @@ def cancel_task( ) elif task_type == enums.TaskType.EXECUTE_INTEGRATION.value: controller_manager.cancel_integration_task(task_info) + elif task_type == enums.TaskType.EXECUTE_ETL.value: + controller_manager.cancel_etl_task(task_info) else: raise ValueError(f"{task_type} is no valid task type") diff --git a/submodules/model b/submodules/model index c996e22c..2d919530 160000 --- a/submodules/model +++ b/submodules/model @@ -1 +1 @@ -Subproject commit c996e22c1ec38ef43a46b495aa1e450a8383df50 +Subproject commit 2d919530f6c3f09c6b9f6b79481c99d520d857dd