Skip to content

Commit 3ef2df5

Browse files
authored
Merge pull request #51 from Cosmo-Tech/AFOS/get_postgres_schema_before_send_if_exists
Afos/get postgres schema before send if exists
2 parents 1b160dc + 1000cec commit 3ef2df5

File tree

2 files changed

+191
-8
lines changed

2 files changed

+191
-8
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
# Copyright (C) - 2023 - 2025 - Cosmo Tech
22
# Licensed under the MIT license.
33

4-
__version__ = '0.9.2'
4+
__version__ = '0.9.3'

cosmotech/coal/utils/postgresql.py

Lines changed: 190 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,48 +5,231 @@
55
# etc., to any person is prohibited unless it has been previously and
66
# specifically authorized by written means by Cosmo Tech.
77

8+
from typing import Optional
9+
from urllib.parse import quote
10+
11+
import pyarrow as pa
812
from adbc_driver_postgresql import dbapi
913
from pyarrow import Table
1014

15+
from cosmotech.coal.utils.logger import LOGGER
16+
1117

1218
def generate_postgresql_full_uri(
1319
postgres_host: str,
1420
postgres_port: str,
1521
postgres_db: str,
1622
postgres_user: str,
1723
postgres_password: str, ) -> str:
24+
# Check if password needs percent encoding (contains special characters)
25+
# We don't log anything about the password for security
26+
encoded_password = quote(postgres_password, safe='')
1827
return ('postgresql://' +
1928
f'{postgres_user}'
20-
f':{postgres_password}'
29+
f':{encoded_password}'
2130
f'@{postgres_host}'
2231
f':{postgres_port}'
2332
f'/{postgres_db}')
2433

2534

26-
def send_pyarrow_table_to_postgresql(
27-
data: Table,
35+
def get_postgresql_table_schema(
2836
target_table_name: str,
2937
postgres_host: str,
3038
postgres_port: str,
3139
postgres_db: str,
3240
postgres_schema: str,
3341
postgres_user: str,
3442
postgres_password: str,
35-
replace: bool
36-
) -> int:
37-
total = 0
43+
) -> Optional[pa.Schema]:
44+
"""
45+
Get the schema of an existing PostgreSQL table using SQL queries.
46+
47+
Args:
48+
target_table_name: Name of the table
49+
postgres_host: PostgreSQL host
50+
postgres_port: PostgreSQL port
51+
postgres_db: PostgreSQL database name
52+
postgres_schema: PostgreSQL schema name
53+
postgres_user: PostgreSQL username
54+
postgres_password: PostgreSQL password
55+
56+
Returns:
57+
PyArrow Schema if table exists, None otherwise
58+
"""
59+
LOGGER.debug(f"Getting schema for table {postgres_schema}.{target_table_name}")
3860

3961
postgresql_full_uri = generate_postgresql_full_uri(postgres_host,
4062
postgres_port,
4163
postgres_db,
4264
postgres_user,
4365
postgres_password)
66+
67+
with (dbapi.connect(postgresql_full_uri) as conn):
68+
try:
69+
catalog = conn.adbc_get_objects(depth="tables",
70+
catalog_filter=postgres_db,
71+
db_schema_filter=postgres_schema,
72+
table_name_filter=target_table_name).read_all().to_pylist()[0]
73+
schema = catalog["catalog_db_schemas"][0]
74+
table = schema["db_schema_tables"][0]
75+
if table["table_name"] == target_table_name:
76+
return conn.adbc_get_table_schema(
77+
target_table_name,
78+
db_schema_filter=postgres_schema,
79+
)
80+
except IndexError:
81+
LOGGER.warning(f"Table {postgres_schema}.{target_table_name} not found")
82+
return None
83+
84+
85+
def adapt_table_to_schema(
86+
data: pa.Table,
87+
target_schema: pa.Schema
88+
) -> pa.Table:
89+
"""
90+
Adapt a PyArrow table to match a target schema with detailed logging.
91+
"""
92+
LOGGER.debug(f"Starting schema adaptation for table with {len(data)} rows")
93+
LOGGER.debug(f"Original schema: {data.schema}")
94+
LOGGER.debug(f"Target schema: {target_schema}")
95+
96+
target_fields = {field.name: field.type for field in target_schema}
97+
new_columns = []
98+
99+
# Track adaptations for summary
100+
added_columns = []
101+
dropped_columns = []
102+
type_conversions = []
103+
failed_conversions = []
104+
105+
# Process each field in target schema
106+
for field_name, target_type in target_fields.items():
107+
if field_name in data.column_names:
108+
# Column exists - try to cast to target type
109+
col = data[field_name]
110+
original_type = col.type
111+
112+
if original_type != target_type:
113+
LOGGER.debug(
114+
f"Attempting to cast column '{field_name}' "
115+
f"from {original_type} to {target_type}"
116+
)
117+
try:
118+
new_col = pa.compute.cast(col, target_type)
119+
new_columns.append(new_col)
120+
type_conversions.append(
121+
f"{field_name}: {original_type} -> {target_type}"
122+
)
123+
except pa.ArrowInvalid as e:
124+
LOGGER.warning(
125+
f"Failed to cast column '{field_name}' "
126+
f"from {original_type} to {target_type}. "
127+
f"Filling with nulls. Error: {str(e)}"
128+
)
129+
new_columns.append(pa.nulls(len(data), type=target_type))
130+
failed_conversions.append(
131+
f"{field_name}: {original_type} -> {target_type}"
132+
)
133+
else:
134+
new_columns.append(col)
135+
else:
136+
# Column doesn't exist - add nulls
137+
LOGGER.debug(f"Adding missing column '{field_name}' with null values")
138+
new_columns.append(pa.nulls(len(data), type=target_type))
139+
added_columns.append(field_name)
140+
141+
# Log columns that will be dropped
142+
dropped_columns = [
143+
name for name in data.column_names
144+
if name not in target_fields
145+
]
146+
if dropped_columns:
147+
LOGGER.debug(
148+
f"Dropping extra columns not in target schema: {dropped_columns}"
149+
)
150+
151+
# Create new table
152+
adapted_table = pa.Table.from_arrays(
153+
new_columns,
154+
schema=target_schema
155+
)
156+
157+
# Log summary of adaptations
158+
LOGGER.debug("Schema adaptation summary:")
159+
if added_columns:
160+
LOGGER.debug(f"- Added columns (filled with nulls): {added_columns}")
161+
if dropped_columns:
162+
LOGGER.debug(f"- Dropped columns: {dropped_columns}")
163+
if type_conversions:
164+
LOGGER.debug(f"- Successful type conversions: {type_conversions}")
165+
if failed_conversions:
166+
LOGGER.debug(
167+
f"- Failed conversions (filled with nulls): {failed_conversions}"
168+
)
169+
170+
LOGGER.debug(f"Final adapted table schema: {adapted_table.schema}")
171+
return adapted_table
172+
173+
174+
def send_pyarrow_table_to_postgresql(
175+
data: Table,
176+
target_table_name: str,
177+
postgres_host: str,
178+
postgres_port: str,
179+
postgres_db: str,
180+
postgres_schema: str,
181+
postgres_user: str,
182+
postgres_password: str,
183+
replace: bool
184+
) -> int:
185+
LOGGER.debug(
186+
f"Preparing to send data to PostgreSQL table '{postgres_schema}.{target_table_name}'"
187+
)
188+
LOGGER.debug(f"Input table has {len(data)} rows")
189+
190+
# Get existing schema if table exists
191+
existing_schema = get_postgresql_table_schema(
192+
target_table_name,
193+
postgres_host,
194+
postgres_port,
195+
postgres_db,
196+
postgres_schema,
197+
postgres_user,
198+
postgres_password
199+
)
200+
201+
if existing_schema is not None:
202+
LOGGER.debug(f"Found existing table with schema: {existing_schema}")
203+
if not replace:
204+
LOGGER.debug("Adapting incoming data to match existing schema")
205+
data = adapt_table_to_schema(data, existing_schema)
206+
else:
207+
LOGGER.debug("Replace mode enabled - skipping schema adaptation")
208+
else:
209+
LOGGER.debug("No existing table found - will create new table")
210+
211+
# Proceed with ingestion
212+
total = 0
213+
postgresql_full_uri = generate_postgresql_full_uri(
214+
postgres_host,
215+
postgres_port,
216+
postgres_db,
217+
postgres_user,
218+
postgres_password
219+
)
220+
221+
LOGGER.debug("Connecting to PostgreSQL database")
44222
with dbapi.connect(postgresql_full_uri, autocommit=True) as conn:
45223
with conn.cursor() as curs:
224+
LOGGER.debug(
225+
f"Ingesting data with mode: {'replace' if replace else 'create_append'}"
226+
)
46227
total += curs.adbc_ingest(
47228
target_table_name,
48229
data,
49230
"replace" if replace else "create_append",
50-
db_schema_name=postgres_schema)
231+
db_schema_name=postgres_schema
232+
)
51233

234+
LOGGER.debug(f"Successfully ingested {total} rows")
52235
return total

0 commit comments

Comments
 (0)