Skip to content

Latest commit

 

History

History
349 lines (284 loc) · 17.3 KB

File metadata and controls

349 lines (284 loc) · 17.3 KB

Migration Plan: AWS Step Functions → Temporal + PostgreSQL

Goal

Migrate the existing AWS Step Function workflow from AWS-native services to Temporal for workflow orchestration and PostgreSQL for data persistence. This migration will demonstrate AI-assisted transformation while maintaining the same business logic: receiving person data with numbers, writing initial state, waiting 30 seconds, then calculating and updating the final value.

Current Architecture

Workflow: AWS Step Functions

  • State 1: Lambda invoke (write initial data to DynamoDB)
  • State 2: Wait 30 seconds
  • State 3: Lambda invoke (read, calculate, update DynamoDB)

Database: DynamoDB (test_db)

  • Partition key: personName (String)
  • Attribute: assignedNumber (String)

Invocation: API Gateway endpoint triggers Step Function

Target Architecture

Workflow: Temporal (Python SDK)

  • Activity 1: Write initial data to PostgreSQL
  • Timer: Wait 30 seconds
  • Activity 2: Read, calculate, update PostgreSQL

Database: PostgreSQL

  • Table with appropriate schema for person data
  • Proper typing (numbers as integers, not strings)

Invocation: REST API or Temporal client trigger

Implementation Steps

Phase 1: Environment Setup

Step 1.1: Set up local Temporal development environment

  • Install Temporal CLI using Homebrew or download
  • Start Temporal server locally using temporal server start-dev
  • Verify Temporal is running by accessing Web UI at http://localhost:8233

Verification: Web UI loads and shows "No workflows" message

Step 1.2: Set up PostgreSQL locally

  • Install PostgreSQL (via Homebrew, Docker, or Postgres.app)
  • Create database: temporal_migration_test
  • Create user with appropriate permissions
  • Test connection using psql or preferred client

Verification: Can connect to database and run basic SQL queries

Step 1.3: Initialize Python project structure

  • Create new directory structure:
    temporal-implementation/
      ├── workflows/
      ├── activities/
      ├── database/
      ├── tests/
      ├── requirements.txt
      ├── config/
      └── .env.example
    
  • Set up virtual environment
  • Install core dependencies: temporalio, psycopg2-binary, python-dotenv, fastapi, uvicorn (using raw SQL, not ORM)
  • Create .env file for database credentials (not tracked in git)
  • Configure logging to STDERR with appropriate log level

Verification: Virtual environment activates, dependencies install without errors, logging outputs to STDERR

Phase 2: Database Migration

Step 2.1: Design PostgreSQL schema

  • Create migration SQL file for persons table:
    • id (SERIAL PRIMARY KEY)
    • person_name (VARCHAR UNIQUE NOT NULL)
    • assigned_number (INTEGER NOT NULL)
    • created_at (TIMESTAMP DEFAULT NOW())
    • updated_at (TIMESTAMP DEFAULT NOW())
  • Add index on person_name for fast lookups
  • Use UPSERT pattern (INSERT ... ON CONFLICT UPDATE) to handle duplicate names
  • Create trigger to automatically update updated_at timestamp on UPDATE operations

Verification: Run SQL against local PostgreSQL, table created successfully with correct types; test UPSERT with duplicate name; verify trigger updates updated_at

Step 2.2: Create database access layer

  • Create functions using raw SQL with psycopg2
  • Read database credentials from .env file using python-dotenv
  • Establish one database connection per activity invocation (connection shared across all database function calls within a single activity)
  • Create functions for database operations:
    • get_connection() - reads .env and returns database connection
    • upsert_person(conn, name, number) - inserts or updates, returns success/failure
    • get_person(conn, name) - returns person record or None
    • update_person_number(conn, name, new_number) - returns success/failure
  • Use parameterized queries to prevent SQL injection
  • Handle connection errors gracefully
  • Activities responsible for opening connection at start and closing at end using try/finally blocks

Verification: Write unit tests that insert, read, update, and upsert test data successfully

Phase 3: Temporal Activities Implementation

Step 3.1: Implement Activity 1 - Initial data write

  • Create activities/ingest_data.py
  • Implement activity function using Temporal Python SDK conventions:
    • Accept parameters: person_name, base_number, additional_number
    • Validate inputs (non-empty name, non-empty numeric strings)
    • Convert base_number and additional_number to integers, raising ValueError if conversion fails
    • Open database connection at start of activity using try/finally
    • Call database layer to upsert record with assigned_number = base_number (as integer)
    • Close database connection in finally block before returning
    • Return tuple: (person_name, base_number, additional_number) where person_name is string, base_number and additional_number are integers
  • Add proper error handling with Temporal activity exceptions
  • Add activity-level logging

Verification:

  • Unit test the activity function using Temporal testing framework (mocking database)
  • Integration test with real database connection

Step 3.2: Implement Activity 2 - Calculate and update

  • Create activities/calculate_update.py
  • Implement activity function using Temporal Python SDK conventions:
    • Accept parameters: person_name, base_number, additional_number (person_name as string, base_number and additional_number as integers from Activity 1)
    • Open database connection at start of activity using try/finally
    • Query database for existing record
    • If record doesn't exist, raise error (fail workflow)
    • If record exists, calculate expected final value: expected_final = base_number + additional_number
    • Check if existing_number (from database) equals either base_number OR expected_final:
      • If existing_number == base_number: First run, proceed with update
      • If existing_number == expected_final: Retry after successful update, skip update and return success
      • Otherwise: Data corruption detected, raise error
    • If update needed: calculate new_number = base_number + additional_number and update database (trigger will update updated_at automatically)
    • Close database connection in finally block before returning
    • Return the final number as an integer
  • Add proper error handling with Temporal activity exceptions
  • Add activity-level logging
  • Retry policy configured in workflow will handle database connection failures and timeouts

Verification:

  • Unit test with mocked database using Temporal testing framework
  • Integration test that performs write → read → update sequence

Phase 4: Temporal Workflow Implementation

Step 4.1: Create workflow definition

  • Create workflows/person_number_workflow.py
  • Define workflow class/function matching Temporal Python SDK patterns
  • Implement workflow logic:
    • Accept input: person_name, base_number, additional_number
    • Execute Activity 1 (ingest data), capture return values
    • Sleep for 30 seconds using workflow.sleep() (hard-coded duration)
    • Execute Activity 2 (calculate and update), passing all three values: person_name, base_number, additional_number
    • Return the final number as an integer (from Activity 2)
  • Configure activity options:
    • Start-to-close timeout: 60 seconds (for both Activity 1 and Activity 2)
    • Retry policies (configured in Step 4.2)
  • Set workflow options (execution timeout, workflow ID reuse policy to let Temporal handle collisions)

Verification: Workflow compiles without syntax errors, passes static analysis, passes Temporal testing framework tests

Step 4.2: Configure retry and error handling

  • Set activity retry policies matching current Step Function behavior:
    • Initial interval: 1 second
    • Max attempts: 3
    • Backoff coefficient: 2.0
  • Retry policy handles all transient errors (database connection failures, timeouts, etc.)
  • Activity-level errors (record not found, validation failures) will exhaust retries and fail workflow
  • Add workflow-level error handling for activity failures
  • No compensation logic needed: if Activity 2 fails, leave initial write in place (matches AWS behavior)
  • Add workflow-level logging

Verification: Review configuration matches documented retry behavior from copilot-instructions.md

Step 4.3: Create worker implementation

  • Create worker.py to register workflow and activities
  • Configure worker connection to Temporal server
  • Set task queue name: person-number-task-queue
  • Register all workflows and activities
  • Add graceful shutdown handling

Verification: Worker starts successfully, logs show connection to Temporal, no registration errors

Phase 5: Testing & Validation

Step 5.1: Create workflow invocation script

  • Create client.py or similar to start workflows programmatically
  • Implement function to:
    • Connect to Temporal
    • Start workflow with sample input
    • Wait for result or return workflow ID
    • Print/return final output
  • Support both synchronous (wait for result) and asynchronous (fire and forget) modes

Verification: Can start workflow manually from command line with test data

Step 5.2: Port existing test script

  • Create test_temporal_workflow.py based on existing API_Testing.py
  • Read names from names.txt
  • Generate random numbers
  • Invoke Temporal workflows instead of API Gateway
  • Collect results (query workflow history or wait for completion)
  • Save results to temporal_results.json

Verification: Script completes all test cases, results file generated

Step 5.3: Validate correctness

  • Manually compare results between AWS implementation and Temporal implementation:
    • Run same inputs through both systems
    • Verify finalNumber = baseNumber + additionalNumber in both cases
    • Check timing (should still be ~30 seconds with full wait time)
  • Query PostgreSQL directly to verify data persistence
  • Use Temporal Web UI to inspect workflow histories
  • Document comparison results

Verification: Results match between systems for identical inputs (manual validation)

Step 5.4: Test error scenarios

  • Test invalid inputs (missing fields, non-numeric values) using Temporal testing framework against dev server
  • Test database connection failures (stop PostgreSQL mid-workflow)
  • Test duplicate person names (should update existing record via UPSERT)
  • Test Activity 2 when record doesn't exist (should fail with error)
  • Verify retry behavior works as configured (3 attempts with exponential backoff)
  • Check that errors are properly logged and surfaced
  • Tests run against local Temporal dev server and wait full 30 seconds (no time mocking)

Verification: Errors are handled gracefully, retries occur as expected, workflows fail appropriately when Activity 2 record missing, duplicate names update records successfully

Phase 6: API Layer

Step 6.1: Create REST API for workflow triggering

  • Implement FastAPI endpoint replicating existing API Gateway interface
  • Accept JSON payload matching current format:
    {
      "input": "{\"personName\": \"Name\", \"baseNumber\": \"10\", \"additionalNumber\": \"20\"}",
      "name": "unique-execution-id",
      "stateMachineArn": "arn:aws:states:us-east-2:<AWS_Account_ID>:stateMachine:state-machine-test"
    }
  • Parse nested JSON input string and extract personName, baseNumber, additionalNumber
  • Validate extracted values:
    • Ensure personName is non-empty string
    • Ensure baseNumber and additionalNumber are valid integer strings
    • Convert baseNumber and additionalNumber to integers, return 400 error if conversion fails
  • Generate new UUID for Temporal workflow ID (ignore name field from payload)
  • Start Temporal workflow asynchronously with validated integer values (Temporal will handle any workflow ID collisions via ID reuse policy)
  • Return custom JSON response structure:
    {
      "workflow_id": "<uuid>",
      "run_id": "<temporal-run-id>",
      "status": "started"
    }
  • Use HTTP status codes for error responses (400 for invalid input, 500 for server errors)
  • No authentication required (internal only)

Verification: Can invoke workflows via HTTP POST with same payload format as existing system; returns workflow_id and run_id

Step 6.2: Update test script for API invocation

  • Update API_Testing.py to point to new local endpoint (instead of AWS API Gateway)
  • Keep same test data from names.txt and validation logic
  • Request payload format remains unchanged (nested JSON with escaped inner string)
  • Update response parsing: AWS returns execution ARN and state machine details, while Temporal API returns {workflow_id, run_id, status}
  • Update how response.json() is processed and stored in results dictionary to handle new response structure
  • Ensure results are functionally equivalent to direct client invocation

Verification: Existing test script works with minimal changes (update URL and response parsing)

Phase 7: Documentation & Cleanup

Step 7.1: Document new architecture

  • Update README.md with Temporal setup instructions
  • Document how to run worker
  • Document how to start workflows
  • Add database schema documentation
  • Create architecture diagram showing workflow flow

Verification: Another developer can follow README and run the system

Step 7.2: Create deployment artifacts (optional)

  • Dockerfile for worker service (optional for local development)
  • Docker Compose file for local development (Temporal, PostgreSQL, worker, Prometheus)
  • Environment variable configuration template
  • Helper scripts for database migrations
  • Prometheus configuration for Temporal metrics

Verification: docker-compose up starts entire system successfully with metrics collection

Step 7.3: Fix known bugs from original implementation

  • Fix typo in exception handling (xceptionException)
  • Add proper logging configuration
  • Add input validation
  • Improve error messages

Verification: Code quality checks pass, no obvious bugs remain

Migration Strategy Decisions

Based on requirements, the following decisions have been made:

Deployment & Infrastructure

  • Environment: Local development only (no cloud deployment)
  • Temporal: Local Temporal server using temporal server start-dev
  • PostgreSQL: Local PostgreSQL instance
  • Observability: Temporal server configured with Prometheus integration
  • Authentication: None required (internal only)

Implementation Approach

  • Testing: Use Temporal's testing framework against local dev server; tests wait full 30 seconds (no time mocking)
  • API Framework: FastAPI with uvicorn for REST endpoint
  • API: Replicate existing API Gateway payload format; return custom JSON response with workflow_id and run_id
  • Data Migration: None needed - starting with fresh infrastructure
  • Duplicate Handling: Update existing records when duplicate person_name is submitted (UPSERT in Activity 1)
  • Database: Use raw SQL with psycopg2 (synchronous, one connection per activity invocation)
  • Database Connection Management: Manual open/close with try/finally blocks
  • Database Credentials: Store in .env file, read with python-dotenv
  • Database Schema: Trigger automatically updates updated_at timestamp
  • Python Style: Synchronous (not async/await)
  • Logging: Configure in project setup to output to STDERR
  • Workflow IDs: Generate new UUIDs for Temporal workflows; let Temporal handle collisions
  • Error Handling: HTTP status codes for API errors; Temporal activity exceptions for workflow errors; retry policy handles all transient errors
  • Activity Timeouts: 60 seconds start-to-close timeout for both Activity 1 and Activity 2
  • Type Conversion: API layer and Activity 1 validate and convert string inputs to integers before database operations
  • Activity 2 Validation: Idempotent retry-safe validation - checks if existing_number equals either base_number (first run) OR base_number + additional_number (retry after successful update); raises error for other values indicating data corruption
  • Wait Duration: Hard-coded 30 seconds (not configurable)
  • Activity 2 Behavior: Idempotent for retries after successful update; fails if record doesn't exist
  • Activity Data Flow: Activity 1 returns tuple (person_name as string, base_number as integer, additional_number as integer); workflow passes all three to Activity 2; Activity 2 returns final number as integer; workflow returns final number as integer
  • Compensation: No cleanup on Activity 2 failure (leave initial write)
  • Validation: Manual comparison between AWS and Temporal implementations (no automated cross-system tests)

Success Criteria

  • Temporal workflow successfully executes all three steps
  • Data persists correctly to PostgreSQL
  • 30-second wait is honored
  • Final calculation is correct (baseNumber + additionalNumber)
  • All 126 test names from names.txt process successfully
  • Error handling and retries work as configured
  • Workflow can be invoked programmatically
  • Results are queryable from both Temporal Web UI and PostgreSQL
  • Documentation is complete and accurate
  • API endpoint replicates existing API Gateway interface
  • Existing test script works with minimal modifications

Questions

No outstanding questions. This spec is complete, internally consistent, and ready for implementation.