Skip to content

Commit 9008c0a

Browse files
committed
feat: add "csm-data api postgres-send-run-metadata" command
1 parent da8879a commit 9008c0a

File tree

6 files changed

+275
-31
lines changed

6 files changed

+275
-31
lines changed

cosmotech/coal/cli/commands/api/api.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
# Any use, reproduction, translation, broadcasting, transmission, distribution,
55
# etc., to any person is prohibited unless it has been previously and
66
# specifically authorized by written means by Cosmo Tech.
7-
7+
from cosmotech.coal.cli.commands.api.postgres_send_run_metadata import postgres_send_run_metadata
88
from cosmotech.coal.cli.commands.api.rds_load_csv import rds_load_csv
99
from cosmotech.coal.cli.commands.api.rds_send_csv import rds_send_csv
1010
from cosmotech.coal.cli.commands.api.rds_send_store import rds_send_store
@@ -52,3 +52,4 @@ def api(ctx: click.Context):
5252
api.add_command(runtemplate_load_handler, "runtemplate-load-handler")
5353
api.add_command(run_load_data, "run-load-data")
5454
api.add_command(scenariorun_load_data, "scenariorun-load-data")
55+
api.add_command(postgres_send_run_metadata, "postgres-send-run-metadata")
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
# Copyright (C) - 2023 - 2024 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
8+
from adbc_driver_postgresql import dbapi
9+
10+
from cosmotech.coal.cli.utils.click import click
11+
from cosmotech.coal.cosmotech_api.connection import get_api_client
12+
from cosmotech.coal.cosmotech_api.run import get_run_metadata
13+
from cosmotech.coal.utils.logger import LOGGER
14+
from cosmotech.coal.utils.postgresql import generate_postgresql_full_uri
15+
16+
17+
@click.command()
18+
@click.option("--organization-id",
19+
envvar="CSM_ORGANIZATION_ID",
20+
help="An organization id for the Cosmo Tech API",
21+
metavar="o-XXXXXXXX",
22+
type=str,
23+
show_envvar=True,
24+
required=True)
25+
@click.option("--workspace-id",
26+
envvar="CSM_WORKSPACE_ID",
27+
help="A workspace id for the Cosmo Tech API",
28+
metavar="w-XXXXXXXX",
29+
type=str,
30+
show_envvar=True,
31+
required=True)
32+
@click.option("--runner-id",
33+
envvar="CSM_RUNNER_ID",
34+
help="A runner id for the Cosmo Tech API",
35+
metavar="r-XXXXXXXX",
36+
type=str,
37+
show_envvar=True,
38+
required=True)
39+
@click.option("--run-id",
40+
envvar="CSM_RUN_ID",
41+
help="A run id for the Cosmo Tech API",
42+
metavar="run-XXXXXX",
43+
type=str,
44+
show_envvar=True,
45+
required=True)
46+
@click.option("--table-prefix",
47+
help="Prefix to add to the table name",
48+
metavar="PREFIX",
49+
type=str,
50+
default="Cosmotech_")
51+
@click.option('--postgres-host',
52+
help='Postgresql host URI',
53+
envvar="POSTGRES_HOST_URI",
54+
show_envvar=True,
55+
required=True)
56+
@click.option('--postgres-port',
57+
help='Postgresql database port',
58+
envvar="POSTGRES_HOST_PORT",
59+
show_envvar=True,
60+
required=False,
61+
default=5432)
62+
@click.option('--postgres-db',
63+
help='Postgresql database name',
64+
envvar="POSTGRES_DB_NAME",
65+
show_envvar=True,
66+
required=True)
67+
@click.option('--postgres-schema',
68+
help='Postgresql schema name',
69+
envvar="POSTGRES_DB_SCHEMA",
70+
show_envvar=True,
71+
required=True)
72+
@click.option('--postgres-user',
73+
help='Postgresql connection user name',
74+
envvar="POSTGRES_USER_NAME",
75+
show_envvar=True,
76+
required=True)
77+
@click.option('--postgres-password',
78+
help='Postgresql connection password',
79+
envvar="POSTGRES_USER_PASSWORD",
80+
show_envvar=True,
81+
required=True)
82+
def postgres_send_run_metadata(
83+
organization_id,
84+
workspace_id,
85+
runner_id,
86+
run_id,
87+
table_prefix: str,
88+
postgres_host,
89+
postgres_port,
90+
postgres_db,
91+
postgres_schema,
92+
postgres_user,
93+
postgres_password
94+
):
95+
"""Send a file to a workspace inside the API
96+
97+
Requires a valid connection to the API to send the data
98+
99+
This implementation make use of an API Key
100+
"""
101+
102+
with get_api_client()[0] as api_client:
103+
runner = get_run_metadata(api_client, organization_id, workspace_id, runner_id, run_id)
104+
105+
postgresql_full_uri = generate_postgresql_full_uri(postgres_host,
106+
postgres_port,
107+
postgres_db,
108+
postgres_user,
109+
postgres_password)
110+
111+
with dbapi.connect(postgresql_full_uri, autocommit=True) as conn:
112+
with conn.cursor() as curs:
113+
schema_table = f"{postgres_schema}.{table_prefix}RunnerMetadata"
114+
sql_create_table = f"""
115+
CREATE TABLE IF NOT EXISTS {schema_table} (
116+
id varchar(32) PRIMARY KEY,
117+
name varchar(256),
118+
last_run_id varchar(32),
119+
run_template_id varchar(32)
120+
);
121+
"""
122+
sql_upsert = f"""
123+
INSERT INTO {schema_table} (id, name, last_run_id, run_template_id)
124+
VALUES(%s, %s, %s, %s)
125+
ON CONFLICT (id)
126+
DO
127+
UPDATE SET name = EXCLUDED.name, last_run_id = EXCLUDED.last_run_id;
128+
"""
129+
LOGGER.info(f"creating table [cyan bold]{schema_table}[/]")
130+
curs.execute(sql_create_table)
131+
conn.commit()
132+
LOGGER.info(f"adding/updating runner metadata")
133+
curs.execute(
134+
sql_upsert,
135+
(
136+
runner.get("id"),
137+
runner.get("name"),
138+
runner.get("lastRunId"),
139+
runner.get("runTemplateId"),
140+
),
141+
)
142+
conn.commit()
143+
LOGGER.info("Runner metadata table has been updated")

cosmotech/coal/cli/commands/store/dump_to_postgresql.py

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@
77

88
from time import perf_counter
99

10-
from adbc_driver_postgresql import dbapi
11-
1210
from cosmotech.coal.cli.utils.click import click
1311
from cosmotech.coal.cli.utils.decorators import web_help
1412
from cosmotech.coal.store.store import Store
1513
from cosmotech.coal.utils.logger import LOGGER
14+
from cosmotech.coal.utils.postgresql import send_pyarrow_table_to_postgresql
1615

1716

1817
@click.command()
@@ -88,39 +87,34 @@ def dump_to_postgresql(
8887
"""
8988
_s = Store(store_location=store_folder)
9089

91-
postgresql_full_uri = (f'postgresql://'
92-
f'{postgres_user}'
93-
f':{postgres_password}'
94-
f'@{postgres_host}'
95-
f':{postgres_port}'
96-
f'/{postgres_db}')
97-
9890
tables = list(_s.list_tables())
9991
if len(tables):
10092
LOGGER.info(f"Sending tables to [green bold]{postgres_db}.{postgres_schema}[/]")
10193
total_rows = 0
10294
_process_start = perf_counter()
103-
with dbapi.connect(postgresql_full_uri, autocommit=True) as conn:
104-
for table_name in tables:
105-
with conn.cursor() as curs:
106-
_s_time = perf_counter()
107-
target_table_name = f"{table_prefix}{table_name}"
108-
LOGGER.info(f" - [yellow]{target_table_name}[/]:")
109-
data = _s.get_table(table_name)
110-
if not len(data):
111-
LOGGER.info(f" -> [cyan bold]0[/] rows (skipping)")
112-
continue
113-
_dl_time = perf_counter()
114-
rows = curs.adbc_ingest(
115-
target_table_name,
116-
data,
117-
"replace" if replace else "create_append",
118-
db_schema_name=postgres_schema)
119-
total_rows += rows
120-
_up_time = perf_counter()
121-
LOGGER.info(f" -> [cyan bold]{rows}[/] rows")
122-
LOGGER.debug(f" -> Load from datastore took [blue]{_dl_time - _s_time:0.3}s[/]")
123-
LOGGER.debug(f" -> Send to postgresql took [blue]{_up_time - _dl_time:0.3}s[/]")
95+
for table_name in tables:
96+
_s_time = perf_counter()
97+
target_table_name = f"{table_prefix}{table_name}"
98+
LOGGER.info(f" - [yellow]{target_table_name}[/]:")
99+
data = _s.get_table(table_name)
100+
if not len(data):
101+
LOGGER.info(f" -> [cyan bold]0[/] rows (skipping)")
102+
continue
103+
_dl_time = perf_counter()
104+
rows = send_pyarrow_table_to_postgresql(data,
105+
target_table_name,
106+
postgres_host,
107+
postgres_port,
108+
postgres_db,
109+
postgres_schema,
110+
postgres_user,
111+
postgres_password,
112+
replace)
113+
total_rows += rows
114+
_up_time = perf_counter()
115+
LOGGER.info(f" -> [cyan bold]{rows}[/] rows")
116+
LOGGER.debug(f" -> Load from datastore took [blue]{_dl_time - _s_time:0.3}s[/]")
117+
LOGGER.debug(f" -> Send to postgresql took [blue]{_up_time - _dl_time:0.3}s[/]")
124118
_process_end = perf_counter()
125119
LOGGER.info(f"Sent [cyan bold]{total_rows}[/] rows "
126120
f"in [blue]{_process_end - _process_start:0.3}s[/]")
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Copyright (C) - 2023 - 2024 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
from typing import Any
8+
from typing import Optional
9+
10+
import cosmotech_api
11+
12+
13+
def get_run_metadata(
14+
api_client: cosmotech_api.api_client.ApiClient,
15+
organization_id: str,
16+
workspace_id: str,
17+
runner_id: str,
18+
run_id: str,
19+
include: Optional[list[str]] = None,
20+
exclude: Optional[list[str]] = None,
21+
) -> dict[str, Any]:
22+
run_api = cosmotech_api.RunApi(api_client)
23+
24+
run: cosmotech_api.Run = run_api.get_run(organization_id,
25+
workspace_id,
26+
runner_id,
27+
run_id)
28+
return run.model_dump(by_alias=True, exclude_none=True, include=include, exclude=exclude, mode='json')
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Copyright (C) - 2023 - 2024 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
from typing import Any
8+
from typing import Optional
9+
10+
import cosmotech_api
11+
12+
13+
def get_runner_metadata(
14+
api_client: cosmotech_api.api_client.ApiClient,
15+
organization_id: str,
16+
workspace_id: str,
17+
runner_id: str,
18+
include: Optional[list[str]] = None,
19+
exclude: Optional[list[str]] = None,
20+
) -> dict[str, Any]:
21+
runner_api = cosmotech_api.RunnerApi(api_client)
22+
runner: cosmotech_api.Runner = runner_api.get_runner(organization_id,
23+
workspace_id,
24+
runner_id)
25+
26+
return runner.model_dump(by_alias=True, exclude_none=True, include=include, exclude=exclude, mode='json')

cosmotech/coal/utils/postgresql.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Copyright (C) - 2023 - 2024 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
8+
from adbc_driver_postgresql import dbapi
9+
from pyarrow import Table
10+
11+
12+
def generate_postgresql_full_uri(
13+
postgres_host: str,
14+
postgres_port: str,
15+
postgres_db: str,
16+
postgres_user: str,
17+
postgres_password: str, ) -> str:
18+
return ('postgresql://' +
19+
f'{postgres_user}'
20+
f':{postgres_password}'
21+
f'@{postgres_host}'
22+
f':{postgres_port}'
23+
f'/{postgres_db}')
24+
25+
26+
def send_pyarrow_table_to_postgresql(
27+
data: Table,
28+
target_table_name: str,
29+
postgres_host: str,
30+
postgres_port: str,
31+
postgres_db: str,
32+
postgres_schema: str,
33+
postgres_user: str,
34+
postgres_password: str,
35+
replace: bool
36+
) -> int:
37+
total = 0
38+
39+
postgresql_full_uri = generate_postgresql_full_uri(postgres_host,
40+
postgres_port,
41+
postgres_db,
42+
postgres_user,
43+
postgres_password)
44+
with dbapi.connect(postgresql_full_uri, autocommit=True) as conn:
45+
with conn.cursor() as curs:
46+
total += curs.adbc_ingest(
47+
target_table_name,
48+
data,
49+
"replace" if replace else "create_append",
50+
db_schema_name=postgres_schema)
51+
52+
return total

0 commit comments

Comments
 (0)