@@ -305,71 +305,207 @@ def _get_table_metadata(self, table: pa.Table, duration: float, batch_count: int
305305
306306## Testing
307307
308- ### Integration Test Structure
308+ ### Generalized Test Infrastructure
309309
310- Create integration tests in ` tests/integration/test_{system}_loader.py ` :
310+ The project uses a generalized test infrastructure that eliminates code duplication across loader tests. Instead of writing standalone tests for each loader, you inherit from shared base test classes.
311+
312+ ### Architecture
313+
314+ ```
315+ tests/integration/loaders/
316+ ├── conftest.py # Base classes and fixtures
317+ ├── test_base_loader.py # 7 core tests (all loaders inherit)
318+ ├── test_base_streaming.py # 5 streaming tests (for loaders with reorg support)
319+ └── backends/
320+ ├── test_postgresql.py # PostgreSQL-specific config + tests
321+ ├── test_redis.py # Redis-specific config + tests
322+ └── test_example.py # Your loader tests here
323+ ```
324+
325+ ### Step 1: Create Configuration Fixture
326+
327+ Add your loader's configuration fixture to ` tests/conftest.py ` :
328+
329+ ``` python
330+ @pytest.fixture (scope = ' session' )
331+ def example_test_config (request ):
332+ """ Example loader configuration from testcontainer or environment"""
333+ # Use testcontainers for CI, or fall back to environment variables
334+ if TESTCONTAINERS_AVAILABLE and USE_TESTCONTAINERS :
335+ # Set up testcontainer (if applicable)
336+ example_container = request.getfixturevalue(' example_container' )
337+ return {
338+ ' host' : example_container.get_container_host_ip(),
339+ ' port' : example_container.get_exposed_port(5432 ),
340+ ' database' : ' test_db' ,
341+ ' user' : ' test_user' ,
342+ ' password' : ' test_pass' ,
343+ }
344+ else :
345+ # Fall back to environment variables
346+ return {
347+ ' host' : os.getenv(' EXAMPLE_HOST' , ' localhost' ),
348+ ' port' : int (os.getenv(' EXAMPLE_PORT' , ' 5432' )),
349+ ' database' : os.getenv(' EXAMPLE_DB' , ' test_db' ),
350+ ' user' : os.getenv(' EXAMPLE_USER' , ' test_user' ),
351+ ' password' : os.getenv(' EXAMPLE_PASSWORD' , ' test_pass' ),
352+ }
353+ ```
354+
355+ ### Step 2: Create Test Configuration Class
356+
357+ Create ` tests/integration/loaders/backends/test_example.py ` :
311358
312359``` python
313- # tests/integration/test_example_loader.py
360+ """
361+ Example loader integration tests using generalized test infrastructure.
362+ """
314363
364+ from typing import Any, Dict, List, Optional
315365import pytest
316- import pyarrow as pa
317- from src.amp.loaders.base import LoadMode
366+
318367from src.amp.loaders.implementations.example_loader import ExampleLoader
368+ from tests.integration.loaders.conftest import LoaderTestConfig
369+ from tests.integration.loaders.test_base_loader import BaseLoaderTests
370+ from tests.integration.loaders.test_base_streaming import BaseStreamingTests
371+
372+
373+ class ExampleTestConfig (LoaderTestConfig ):
374+ """ Example-specific test configuration"""
375+
376+ loader_class = ExampleLoader
377+ config_fixture_name = ' example_test_config'
378+
379+ # Declare loader capabilities
380+ supports_overwrite = True
381+ supports_streaming = True # Set to False if no streaming support
382+ supports_multi_network = True # For blockchain loaders with reorg
383+ supports_null_values = True
384+
385+ def get_row_count (self , loader : ExampleLoader, table_name : str ) -> int :
386+ """ Get row count from table"""
387+ # Implement using your loader's API
388+ return loader._connection.query(f " SELECT COUNT(*) FROM { table_name} " )[0 ][' count' ]
389+
390+ def query_rows (
391+ self ,
392+ loader : ExampleLoader,
393+ table_name : str ,
394+ where : Optional[str ] = None ,
395+ order_by : Optional[str ] = None
396+ ) -> List[Dict[str , Any]]:
397+ """ Query rows from table"""
398+ query = f " SELECT * FROM { table_name} "
399+ if where:
400+ query += f " WHERE { where} "
401+ if order_by:
402+ query += f " ORDER BY { order_by} "
403+ return loader._connection.query(query)
404+
405+ def cleanup_table (self , loader : ExampleLoader, table_name : str ) -> None :
406+ """ Drop table"""
407+ loader._connection.execute(f " DROP TABLE IF EXISTS { table_name} " )
408+
409+ def get_column_names (self , loader : ExampleLoader, table_name : str ) -> List[str ]:
410+ """ Get column names from table"""
411+ result = loader._connection.query(
412+ f " SELECT column_name FROM information_schema.columns WHERE table_name = ' { table_name} ' "
413+ )
414+ return [row[' column_name' ] for row in result]
319415
320- @pytest.fixture
321- def example_config ():
322- return {
323- ' host' : ' localhost' ,
324- ' port' : 5432 ,
325- ' database' : ' test_db' ,
326- ' user' : ' test_user' ,
327- ' password' : ' test_pass'
328- }
329416
330- @pytest.fixture
331- def test_data ():
332- return pa.Table.from_pydict({
333- ' id' : [1 , 2 , 3 ],
334- ' name' : [' a' , ' b' , ' c' ],
335- ' value' : [1.0 , 2.0 , 3.0 ]
336- })
417+ # Core tests - ALL loaders must inherit these
418+ class TestExampleCore (BaseLoaderTests ):
419+ """ Inherits 7 core tests: connection, context manager, batching, modes, null handling, errors"""
420+ config = ExampleTestConfig()
421+
337422
423+ # Streaming tests - Only for loaders with streaming/reorg support
424+ class TestExampleStreaming (BaseStreamingTests ):
425+ """ Inherits 5 streaming tests: metadata columns, reorg deletion, overlapping ranges, multi-network, microbatch dedup"""
426+ config = ExampleTestConfig()
427+
428+
429+ # Loader-specific tests
338430@pytest.mark.integration
339431@pytest.mark.example
340- class TestExampleLoaderIntegration :
341- def test_connection (self , example_config ):
342- loader = ExampleLoader(example_config)
343-
344- loader.connect()
345- assert loader.is_connected
346-
347- loader.disconnect()
348- assert not loader.is_connected
349-
350- def test_basic_loading (self , example_config , test_data ):
351- loader = ExampleLoader(example_config)
352-
432+ class TestExampleSpecific :
433+ """ Example-specific functionality tests"""
434+ config = ExampleTestConfig()
435+
436+ def test_custom_feature (self , loader , test_table_name , cleanup_tables ):
437+ """ Test example-specific functionality"""
438+ cleanup_tables.append(test_table_name)
439+
353440 with loader:
354- result = loader.load_table(test_data, ' test_table ' )
355-
441+ # Test your loader's unique features
442+ result = loader.some_custom_method(test_table_name)
356443 assert result.success
357- assert result.rows_loaded == 3
358- assert result.metadata[' operation' ] == ' load_table'
359- assert result.metadata[' batches_processed' ] > 0
444+ ```
445+
446+ ### What You Get Automatically
447+
448+ By inheriting from the base test classes, you automatically get:
449+
450+ ** From ` BaseLoaderTests ` (7 core tests):**
451+ - ` test_connection ` - Connection establishment and disconnection
452+ - ` test_context_manager ` - Context manager functionality
453+ - ` test_batch_loading ` - Basic batch loading
454+ - ` test_append_mode ` - Append mode operations
455+ - ` test_overwrite_mode ` - Overwrite mode operations
456+ - ` test_null_handling ` - Null value handling
457+ - ` test_error_handling ` - Error scenarios
458+
459+ ** From ` BaseStreamingTests ` (5 streaming tests):**
460+ - ` test_streaming_metadata_columns ` - Metadata column creation
461+ - ` test_reorg_deletion ` - Blockchain reorganization handling
462+ - ` test_reorg_overlapping_ranges ` - Overlapping range invalidation
463+ - ` test_reorg_multi_network ` - Multi-network reorg isolation
464+ - ` test_microbatch_deduplication ` - Microbatch duplicate detection
465+
466+ ### Required LoaderTestConfig Methods
467+
468+ You must implement these four methods in your ` LoaderTestConfig ` subclass:
469+
470+ ``` python
471+ def get_row_count (self , loader , table_name : str ) -> int :
472+ """ Return number of rows in table"""
473+
474+ def query_rows (self , loader , table_name : str , where = None , order_by = None ) -> List[Dict]:
475+ """ Query and return rows as list of dicts"""
476+
477+ def cleanup_table (self , loader , table_name : str ) -> None :
478+ """ Drop/delete the table"""
479+
480+ def get_column_names (self , loader , table_name : str ) -> List[str ]:
481+ """ Return list of column names"""
482+ ```
483+
484+ ### Capability Flags
485+
486+ Set these flags in your ` LoaderTestConfig ` to control which tests run:
487+
488+ ``` python
489+ supports_overwrite = True # Can overwrite existing data
490+ supports_streaming = True # Supports streaming with metadata
491+ supports_multi_network = True # Supports multi-network isolation (blockchain loaders)
492+ supports_null_values = True # Handles NULL values correctly
360493```
361494
362495### Running Tests
363496
364497``` bash
365- # Run all integration tests
366- make test-integration
498+ # Run all tests for your loader
499+ uv run pytest tests/integration/loaders/backends/test_example.py -v
500+
501+ # Run only core tests
502+ uv run pytest tests/integration/loaders/backends/test_example.py::TestExampleCore -v
367503
368- # Run specific loader tests
369- make test-example
504+ # Run only streaming tests
505+ uv run pytest tests/integration/loaders/backends/test_example.py::TestExampleStreaming -v
370506
371- # Run with environment variables
372- uv run --env-file .test.env pytest tests/integration/test_example_loader .py -v
507+ # Run specific test
508+ uv run pytest tests/integration/loaders/backends/test_example .py::TestExampleCore::test_connection -v
373509```
374510
375511## Best Practices
0 commit comments