Skip to content

[BUG] Import error on restore_task_instance #43

@betancourtl

Description

@betancourtl

Is there an existing issue for this?

  • I have searched the existing issues

What MWAA versions are you seeing the problem on?

2.9.2

Current Behavior

I'm following these steps to import, clean, restore metadata from env A -> env B. Both are on version 2.9. The reason I'm migrating to a new env is because we are changing kms keys.

Steps:

  1. run restore_metadata.py on env A
  2. run cleanup_metadata.py on env B
  3. run restore_metadata.py on env B (import error on restore_task_instance)

https://pypi.org/project/mwaa-dr/

restore_metadata.py

from airflow import DAG
from custom_dr_factory_2_9 import CustomDRFactory_2_9

factory = CustomDRFactory_2_9(
    dag_id='restore_metadata',
    path_prefix='data',
    storage_type='S3'
)

dag:DAG = factory.create_restore_dag()

backup_metadata.py

# Importing DAG is necessary for DAG detection
from airflow import DAG
from custom_dr_factory_2_9 import CustomDRFactory_2_9

factory = CustomDRFactory_2_9(
    dag_id='backup_metadata',
    path_prefix='data',
    storage_type='S3'
)

# Assigning the returned dag to a global variable is necessary for DAG detection
dag: DAG = factory.create_backup_dag()

cleanup_metadata.py

from airflow import DAG
from mwaa_dr.v_2_9.dr_factory import DRFactory_2_9

factory = DRFactory_2_9(
    dag_id='cleanup_metadata',
    path_prefix='data',
    storage_type='S3'
)

dag:DAG = factory.create_cleanup_dag()

custom_dr_factory_2_9.py

I had to make a custom dr_factory as a workaround log_template_id issues.

from mwaa_dr.framework.model.base_table import BaseTable
from mwaa_dr.framework.model.dependency_model import DependencyModel
from mwaa_dr.v_2_9.dr_factory import DRFactory_2_9


class CustomDRFactory_2_9(DRFactory_2_9):

    def dag_run(self, model: DependencyModel[BaseTable]) -> BaseTable:
        """
        Create a BaseTable model for the dag_run table in Apache Airflow 2.8.1.
        In particular, adds the `clear_number` field to the 2.7.3 dag_run table.
        Args:
            model (DependencyModel[BaseTable]): The dependency model for the dag_run table.

        Returns:
            BaseTable: The BaseTable model for the dag_run table.
        """
        return BaseTable(
            name="dag_run",
            model=model,
            columns=[
                "clear_number",  # New Field
                "conf",
                "creating_job_id",
                "dag_hash",
                "dag_id",
                "data_interval_end",
                "data_interval_start",
                "end_date",
                "execution_date",
                "external_trigger",
                "last_scheduling_decision",
                # "log_template_id",
                "queued_at",
                "run_id",
                "run_type",
                "start_date",
                "state",
                "updated_at",
            ],
            export_mappings={"conf": "'\\x' || encode(conf,'hex') as conf"},
            storage_type=self.storage_type,
            path_prefix=self.path_prefix,
            batch_size=self.batch_size,
        )

error

ip-10-133-4-166.ec2.internal
*** Reading remote log from Cloudwatch log_group: airflow-USN-Airflow-data-warehouse-MWAAV292-1IEZUDU5TZEVC-airflow-Task log_stream: dag_id=restore_metadata/run_id=manual__2025-06-06T13_45_36.284037+00_00/task_id=restore_task_instance/attempt=1.log.
[2025-06-06, 13:46:15 UTC] {local_task_job_runner.py:120} ▼ Pre task execution logs
[2025-06-06, 13:46:15 UTC] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: restore_metadata.restore_task_instance manual__2025-06-06T13:45:36.284037+00:00 [queued]>
[2025-06-06, 13:46:15 UTC] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: restore_metadata.restore_task_instance manual__2025-06-06T13:45:36.284037+00:00 [queued]>
[2025-06-06, 13:46:15 UTC] {taskinstance.py:2306} INFO - Starting attempt 1 of 1
[2025-06-06, 13:46:15 UTC] {taskinstance.py:2330} INFO - Executing <Task(PythonOperator): restore_task_instance> on 2025-06-06 13:45:36.284037+00:00
[2025-06-06, 13:46:15 UTC] {standard_task_runner.py:63} INFO - Started process 113 to run task
[2025-06-06, 13:46:15 UTC] {standard_task_runner.py:90} INFO - Running: ['airflow', 'tasks', 'run', 'restore_metadata', 'restore_task_instance', 'manual__2025-06-06T13:45:36.284037+00:00', '--job-id', '1341990', '--raw', '--subdir', 'DAGS_FOLDER/restore_metadata.py', '--cfg-path', '/tmp/tmpk9fm5hnc']
[2025-06-06, 13:46:15 UTC] {standard_task_runner.py:91} INFO - Job 1341990: Subtask restore_task_instance
[2025-06-06, 13:46:16 UTC] {taskinstance.py:2648} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='restore_metadata' AIRFLOW_CTX_TASK_ID='restore_task_instance' AIRFLOW_CTX_EXECUTION_DATE='2025-06-06T13:45:36.284037+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2025-06-06T13:45:36.284037+00:00'
[2025-06-06, 13:46:16 UTC] {taskinstance.py:430} ▲▲▲ Log group end
[2025-06-06, 13:46:16 UTC] {logging_mixin.py:188} INFO - Restore SQL: COPY task_instance (dag_id, map_index, run_id, task_id, custom_operator_name, duration, end_date, executor_config, external_executor_id, hostname, job_id, max_tries, next_kwargs, next_method, operator, pid, pool, pool_slots, priority_weight, queue, queued_by_job_id, queued_dttm, rendered_map_index, start_date, state, task_display_name, trigger_id, trigger_timeout, try_number, unixname, updated_at) FROM STDIN WITH (FORMAT CSV, HEADER FALSE, DELIMITER '|')
[2025-06-06, 13:46:24 UTC] {taskinstance.py:441} ▼ Post task execution logs
[2025-06-06, 13:46:24 UTC] {taskinstance.py:2905} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 235, in execute
    return_value = self.execute_callable()
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 252, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/mwaa_dr/framework/model/base_table.py", line 368, in restore
    cursor.copy_expert(restore_sql, StringIO("".join(batch)))
psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type json
DETAIL:  Token "'" is invalid.
CONTEXT:  JSON data, line 1: {'...
COPY task_instance, line 2939, column next_kwargs: "{'__var': {}, '__type': 'dict'}"

[2025-06-06, 13:46:24 UTC] {taskinstance.py:1206} INFO - Marking task as FAILED. dag_id=restore_metadata, task_id=restore_task_instance, run_id=manual__2025-06-06T13:45:36.284037+00:00, execution_date=20250606T134536, start_date=20250606T134615, end_date=20250606T134624
[2025-06-06, 13:46:24 UTC] {logging_mixin.py:188} INFO - {'dag': 'restore_metadata', 'dag_run': 'manual__2025-06-06T13:45:36.284037+00:00', 'tasks': ['restore_task_fail => None', 'restore_active_dag => None', 'restore_xcom => None', 'restore_end => None', 'teardown => None', 'notify_success => None', 'setup => success', 'restore_start => success', 'restore_task_instance => running', 'restore_log => running', 'restore_trigger => success', 'restore_connection => success', 'restore_dag_run => success', 'restore_variable => success', 'restore_slot_pool => success', 'restore_job => success'], 'status': 'Fail'}
[2025-06-06, 13:46:24 UTC] {standard_task_runner.py:110} ERROR - Failed to execute job 1341990 for task restore_task_instance (invalid input syntax for type json
DETAIL:  Token "'" is invalid.
CONTEXT:  JSON data, line 1: {'...
COPY task_instance, line 2939, column next_kwargs: "{'__var': {}, '__type': 'dict'}"
; 113)
[2025-06-06, 13:46:24 UTC] {local_task_job_runner.py:240} INFO - Task exited with return code 1
[2025-06-06, 13:46:24 UTC] {local_task_job_runner.py:222} ▲▲▲ Log group end

Image

Expected Behavior

No response

Steps To Reproduce

Steps:

  1. run restore_metadata.py on env A
  2. run cleanup_metadata.py on env B
  3. run restore_metadata.py on env B (Success)

Anything else?

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions