Skip to content

Commit 64ef99c

Browse files
committed
test(coal): restructure test suite to match code refactoring
- Add new tests for cosmotech_api/apis modules (dataset, runner, simple_apis, workspace) - Add new tests for cosmotech_api/objects modules (connection, parameters) - Add tests for store/output channels (aws, azure storage, postgres, splitter, interface) - Add test for utils/configuration module - Update AWS S3 tests - Update Azure ADX tests (store, tables, utils) - Update PostgreSQL tests (runner, store, utils) and reorganize file structure - Remove tests for deprecated cosmotech_api modules - Remove tests for deprecated dataset and runner submodules - Remove test for deprecated azure/functions module
1 parent e00aca6 commit 64ef99c

File tree

54 files changed

+3600
-7855
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+3600
-7855
lines changed

tests/unit/coal/test_aws/test_aws_s3.py

Lines changed: 187 additions & 144 deletions
Large diffs are not rendered by default.
Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
# Copyright (C) - 2023 - 2025 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
8+
import os
9+
from unittest.mock import ANY, MagicMock, patch
10+
11+
import pyarrow as pa
12+
import pytest
13+
from azure.kusto.data import KustoClient
14+
from azure.kusto.ingest import IngestionResult, QueuedIngestClient
15+
16+
from cosmotech.coal.azure.adx.store import (
17+
process_tables,
18+
send_pyarrow_table_to_adx,
19+
send_store_to_adx,
20+
send_table_data,
21+
)
22+
from cosmotech.coal.store.store import Store
23+
24+
25+
class TestAdxStoreFunctions:
26+
"""Tests for ADX store module functions."""
27+
28+
@pytest.fixture
29+
def mock_ingest_client(self):
30+
"""Create a mock QueuedIngestClient."""
31+
return MagicMock(spec=QueuedIngestClient)
32+
33+
@pytest.fixture
34+
def mock_kusto_client(self):
35+
"""Create a mock KustoClient."""
36+
return MagicMock(spec=KustoClient)
37+
38+
@pytest.fixture
39+
def sample_table(self):
40+
"""Create a sample PyArrow table for testing."""
41+
return pa.table({"id": ["1", "2", "3"], "name": ["Alice", "Bob", "Charlie"], "value": [1.5, 2.5, 3.5]})
42+
43+
def test_send_table_data(self, mock_ingest_client, sample_table):
44+
"""Test send_table_data function."""
45+
# Arrange
46+
database = "test-database"
47+
table_name = "test-table"
48+
operation_tag = "test-tag"
49+
50+
mock_result = MagicMock(spec=IngestionResult)
51+
mock_result.source_id = "test-source-id-123"
52+
53+
with patch("cosmotech.coal.azure.adx.store.send_pyarrow_table_to_adx") as mock_send:
54+
mock_send.return_value = mock_result
55+
56+
# Act
57+
source_id, returned_table_name = send_table_data(
58+
mock_ingest_client, database, table_name, sample_table, operation_tag
59+
)
60+
61+
# Assert
62+
assert source_id == "test-source-id-123"
63+
assert returned_table_name == table_name
64+
mock_send.assert_called_once_with(mock_ingest_client, database, table_name, sample_table, operation_tag)
65+
66+
def test_send_pyarrow_table_to_adx(self, mock_ingest_client, sample_table):
67+
"""Test send_pyarrow_table_to_adx function."""
68+
# Arrange
69+
database = "test-database"
70+
table_name = "test-table"
71+
drop_by_tag = "test-tag"
72+
73+
mock_result = MagicMock(spec=IngestionResult)
74+
mock_ingest_client.ingest_from_file.return_value = mock_result
75+
76+
# Act
77+
result = send_pyarrow_table_to_adx(mock_ingest_client, database, table_name, sample_table, drop_by_tag)
78+
79+
# Assert
80+
assert result == mock_result
81+
mock_ingest_client.ingest_from_file.assert_called_once()
82+
83+
# Check that the ingestion properties were set correctly
84+
call_args = mock_ingest_client.ingest_from_file.call_args
85+
properties = call_args[0][1]
86+
assert properties.database == database
87+
assert properties.table == table_name
88+
assert drop_by_tag in properties.drop_by_tags
89+
90+
def test_send_pyarrow_table_to_adx_no_tag(self, mock_ingest_client, sample_table):
91+
"""Test send_pyarrow_table_to_adx without drop_by_tag."""
92+
# Arrange
93+
database = "test-database"
94+
table_name = "test-table"
95+
96+
mock_result = MagicMock(spec=IngestionResult)
97+
mock_ingest_client.ingest_from_file.return_value = mock_result
98+
99+
# Act
100+
result = send_pyarrow_table_to_adx(mock_ingest_client, database, table_name, sample_table, drop_by_tag=None)
101+
102+
# Assert
103+
assert result == mock_result
104+
mock_ingest_client.ingest_from_file.assert_called_once()
105+
106+
# Check that drop_by_tags is None
107+
call_args = mock_ingest_client.ingest_from_file.call_args
108+
properties = call_args[0][1]
109+
assert properties.drop_by_tags is None
110+
111+
def test_send_pyarrow_table_to_adx_cleans_temp_file(self, mock_ingest_client, sample_table):
112+
"""Test that send_pyarrow_table_to_adx cleans up temporary files."""
113+
# Arrange
114+
database = "test-database"
115+
table_name = "test-table"
116+
117+
mock_result = MagicMock(spec=IngestionResult)
118+
mock_ingest_client.ingest_from_file.return_value = mock_result
119+
120+
created_files = []
121+
122+
def track_file(file_path, properties):
123+
created_files.append(file_path)
124+
return mock_result
125+
126+
mock_ingest_client.ingest_from_file.side_effect = track_file
127+
128+
# Act
129+
send_pyarrow_table_to_adx(mock_ingest_client, database, table_name, sample_table)
130+
131+
# Assert
132+
assert len(created_files) == 1
133+
# File should be cleaned up
134+
assert not os.path.exists(created_files[0])
135+
136+
@patch("cosmotech.coal.azure.adx.store.Store")
137+
def test_process_tables(self, mock_store_class, mock_kusto_client, mock_ingest_client, sample_table):
138+
"""Test process_tables function."""
139+
# Arrange
140+
mock_store = MagicMock(spec=Store)
141+
mock_store_class.return_value = mock_store
142+
mock_store.list_tables.return_value = ["table1", "table2"]
143+
mock_store.get_table.return_value = sample_table
144+
145+
database = "test-database"
146+
operation_tag = "test-tag"
147+
148+
mock_result = MagicMock(spec=IngestionResult)
149+
mock_result.source_id = "source-id-1"
150+
151+
with (
152+
patch("cosmotech.coal.azure.adx.store.check_and_create_table"),
153+
patch("cosmotech.coal.azure.adx.store.send_table_data") as mock_send,
154+
):
155+
mock_send.return_value = ("source-id-1", "table1")
156+
157+
# Act
158+
source_ids, mapping = process_tables(
159+
mock_store, mock_kusto_client, mock_ingest_client, database, operation_tag
160+
)
161+
162+
# Assert
163+
assert len(source_ids) == 2
164+
assert "source-id-1" in source_ids
165+
assert "source-id-1" in mapping
166+
167+
@patch("cosmotech.coal.azure.adx.store.Store")
168+
def test_process_tables_empty_table(self, mock_store_class, mock_kusto_client, mock_ingest_client):
169+
"""Test process_tables skips empty tables."""
170+
# Arrange
171+
mock_store = MagicMock(spec=Store)
172+
mock_store_class.return_value = mock_store
173+
mock_store.list_tables.return_value = ["empty_table"]
174+
175+
# Create an empty table
176+
empty_table = pa.table({"id": []})
177+
mock_store.get_table.return_value = empty_table
178+
179+
database = "test-database"
180+
operation_tag = "test-tag"
181+
182+
with patch("cosmotech.coal.azure.adx.store.send_table_data") as mock_send:
183+
# Act
184+
source_ids, mapping = process_tables(
185+
mock_store, mock_kusto_client, mock_ingest_client, database, operation_tag
186+
)
187+
188+
# Assert
189+
assert len(source_ids) == 0
190+
assert len(mapping) == 0
191+
mock_send.assert_not_called()
192+
193+
@patch("cosmotech.coal.azure.adx.store.initialize_clients")
194+
@patch("cosmotech.coal.azure.adx.store.Store")
195+
@patch("cosmotech.coal.azure.adx.store.process_tables")
196+
@patch("cosmotech.coal.azure.adx.store.monitor_ingestion")
197+
@patch("cosmotech.coal.azure.adx.store.handle_failures")
198+
def test_send_store_to_adx_success(
199+
self, mock_handle_failures, mock_monitor, mock_process, mock_store_class, mock_init_clients
200+
):
201+
"""Test send_store_to_adx successful execution."""
202+
# Arrange
203+
mock_kusto = MagicMock(spec=KustoClient)
204+
mock_ingest = MagicMock(spec=QueuedIngestClient)
205+
mock_init_clients.return_value = (mock_kusto, mock_ingest)
206+
207+
mock_store = MagicMock(spec=Store)
208+
mock_store_class.return_value = mock_store
209+
210+
mock_process.return_value = (["source-id-1"], {"source-id-1": "table1"})
211+
mock_monitor.return_value = False # No failures
212+
mock_handle_failures.return_value = False # Should not abort
213+
214+
# Act
215+
result = send_store_to_adx(
216+
adx_uri="https://adx.example.com",
217+
adx_ingest_uri="https://ingest.adx.example.com",
218+
database_name="test-db",
219+
wait=True,
220+
tag="custom-tag",
221+
)
222+
223+
# Assert
224+
assert result is True
225+
mock_init_clients.assert_called_once()
226+
mock_process.assert_called_once()
227+
mock_monitor.assert_called_once()
228+
mock_handle_failures.assert_called_once()
229+
230+
@patch("cosmotech.coal.azure.adx.store.initialize_clients")
231+
@patch("cosmotech.coal.azure.adx.store.Store")
232+
@patch("cosmotech.coal.azure.adx.store.process_tables")
233+
@patch("cosmotech.coal.azure.adx.store.handle_failures")
234+
def test_send_store_to_adx_no_wait(self, mock_handle_failures, mock_process, mock_store_class, mock_init_clients):
235+
"""Test send_store_to_adx without waiting for ingestion."""
236+
# Arrange
237+
mock_kusto = MagicMock(spec=KustoClient)
238+
mock_ingest = MagicMock(spec=QueuedIngestClient)
239+
mock_init_clients.return_value = (mock_kusto, mock_ingest)
240+
241+
mock_store = MagicMock(spec=Store)
242+
mock_store_class.return_value = mock_store
243+
244+
mock_process.return_value = (["source-id-1"], {"source-id-1": "table1"})
245+
mock_handle_failures.return_value = False
246+
247+
# Act
248+
result = send_store_to_adx(
249+
adx_uri="https://adx.example.com",
250+
adx_ingest_uri="https://ingest.adx.example.com",
251+
database_name="test-db",
252+
wait=False,
253+
)
254+
255+
# Assert
256+
assert result is True
257+
mock_handle_failures.assert_called_once_with(mock_kusto, "test-db", ANY, False)
258+
259+
@patch("cosmotech.coal.azure.adx.store.initialize_clients")
260+
@patch("cosmotech.coal.azure.adx.store.Store")
261+
@patch("cosmotech.coal.azure.adx.store.process_tables")
262+
@patch("cosmotech.coal.azure.adx.store.handle_failures")
263+
def test_send_store_to_adx_with_failures(
264+
self, mock_handle_failures, mock_process, mock_store_class, mock_init_clients
265+
):
266+
"""Test send_store_to_adx when failures occur and should abort."""
267+
# Arrange
268+
mock_kusto = MagicMock(spec=KustoClient)
269+
mock_ingest = MagicMock(spec=QueuedIngestClient)
270+
mock_init_clients.return_value = (mock_kusto, mock_ingest)
271+
272+
mock_store = MagicMock(spec=Store)
273+
mock_store_class.return_value = mock_store
274+
275+
mock_process.return_value = (["source-id-1"], {"source-id-1": "table1"})
276+
mock_handle_failures.return_value = True # Should abort
277+
278+
# Act
279+
result = send_store_to_adx(
280+
adx_uri="https://adx.example.com", adx_ingest_uri="https://ingest.adx.example.com", database_name="test-db"
281+
)
282+
283+
# Assert
284+
assert result is False
285+
286+
@patch("cosmotech.coal.azure.adx.store.initialize_clients")
287+
@patch("cosmotech.coal.azure.adx.store.Store")
288+
@patch("cosmotech.coal.azure.adx.store.process_tables")
289+
@patch("cosmotech.coal.azure.adx.store._drop_by_tag")
290+
def test_send_store_to_adx_exception_rollback(self, mock_drop, mock_process, mock_store_class, mock_init_clients):
291+
"""Test send_store_to_adx performs rollback on exception."""
292+
# Arrange
293+
mock_kusto = MagicMock(spec=KustoClient)
294+
mock_ingest = MagicMock(spec=QueuedIngestClient)
295+
mock_init_clients.return_value = (mock_kusto, mock_ingest)
296+
297+
mock_store = MagicMock(spec=Store)
298+
mock_store_class.return_value = mock_store
299+
300+
mock_process.side_effect = Exception("Test exception")
301+
302+
# Act & Assert
303+
with pytest.raises(Exception, match="Test exception"):
304+
send_store_to_adx(
305+
adx_uri="https://adx.example.com",
306+
adx_ingest_uri="https://ingest.adx.example.com",
307+
database_name="test-db",
308+
tag="rollback-tag",
309+
)
310+
311+
# Verify rollback was called
312+
mock_drop.assert_called_once_with(mock_kusto, "test-db", "rollback-tag")

0 commit comments

Comments
 (0)