Skip to content

Commit ed28f1a

Browse files
authored
Fix test isolation issues in integration tests (#144)
- Enhanced state store cleanup in test_data_modeling_replicator fixture to prevent test interference by clearing cursors between tests - Improved delete_delta_table_data with retry logic and verification - Added aggressive pre-test cleanup in instance_table_paths fixture - Fixed test_data_model_sync_service_delete failing when run in sequence - Fixed test_events_service[10-100] isolation issues - Tests now pass consistently both individually and in sequence - Applied code formatting via pre-commit hooks
1 parent 4853fb1 commit ed28f1a

File tree

2 files changed

+85
-5
lines changed

2 files changed

+85
-5
lines changed

tests/integration/integration_steps/fabric_steps.py

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from deltalake.writer import write_deltalake
99
from deltalake.exceptions import TableNotFoundError
1010
from azure.storage.filedatalake import DataLakeServiceClient
11+
import time
1112

1213
TIMESTAMP_COLUMN = "timestamp"
1314
DATA_MODEL_TIMESTAMP_COLUMNS = ["lastUpdatedTime", "createdTime"]
@@ -30,11 +31,40 @@ def get_ts_delta_table(
3031

3132

3233
def delete_delta_table_data(credential: DefaultAzureCredential, path: str):
33-
try:
34-
delta_table = get_ts_delta_table(credential, path)
35-
delta_table.delete()
36-
except TableNotFoundError:
37-
print(f"Table not found {path}")
34+
max_retries = 3
35+
retry_delay = 1 # seconds
36+
37+
for attempt in range(max_retries):
38+
try:
39+
delta_table = get_ts_delta_table(credential, path)
40+
delta_table.delete()
41+
42+
# Verify deletion by checking if table is empty
43+
time.sleep(0.5) # Small delay to ensure operation completes
44+
try:
45+
verification_table = get_ts_delta_table(credential, path)
46+
df = verification_table.to_pandas()
47+
if len(df) == 0:
48+
return # Successfully deleted all data
49+
else:
50+
print(
51+
f"Warning: Table {path} still contains {len(df)} rows after deletion attempt {attempt + 1}"
52+
)
53+
except TableNotFoundError:
54+
return # Table doesn't exist, which is fine
55+
56+
except TableNotFoundError:
57+
print(f"Table not found {path}")
58+
return
59+
except Exception as e:
60+
print(
61+
f"Attempt {attempt + 1}/{max_retries}: Error deleting data from {path}: {e}"
62+
)
63+
if attempt < max_retries - 1:
64+
time.sleep(retry_delay)
65+
else:
66+
print(f"Failed to delete data from {path} after {max_retries} attempts")
67+
raise
3868

3969

4070
def read_deltalake_timeseries(timeseries_path: str, credential: DefaultAzureCredential):

tests/integration/test_data_model_replicator.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
lakehouse_table_name,
2020
assert_data_model_instances_in_fabric,
2121
assert_data_model_instances_update,
22+
get_ts_delta_table,
2223
)
2324
from integration_steps.data_model_generation import Node, Edge, create_node, create_edge
2425

@@ -38,6 +39,31 @@ def test_data_modeling_replicator():
3839
replicator._load_state_store()
3940
replicator.logger = Mock()
4041
yield replicator
42+
43+
# Clean up state store after each test to prevent test interference
44+
try:
45+
# Clear all state store entries that might interfere with subsequent tests
46+
for data_model_config in replicator.config.data_modeling or []:
47+
try:
48+
all_views = replicator.cognite_client.data_modeling.views.list(
49+
space=data_model_config.space, limit=-1
50+
)
51+
views_dict = all_views.dump()
52+
for view in views_dict:
53+
state_id = f"state_{data_model_config.space}_{view['externalId']}_{view['version']}"
54+
replicator.state_store.delete_state(state_id)
55+
# Also clean edge state
56+
edge_state_id = f"state_{data_model_config.space}_edges"
57+
replicator.state_store.delete_state(edge_state_id)
58+
except Exception as e:
59+
print(
60+
f"Warning: Could not clean up state for space {data_model_config.space}: {e}"
61+
)
62+
63+
replicator.state_store.synchronize()
64+
except Exception as e:
65+
print(f"Warning: Could not clean up state store: {e}")
66+
4167
try:
4268
os.remove("states.json")
4369
except FileNotFoundError:
@@ -116,10 +142,34 @@ def instance_table_paths(
116142
instance_table_paths.append(
117143
lakehouse_table_name(test_model.space + "_" + view.external_id)
118144
)
145+
# Clean up before the test
119146
delete_delta_table_data(azure_credential, instance_table_paths[-1])
147+
120148
edge_table_path = lakehouse_table_name(test_model.space + "_edges")
121149
instance_table_paths.append(edge_table_path)
150+
# Clean up edge table before the test
151+
delete_delta_table_data(azure_credential, edge_table_path)
152+
153+
# Double-check cleanup - make sure tables are completely empty before proceeding
154+
import time
155+
156+
time.sleep(1) # Give some time for cleanup operations to complete
157+
158+
for path in instance_table_paths:
159+
try:
160+
delta_table = get_ts_delta_table(azure_credential, path)
161+
df = delta_table.to_pandas()
162+
if len(df) > 0:
163+
print(
164+
f"Warning: Table {path} still contains {len(df)} rows before test, attempting additional cleanup"
165+
)
166+
delete_delta_table_data(azure_credential, path)
167+
except Exception as e:
168+
print(f"Could not verify cleanup for {path}: {e}")
169+
122170
yield instance_table_paths
171+
172+
# Clean up after the test
123173
for path in instance_table_paths:
124174
delete_delta_table_data(azure_credential, path)
125175

0 commit comments

Comments
 (0)