Skip to content

Commit 70f7cd2

Browse files
committed
Add generalized tests for DeltaLake, Iceberg, and LMDB loaders
Migrates the final three loader test suites to use the shared base test infrastructure
1 parent 42f0c07 commit 70f7cd2

File tree

3 files changed

+941
-0
lines changed

3 files changed

+941
-0
lines changed
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
"""
2+
DeltaLake-specific loader integration tests.
3+
4+
This module provides DeltaLake-specific test configuration and tests that
5+
inherit from the generalized base test classes.
6+
"""
7+
8+
from pathlib import Path
9+
from typing import Any, Dict, List, Optional
10+
11+
import pytest
12+
13+
try:
14+
from deltalake import DeltaTable
15+
16+
from src.amp.loaders.implementations.deltalake_loader import DeltaLakeLoader
17+
from tests.integration.loaders.conftest import LoaderTestConfig
18+
from tests.integration.loaders.test_base_loader import BaseLoaderTests
19+
from tests.integration.loaders.test_base_streaming import BaseStreamingTests
20+
except ImportError:
21+
pytest.skip('amp modules not available', allow_module_level=True)
22+
23+
24+
class DeltaLakeTestConfig(LoaderTestConfig):
25+
"""DeltaLake-specific test configuration"""
26+
27+
loader_class = DeltaLakeLoader
28+
config_fixture_name = 'delta_basic_config'
29+
30+
supports_overwrite = True
31+
supports_streaming = True
32+
supports_multi_network = True
33+
supports_null_values = True
34+
35+
def get_row_count(self, loader: DeltaLakeLoader, table_name: str) -> int:
36+
"""Get row count from DeltaLake table"""
37+
# DeltaLake uses the table_path as the identifier
38+
table_path = loader.config.table_path
39+
dt = DeltaTable(table_path)
40+
df = dt.to_pyarrow_table()
41+
return len(df)
42+
43+
def query_rows(
44+
self, loader: DeltaLakeLoader, table_name: str, where: Optional[str] = None, order_by: Optional[str] = None
45+
) -> List[Dict[str, Any]]:
46+
"""Query rows from DeltaLake table"""
47+
table_path = loader.config.table_path
48+
dt = DeltaTable(table_path)
49+
df = dt.to_pyarrow_table()
50+
51+
# Convert to list of dicts (simple implementation, no filtering)
52+
result = []
53+
for i in range(min(100, len(df))):
54+
row = {col: df[col][i].as_py() for col in df.column_names}
55+
result.append(row)
56+
return result
57+
58+
def cleanup_table(self, loader: DeltaLakeLoader, table_name: str) -> None:
59+
"""Delete DeltaLake table directory"""
60+
import shutil
61+
62+
table_path = loader.config.table_path
63+
if Path(table_path).exists():
64+
shutil.rmtree(table_path, ignore_errors=True)
65+
66+
def get_column_names(self, loader: DeltaLakeLoader, table_name: str) -> List[str]:
67+
"""Get column names from DeltaLake table"""
68+
table_path = loader.config.table_path
69+
dt = DeltaTable(table_path)
70+
schema = dt.schema()
71+
return [field.name for field in schema.fields]
72+
73+
74+
@pytest.mark.delta_lake
75+
class TestDeltaLakeCore(BaseLoaderTests):
76+
"""DeltaLake core loader tests (inherited from base)"""
77+
78+
config = DeltaLakeTestConfig()
79+
80+
81+
@pytest.mark.delta_lake
82+
class TestDeltaLakeStreaming(BaseStreamingTests):
83+
"""DeltaLake streaming tests (inherited from base)"""
84+
85+
config = DeltaLakeTestConfig()
86+
87+
88+
@pytest.mark.delta_lake
89+
class TestDeltaLakeSpecific:
90+
"""DeltaLake-specific tests that cannot be generalized"""
91+
92+
def test_partitioning(self, delta_partitioned_config, small_test_data):
93+
"""Test DeltaLake partitioning functionality"""
94+
import pyarrow as pa
95+
96+
loader = DeltaLakeLoader(delta_partitioned_config)
97+
98+
# Verify partitioning is configured
99+
assert loader.partition_by == ['year', 'month', 'day']
100+
101+
with loader:
102+
result = loader.load_table(small_test_data, 'test_table')
103+
assert result.success == True
104+
assert result.rows_loaded == 5
105+
106+
# Verify partitions were created
107+
dt = DeltaTable(loader.config.table_path)
108+
files = dt.file_uris()
109+
# Partitioned tables create subdirectories
110+
assert len(files) > 0
111+
112+
def test_optimization_operations(self, delta_basic_config, comprehensive_test_data):
113+
"""Test DeltaLake OPTIMIZE operations"""
114+
loader = DeltaLakeLoader(delta_basic_config)
115+
116+
with loader:
117+
# Load data
118+
result = loader.load_table(comprehensive_test_data, 'test_table')
119+
assert result.success == True
120+
121+
# DeltaLake may auto-optimize if configured
122+
dt = DeltaTable(loader.config.table_path)
123+
version = dt.version()
124+
assert version >= 0
125+
126+
# Verify table can be read after optimization
127+
df = dt.to_pyarrow_table()
128+
assert len(df) == 1000
129+
130+
def test_schema_evolution(self, delta_basic_config, small_test_data):
131+
"""Test DeltaLake schema evolution"""
132+
import pyarrow as pa
133+
134+
loader = DeltaLakeLoader(delta_basic_config)
135+
136+
with loader:
137+
# Load initial data
138+
result = loader.load_table(small_test_data, 'test_table')
139+
assert result.success == True
140+
141+
# Add new column to schema
142+
extended_data = {
143+
**{col: small_test_data[col].to_pylist() for col in small_test_data.column_names},
144+
'new_column': [100, 200, 300, 400, 500],
145+
}
146+
extended_table = pa.Table.from_pydict(extended_data)
147+
148+
# Load with new schema (if schema evolution enabled)
149+
from src.amp.loaders.base import LoadMode
150+
151+
result2 = loader.load_table(extended_table, 'test_table', mode=LoadMode.APPEND)
152+
153+
# Result depends on merge_schema configuration
154+
dt = DeltaTable(loader.config.table_path)
155+
schema = dt.schema()
156+
# New column may or may not be present depending on config
157+
assert len(schema.fields) >= len(small_test_data.schema)
158+
159+
def test_table_history(self, delta_basic_config, small_test_data):
160+
"""Test DeltaLake table version history"""
161+
from src.amp.loaders.base import LoadMode
162+
163+
loader = DeltaLakeLoader(delta_basic_config)
164+
165+
with loader:
166+
# Load initial data
167+
loader.load_table(small_test_data, 'test_table')
168+
169+
# Append more data
170+
loader.load_table(small_test_data, 'test_table', mode=LoadMode.APPEND)
171+
172+
# Check version history
173+
dt = DeltaTable(loader.config.table_path)
174+
version = dt.version()
175+
assert version >= 1 # At least 2 operations (create + append)
176+
177+
# Verify history is accessible
178+
history = dt.history()
179+
assert len(history) >= 1
180+
181+
def test_metadata_completeness(self, delta_basic_config, comprehensive_test_data):
182+
"""Test DeltaLake metadata in load results"""
183+
loader = DeltaLakeLoader(delta_basic_config)
184+
185+
with loader:
186+
result = loader.load_table(comprehensive_test_data, 'test_table')
187+
188+
assert result.success == True
189+
assert 'delta_version' in result.metadata
190+
assert 'files_added' in result.metadata
191+
assert result.metadata['delta_version'] >= 0
192+
193+
def test_query_operations(self, delta_basic_config, comprehensive_test_data):
194+
"""Test querying DeltaLake tables"""
195+
loader = DeltaLakeLoader(delta_basic_config)
196+
197+
with loader:
198+
result = loader.load_table(comprehensive_test_data, 'test_table')
199+
assert result.success == True
200+
201+
# Query the table
202+
dt = DeltaTable(loader.config.table_path)
203+
df = dt.to_pyarrow_table()
204+
205+
# Verify data integrity
206+
assert len(df) == 1000
207+
assert 'id' in df.column_names
208+
assert 'user_id' in df.column_names
209+
210+
def test_file_size_calculation(self, delta_basic_config, comprehensive_test_data):
211+
"""Test file size calculation for DeltaLake tables"""
212+
loader = DeltaLakeLoader(delta_basic_config)
213+
214+
with loader:
215+
result = loader.load_table(comprehensive_test_data, 'test_table')
216+
assert result.success == True
217+
218+
# Get table size
219+
dt = DeltaTable(loader.config.table_path)
220+
files = dt.file_uris()
221+
assert len(files) > 0
222+
223+
# Calculate total size
224+
total_size = 0
225+
for file_uri in files:
226+
# Remove file:// prefix if present
227+
file_path = file_uri.replace('file://', '')
228+
if Path(file_path).exists():
229+
total_size += Path(file_path).stat().st_size
230+
231+
assert total_size > 0
232+
233+
def test_concurrent_operations_safety(self, delta_basic_config, small_test_data):
234+
"""Test that DeltaLake handles concurrent operations safely"""
235+
from concurrent.futures import ThreadPoolExecutor, as_completed
236+
237+
from src.amp.loaders.base import LoadMode
238+
239+
loader = DeltaLakeLoader(delta_basic_config)
240+
241+
with loader:
242+
# Load initial data
243+
loader.load_table(small_test_data, 'test_table')
244+
245+
# Try concurrent appends
246+
def append_data(i):
247+
return loader.load_table(small_test_data, 'test_table', mode=LoadMode.APPEND)
248+
249+
with ThreadPoolExecutor(max_workers=3) as executor:
250+
futures = [executor.submit(append_data, i) for i in range(3)]
251+
results = [future.result() for future in as_completed(futures)]
252+
253+
# All operations should succeed
254+
assert all(r.success for r in results)
255+
256+
# Verify final row count
257+
dt = DeltaTable(loader.config.table_path)
258+
df = dt.to_pyarrow_table()
259+
assert len(df) == 20 # 5 initial + 3 * 5 appends
260+
261+
262+
@pytest.mark.delta_lake
263+
@pytest.mark.slow
264+
class TestDeltaLakePerformance:
265+
"""DeltaLake performance tests"""
266+
267+
def test_large_data_loading(self, delta_basic_config):
268+
"""Test loading large datasets to DeltaLake"""
269+
import pyarrow as pa
270+
271+
# Create large dataset
272+
large_data = {
273+
'id': list(range(50000)),
274+
'value': [i * 0.123 for i in range(50000)],
275+
'category': [f'cat_{i % 100}' for i in range(50000)],
276+
'year': [2024 if i < 40000 else 2023 for i in range(50000)],
277+
'month': [(i // 100) % 12 + 1 for i in range(50000)],
278+
'day': [(i // 10) % 28 + 1 for i in range(50000)],
279+
}
280+
large_table = pa.Table.from_pydict(large_data)
281+
282+
loader = DeltaLakeLoader(delta_basic_config)
283+
284+
with loader:
285+
result = loader.load_table(large_table, 'test_table')
286+
287+
assert result.success == True
288+
assert result.rows_loaded == 50000
289+
assert result.duration < 60 # Should complete within 60 seconds
290+
291+
# Verify data integrity
292+
dt = DeltaTable(loader.config.table_path)
293+
df = dt.to_pyarrow_table()
294+
assert len(df) == 50000

0 commit comments

Comments
 (0)