- The repo uses
uv. You'll want to installuvand then you canuv syncor utilize theinstallmake command. - You'll need to install the pre-commits, make sure that you have your environment active in the terminal by utilizing
source .venv/bin/activate. Then issue the command:
uv run -- pre-commit install --install-hooks - Setup any wanted configuration. For development work you need to prefix the environment variables from the
src/settings.pyfile withDEV_in your.envfile. Reference.env.examplefor examples.
To develop and test with cloud storage providers and secret managers, you'll need to set up authentication credentials. These are only needed for development - production environments use IAM roles, Managed Identity, or Application Default Credentials.
For AWS Secrets Manager and S3 access:
DEV_AWS_ACCESS_KEY_ID=your-aws-access-key-id
DEV_AWS_SECRET_ACCESS_KEY=your-aws-secret-access-key
DEV_AWS_SESSION_TOKEN=your-session-token # Optional, for temporary credentials
DEV_AWS_REGION=us-east-1
For Azure Key Vault and Blob Storage access:
DEV_AZURE_CLIENT_ID=your-azure-client-id
DEV_AZURE_CLIENT_SECRET=your-azure-client-secret
DEV_AZURE_TENANT_ID=your-azure-tenant-id
DEV_AZURE_KEY_VAULT_URL=https://your-vault.vault.azure.net/
For GCP Secret Manager and GCS access:
DEV_GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
DEV_GOOGLE_CLOUD_PROJECT=your-gcp-project-id
To add support for a new file type (e.g., TSV, XML, Avro), you need to:
-
Create a Source class in
src/sources/base.py:- Extend
DataSourceand add any file-type-specific configuration fields - Example:
CSVSourcehasdelimiter,encoding, andskip_rowsfields - Example:
ExcelSourcehassheet_nameandskip_rowsfields
- Extend
-
Create a Reader class in
src/pipeline/read/:- Extend
BaseReaderfromsrc/pipeline/read/base.py - Set
SOURCE_TYPEclass variable to your new Source class - Implement the
read()method that:- Validates headers exist and match required fields
- Streams data from the file in batches
- Yields batches of dictionaries (one dict per row/record)
- Implement the
starting_row_numberproperty (for error reporting) - Override
_get_file_stream()if you need custom file handling (e.g., text encoding) - See
src/pipeline/read/csv.pyorsrc/pipeline/read/excel.pyfor examples
- Extend
-
Register the Reader in
src/pipeline/read/factory.py:- Add your file extension(s) to the
_readersdictionary - Map the extension to your Reader class
- Example:
".tsv": TSVReaderor".xml": XMLReader
- Add your file extension(s) to the
-
Update
ReaderFactory.create_reader()if needed:- Add any new source configuration fields to the
includeset inreader_kwargs - This ensures those fields are passed to your Reader's
__init__method
- Add any new source configuration fields to the
-
Test your implementation:
- Create test files in
src/tests/ - Ensure your Reader handles:
- Missing headers
- Missing required columns
- Empty files
- Cloud storage (S3, GCS, Azure) if applicable
- Gzip compression if applicable
- Create test files in
-
Add in Tests:
-
Create fixture data in
src/tests/fixtures/<filetype>_files.py:- Define test data constants as lists/dicts representing file contents
- Include scenarios: blank headers, duplicates, validation errors, missing columns, failed audits, no data
- See
src/tests/fixtures/csv_files.pyorsrc/tests/fixtures/excel_files.pyfor examples
-
Create test source configuration in
src/tests/fixtures/sources.py:- Define a
TEST_<FILETYPE>_SOURCEusing your new Source class - Include a test
TableModelwith appropriate fields - Add it to the
test_sourceslist inconftest.py'ssetup_test_db_and_directoriesfixture
- Define a
-
Create file creation fixture in
src/tests/conftest.py:- Add a
create_<filetype>_filefixture function - It should accept
file_nameanddataparameters - Create the file in
session_temp_dirusing your file format's library - Clean up files after tests (see
create_csv_fileorcreate_excel_filefor patterns)
- Add a
-
Create test file
src/tests/test_<filetype>.py:- Test blank/missing headers → should raise
MissingHeaderError - Test duplicate grain → should raise
GrainValidationError - Test validation errors exceeding threshold → should raise
ValidationThresholdExceededError - Test validation errors below threshold → should succeed but create DLQ records
- Test missing required columns → should raise
MissingColumnsError - Test failed audit → should raise
AuditFailedError - Test no data → should raise
NoDataInFileError(if applicable) - Test successful processing → should return
(True, filename, None) - Test DLQ records are created correctly for validation errors
- Test file-type specific features (e.g., Excel date conversion, JSON array paths)
- Use the pattern:
test_processor.results.clear(), process file, assert results - See
src/tests/test_csv.pyorsrc/tests/test_excel.pyfor complete examples
- Test blank/missing headers → should raise
-
Test gzip support (if applicable):
- Create
create_<filetype>_gz_filefixture inconftest.py - Add test in
src/tests/test_gzip.pyto verify compressed files work
- Create
-
Run tests:
- Utilize
make test - Tests are pretty fast right now
- Utilize
-
To add support for a new database (e.g., Oracle, Snowflake, Redshift), you need to:
-
Update SQLAlchemy connection in
src/process/db.py:- Ensure SQLAlchemy supports your database (may need additional driver)
- Update
setup_db()if needed for database-specific setup - Add any required type adapters (see
_register_pendulum_adapters()for examples) - Update
SUPPORTED_DATABASE_DRIVERSinsrc/settings.pyso that the DRIVERNAME is properly extracted from the database url.
-
Create a Writer class in
src/pipeline/write/:- Extend
BaseWriterfromsrc/pipeline/write/base.py - Override
__init__()if you need database-specific initialization - Override
_convert_record()if you need database-specific type conversions - Override
write()if you need custom batch insertion logic (e.g., BigQuery uses bulk loading) - See
src/pipeline/write/postgresql.pyfor a simple example - See
src/pipeline/write/bigquery.pyfor a complex example with custom write logic
- Extend
-
Create an Auditor class in
src/pipeline/audit/:- Extend
BaseAuditorfromsrc/pipeline/audit/base.py - Override
_check_grain_uniqueness()if your database has different SQL syntax - Override
_execute_audit_query()if your database has different SQL syntax - See
src/pipeline/audit/postgresql.pyfor a standard example - See
src/pipeline/audit/bigquery.pyfor an example with custom SQL (FARM_FINGERPRINT)
- Extend
-
Create a Publisher class in
src/pipeline/publish/:- Extend
BasePublisherfromsrc/pipeline/publish/base.py - Override
create_publish_sql()if your database uses different MERGE/UPSERT syntax - Override
publish()if you need custom merge logic - See
src/pipeline/publish/postgresql.pyfor a standard example - See
src/pipeline/publish/bigquery.pyfor an example with custom SQL
- Extend
-
Register all three classes in their respective factories:
- Add to
WriterFactory._writersinsrc/pipeline/write/factory.py - Add to
AuditorFactory._auditorsinsrc/pipeline/audit/factory.py - Add to
PublisherFactory._publishersinsrc/pipeline/publish/factory.py - Use the database driver name from
config.DRIVERNAMEas the key
- Add to
-
Update database-specific utilities in
src/pipeline/db_utils.pyif needed:_get_timezone_aware_datetime_type()- for datetime column typesdb_serialize_json_for_dlq_table()- for JSON serialization in DLQdb_start_log()- for ID generation (BigQuery doesn't support auto-increment)
-
Update table creation in
src/process/db.pyif needed:- Check
create_tables()for any database-specific table definitions - Update
file_load_dlqorfile_load_logtable creation if needed
- Check
-
Test your implementation:
- Test with sample data files
- Add in your own Make command in the Makefile for manual dev test runs.
- Verify:
- Table creation works correctly
- Batch inserts work efficiently
- Grain validation queries work
- Merge/publish operations work correctly
- Error handling and DLQ insertion work