Skip to content

Commit 77de267

Browse files
committed
SingleStore Integration: Add SingleStore store functionality with tests
1 parent 9262d1f commit 77de267

File tree

3 files changed

+321
-0
lines changed

3 files changed

+321
-0
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Copyright (C) - 2023 - 2025 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
8+
"""
9+
SingleStore integration module.
10+
11+
This module provides functions for interacting with SingleStore databases.
12+
"""
13+
14+
# Re-export functions from the store module
15+
from cosmotech.coal.singlestore.store import (
16+
load_from_singlestore,
17+
)
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
# Copyright (C) - 2023 - 2025 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
8+
"""
9+
SingleStore store operations module.
10+
11+
This module provides functions for interacting with SingleStore databases
12+
for store operations.
13+
"""
14+
15+
import pathlib
16+
import time
17+
import csv
18+
import singlestoredb as s2
19+
20+
from cosmotech.coal.store.csv import store_csv_file
21+
from cosmotech.coal.store.store import Store
22+
from cosmotech.coal.utils.logger import LOGGER
23+
from cosmotech.orchestrator.utils.translate import T
24+
25+
26+
def _get_data(table_name: str, output_directory: str, cursor) -> None:
27+
"""
28+
Run a SQL query to fetch all data from a table and write them in csv files.
29+
30+
Args:
31+
table_name: Table name
32+
output_directory: Output directory
33+
cursor: SingleStore cursor
34+
"""
35+
start_time = time.perf_counter()
36+
cursor.execute(f"SELECT * FROM {table_name}")
37+
rows = cursor.fetchall()
38+
end_time = time.perf_counter()
39+
LOGGER.info(
40+
T("coal.logs.database.rows_fetched").format(
41+
table=table_name, count=len(rows), time=round(end_time - start_time, 2)
42+
)
43+
)
44+
with open(f"{output_directory}/{table_name}.csv", "w", newline="") as csv_stock:
45+
w = csv.DictWriter(csv_stock, rows[0].keys())
46+
w.writeheader()
47+
w.writerows(rows)
48+
49+
50+
def load_from_singlestore(
51+
single_store_host: str,
52+
single_store_port: int,
53+
single_store_db: str,
54+
single_store_user: str,
55+
single_store_password: str,
56+
store_folder: str,
57+
single_store_tables: str = "",
58+
) -> None:
59+
"""
60+
Load data from SingleStore and store it in the Store.
61+
62+
Args:
63+
single_store_host: SingleStore host
64+
single_store_port: SingleStore port
65+
single_store_db: SingleStore database name
66+
single_store_user: SingleStore username
67+
single_store_password: SingleStore password
68+
store_folder: Store folder
69+
single_store_tables: Comma-separated list of tables to load
70+
"""
71+
single_store_working_dir = store_folder + "/singlestore"
72+
if not pathlib.Path.exists(single_store_working_dir):
73+
pathlib.Path.mkdir(single_store_working_dir)
74+
75+
start_full = time.perf_counter()
76+
77+
conn = s2.connect(
78+
host=single_store_host,
79+
port=single_store_port,
80+
database=single_store_db,
81+
user=single_store_user,
82+
password=single_store_password,
83+
results_type="dicts",
84+
)
85+
with conn:
86+
with conn.cursor() as cur:
87+
if single_store_tables == "":
88+
cur.execute("SHOW TABLES")
89+
table_names = cur.fetchall()
90+
else:
91+
table_names = single_store_tables.split(",")
92+
LOGGER.info(T("coal.logs.database.tables_to_fetch").format(tables=table_names))
93+
for name in table_names:
94+
_get_data(name, single_store_working_dir, cur)
95+
end_full = time.perf_counter()
96+
LOGGER.info(T("coal.logs.database.full_dataset").format(time=round(end_full - start_full, 2)))
97+
98+
for csv_path in pathlib.Path(single_store_working_dir).glob("*.csv"):
99+
LOGGER.info(T("coal.logs.storage.found_file").format(file=csv_path.name))
100+
store_csv_file(csv_path.name[:-4], csv_path, store=Store(False, store_folder))
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
# Copyright (C) - 2023 - 2025 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
8+
import csv
9+
import os
10+
import pathlib
11+
import tempfile
12+
from unittest.mock import MagicMock, patch, mock_open, call
13+
14+
import pytest
15+
import singlestoredb as s2
16+
17+
from cosmotech.coal.singlestore.store import load_from_singlestore, _get_data
18+
19+
20+
class TestStoreFunctions:
21+
"""Tests for top-level functions in the store module."""
22+
23+
@patch("cosmotech.coal.singlestore.store.Store")
24+
@patch("cosmotech.coal.singlestore.store.store_csv_file")
25+
@patch("cosmotech.coal.singlestore.store.s2.connect")
26+
@patch("pathlib.Path.mkdir")
27+
@patch("pathlib.Path.exists")
28+
@patch("pathlib.Path.glob")
29+
@patch("os.path.exists")
30+
def test_load_from_singlestore(
31+
self, mock_os_exists, mock_glob, mock_exists, mock_mkdir, mock_connect, mock_store_csv_file, mock_store
32+
):
33+
"""Test the load_from_singlestore function."""
34+
# Arrange
35+
single_store_host = "localhost"
36+
single_store_port = 3306
37+
single_store_db = "test_db"
38+
single_store_user = "user"
39+
single_store_password = "password"
40+
store_folder = "/tmp/store"
41+
single_store_tables = "table1,table2"
42+
43+
# Mock Path.exists to return False so that mkdir is called
44+
mock_exists.return_value = False
45+
46+
# Mock os.path.exists to return True for the CSV files
47+
mock_os_exists.return_value = True
48+
49+
# Mock the connection and cursor
50+
mock_conn = MagicMock()
51+
mock_cursor = MagicMock()
52+
mock_conn.__enter__.return_value = mock_conn
53+
mock_conn.cursor.return_value.__enter__.return_value = mock_cursor
54+
mock_connect.return_value = mock_conn
55+
56+
# Mock the cursor.fetchall to return table names
57+
mock_cursor.fetchall.return_value = [{"TABLE_NAME": "table1"}, {"TABLE_NAME": "table2"}]
58+
59+
# Mock Path.glob to return paths to CSV files
60+
mock_csv_path1 = MagicMock()
61+
mock_csv_path1.name = "table1.csv"
62+
mock_csv_path2 = MagicMock()
63+
mock_csv_path2.name = "table2.csv"
64+
mock_glob.return_value = [mock_csv_path1, mock_csv_path2]
65+
66+
# Mock Store instance
67+
mock_store_instance = MagicMock()
68+
mock_store.return_value = mock_store_instance
69+
70+
# Act
71+
with patch("builtins.open", mock_open()) as mock_file:
72+
load_from_singlestore(
73+
single_store_host=single_store_host,
74+
single_store_port=single_store_port,
75+
single_store_db=single_store_db,
76+
single_store_user=single_store_user,
77+
single_store_password=single_store_password,
78+
store_folder=store_folder,
79+
single_store_tables=single_store_tables,
80+
)
81+
82+
# Assert
83+
# Verify that the directory was created
84+
mock_exists.assert_called_once_with("/tmp/store/singlestore")
85+
mock_mkdir.assert_called_once()
86+
87+
# Verify that the connection was established with the correct parameters
88+
mock_connect.assert_called_once_with(
89+
host=single_store_host,
90+
port=single_store_port,
91+
database=single_store_db,
92+
user=single_store_user,
93+
password=single_store_password,
94+
results_type="dicts",
95+
)
96+
97+
# Verify that _get_data was called for each table
98+
assert mock_cursor.execute.call_count >= 2 # At least one call per table
99+
100+
# Verify that store_csv_file was called for each CSV file
101+
assert mock_store_csv_file.call_count == 2
102+
mock_store_csv_file.assert_any_call("table1", mock_csv_path1, store=mock_store_instance)
103+
mock_store_csv_file.assert_any_call("table2", mock_csv_path2, store=mock_store_instance)
104+
105+
# Verify that Store was initialized correctly
106+
mock_store.assert_called_with(False, store_folder)
107+
108+
def test_get_data(self):
109+
"""Test the _get_data function."""
110+
# Arrange
111+
table_name = "test_table"
112+
output_directory = "/tmp/output"
113+
cursor = MagicMock()
114+
115+
# Mock data returned from the database
116+
mock_rows = [{"id": 1, "name": "John"}, {"id": 2, "name": "Jane"}]
117+
cursor.fetchall.return_value = mock_rows
118+
119+
# Act
120+
with patch("builtins.open", mock_open()) as mock_file:
121+
with patch("csv.DictWriter") as mock_dict_writer:
122+
# Mock the DictWriter
123+
mock_writer = MagicMock()
124+
mock_dict_writer.return_value = mock_writer
125+
126+
# Ensure the directory exists
127+
with patch("os.path.exists") as mock_exists:
128+
mock_exists.return_value = True
129+
130+
_get_data(table_name, output_directory, cursor)
131+
132+
# Assert
133+
# Verify that the SQL query was executed
134+
cursor.execute.assert_called_once_with("SELECT * FROM test_table")
135+
136+
# Verify that fetchall was called
137+
cursor.fetchall.assert_called_once()
138+
139+
@patch("cosmotech.coal.singlestore.store.Store")
140+
@patch("cosmotech.coal.singlestore.store.store_csv_file")
141+
@patch("cosmotech.coal.singlestore.store.s2.connect")
142+
@patch("pathlib.Path.mkdir")
143+
@patch("pathlib.Path.exists")
144+
@patch("pathlib.Path.glob")
145+
@patch("os.path.exists")
146+
def test_load_from_singlestore_no_tables_specified(
147+
self, mock_os_exists, mock_glob, mock_exists, mock_mkdir, mock_connect, mock_store_csv_file, mock_store
148+
):
149+
"""Test the load_from_singlestore function when no tables are specified."""
150+
# Arrange
151+
single_store_host = "localhost"
152+
single_store_port = 3306
153+
single_store_db = "test_db"
154+
single_store_user = "user"
155+
single_store_password = "password"
156+
store_folder = "/tmp/store"
157+
158+
# Mock Path.exists to return False so that mkdir is called
159+
mock_exists.return_value = False
160+
161+
# Mock os.path.exists to return True for the CSV files
162+
mock_os_exists.return_value = True
163+
164+
# Mock the connection and cursor
165+
mock_conn = MagicMock()
166+
mock_cursor = MagicMock()
167+
mock_conn.__enter__.return_value = mock_conn
168+
mock_conn.cursor.return_value.__enter__.return_value = mock_cursor
169+
mock_connect.return_value = mock_conn
170+
171+
# Mock the cursor.fetchall to return table names for SHOW TABLES
172+
mock_cursor.fetchall.return_value = [
173+
{"TABLE_NAME": "table1"},
174+
{"TABLE_NAME": "table2"},
175+
{"TABLE_NAME": "table3"},
176+
]
177+
178+
# Mock Path.glob to return paths to CSV files
179+
mock_csv_paths = [MagicMock() for _ in range(3)]
180+
for i, path in enumerate(mock_csv_paths):
181+
path.name = f"table{i+1}.csv"
182+
mock_glob.return_value = mock_csv_paths
183+
184+
# Mock Store instance
185+
mock_store_instance = MagicMock()
186+
mock_store.return_value = mock_store_instance
187+
188+
# Act
189+
with patch("builtins.open", mock_open()) as mock_file:
190+
load_from_singlestore(
191+
single_store_host=single_store_host,
192+
single_store_port=single_store_port,
193+
single_store_db=single_store_db,
194+
single_store_user=single_store_user,
195+
single_store_password=single_store_password,
196+
store_folder=store_folder,
197+
)
198+
199+
# Assert
200+
# Verify that the cursor was used to execute a query
201+
assert mock_cursor.execute.called
202+
203+
# Verify that store_csv_file was called for each CSV file
204+
assert mock_store_csv_file.call_count == 3

0 commit comments

Comments
 (0)