Skip to content

Commit 31d82d2

Browse files
committed
Update loader implementation docs to explain new shared test structure
1 parent bd57cee commit 31d82d2

File tree

1 file changed

+181
-45
lines changed

1 file changed

+181
-45
lines changed

docs/implementing_data_loaders.md

Lines changed: 181 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -305,71 +305,207 @@ def _get_table_metadata(self, table: pa.Table, duration: float, batch_count: int
305305

306306
## Testing
307307

308-
### Integration Test Structure
308+
### Generalized Test Infrastructure
309309

310-
Create integration tests in `tests/integration/test_{system}_loader.py`:
310+
The project uses a generalized test infrastructure that eliminates code duplication across loader tests. Instead of writing standalone tests for each loader, you inherit from shared base test classes.
311+
312+
### Architecture
313+
314+
```
315+
tests/integration/loaders/
316+
├── conftest.py # Base classes and fixtures
317+
├── test_base_loader.py # 7 core tests (all loaders inherit)
318+
├── test_base_streaming.py # 5 streaming tests (for loaders with reorg support)
319+
└── backends/
320+
├── test_postgresql.py # PostgreSQL-specific config + tests
321+
├── test_redis.py # Redis-specific config + tests
322+
└── test_example.py # Your loader tests here
323+
```
324+
325+
### Step 1: Create Configuration Fixture
326+
327+
Add your loader's configuration fixture to `tests/conftest.py`:
328+
329+
```python
330+
@pytest.fixture(scope='session')
331+
def example_test_config(request):
332+
"""Example loader configuration from testcontainer or environment"""
333+
# Use testcontainers for CI, or fall back to environment variables
334+
if TESTCONTAINERS_AVAILABLE and USE_TESTCONTAINERS:
335+
# Set up testcontainer (if applicable)
336+
example_container = request.getfixturevalue('example_container')
337+
return {
338+
'host': example_container.get_container_host_ip(),
339+
'port': example_container.get_exposed_port(5432),
340+
'database': 'test_db',
341+
'user': 'test_user',
342+
'password': 'test_pass',
343+
}
344+
else:
345+
# Fall back to environment variables
346+
return {
347+
'host': os.getenv('EXAMPLE_HOST', 'localhost'),
348+
'port': int(os.getenv('EXAMPLE_PORT', '5432')),
349+
'database': os.getenv('EXAMPLE_DB', 'test_db'),
350+
'user': os.getenv('EXAMPLE_USER', 'test_user'),
351+
'password': os.getenv('EXAMPLE_PASSWORD', 'test_pass'),
352+
}
353+
```
354+
355+
### Step 2: Create Test Configuration Class
356+
357+
Create `tests/integration/loaders/backends/test_example.py`:
311358

312359
```python
313-
# tests/integration/test_example_loader.py
360+
"""
361+
Example loader integration tests using generalized test infrastructure.
362+
"""
314363

364+
from typing import Any, Dict, List, Optional
315365
import pytest
316-
import pyarrow as pa
317-
from src.amp.loaders.base import LoadMode
366+
318367
from src.amp.loaders.implementations.example_loader import ExampleLoader
368+
from tests.integration.loaders.conftest import LoaderTestConfig
369+
from tests.integration.loaders.test_base_loader import BaseLoaderTests
370+
from tests.integration.loaders.test_base_streaming import BaseStreamingTests
371+
372+
373+
class ExampleTestConfig(LoaderTestConfig):
374+
"""Example-specific test configuration"""
375+
376+
loader_class = ExampleLoader
377+
config_fixture_name = 'example_test_config'
378+
379+
# Declare loader capabilities
380+
supports_overwrite = True
381+
supports_streaming = True # Set to False if no streaming support
382+
supports_multi_network = True # For blockchain loaders with reorg
383+
supports_null_values = True
384+
385+
def get_row_count(self, loader: ExampleLoader, table_name: str) -> int:
386+
"""Get row count from table"""
387+
# Implement using your loader's API
388+
return loader._connection.query(f"SELECT COUNT(*) FROM {table_name}")[0]['count']
389+
390+
def query_rows(
391+
self,
392+
loader: ExampleLoader,
393+
table_name: str,
394+
where: Optional[str] = None,
395+
order_by: Optional[str] = None
396+
) -> List[Dict[str, Any]]:
397+
"""Query rows from table"""
398+
query = f"SELECT * FROM {table_name}"
399+
if where:
400+
query += f" WHERE {where}"
401+
if order_by:
402+
query += f" ORDER BY {order_by}"
403+
return loader._connection.query(query)
404+
405+
def cleanup_table(self, loader: ExampleLoader, table_name: str) -> None:
406+
"""Drop table"""
407+
loader._connection.execute(f"DROP TABLE IF EXISTS {table_name}")
408+
409+
def get_column_names(self, loader: ExampleLoader, table_name: str) -> List[str]:
410+
"""Get column names from table"""
411+
result = loader._connection.query(
412+
f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table_name}'"
413+
)
414+
return [row['column_name'] for row in result]
319415

320-
@pytest.fixture
321-
def example_config():
322-
return {
323-
'host': 'localhost',
324-
'port': 5432,
325-
'database': 'test_db',
326-
'user': 'test_user',
327-
'password': 'test_pass'
328-
}
329416

330-
@pytest.fixture
331-
def test_data():
332-
return pa.Table.from_pydict({
333-
'id': [1, 2, 3],
334-
'name': ['a', 'b', 'c'],
335-
'value': [1.0, 2.0, 3.0]
336-
})
417+
# Core tests - ALL loaders must inherit these
418+
class TestExampleCore(BaseLoaderTests):
419+
"""Inherits 7 core tests: connection, context manager, batching, modes, null handling, errors"""
420+
config = ExampleTestConfig()
421+
337422

423+
# Streaming tests - Only for loaders with streaming/reorg support
424+
class TestExampleStreaming(BaseStreamingTests):
425+
"""Inherits 5 streaming tests: metadata columns, reorg deletion, overlapping ranges, multi-network, microbatch dedup"""
426+
config = ExampleTestConfig()
427+
428+
429+
# Loader-specific tests
338430
@pytest.mark.integration
339431
@pytest.mark.example
340-
class TestExampleLoaderIntegration:
341-
def test_connection(self, example_config):
342-
loader = ExampleLoader(example_config)
343-
344-
loader.connect()
345-
assert loader.is_connected
346-
347-
loader.disconnect()
348-
assert not loader.is_connected
349-
350-
def test_basic_loading(self, example_config, test_data):
351-
loader = ExampleLoader(example_config)
352-
432+
class TestExampleSpecific:
433+
"""Example-specific functionality tests"""
434+
config = ExampleTestConfig()
435+
436+
def test_custom_feature(self, loader, test_table_name, cleanup_tables):
437+
"""Test example-specific functionality"""
438+
cleanup_tables.append(test_table_name)
439+
353440
with loader:
354-
result = loader.load_table(test_data, 'test_table')
355-
441+
# Test your loader's unique features
442+
result = loader.some_custom_method(test_table_name)
356443
assert result.success
357-
assert result.rows_loaded == 3
358-
assert result.metadata['operation'] == 'load_table'
359-
assert result.metadata['batches_processed'] > 0
444+
```
445+
446+
### What You Get Automatically
447+
448+
By inheriting from the base test classes, you automatically get:
449+
450+
**From `BaseLoaderTests` (7 core tests):**
451+
- `test_connection` - Connection establishment and disconnection
452+
- `test_context_manager` - Context manager functionality
453+
- `test_batch_loading` - Basic batch loading
454+
- `test_append_mode` - Append mode operations
455+
- `test_overwrite_mode` - Overwrite mode operations
456+
- `test_null_handling` - Null value handling
457+
- `test_error_handling` - Error scenarios
458+
459+
**From `BaseStreamingTests` (5 streaming tests):**
460+
- `test_streaming_metadata_columns` - Metadata column creation
461+
- `test_reorg_deletion` - Blockchain reorganization handling
462+
- `test_reorg_overlapping_ranges` - Overlapping range invalidation
463+
- `test_reorg_multi_network` - Multi-network reorg isolation
464+
- `test_microbatch_deduplication` - Microbatch duplicate detection
465+
466+
### Required LoaderTestConfig Methods
467+
468+
You must implement these four methods in your `LoaderTestConfig` subclass:
469+
470+
```python
471+
def get_row_count(self, loader, table_name: str) -> int:
472+
"""Return number of rows in table"""
473+
474+
def query_rows(self, loader, table_name: str, where=None, order_by=None) -> List[Dict]:
475+
"""Query and return rows as list of dicts"""
476+
477+
def cleanup_table(self, loader, table_name: str) -> None:
478+
"""Drop/delete the table"""
479+
480+
def get_column_names(self, loader, table_name: str) -> List[str]:
481+
"""Return list of column names"""
482+
```
483+
484+
### Capability Flags
485+
486+
Set these flags in your `LoaderTestConfig` to control which tests run:
487+
488+
```python
489+
supports_overwrite = True # Can overwrite existing data
490+
supports_streaming = True # Supports streaming with metadata
491+
supports_multi_network = True # Supports multi-network isolation (blockchain loaders)
492+
supports_null_values = True # Handles NULL values correctly
360493
```
361494

362495
### Running Tests
363496

364497
```bash
365-
# Run all integration tests
366-
make test-integration
498+
# Run all tests for your loader
499+
uv run pytest tests/integration/loaders/backends/test_example.py -v
500+
501+
# Run only core tests
502+
uv run pytest tests/integration/loaders/backends/test_example.py::TestExampleCore -v
367503

368-
# Run specific loader tests
369-
make test-example
504+
# Run only streaming tests
505+
uv run pytest tests/integration/loaders/backends/test_example.py::TestExampleStreaming -v
370506

371-
# Run with environment variables
372-
uv run --env-file .test.env pytest tests/integration/test_example_loader.py -v
507+
# Run specific test
508+
uv run pytest tests/integration/loaders/backends/test_example.py::TestExampleCore::test_connection -v
373509
```
374510

375511
## Best Practices

0 commit comments

Comments
 (0)