Skip to content

Commit 89f2205

Browse files
committed
PostgreSQL Integration: Add PostgreSQL runner and store functionality with tests
1 parent c79367e commit 89f2205

File tree

5 files changed

+562
-0
lines changed

5 files changed

+562
-0
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Copyright (C) - 2023 - 2025 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
8+
"""
9+
PostgreSQL integration module.
10+
11+
This module provides functions for interacting with PostgreSQL databases.
12+
"""
13+
14+
# Re-export functions from the runner module
15+
from cosmotech.coal.postgresql.runner import (
16+
send_runner_metadata_to_postgresql,
17+
)
18+
19+
# Re-export functions from the store module
20+
from cosmotech.coal.postgresql.store import (
21+
dump_store_to_postgresql,
22+
)
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Copyright (C) - 2023 - 2025 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
8+
"""
9+
PostgreSQL runner operations module.
10+
11+
This module provides functions for interacting with PostgreSQL databases
12+
for runner metadata operations.
13+
"""
14+
15+
from adbc_driver_postgresql import dbapi
16+
17+
from cosmotech.coal.cosmotech_api.connection import get_api_client
18+
from cosmotech.coal.cosmotech_api.runner.metadata import get_runner_metadata
19+
from cosmotech.coal.utils.logger import LOGGER
20+
from cosmotech.coal.utils.postgresql import generate_postgresql_full_uri
21+
from cosmotech.orchestrator.utils.translate import T
22+
23+
24+
def send_runner_metadata_to_postgresql(
25+
organization_id: str,
26+
workspace_id: str,
27+
runner_id: str,
28+
postgres_host: str,
29+
postgres_port: int,
30+
postgres_db: str,
31+
postgres_schema: str,
32+
postgres_user: str,
33+
postgres_password: str,
34+
table_prefix: str = "Cosmotech_",
35+
) -> None:
36+
"""
37+
Send runner metadata to a PostgreSQL database.
38+
39+
Args:
40+
organization_id: Organization ID
41+
workspace_id: Workspace ID
42+
runner_id: Runner ID
43+
postgres_host: PostgreSQL host
44+
postgres_port: PostgreSQL port
45+
postgres_db: PostgreSQL database name
46+
postgres_schema: PostgreSQL schema
47+
postgres_user: PostgreSQL username
48+
postgres_password: PostgreSQL password
49+
table_prefix: Table prefix
50+
"""
51+
# Get runner metadata
52+
with get_api_client()[0] as api_client:
53+
runner = get_runner_metadata(api_client, organization_id, workspace_id, runner_id)
54+
55+
# Generate PostgreSQL URI
56+
postgresql_full_uri = generate_postgresql_full_uri(
57+
postgres_host, postgres_port, postgres_db, postgres_user, postgres_password
58+
)
59+
60+
# Connect to PostgreSQL and update runner metadata
61+
with dbapi.connect(postgresql_full_uri, autocommit=True) as conn:
62+
with conn.cursor() as curs:
63+
schema_table = f"{postgres_schema}.{table_prefix}RunnerMetadata"
64+
sql_create_table = f"""
65+
CREATE TABLE IF NOT EXISTS {schema_table} (
66+
id varchar(32) PRIMARY KEY,
67+
name varchar(256),
68+
last_run_id varchar(32),
69+
run_template_id varchar(32)
70+
);
71+
"""
72+
sql_upsert = f"""
73+
INSERT INTO {schema_table} (id, name, last_run_id, run_template_id)
74+
VALUES(%s, %s, %s, %s)
75+
ON CONFLICT (id)
76+
DO
77+
UPDATE SET name = EXCLUDED.name, last_run_id = EXCLUDED.last_run_id;
78+
"""
79+
LOGGER.info(f"creating table {schema_table}")
80+
curs.execute(sql_create_table)
81+
conn.commit()
82+
LOGGER.info(f"adding/updating runner metadata")
83+
curs.execute(
84+
sql_upsert,
85+
(
86+
runner.get("id"),
87+
runner.get("name"),
88+
runner.get("lastRunId"),
89+
runner.get("runTemplateId"),
90+
),
91+
)
92+
conn.commit()
93+
LOGGER.info("Runner metadata table has been updated")

cosmotech/coal/postgresql/store.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# Copyright (C) - 2023 - 2025 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
8+
"""
9+
PostgreSQL store operations module.
10+
11+
This module provides functions for interacting with PostgreSQL databases
12+
for store operations.
13+
"""
14+
15+
from time import perf_counter
16+
import pyarrow
17+
18+
from cosmotech.coal.store.store import Store
19+
from cosmotech.coal.utils.logger import LOGGER
20+
from cosmotech.coal.utils.postgresql import send_pyarrow_table_to_postgresql
21+
from cosmotech.orchestrator.utils.translate import T
22+
23+
24+
def dump_store_to_postgresql(
25+
store_folder: str,
26+
postgres_host: str,
27+
postgres_port: int,
28+
postgres_db: str,
29+
postgres_schema: str,
30+
postgres_user: str,
31+
postgres_password: str,
32+
table_prefix: str = "Cosmotech_",
33+
replace: bool = True,
34+
) -> None:
35+
"""
36+
Dump Store data to a PostgreSQL database.
37+
38+
Args:
39+
store_folder: Folder containing the Store
40+
postgres_host: PostgreSQL host
41+
postgres_port: PostgreSQL port
42+
postgres_db: PostgreSQL database name
43+
postgres_schema: PostgreSQL schema
44+
postgres_user: PostgreSQL username
45+
postgres_password: PostgreSQL password
46+
table_prefix: Table prefix
47+
replace: Whether to replace existing tables
48+
"""
49+
_s = Store(store_location=store_folder)
50+
51+
tables = list(_s.list_tables())
52+
if len(tables):
53+
LOGGER.info(T("coal.logs.database.sending_data").format(table=f"{postgres_db}.{postgres_schema}"))
54+
total_rows = 0
55+
_process_start = perf_counter()
56+
for table_name in tables:
57+
_s_time = perf_counter()
58+
target_table_name = f"{table_prefix}{table_name}"
59+
LOGGER.info(T("coal.logs.database.table_entry").format(table=target_table_name))
60+
data = _s.get_table(table_name)
61+
if not len(data):
62+
LOGGER.info(T("coal.logs.database.no_rows"))
63+
continue
64+
_dl_time = perf_counter()
65+
rows = send_pyarrow_table_to_postgresql(
66+
data,
67+
target_table_name,
68+
postgres_host,
69+
postgres_port,
70+
postgres_db,
71+
postgres_schema,
72+
postgres_user,
73+
postgres_password,
74+
replace,
75+
)
76+
total_rows += rows
77+
_up_time = perf_counter()
78+
LOGGER.info(T("coal.logs.database.row_count").format(count=rows))
79+
LOGGER.debug(
80+
T("coal.logs.progress.operation_timing").format(
81+
operation="Load from datastore", time=f"{_dl_time - _s_time:0.3}"
82+
)
83+
)
84+
LOGGER.debug(
85+
T("coal.logs.progress.operation_timing").format(
86+
operation="Send to postgresql", time=f"{_up_time - _dl_time:0.3}"
87+
)
88+
)
89+
_process_end = perf_counter()
90+
LOGGER.info(
91+
T("coal.logs.database.rows_fetched").format(
92+
table="all tables",
93+
count=total_rows,
94+
time=f"{_process_end - _process_start:0.3}",
95+
)
96+
)
97+
else:
98+
LOGGER.info(T("coal.logs.database.store_empty"))
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
# Copyright (C) - 2023 - 2025 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
8+
import pytest
9+
from unittest.mock import MagicMock, patch
10+
11+
from cosmotech.coal.postgresql.runner import send_runner_metadata_to_postgresql
12+
13+
14+
class TestRunnerFunctions:
15+
"""Tests for top-level functions in the runner module."""
16+
17+
@patch("cosmotech.coal.postgresql.runner.get_api_client")
18+
@patch("cosmotech.coal.postgresql.runner.get_runner_metadata")
19+
@patch("cosmotech.coal.postgresql.runner.generate_postgresql_full_uri")
20+
@patch("cosmotech.coal.postgresql.runner.dbapi.connect")
21+
def test_send_runner_metadata_to_postgresql(
22+
self, mock_connect, mock_generate_uri, mock_get_runner_metadata, mock_get_api_client
23+
):
24+
"""Test the send_runner_metadata_to_postgresql function."""
25+
# Arrange
26+
# Mock API client with context manager behavior
27+
mock_api_client = MagicMock()
28+
mock_api_client_context = MagicMock()
29+
mock_api_client.__enter__.return_value = mock_api_client_context
30+
mock_get_api_client.return_value = (mock_api_client, "Test Connection")
31+
32+
# Mock runner metadata
33+
mock_runner = {
34+
"id": "test-runner-id",
35+
"name": "Test Runner",
36+
"lastRunId": "test-run-id",
37+
"runTemplateId": "test-template-id",
38+
}
39+
mock_get_runner_metadata.return_value = mock_runner
40+
41+
# Mock PostgreSQL URI
42+
mock_uri = "postgresql://user:password@host:5432/db"
43+
mock_generate_uri.return_value = mock_uri
44+
45+
# Mock PostgreSQL connection and cursor
46+
mock_conn = MagicMock()
47+
mock_cursor = MagicMock()
48+
mock_conn.cursor.return_value.__enter__.return_value = mock_cursor
49+
mock_connect.return_value.__enter__.return_value = mock_conn
50+
51+
# Test parameters
52+
organization_id = "test-org"
53+
workspace_id = "test-workspace"
54+
runner_id = "test-runner-id"
55+
postgres_host = "localhost"
56+
postgres_port = 5432
57+
postgres_db = "testdb"
58+
postgres_schema = "public"
59+
postgres_user = "user"
60+
postgres_password = "password"
61+
table_prefix = "Test_"
62+
63+
# Act
64+
send_runner_metadata_to_postgresql(
65+
organization_id,
66+
workspace_id,
67+
runner_id,
68+
postgres_host,
69+
postgres_port,
70+
postgres_db,
71+
postgres_schema,
72+
postgres_user,
73+
postgres_password,
74+
table_prefix,
75+
)
76+
77+
# Assert
78+
# Check that API client was used correctly
79+
mock_get_api_client.assert_called_once()
80+
mock_get_runner_metadata.assert_called_once_with(
81+
mock_api_client_context, organization_id, workspace_id, runner_id
82+
)
83+
84+
# Check that PostgreSQL URI was generated correctly
85+
mock_generate_uri.assert_called_once_with(
86+
postgres_host, postgres_port, postgres_db, postgres_user, postgres_password
87+
)
88+
89+
# Check that PostgreSQL connection was established
90+
mock_connect.assert_called_once_with(mock_uri, autocommit=True)
91+
92+
# Check that SQL statements were executed
93+
assert mock_cursor.execute.call_count == 2
94+
95+
# Verify the SQL statements (partially, since the exact SQL is complex)
96+
create_table_call = mock_cursor.execute.call_args_list[0]
97+
assert "CREATE TABLE IF NOT EXISTS" in create_table_call[0][0]
98+
assert f"{postgres_schema}.{table_prefix}RunnerMetadata" in create_table_call[0][0]
99+
100+
upsert_call = mock_cursor.execute.call_args_list[1]
101+
assert "INSERT INTO" in upsert_call[0][0]
102+
assert f"{postgres_schema}.{table_prefix}RunnerMetadata" in upsert_call[0][0]
103+
assert upsert_call[0][1] == (
104+
mock_runner["id"],
105+
mock_runner["name"],
106+
mock_runner["lastRunId"],
107+
mock_runner["runTemplateId"],
108+
)
109+
110+
# Check that commits were called
111+
assert mock_conn.commit.call_count == 2

0 commit comments

Comments
 (0)