Skip to content

Commit 2ecedc9

Browse files
committed
Store Module Enhancements: Update CSV, Pandas, PyArrow, and native Python store implementations with tests
1 parent 77de267 commit 2ecedc9

File tree

13 files changed

+1093
-62
lines changed

13 files changed

+1093
-62
lines changed

cosmotech/coal/store/__init__.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Copyright (C) - 2023 - 2025 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
8+
"""
9+
Store module.
10+
11+
This module provides functions for working with the Store,
12+
including loading and converting data.
13+
"""
14+
15+
# Re-export the Store class
16+
from cosmotech.coal.store.store import Store
17+
18+
# Re-export functions from the csv module
19+
from cosmotech.coal.store.csv import (
20+
store_csv_file,
21+
convert_store_table_to_csv,
22+
)
23+
24+
# Re-export functions from the native_python module
25+
from cosmotech.coal.store.native_python import (
26+
store_pylist,
27+
convert_table_as_pylist,
28+
)
29+
30+
# Re-export functions from the pandas module (if available)
31+
32+
from cosmotech.coal.store.pandas import (
33+
store_dataframe,
34+
convert_store_table_to_dataframe as convert_store_table_to_pandas_dataframe,
35+
)
36+
37+
# Re-export functions from the pyarrow module (if available)
38+
39+
from cosmotech.coal.store.pyarrow import (
40+
store_table,
41+
convert_store_table_to_dataframe as convert_store_table_to_pyarrow_table,
42+
)

cosmotech/coal/store/csv.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
# Copyright (C) - 2023 - 2025 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
18
import pathlib
29

310
import pyarrow.csv as pc
@@ -9,7 +16,7 @@ def store_csv_file(
916
table_name: str,
1017
csv_path: pathlib.Path,
1118
replace_existsing_file: bool = False,
12-
store=Store()
19+
store=Store(),
1320
):
1421
if not csv_path.exists():
1522
raise FileNotFoundError(f"File {csv_path} does not exists")
@@ -18,16 +25,14 @@ def store_csv_file(
1825
_c = data.column_names
1926
data = data.rename_columns([Store.sanitize_column(_column) for _column in _c])
2027

21-
store.add_table(table_name=table_name,
22-
data=data,
23-
replace=replace_existsing_file)
28+
store.add_table(table_name=table_name, data=data, replace=replace_existsing_file)
2429

2530

2631
def convert_store_table_to_csv(
2732
table_name: str,
2833
csv_path: pathlib.Path,
2934
replace_existsing_file: bool = False,
30-
store=Store()
35+
store=Store(),
3136
):
3237
if csv_path.name.endswith(".csv") and csv_path.exists() and not replace_existsing_file:
3338
raise FileExistsError(f"File {csv_path} already exists")
Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
# Copyright (C) - 2023 - 2025 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
18
import pyarrow as pa
29

310
from cosmotech.coal.store.store import Store
@@ -6,19 +13,13 @@
613
def store_pylist(
714
table_name: str,
815
data: list[dict],
9-
replace_existsing_file:
10-
bool = False,
11-
store=Store()
16+
replace_existsing_file: bool = False,
17+
store=Store(),
1218
):
1319
data = pa.Table.from_pylist(data)
1420

15-
store.add_table(table_name=table_name,
16-
data=data,
17-
replace=replace_existsing_file)
21+
store.add_table(table_name=table_name, data=data, replace=replace_existsing_file)
1822

1923

20-
def convert_table_as_pylist(
21-
table_name: str,
22-
store=Store()
23-
):
24+
def convert_table_as_pylist(table_name: str, store=Store()):
2425
return store.get_table(table_name).to_pylist()

cosmotech/coal/store/pandas.py

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,26 @@
1+
# Copyright (C) - 2023 - 2025 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
18
import pyarrow
29

310
from cosmotech.coal.store.store import Store
11+
import pandas as pd
412

5-
try:
6-
import pandas as pd
7-
8-
9-
def store_dataframe(
10-
table_name: str,
11-
dataframe: pd.DataFrame,
12-
replace_existsing_file: bool = False,
13-
store=Store()
14-
):
15-
16-
data = pyarrow.Table.from_pandas(dataframe)
17-
18-
store.add_table(table_name=table_name,
19-
data=data,
20-
replace=replace_existsing_file)
2113

14+
def store_dataframe(
15+
table_name: str,
16+
dataframe: pd.DataFrame,
17+
replace_existsing_file: bool = False,
18+
store=Store(),
19+
):
20+
data = pyarrow.Table.from_pandas(dataframe)
2221

23-
def convert_store_table_to_dataframe(
24-
table_name: str,
25-
store=Store()
26-
) -> pd.DataFrame:
22+
store.add_table(table_name=table_name, data=data, replace=replace_existsing_file)
2723

28-
return store.get_table(table_name).to_pandas()
2924

30-
except ModuleNotFoundError:
31-
pass
25+
def convert_store_table_to_dataframe(table_name: str, store=Store()) -> pd.DataFrame:
26+
return store.get_table(table_name).to_pandas()

cosmotech/coal/store/pyarrow.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
1-
from cosmotech.coal.store.store import Store
2-
3-
try:
4-
import pyarrow as pa
1+
# Copyright (C) - 2023 - 2025 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
57

8+
from cosmotech.coal.store.store import Store
69

7-
def store_table(
8-
table_name: str,
9-
data: pa.Table,
10-
replace_existsing_file: bool = False,
11-
store=Store()
12-
):
10+
import pyarrow as pa
1311

14-
store.add_table(table_name=table_name,
15-
data=data,
16-
replace=replace_existsing_file)
1712

13+
def store_table(
14+
table_name: str,
15+
data: pa.Table,
16+
replace_existsing_file: bool = False,
17+
store=Store(),
18+
):
19+
store.add_table(table_name=table_name, data=data, replace=replace_existsing_file)
1820

19-
def convert_store_table_to_dataframe(table_name: str, store=Store()) -> pa.Table:
20-
return store.get_table(table_name)
2121

22-
except ModuleNotFoundError:
23-
pass
22+
def convert_store_table_to_dataframe(table_name: str, store=Store()) -> pa.Table:
23+
return store.get_table(table_name)

cosmotech/coal/store/store.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,29 @@
1+
# Copyright (C) - 2023 - 2025 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
18
import os
29
import pathlib
310

411
import pyarrow
512
from adbc_driver_sqlite import dbapi
613

714
from cosmotech.coal.utils.logger import LOGGER
15+
from cosmotech.orchestrator.utils.translate import T
816

917

1018
class Store:
11-
1219
@staticmethod
1320
def sanitize_column(column_name: str) -> str:
1421
return column_name.replace(" ", "_")
1522

1623
def __init__(
1724
self,
1825
reset=False,
19-
store_location: pathlib.Path = pathlib.Path(os.environ.get("CSM_PARAMETERS_ABSOLUTE_PATH",
20-
"."))
26+
store_location: pathlib.Path = pathlib.Path(os.environ.get("CSM_PARAMETERS_ABSOLUTE_PATH", ".")),
2127
):
2228
self.store_location = pathlib.Path(store_location) / ".coal/store"
2329
self.store_location.mkdir(parents=True, exist_ok=True)
@@ -33,23 +39,23 @@ def reset(self):
3339

3440
def get_table(self, table_name: str) -> pyarrow.Table:
3541
if not self.table_exists(table_name):
36-
raise ValueError(f"No table with name {table_name} exists")
42+
raise ValueError(T("coal.errors.data.no_table").format(table_name=table_name))
3743
return self.execute_query(f"select * from {table_name}")
3844

3945
def table_exists(self, table_name) -> bool:
4046
return table_name in self.list_tables()
4147

4248
def get_table_schema(self, table_name: str) -> pyarrow.Schema:
4349
if not self.table_exists(table_name):
44-
raise ValueError(f"No table with name {table_name} exists")
50+
raise ValueError(T("coal.errors.data.no_table").format(table_name=table_name))
4551
with dbapi.connect(self._database) as conn:
4652
return conn.adbc_get_table_schema(table_name)
4753

4854
def add_table(self, table_name: str, data=pyarrow.Table, replace: bool = False):
4955
with dbapi.connect(self._database, autocommit=True) as conn:
5056
with conn.cursor() as curs:
5157
rows = curs.adbc_ingest(table_name, data, "replace" if replace else "create_append")
52-
LOGGER.debug(f"Inserted {rows} rows in table {table_name}")
58+
LOGGER.debug(T("coal.logs.data_transfer.rows_inserted").format(rows=rows, table_name=table_name))
5359

5460
def execute_query(self, sql_query: str) -> pyarrow.Table:
5561
batch_size = 1024
@@ -58,7 +64,7 @@ def execute_query(self, sql_query: str) -> pyarrow.Table:
5864
try:
5965
with dbapi.connect(self._database, autocommit=True) as conn:
6066
with conn.cursor() as curs:
61-
curs.adbc_statement.set_options(**{"adbc.sqlite.query.batch_rows":str(batch_size)})
67+
curs.adbc_statement.set_options(**{"adbc.sqlite.query.batch_rows": str(batch_size)})
6268
curs.execute(sql_query)
6369
return curs.fetch_arrow_table()
6470
except OSError:

0 commit comments

Comments
 (0)