Test implementation of AWS Step Function workflow, with migration to Temporal + PostgreSQL for evaluating AI-assisted transformation tooling.
Project ID: <Project_ID>
Created: 2026-02-02
AWS Account: <AWS_Account_ID>
Region: us-east-2
- Overview
- Implementations
- Temporal Quick Start
- Docker Quick Start
- Architecture
- Database Schema
- Running the System
- Testing
- API Documentation
- Migration Notes
- Troubleshooting
- Project Structure
- Implementation Journal
This project demonstrates a simple workflow that:
- Receives person data with two numbers (base and additional)
- Writes initial state to database with base number
- Waits 30 seconds
- Calculates final number (base + additional) and updates database
The workflow showcases migration from AWS-native services (Step Functions, Lambda, DynamoDB) to Temporal + PostgreSQL while maintaining identical business logic.
Stack: AWS Step Functions + Lambda + DynamoDB
IAM Roles:
step-one-test-role-<ROLE_SUFFIX-1>- Lambda execution role- ARN:
arn:aws:iam::<AWS_Account_ID>:role/service-role/step-one-test-role-<ROLE_SUFFIX-1>
- ARN:
StepFunctions-state-machine-test-role-<ROLE_SUFFIX-2>- Step Functions role- ARN:
arn:aws:iam::<AWS_Account_ID>:role/service-role/StepFunctions-state-machine-test-role-<ROLE_SUFFIX-2>
- ARN:
IAM Policies:
lambda_dynamodb_getitem_test- Inline policy for DynamoDB access
Lambda Functions:
step-one-test- Initial data ingestion- ARN:
arn:aws:lambda:us-east-2:<AWS_Account_ID>:function:step-one-test - Code: step-one-test/lambda_function.py
- ARN:
step-two-test- Calculate and update final value- ARN:
arn:aws:lambda:us-east-2:<AWS_Account_ID>:function:step-two-test - Code: step-two-test/lambda_function.py
- ARN:
DynamoDB:
- Table:
test_db- ARN:
arn:aws:dynamodb:us-east-2:<AWS_Account_ID>:table/test_db - Partition Key:
personName(String) - Attributes:
assignedNumber(String)
- ARN:
S3:
- Bucket:
test-bucket- ARN:
arn:aws:s3:::test-bucket
- ARN:
Step Functions:
- State Machine:
state-machine-test- ARN:
arn:aws:states:us-east-2:<AWS_Account_ID>:stateMachine:state-machine-test - Definition: step-function-test.yaml
- ARN:
API Gateway:
- Endpoint:
https://<random_id>.execute-api.us-east-2.amazonaws.com/v1/execution
Input: {personName, baseNumber, additionalNumber}
↓
Lambda 1: step-one-test
- Writes: DynamoDB[personName] = {assignedNumber: baseNumber}
↓
Wait: 30 seconds
↓
Lambda 2: step-two-test
- Reads: current assignedNumber
- Calculates: baseNumber + additionalNumber
- Writes: DynamoDB[personName] = {assignedNumber: final}
↓
Output: {status_code, name, finalNumber}
Stack: Temporal + Python SDK + PostgreSQL + FastAPI
Located in temporal-implementation/ directory.
Temporal Workflows:
PersonNumberWorkflow- Main workflow orchestrator- Location: temporal-implementation/workflows/person_number_workflow.py
- Retry Policy: 3 attempts, 1s initial interval, 2.0 backoff coefficient
Temporal Activities:
ingest_data_activity- Activity 1: Data validation and initial database write- Location: temporal-implementation/activities/ingest_data.py
- Timeout: 60 seconds
calculate_and_update_activity- Activity 2: Calculate final value and update- Location: temporal-implementation/activities/calculate_update.py
- Timeout: 60 seconds
- Features: Idempotent retry logic
Database:
- PostgreSQL database:
temporal_migration_test - Schema: temporal-implementation/database/001_create_persons_table.sql
- Access Layer: temporal-implementation/database/db_operations.py
REST API:
- FastAPI application at
http://localhost:8000 - Main file: temporal-implementation/api/main.py
- Documentation: temporal-implementation/api/README.md
Worker:
- Temporal worker: temporal-implementation/worker.py
- Task Queue:
person-number-task-queue
Client:
- CLI client: temporal-implementation/client.py
- Supports sync and async workflow execution
Input: {personName, baseNumber, additionalNumber}
↓
Activity 1: ingest_data_activity
- Validates: personName not empty
- Converts: baseNumber/additionalNumber strings → integers
- Writes: PostgreSQL[personName] = {assigned_number: baseNumber}
- UPSERT: ON CONFLICT (person_name) DO UPDATE
↓
Sleep: 30 seconds (workflow.sleep)
↓
Activity 2: calculate_and_update_activity
- Reads: existing assigned_number
- Validates: Idempotent retry safety check
- Calculates: baseNumber + additionalNumber
- Writes: PostgreSQL[personName] = {assigned_number: final}
↓
Output: final_number (integer)
-
Temporal CLI (v1.6.0+)
brew install temporal
-
PostgreSQL (14.21+)
brew install postgresql@14 brew services start postgresql@14
-
Python (3.8+)
python3 --version
temporal server start-dev- Web UI: http://localhost:8233
- gRPC: localhost:7233
- Namespace:
default
# Create database
createdb temporal_migration_test
# Run schema migration
psql -d temporal_migration_test -f temporal-implementation/database/001_create_persons_table.sql
# Verify
psql -d temporal_migration_test -c "\dt"cd temporal-implementation
# Create virtual environment
python3 -m venv venv
source venv/bin/activate
# Install dependencies
pip install -r requirements.txt# Copy example .env
cp .env.example .env
# Edit with your settings (defaults work for local development)
nano .env.env file:
# Temporal Configuration
TEMPORAL_HOST=localhost:7233
TEMPORAL_NAMESPACE=default
TASK_QUEUE=person-number-task-queue
# PostgreSQL Configuration
DB_HOST=localhost
DB_PORT=5432
DB_NAME=temporal_migration_test
DB_USER=your_username
DB_PASSWORD=your_passwordcd temporal-implementation
source venv/bin/activate
python worker.pyKeep this running in a separate terminal.
cd temporal-implementation
source venv/bin/activate
python run_api.py --port 8000- API: http://localhost:8000
- Docs: http://localhost:8000/docs
- Health: http://localhost:8000/health
For a containerized setup with zero local installation (except Docker):
- Docker Desktop (macOS/Windows) or Docker Engine (Linux)
- At least 4GB RAM available
cd temporal-implementation# Copy template (uses Docker defaults)
cp .env.template .envNo editing needed for Docker - defaults work out of the box.
./scripts/docker.sh upThis starts:
- PostgreSQL (with schema)
- Temporal Server
- Worker
- REST API
| Service | URL | Description |
|---|---|---|
| Temporal Web UI | http://localhost:8233 | Monitor workflows |
| API | http://localhost:8000 | Submit workflows |
| API Docs | http://localhost:8000/docs | Interactive API docs |
curl -X POST http://localhost:8000/v1/execution \
-H "Content-Type: application/json" \
-d '{
"input": "{\"personName\": \"Alice\", \"baseNumber\": \"50\", \"additionalNumber\": \"75\"}",
"name": "test-001",
"stateMachineArn": "arn:..."
}'After ~35 seconds, verify in database:
./scripts/docker.sh shell-db
# Then in psql:
SELECT person_name, assigned_number FROM persons WHERE person_name = 'Alice';
# Expected: Alice | 125# All services
./scripts/docker.sh logs
# Specific service
./scripts/docker.sh logs worker
./scripts/docker.sh logs api./scripts/docker.sh downTo include Prometheus and Grafana:
./scripts/docker.sh up-monitoringAccess:
- Prometheus: http://localhost:9091
- Grafana: http://localhost:3000 (admin/admin)
See temporal-implementation/DOCKER.md for:
- Detailed configuration
- Troubleshooting
- Development workflow
- Production considerations
┌─────────────────┐
│ REST API │ FastAPI (Port 8000)
│ (main.py) │ - Validates input
└────────┬────────┘ - Generates workflow ID
│ - Starts workflow
↓
┌─────────────────┐
│ Temporal Server │ localhost:7233
│ (dev server) │ - Workflow orchestration
└────────┬────────┘ - State management
│ - Retry handling
↓
┌─────────────────┐
│ Temporal Worker│ worker.py
│ (Task Queue) │ - Executes workflows
└────────┬────────┘ - Runs activities
│
├→ PersonNumberWorkflow
│ ├→ Activity 1: ingest_data
│ ├→ Sleep: 30 seconds
│ └→ Activity 2: calculate_update
│
↓
┌─────────────────┐
│ PostgreSQL │ temporal_migration_test
│ (Database) │ - persons table
└─────────────────┘ - ACID transactions
- UPSERT support
- Client → REST API: HTTP POST with JSON payload
- REST API → Temporal: Start workflow asynchronously
- Temporal → Worker: Schedule activity execution
- Worker → Activity: Execute business logic
- Activity → Database: Read/write data
- Activity → Worker: Return result
- Worker → Temporal: Report activity completion
- Temporal → Workflow: Continue execution
- REST API → Client: Return workflow_id and run_id
Workflow-Level Retry Policy:
- Maximum Attempts: 3
- Initial Interval: 1 second
- Backoff Coefficient: 2.0 (exponential)
- Retry Sequence: 1s → 2s → 4s
Activity-Level Timeouts:
- Start-to-Close: 60 seconds per activity
- No heartbeat timeout (activities are short-lived)
Idempotency:
- Activity 2 validates existing state before updating
- Safe to retry without causing duplicate updates
CREATE TABLE persons (
id SERIAL PRIMARY KEY,
person_name VARCHAR(255) UNIQUE NOT NULL,
assigned_number INTEGER NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_persons_person_name ON persons(person_name);
-- Trigger to update updated_at automatically
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_persons_updated_at
BEFORE UPDATE ON persons
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();Constraints:
person_name: UNIQUE (prevents duplicate entries)assigned_number: INTEGER (proper typing vs DynamoDB strings)
UPSERT Pattern:
INSERT INTO persons (person_name, assigned_number)
VALUES ($1, $2)
ON CONFLICT (person_name)
DO UPDATE SET assigned_number = EXCLUDED.assigned_number
RETURNING id;- Partition Key:
personName(String) - Attributes:
assignedNumber(String - note stored as string) - No automatic timestamps
- No constraints (allows overwrites)
Start a workflow:
curl -X POST http://localhost:8000/v1/execution \
-H "Content-Type: application/json" \
-d '{
"input": "{\"personName\": \"Alice\", \"baseNumber\": \"50\", \"additionalNumber\": \"75\"}",
"name": "test-execution-001",
"stateMachineArn": "arn:aws:states:us-east-2:<AWS_Account_ID>:stateMachine:state-machine-test"
}'Response:
{
"workflow_id": "person-number-uuid",
"run_id": "temporal-run-id",
"status": "started"
}Synchronous (waits for result):
cd temporal-implementation
source venv/bin/activate
python client.py --person-name Alice --base-number 50 --additional-number 75Asynchronous (returns immediately):
python client.py --person-name Alice --base-number 50 --additional-number 75 --asyncTemporal Web UI:
open http://localhost:8233Database Query:
psql -d temporal_migration_test -c "SELECT * FROM persons ORDER BY created_at DESC LIMIT 10;"cd temporal-implementation
source venv/bin/activate
# Run all tests
pytest
# Run specific test suites
pytest tests/test_db_operations.py # Database tests (9 tests)
pytest tests/test_activities.py # Activity unit tests (11 tests)
pytest tests/test_activities_integration.py # Integration tests (6 tests)Test Coverage:
- Database operations: CRUD, UPSERT, error handling
- Activity logic: Input validation, number conversion, idempotency
- Integration: Full workflow execution with real database
cd temporal-implementation
python test_api.pyTests:
- Health check endpoint
- Valid workflow execution
- Input validation (empty names, invalid numbers)
- Missing required fields
cd /path/to/project/root
python test_error_scenarios.pyTests:
- Invalid inputs (missing fields, non-numeric values)
- Duplicate person names (UPSERT behavior)
- Activity 2 when record doesn't exist
- Retry behavior verification
Results:
- 6/7 tests pass
- 1 inconclusive (retry history parsing)
With API (mimics AWS API_Testing.py):
python API_Testing.py- Reads all names from
names.txt - Generates random numbers for each
- Starts 124+ workflows via REST API
- Saves results to
results.json - Execution time: ~0.5 seconds (all start in parallel)
- Completion time: ~35 seconds (longest workflow + overhead)
Direct Temporal (no API):
python test_temporal_workflow.py- Similar to API_Testing.py but calls Temporal client directly
- Saves results to
temporal_results.json
From AWS Console Step Functions, choose Start Execution with:
{
"personName": "Harold",
"baseNumber": "23",
"additionalNumber": "17"
}Expected outcome:
- Initial write:
{personName: "Harold", assignedNumber: "23"} - After 30s:
{personName: "Harold", assignedNumber: "40"}
See temporal-implementation/api/README.md for complete API documentation including:
- Endpoint specifications
- Request/response formats
- Error codes
- Testing examples
- Monitoring tips
Quick Reference:
| Endpoint | Method | Purpose |
|---|---|---|
/v1/execution |
POST | Start workflow |
/health |
GET | Health check |
/ |
GET | API info |
/docs |
GET | Swagger UI |
| Aspect | AWS | Temporal |
|---|---|---|
| Orchestration | Step Functions | Temporal Workflows |
| Compute | Lambda Functions | Temporal Activities |
| Database | DynamoDB | PostgreSQL |
| Data Types | Strings | Proper types (INT) |
| Retry Logic | Step Function retry | Temporal retry policies |
| API | API Gateway | FastAPI |
| Execution | Sequential (limited) | Parallel (unlimited) |
| Cost | Pay per execution | Self-hosted (free dev) |
| Local Dev | Limited | Full local stack |
| Observability | CloudWatch | Temporal Web UI |
Test Suite Execution (126 workflows):
- AWS: ~1 hour (sequential due to API Gateway limits)
- Temporal: ~35 seconds (all workflows start in parallel)
- Speedup: ~103x faster
Development Benefits:
- Full local development environment
- No AWS costs for testing
- Instant feedback loop
- Better debugging with Temporal Web UI
All bugs in the original AWS Lambda functions have been fixed:
-
✅ Fixed step-one-test/lambda_function.py:
- Fixed critical typo:
xception→Exception - Added logging configuration:
import loggingandlogger = logging.getLogger() - Added comprehensive input validation (empty names, invalid numbers, missing fields)
- Enhanced error messages with specific details
- Added function documentation
- Fixed critical typo:
-
✅ Fixed step-two-test/lambda_function.py:
- Added logging configuration
- Added input validation
- Added check for record existence before calculation
- Enhanced error handling for DynamoDB operations
- Added function documentation
See AWS_LAMBDA_FIXES.md for detailed documentation of all fixes.
Symptom: Connection refused errors
Solution:
# Check if running
ps aux | grep "temporal server"
# Start if needed
temporal server start-devSymptom: Workflows stuck in "Running" state
Solution:
# Check worker
ps aux | grep "python worker.py"
# Start worker
cd temporal-implementation
source venv/bin/activate
python worker.pySymptom: psycopg2.OperationalError
Solution:
# Check PostgreSQL
brew services list | grep postgresql
# Start if needed
brew services start postgresql@14
# Verify connection
psql -d temporal_migration_test -c "SELECT 1"
# Check .env file has correct credentials
cat temporal-implementation/.envSymptom: Connection refused on port 8000
Solution:
# Check if API is running
curl http://localhost:8000/health
# Check port
lsof -ti:8000
# Start API
cd temporal-implementation
python run_api.pyCommon causes:
- Invalid input (empty person_name, non-numeric values)
- Database connection issue
- Record not found in Activity 2
Debug:
# Check worker logs
cd temporal-implementation
tail -f worker.log
# Check Temporal Web UI
open http://localhost:8233
# Query database
psql -d temporal_migration_test -c "SELECT * FROM persons WHERE person_name = 'YourName';"Solution:
# Ensure all services running
temporal server start-dev # Terminal 1
cd temporal-implementation && python worker.py # Terminal 2
cd temporal-implementation && python run_api.py # Terminal 3 (if testing API)
# Check database is empty/clean
psql -d temporal_migration_test -c "TRUNCATE TABLE persons;"
# Re-run tests
pytest.
├── README.md # This file
├── ARCHITECTURE.md # System architecture documentation
├── AWS_LAMBDA_FIXES.md # AWS Lambda bug fixes (Phase 7.3)
├── IMPLEMENTATION_NOTES.md # Complete implementation journal
├── .github/
│ └── copilot-instructions.md # AI assistant context
├── spec/
│ └── 001-spec.md # Migration specification
│
├── step-function-test.yaml # AWS Step Function definition
├── step-one-test/
│ └── lambda_function.py # AWS Lambda 1 (✅ bugs fixed)
├── step-two-test/
│ └── lambda_function.py # AWS Lambda 2 (✅ bugs fixed)
│
├── temporal-implementation/ # Temporal implementation
│ ├── .env.example # Environment template
│ ├── .env.template # Docker environment template
│ ├── .dockerignore # Docker build exclusions
│ ├── Dockerfile # Worker/API container image
│ ├── docker-compose.yml # Full stack deployment
│ ├── DOCKER.md # Docker deployment guide
│ ├── requirements.txt # Python dependencies
│ ├── worker.py # Temporal worker
│ ├── client.py # Workflow client CLI
│ ├── run_api.py # API server launcher
│ │
│ ├── workflows/ # Workflow definitions
│ │ ├── __init__.py
│ │ └── person_number_workflow.py # Main workflow
│ │
│ ├── activities/ # Activity implementations
│ │ ├── __init__.py
│ │ ├── ingest_data.py # Activity 1
│ │ └── calculate_update.py # Activity 2
│ │
│ ├── database/ # Database layer
│ │ ├── __init__.py
│ │ ├── 001_create_persons_table.sql # Schema migration
│ │ └── db_operations.py # Data access layer
│ │
│ ├── api/ # REST API
│ │ ├── __init__.py
│ │ ├── main.py # FastAPI app
│ │ └── README.md # API documentation
│ │
│ ├── config/ # Configuration
│ │ ├── __init__.py
│ │ ├── logging_config.py # Logging setup
│ │ ├── prometheus/ # Prometheus configuration
│ │ │ └── prometheus.yml # Metrics scraping config
│ │ └── grafana/ # Grafana configuration
│ │ ├── datasources/ # Datasource provisioning
│ │ └── dashboards/ # Dashboard provisioning
│ │
│ ├── scripts/ # Helper scripts
│ │ ├── docker.sh # Docker Compose helper
│ │ └── db-migrate.sh # Database migration helper
│ │
│ └── tests/ # Test suites
│ ├── __init__.py
│ ├── test_db_operations.py # Database tests (9 tests)
│ ├── test_activities.py # Activity unit tests (11 tests)
│ └── test_activities_integration.py # Integration tests (6 tests)
│
├── names.txt # Test data (126 names)
├── API_Testing.py # Test script (updated for Temporal)
├── test_temporal_workflow.py # Direct Temporal test
├── test_error_scenarios.py # Error handling tests
├── results.json # Test results
└── temporal_results.json # Temporal test results
For a complete, detailed account of the entire migration project, see IMPLEMENTATION_NOTES.md.
This comprehensive journal documents:
- Phase-by-phase implementation (Phases 1-7.3) with dates, durations, and completion status
- Technical decisions and rationale behind architectural choices
- Challenges encountered and solutions with code examples
- Test results and verification at each stage
- Performance metrics and comparison with AWS
- Lessons learned from AI-assisted development
- Future enhancement recommendations
Key Statistics:
- Project duration: 22 days (Feb 2-24, 2026)
- Lines of code: ~1,230 lines
- Lines of documentation: ~2,350 lines
- Test coverage: 165 test cases, 164 passed (99.4%)
- Performance improvement: 103x faster than AWS
- Temporal Documentation: https://docs.temporal.io
- Temporal Python SDK: https://github.com/temporalio/sdk-python
- FastAPI Documentation: https://fastapi.tiangolo.com
- PostgreSQL Documentation: https://www.postgresql.org/docs/
This is a test project for evaluating AI-assisted transformation from AWS to Temporal.
Migration Status: ✅ Complete
- Phase 1-6: Temporal implementation fully functional
- Phase 7.1: Comprehensive documentation
- Phase 7.2: Docker Compose deployment
- Phase 7.3: AWS Lambda bugs fixed
All bugs in the AWS Lambda functions have been resolved. See AWS_LAMBDA_FIXES.md for details.
For questions or issues, refer to:
- spec/001-spec.md - Migration specification
- ARCHITECTURE.md - System architecture
- temporal-implementation/DOCKER.md - Docker deployment
Test project - internal use only.