Skip to content

Latest commit

 

History

History
384 lines (303 loc) · 23.3 KB

File metadata and controls

384 lines (303 loc) · 23.3 KB

Temporal Adapter

Overview

The Temporal adapter provides orchestrated execution of the ETL pipeline. Unlike the standalone adapter, which requires you to run each script independently and manually update code to pass data between steps, the Temporal adapter automatically:

  • Orchestrates data flow: Passes data from function to function automatically
  • Manages state transitions: Moves through each stage (extract → identify → transform → load) without manual intervention
  • Handles errors: Automatically retries failed activities and continues processing other items
  • Tracks progress: Provides visibility into workflow execution through the Temporal UI
  • Runs to completion: Processes all items from start to finish (or error) without manual steps

The workflow automatically handles the entire pipeline from extracting content from your CMS through loading transformed content into Arc XP, with built-in error handling and retry logic.

Note on Orchestration Approach

Temporal is one way to orchestrate the adapter functions - you can orchestrate the functions in your adapter in any way that works for your organization. This repository uses Temporal to demonstrate one approach to coding the orchestration of adapter steps into a cohesive ETL application.

Alternative orchestration approaches include:

  • AWS Step Functions - Serverless workflow orchestration
  • Bespoke orchestration code - Custom orchestration logic using your preferred framework or language
  • Other workflow engines - Airflow, Prefect, Dagster, or similar tools

The adapter functions themselves (in the adapter/ directory) are designed to be orchestration-agnostic and can be called from any orchestration system. The Temporal implementation serves as a reference example of how to coordinate these functions into a production-ready ETL pipeline.

Setup

One-time

  1. Create and activate a virtual environment (if not already done):

    python -m venv venv
    source venv/bin/activate  # On Windows: venv\Scripts\activate
  2. Install dependencies (includes Temporal Python SDK):

    pip install -r requirements.txt
  3. Install the Temporal Python SDK (if installing separately). In this repo, the Temporal SDK is installed automatically when you install the dependencies in requirements.txt.

  4. Install the Temporal CLI (e.g., using Homebrew on macOS):

brew install temporal

Running

  1. Activate the virtual environment (if not already active):

    source venv/bin/activate  # On Windows: venv\Scripts\activate
  2. Start a local development Temporal Server:

temporal server start-dev

localhost:7233 is the default address and port for the Temporal Frontend Service when running a local Temporal development server. This is where your Temporal Clients and Workers connect to interact with the Temporal Platform.

When you start the local Temporal Service using the Temporal CLI (e.g., with a command like temporal server start-dev), it automatically starts the Frontend Service on localhost:7233 and the Temporal Web UI on http://localhost:8233.

This setup provides a convenient way to develop and test Temporal applications locally without needing to configure a full production-ready Temporal Cluster. Most Temporal SDKs will default to connecting to localhost:7233 if no other connection address is specified.

  1. Populate an .env file at the root of the arc-xp-etl-migration-starter repository with appropriate values for
  • an Arc XP bearer Token (BEARER_TOKEN)
  • the Arc XP environment where your objects are to be saved: sandbox or production (ENV)
  • the Arc XP organization id (ORG)
  • the Arc XP website id where the objects are to be saved (WEBSITE)
  • (Optional) Temporal server URL (TEMPORAL_SERVER_URL, defaults to localhost:7233)
  • (Optional) Extract API endpoint URL (EXTRACT_API_URL, defaults to http://127.0.0.1:8000/)
  1. In another terminal, activate the virtual environment and start the fake-news API (see /fake-news/ReadMe.md). You can view the feed in your browser at http://127.0.0.1:8000

Orchestration

Use Workflows to orchestrate the execution of your data pipeline's steps. The Workflow will be responsible for executing the Activities, or steps, in your data pipeline, and handle any failures that may occur using retries and timeouts.

  1. In another terminal, activate the virtual environment and start the Temporal Worker

python run_worker.py

Note: When you make changes to workflow or activity code, you need to stop run_worker.py (Ctrl+C) and run it again to pick up the changes.

  1. In another terminal, activate the virtual environment and start the Temporal Workflow. This will cause temporal to pull in the entire feed from Fast API and process each item to transform the stories, their images, and send the transformed content to Arc XP.

python adapter_temporal/run_workflow.py

  1. In a browser, access the Temporal Web UI, usually accessible at http://localhost:8233/namespaces/default/workflows.

You will see logs print to the terminal window where run_worker.py is operating, or you can follow the visual path of the processes in the Temporal Web UI.

Note: An example log file output is included in the fixtures directory. The log file shows the entire output of one pass at processing fake-news content into Arc XP content.

INFO:temporalio.workflow:Starting AdapterTransformWorkflow for ans_id: TUQRVGSOLVSUFNW4DX24ZRFQKQ ({'attempt': 1, 'namespace': 'default', 'run_id': '019b70d0-4532-7130-81a7-e72292f7608b', 'task_queue': 'temporal-adapter-task-queue', 'workflow_id': 'transform_TUQRVGSOLVSUFNW4DX24ZRFQKQ', 'workflow_type': 'AdapterTransformWorkflow'})
INFO:temporalio.workflow:Content type: story, Converter: StoryConverter ({'attempt': 1, 'namespace': 'default', 'run_id': '019b70d0-4532-7130-81a7-e72292f7608b', 'task_queue': 'temporal-adapter-task-queue', 'workflow_id': 'transform_TUQRVGSOLVSUFNW4DX24ZRFQKQ', 'workflow_type': 'AdapterTransformWorkflow'})
INFO:temporalio.workflow:Calling transform_content_item activity ({'attempt': 1, 'namespace': 'default', 'run_id': '019b70d0-4532-7130-81a7-e72292f7608b', 'task_queue': 'temporal-adapter-task-queue', 'workflow_id': 'transform_TUQRVGSOLVSUFNW4DX24ZRFQKQ', 'workflow_type': 'AdapterTransformWorkflow'})
INFO:temporalio.workflow:Item 2 identified: ans_id=TUQRVGSOLVSUFNW4DX24ZRFQKQ, type=story ({'attempt': 1, 'namespace': 'default', 'run_id': '395292c6-ee46-4b4f-9db5-1015c113fbcd', 'task_queue': 'temporal-adapter-task-queue', 'workflow_id': 'temporal-adapter-workflow', 'workflow_type': 'TemporalAdapterWorkflow'})
INFO:temporalio.workflow:Inventorying item TUQRVGSOLVSUFNW4DX24ZRFQKQ story ({'attempt': 1, 'namespace': 'default', 'run_id': '395292c6-ee46-4b4f-9db5-1015c113fbcd', 'task_queue': 'temporal-adapter-task-queue', 'workflow_id': 'temporal-adapter-workflow', 'workflow_type': 'TemporalAdapterWorkflow'})
INFO:temporalio.activity:Inventory content item: TUQRVGSOLVSUFNW4DX24ZRFQKQ story ({'activity_id': '5', 'activity_type': 'add_update_inventory', 'attempt': 1, 'namespace': 'default', 'task_queue': 'temporal-adapter-task-queue', 'workflow_id': 'temporal-adapter-workflow', 'workflow_run_id': '395292c6-ee46-4b4f-9db5-1015c113fbcd', 'workflow_type': 'TemporalAdapterWorkflow'})
INFO:temporalio.activity:Converting TUQRVGSOLVSUFNW4DX24ZRFQKQ story using StoryConverter ({'activity_id': '1', 'activity_type': 'transform_content_item', 'attempt': 1, 'namespace': 'default', 'task_queue': 'temporal-adapter-task-queue', 'workflow_id': 'transform_TUQRVGSOLVSUFNW4DX24ZRFQKQ', 'workflow_run_id': '019b70d0-4532-7130-81a7-e72292f7608b', 'workflow_type': 'AdapterTransformWorkflow'})
Database created/opened at: /Users/Repositories/arc-xp-etl-migration-starter/adapter/databases/example_etl_fakenews_inventory_sandbox.db
INFO:temporalio.activity:Successfully converted TUQRVGSOLVSUFNW4DX24ZRFQKQ. Images to convert: 1 ({'activity_id': '1', 'activity_type': 'transform_content_item', 'attempt': 1, 'namespace': 'default', 'task_queue': 'temporal-adapter-task-queue', 'workflow_id': 'transform_TUQRVGSOLVSUFNW4DX24ZRFQKQ', 'workflow_run_id': '019b70d0-4532-7130-81a7-e72292f7608b', 'workflow_type': 'AdapterTransformWorkflow'})
INFO:temporalio.workflow:Inventorying transformed item TUQRVGSOLVSUFNW4DX24ZRFQKQ ({'attempt': 1, 'namespace': 'default', 'run_id': '019b70d0-4532-7130-81a7-e72292f7608b', 'task_queue': 'temporal-adapter-task-queue', 'workflow_id': 'transform_TUQRVGSOLVSUFNW4DX24ZRFQKQ', 'workflow_type': 'AdapterTransformWorkflow'})
INFO:temporalio.workflow:Identifying item 3/15 ({'attempt': 1, 'namespace': 'default', 'run_id': '395292c6-ee46-4b4f-9db5-1015c113fbcd', 'task_queue': 'temporal-adapter-task-queue', 'workflow_id': 'temporal-adapter-workflow', 'workflow_type': 'TemporalAdapterWorkflow'})
INFO:temporalio.activity:Inventory content item: TUQRVGSOLVSUFNW4DX24ZRFQKQ story ({'activity_id': '2', 'activity_type': 'add_update_inventory', 'attempt': 1, 'namespace': 'default', 'task_queue': 'temporal-adapter-task-queue', 'workflow_id': 'transform_TUQRVGSOLVSUFNW4DX24ZRFQKQ', 'workflow_run_id': '019b70d0-4532-7130-81a7-e72292f7608b', 'workflow_type': 'AdapterTransformWorkflow'})
Database created/opened at: /Users/Repositories/arc-xp-etl-migration-starter/adapter/databases/example_etl_fakenews_inventory_sandbox.db

Workflow Architecture

The Temporal adapter uses a two-level workflow architecture:

┌─────────────────────────────────────────────────────────────────┐
│                    TemporalAdapterWorkflow                       │
│                      (Main Workflow)                             │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
                    ┌─────────────────────┐
                    │ call_extract_api     │
                    │ Extract from API    │
                    └─────────────────────┘
                              │
                              ▼
                    ┌─────────────────────┐
                    │   For each item     │
                    └─────────────────────┘
                              │
                              ▼
                    ┌─────────────────────┐
                    │ identify_extract_item│
                    │ Identify type & ID  │
                    └─────────────────────┘
                              │
                              ▼
                    ┌─────────────────────┐
                    │ add_update_inventory │
                    │ Status: "inventoried"│
                    └─────────────────────┘
                              │
                              ▼
    ┌───────────────────────────────────────────────────────┐
    │         Start AdapterTransformWorkflow                 │
    │              (Child Workflow)                          │
    └───────────────────────────────────────────────────────┘
                              │
                              ▼
                    ┌─────────────────────┐
                    │ transform_content_   │
                    │ item                │
                    │ Convert to ANS      │
                    └─────────────────────┘
                              │
                              ▼
                    ┌─────────────────────┐
                    │ add_update_inventory │
                    │ Status: "transformed"│
                    └─────────────────────┘
                              │
                              ▼
                    ┌─────────────────────┐
                    │ If images exist:    │
                    │ transform_image_item│
                    │ (async, parallel)   │
                    └─────────────────────┘
                              │
                              ▼
                    ┌─────────────────────┐
                    │ load_to_arcxp        │
                    │ Submit to Migration │
                    │ Center API          │
                    └─────────────────────┘
                              │
                              ▼
                    ┌─────────────────────┐
                    │ add_update_inventory │
                    │ Status: "transformed_│
                    │ and_submitted_to_   │
                    │ migration_center"   │
                    └─────────────────────┘

Workflow Components

TemporalAdapterWorkflow (Main Workflow)

  • Extracts content from the fake-news CMS API
  • Identifies each content item's type and generates Arc XP IDs
  • Creates inventory records with "inventoried" status
  • Spawns child workflows for content transformation

AdapterTransformWorkflow (Child Workflow)

  • Receives identified content items with metadata (org, env, website)
  • Transforms content using the appropriate converter (StoryConverter, ImageConverter, etc.)
  • Updates inventory with "transformed" status
  • Transforms images asynchronously if present in content
  • Loads transformed content to Arc XP using Bulk Migration Center API
  • Updates inventory with "transformed_and_submitted_to_migration_center" status
  • Handles errors gracefully: logs source content and continues processing

Activities

  • call_extract_api: Fetches content from the fake-news CMS source
  • identify_extract_item: Identifies content type and generates Arc XP identifiers
  • add_update_inventory: Manages inventory tracking in SQLite database with status notes
  • transform_content_item: Converts source content to Arc XP ANS format using appropriate converter
  • transform_image_item: Converts image URLs to Arc XP ANS format
  • load_to_arcxp: Submits transformed content to Arc XP using the Migration Center Bulk API (handles 50-item limit by chunking)

Analyzing Workflow Logs for Problematic Items

When reviewing workflow execution logs, you can identify items that failed at different stages of the pipeline. The example log file (fixtures/workflow_logging_result.txt) demonstrates how to analyze logs to find items that need attention.

How to Identify Problematic Items

Search the logs for ERROR entries to find items that failed. The workflow logs include:

  • Error messages with full exception details
  • Source content that caused the failure (logged after errors)
  • Item position in the processing queue (e.g., "item 3/17")

Three Items in the Example Logs Fixture Requiring Attention

Analysis of the example log file reveals three items that failed at different stages:

1. Item 3/17: Failed Identification

Error Location: Line 39-64 in workflow_logging_result.txt

Problem: The content item could not be identified as a valid Arc XP content type because it's missing the url field, which is required for story identification.

Error Message:

ERROR:temporalio.activity:Error identifying content item: Source content cannot be identified as Arc XP content type. Update logic in identify_content_type().
UnidentifiedContentException: Source content cannot be identified as Arc XP content type.

Source Content:

{
  "author": "Ronald McErrorson, Reuters",
  "blurb": "",
  "extra_properties": {"num_content_elements": 10},
  "headline": "This fake news should error during identification",
  "hero": "x",
  "many_lines_of_content": {},
  "publish_date": "2015-10-20T22:42:10.899059",
  "section": "Elections"
}

How to Fix:

  1. Update the source data to include a url field. Do you need to make a secondary Query when extracting data to locate URLs for this kind of content? or
  2. Modify adapter/identify.py to handle content items that don't have a url field by:
    • Adding alternative identification logic (e.g., using headline + publish_date as a unique identifier)
    • Updating identify_content_type() to support content without URLs
      • However, Arc XP stories require URLs and when migrating stories that have already been published, the best practice is to assign the same URL with which they were previously published.
      • If your extract does not contain URLs or other significant data, you may need to conduct a secondary extract and concatenate the two sources of data before conducting your identify action.
    • Ensuring the identification logic matches your source CMS data structure

2. Item 6/17: Failed Transformation

Error Location: Line 143-190 in workflow_logging_result.txt

Problem: The publish_date field contains an invalid date format ("not-a-valid-date-format") that cannot be parsed by the arrow library.

Error Message:

ERROR:temporalio.activity:Error converting content item ACAYEWVUFRCDE3EYXQBJEC566Y: 
Could not match input 'not-a-valid-date-format' to any of the following formats: 
YYYY-MM-DD, YYYY-M-DD, YYYY-M-D, YYYY/MM/DD, ...
arrow.parser.ParserError: Could not match input 'not-a-valid-date-format'

Source Content:

{
  "author": "Ronald McErrorson, Reuters",
  "blurb": "This will cause errors",
  "headline": "This fake news should error during conversion",
  "publish_date": "not-a-valid-date-format",
  "url": "https://example.com/error-story-conversion"
}

How to Fix:

  1. Fix the source data to provide a valid date format, or
  2. Enhance adapter/convert_story.py in the set_publish_date() method to:
    • Add validation and error handling for invalid date formats
    • Do not include the date in data to be transformed
      • Likely not a preferred option
    • Log warnings for invalid dates while still allowing transformation to proceed
    • Support additional date formats that your source CMS might use

3. Item 17/17: Failed Load to Arc XP

Error Location: Line 518 in workflow_logging_result.txt

Problem: The transformed ANS payload was rejected by Arc XP's Migration Center API because it contains invalid content_elements types. Specifically, the type "heading1" is not recognized by Arc XP's ANS schema.

Error Message:

ERROR:temporalio.activity:Failed to load chunk 1: 400 - {
  "failureCount": 1,
  "content": [{
    "success": false,
    "reasons": [{
      "ansId": "LJR5N6SXUEYC3FJU6NRNN6RGFU",
      "ansType": "story",
      "totalErrors": 2,
      "errors": [
        {
          "dataPath": ".content_elements[2].type",
          "params": "heading1",
          "message": "Could not find schema for specified type"
        },
        {
          "dataPath": ".content_elements[4].type",
          "params": "heading1",
          "message": "Could not find schema for specified type"
        }
      ]
    }]
  }]
}

This error message came after the ANS has been submitted to Arc XP via the Migration Center API. Migration Center API has reporting endpoints that you can visit to determine the status of an item's submission. If you visit the /migrations/v3/report/detail endpoint you will see the full submission result including this error message.

How to Fix:

  1. Review Arc XP ANS documentation to identify the correct content element types for headings
  2. Update adapter/convert_story.py in the set_content_elements() method to:
    • Replace "heading1" with the correct ANS type (likely "heading" with a level property, or "h1")
    • Verify all content element types match Arc XP's ANS schema
    • Add validation to ensure only valid ANS types are used
    • Check the converter class used (in this case, StoryConverterRecipeBody) to ensure it generates valid ANS

Next Steps for Developers

To get these three items successfully to Arc XP:

  1. Examine the source data for each problematic item:

    • Review the source CMS to understand why these items have invalid data
    • Determine if this is a data quality issue or a missing feature in the adapter
  2. Update the adapter code to handle edge cases:

    • Add validation and error handling for missing required fields
    • Support alternative data formats and structures
    • Ensure all generated ANS conforms to Arc XP's schema
  3. Test fixes incrementally:

    • Use the standalone adapter to test transformations on individual problematic items
    • Verify the transformed ANS payloads are valid before running the full Temporal workflow
    • Re-run the workflow and verify all items process successfully
  4. Monitor for similar issues:

    • Review logs after each workflow run to identify new problematic items
    • Consider adding validation earlier in the pipeline to catch issues before transformation
    • Document common data quality issues and their solutions
  5. Query the inventory database to track items that failed:

    from adapter.inventory import InventoryDB
    inventory = InventoryDB(org="cetest", env="sandbox")
    failed_items = inventory.load_inventory(
        content_type="story",
        status_note="transformation_failed"
    )

The workflow's error handling ensures that these failures don't stop processing of other items, but they should be addressed to achieve 100% success rate.

Learning More About Temporal

If you're new to Temporal or want to deepen your understanding, here are some helpful resources: