88from typing import Optional
99from urllib .parse import quote
1010
11+ import adbc_driver_manager
1112import pyarrow as pa
1213from adbc_driver_postgresql import dbapi
14+ from cosmotech .orchestrator .utils .translate import T
1315from pyarrow import Table
1416
1517from cosmotech .coal .utils .logger import LOGGER
@@ -20,16 +22,22 @@ def generate_postgresql_full_uri(
2022 postgres_port : str ,
2123 postgres_db : str ,
2224 postgres_user : str ,
23- postgres_password : str , ) -> str :
25+ postgres_password : str ,
26+ force_encode : bool = False ,
27+ ) -> str :
2428 # Check if password needs percent encoding (contains special characters)
2529 # We don't log anything about the password for security
26- encoded_password = quote (postgres_password , safe = '' )
27- return ('postgresql://' +
28- f'{ postgres_user } '
29- f':{ encoded_password } '
30- f'@{ postgres_host } '
31- f':{ postgres_port } '
32- f'/{ postgres_db } ' )
30+ encoded_password = postgres_password
31+ if force_encode :
32+ encoded_password = quote (postgres_password , safe = "" )
33+
34+ return (
35+ "postgresql://" + f"{ postgres_user } "
36+ f":{ encoded_password } "
37+ f"@{ postgres_host } "
38+ f":{ postgres_port } "
39+ f"/{ postgres_db } "
40+ )
3341
3442
3543def get_postgresql_table_schema (
@@ -40,10 +48,11 @@ def get_postgresql_table_schema(
4048 postgres_schema : str ,
4149 postgres_user : str ,
4250 postgres_password : str ,
51+ force_encode : bool = False ,
4352) -> Optional [pa .Schema ]:
4453 """
4554 Get the schema of an existing PostgreSQL table using SQL queries.
46-
55+
4756 Args:
4857 target_table_name: Name of the table
4958 postgres_host: PostgreSQL host
@@ -52,46 +61,47 @@ def get_postgresql_table_schema(
5261 postgres_schema: PostgreSQL schema name
5362 postgres_user: PostgreSQL username
5463 postgres_password: PostgreSQL password
55-
64+
5665 Returns:
5766 PyArrow Schema if table exists, None otherwise
5867 """
59- LOGGER .debug (f"Getting schema for table { postgres_schema } .{ target_table_name } " )
68+ LOGGER .debug (
69+ T ("coal.logs.postgresql.getting_schema" ).format (
70+ postgres_schema = postgres_schema , target_table_name = target_table_name
71+ )
72+ )
6073
61- postgresql_full_uri = generate_postgresql_full_uri (postgres_host ,
62- postgres_port ,
63- postgres_db ,
64- postgres_user ,
65- postgres_password )
74+ postgresql_full_uri = generate_postgresql_full_uri (
75+ postgres_host ,
76+ postgres_port ,
77+ postgres_db ,
78+ postgres_user ,
79+ postgres_password ,
80+ force_encode ,
81+ )
6682
67- with ( dbapi .connect (postgresql_full_uri ) as conn ) :
83+ with dbapi .connect (postgresql_full_uri ) as conn :
6884 try :
69- catalog = conn .adbc_get_objects (depth = "tables" ,
70- catalog_filter = postgres_db ,
71- db_schema_filter = postgres_schema ,
72- table_name_filter = target_table_name ).read_all ().to_pylist ()[0 ]
73- schema = catalog ["catalog_db_schemas" ][0 ]
74- table = schema ["db_schema_tables" ][0 ]
75- if table ["table_name" ] == target_table_name :
76- return conn .adbc_get_table_schema (
77- target_table_name ,
78- db_schema_filter = postgres_schema ,
85+ return conn .adbc_get_table_schema (
86+ target_table_name ,
87+ db_schema_filter = postgres_schema ,
88+ )
89+ except adbc_driver_manager .ProgrammingError :
90+ LOGGER .warning (
91+ T ("coal.logs.postgresql.table_not_found" ).format (
92+ postgres_schema = postgres_schema , target_table_name = target_table_name
7993 )
80- except IndexError :
81- LOGGER .warning (f"Table { postgres_schema } .{ target_table_name } not found" )
94+ )
8295 return None
8396
8497
85- def adapt_table_to_schema (
86- data : pa .Table ,
87- target_schema : pa .Schema
88- ) -> pa .Table :
98+ def adapt_table_to_schema (data : pa .Table , target_schema : pa .Schema ) -> pa .Table :
8999 """
90100 Adapt a PyArrow table to match a target schema with detailed logging.
91101 """
92- LOGGER .debug (f"Starting schema adaptation for table with { len (data )} rows" )
93- LOGGER .debug (f"Original schema: { data .schema } " )
94- LOGGER .debug (f"Target schema: { target_schema } " )
102+ LOGGER .debug (T ( "coal.logs.postgresql.schema_adaptation_start" ). format ( rows = len (data )) )
103+ LOGGER .debug (T ( "coal.logs.postgresql.original_schema" ). format ( schema = data .schema ) )
104+ LOGGER .debug (T ( "coal.logs.postgresql. target_schema" ). format ( schema = target_schema ) )
95105
96106 target_fields = {field .name : field .type for field in target_schema }
97107 new_columns = []
@@ -111,63 +121,55 @@ def adapt_table_to_schema(
111121
112122 if original_type != target_type :
113123 LOGGER .debug (
114- f"Attempting to cast column '{ field_name } ' "
115- f"from { original_type } to { target_type } "
124+ T ("coal.logs.postgresql.casting_column" ).format (
125+ field_name = field_name ,
126+ original_type = original_type ,
127+ target_type = target_type ,
128+ )
116129 )
117130 try :
118131 new_col = pa .compute .cast (col , target_type )
119132 new_columns .append (new_col )
120- type_conversions .append (
121- f"{ field_name } : { original_type } -> { target_type } "
122- )
133+ type_conversions .append (f"{ field_name } : { original_type } -> { target_type } " )
123134 except pa .ArrowInvalid as e :
124135 LOGGER .warning (
125- f"Failed to cast column '{ field_name } ' "
126- f"from { original_type } to { target_type } . "
127- f"Filling with nulls. Error: { str (e )} "
136+ T ("coal.logs.postgresql.cast_failed" ).format (
137+ field_name = field_name ,
138+ original_type = original_type ,
139+ target_type = target_type ,
140+ error = str (e ),
141+ )
128142 )
129143 new_columns .append (pa .nulls (len (data ), type = target_type ))
130- failed_conversions .append (
131- f"{ field_name } : { original_type } -> { target_type } "
132- )
144+ failed_conversions .append (f"{ field_name } : { original_type } -> { target_type } " )
133145 else :
134146 new_columns .append (col )
135147 else :
136148 # Column doesn't exist - add nulls
137- LOGGER .debug (f"Adding missing column ' { field_name } ' with null values" )
149+ LOGGER .debug (T ( "coal.logs.postgresql.adding_missing_column" ). format ( field_name = field_name ) )
138150 new_columns .append (pa .nulls (len (data ), type = target_type ))
139151 added_columns .append (field_name )
140152
141153 # Log columns that will be dropped
142- dropped_columns = [
143- name for name in data .column_names
144- if name not in target_fields
145- ]
154+ dropped_columns = [name for name in data .column_names if name not in target_fields ]
146155 if dropped_columns :
147- LOGGER .debug (
148- f"Dropping extra columns not in target schema: { dropped_columns } "
149- )
156+ LOGGER .debug (T ("coal.logs.postgresql.dropping_columns" ).format (columns = dropped_columns ))
150157
151158 # Create new table
152- adapted_table = pa .Table .from_arrays (
153- new_columns ,
154- schema = target_schema
155- )
159+ adapted_table = pa .Table .from_arrays (new_columns , schema = target_schema )
156160
157161 # Log summary of adaptations
158- LOGGER .debug ("Schema adaptation summary:" )
162+ LOGGER .debug (T ( "coal.logs.postgresql.adaptation_summary" ) )
159163 if added_columns :
160- LOGGER .debug (f"- Added columns (filled with nulls): { added_columns } " )
164+ LOGGER .debug (T ( "coal.logs.postgresql. added_columns" ). format ( columns = added_columns ) )
161165 if dropped_columns :
162- LOGGER .debug (f"- Dropped columns: { dropped_columns } " )
166+ LOGGER .debug (T ( "coal.logs.postgresql. dropped_columns" ). format ( columns = dropped_columns ) )
163167 if type_conversions :
164- LOGGER .debug (f"- Successful type conversions: { type_conversions } " )
168+ LOGGER .debug (T ( "coal.logs.postgresql.successful_conversions" ). format ( conversions = type_conversions ) )
165169 if failed_conversions :
166- LOGGER .debug (
167- f"- Failed conversions (filled with nulls): { failed_conversions } "
168- )
170+ LOGGER .debug (T ("coal.logs.postgresql.failed_conversions" ).format (conversions = failed_conversions ))
169171
170- LOGGER .debug (f"Final adapted table schema: { adapted_table .schema } " )
172+ LOGGER .debug (T ( "coal.logs.postgresql.final_schema" ). format ( schema = adapted_table .schema ) )
171173 return adapted_table
172174
173175
@@ -180,12 +182,15 @@ def send_pyarrow_table_to_postgresql(
180182 postgres_schema : str ,
181183 postgres_user : str ,
182184 postgres_password : str ,
183- replace : bool
185+ replace : bool ,
186+ force_encode : bool = False ,
184187) -> int :
185188 LOGGER .debug (
186- f"Preparing to send data to PostgreSQL table '{ postgres_schema } .{ target_table_name } '"
189+ T ("coal.logs.postgresql.preparing_send" ).format (
190+ postgres_schema = postgres_schema , target_table_name = target_table_name
191+ )
187192 )
188- LOGGER .debug (f"Input table has { len (data )} rows" )
193+ LOGGER .debug (T ( "coal.logs.postgresql.input_rows" ). format ( rows = len (data )) )
189194
190195 # Get existing schema if table exists
191196 existing_schema = get_postgresql_table_schema (
@@ -195,18 +200,19 @@ def send_pyarrow_table_to_postgresql(
195200 postgres_db ,
196201 postgres_schema ,
197202 postgres_user ,
198- postgres_password
203+ postgres_password ,
204+ force_encode ,
199205 )
200206
201207 if existing_schema is not None :
202- LOGGER .debug (f"Found existing table with schema: { existing_schema } " )
208+ LOGGER .debug (T ( "coal.logs.postgresql.found_existing_table" ). format ( schema = existing_schema ) )
203209 if not replace :
204- LOGGER .debug ("Adapting incoming data to match existing schema" )
210+ LOGGER .debug (T ( "coal.logs.postgresql.adapting_data" ) )
205211 data = adapt_table_to_schema (data , existing_schema )
206212 else :
207- LOGGER .debug ("Replace mode enabled - skipping schema adaptation" )
213+ LOGGER .debug (T ( "coal.logs.postgresql.replace_mode" ) )
208214 else :
209- LOGGER .debug ("No existing table found - will create new table" )
215+ LOGGER .debug (T ( "coal.logs.postgresql.no_existing_table" ) )
210216
211217 # Proceed with ingestion
212218 total = 0
@@ -215,21 +221,16 @@ def send_pyarrow_table_to_postgresql(
215221 postgres_port ,
216222 postgres_db ,
217223 postgres_user ,
218- postgres_password
224+ postgres_password ,
225+ force_encode ,
219226 )
220227
221- LOGGER .debug ("Connecting to PostgreSQL database" )
228+ LOGGER .debug (T ( "coal.logs.postgresql.connecting" ) )
222229 with dbapi .connect (postgresql_full_uri , autocommit = True ) as conn :
223230 with conn .cursor () as curs :
224- LOGGER .debug (
225- f"Ingesting data with mode: { 'replace' if replace else 'create_append' } "
226- )
227- total += curs .adbc_ingest (
228- target_table_name ,
229- data ,
230- "replace" if replace else "create_append" ,
231- db_schema_name = postgres_schema
232- )
231+ mode = "replace" if replace else "create_append"
232+ LOGGER .debug (T ("coal.logs.postgresql.ingesting_data" ).format (mode = mode ))
233+ total += curs .adbc_ingest (target_table_name , data , mode , db_schema_name = postgres_schema )
233234
234- LOGGER .debug (f"Successfully ingested { total } rows" )
235+ LOGGER .debug (T ( "coal.logs.postgresql.ingestion_success" ). format ( rows = total ) )
235236 return total
0 commit comments