1111import adbc_driver_manager
1212import pyarrow as pa
1313from adbc_driver_postgresql import dbapi
14+ from cosmotech .orchestrator .utils .translate import T
1415from pyarrow import Table
1516
1617from cosmotech .coal .utils .logger import LOGGER
@@ -64,7 +65,11 @@ def get_postgresql_table_schema(
6465 Returns:
6566 PyArrow Schema if table exists, None otherwise
6667 """
67- LOGGER .debug (f"Getting schema for table { postgres_schema } .{ target_table_name } " )
68+ LOGGER .debug (
69+ T ("coal.logs.postgresql.getting_schema" ).format (
70+ postgres_schema = postgres_schema , target_table_name = target_table_name
71+ )
72+ )
6873
6974 postgresql_full_uri = generate_postgresql_full_uri (
7075 postgres_host ,
@@ -82,17 +87,21 @@ def get_postgresql_table_schema(
8287 db_schema_filter = postgres_schema ,
8388 )
8489 except adbc_driver_manager .ProgrammingError :
85- LOGGER .warning (f"Table { postgres_schema } .{ target_table_name } not found" )
90+ LOGGER .warning (
91+ T ("coal.logs.postgresql.table_not_found" ).format (
92+ postgres_schema = postgres_schema , target_table_name = target_table_name
93+ )
94+ )
8695 return None
8796
8897
8998def adapt_table_to_schema (data : pa .Table , target_schema : pa .Schema ) -> pa .Table :
9099 """
91100 Adapt a PyArrow table to match a target schema with detailed logging.
92101 """
93- LOGGER .debug (f"Starting schema adaptation for table with { len (data )} rows" )
94- LOGGER .debug (f"Original schema: { data .schema } " )
95- LOGGER .debug (f"Target schema: { target_schema } " )
102+ LOGGER .debug (T ( "coal.logs.postgresql.schema_adaptation_start" ). format ( rows = len (data )) )
103+ LOGGER .debug (T ( "coal.logs.postgresql.original_schema" ). format ( schema = data .schema ) )
104+ LOGGER .debug (T ( "coal.logs.postgresql. target_schema" ). format ( schema = target_schema ) )
96105
97106 target_fields = {field .name : field .type for field in target_schema }
98107 new_columns = []
@@ -112,53 +121,55 @@ def adapt_table_to_schema(data: pa.Table, target_schema: pa.Schema) -> pa.Table:
112121
113122 if original_type != target_type :
114123 LOGGER .debug (
115- f"Attempting to cast column '{ field_name } ' "
116- f"from { original_type } to { target_type } "
124+ T ("coal.logs.postgresql.casting_column" ).format (
125+ field_name = field_name ,
126+ original_type = original_type ,
127+ target_type = target_type ,
128+ )
117129 )
118130 try :
119131 new_col = pa .compute .cast (col , target_type )
120132 new_columns .append (new_col )
121- type_conversions .append (
122- f"{ field_name } : { original_type } -> { target_type } "
123- )
133+ type_conversions .append (f"{ field_name } : { original_type } -> { target_type } " )
124134 except pa .ArrowInvalid as e :
125135 LOGGER .warning (
126- f"Failed to cast column '{ field_name } ' "
127- f"from { original_type } to { target_type } . "
128- f"Filling with nulls. Error: { str (e )} "
136+ T ("coal.logs.postgresql.cast_failed" ).format (
137+ field_name = field_name ,
138+ original_type = original_type ,
139+ target_type = target_type ,
140+ error = str (e ),
141+ )
129142 )
130143 new_columns .append (pa .nulls (len (data ), type = target_type ))
131- failed_conversions .append (
132- f"{ field_name } : { original_type } -> { target_type } "
133- )
144+ failed_conversions .append (f"{ field_name } : { original_type } -> { target_type } " )
134145 else :
135146 new_columns .append (col )
136147 else :
137148 # Column doesn't exist - add nulls
138- LOGGER .debug (f"Adding missing column ' { field_name } ' with null values" )
149+ LOGGER .debug (T ( "coal.logs.postgresql.adding_missing_column" ). format ( field_name = field_name ) )
139150 new_columns .append (pa .nulls (len (data ), type = target_type ))
140151 added_columns .append (field_name )
141152
142153 # Log columns that will be dropped
143154 dropped_columns = [name for name in data .column_names if name not in target_fields ]
144155 if dropped_columns :
145- LOGGER .debug (f"Dropping extra columns not in target schema: { dropped_columns } " )
156+ LOGGER .debug (T ( "coal.logs.postgresql.dropping_columns" ). format ( columns = dropped_columns ) )
146157
147158 # Create new table
148159 adapted_table = pa .Table .from_arrays (new_columns , schema = target_schema )
149160
150161 # Log summary of adaptations
151- LOGGER .debug ("Schema adaptation summary:" )
162+ LOGGER .debug (T ( "coal.logs.postgresql.adaptation_summary" ) )
152163 if added_columns :
153- LOGGER .debug (f"- Added columns (filled with nulls): { added_columns } " )
164+ LOGGER .debug (T ( "coal.logs.postgresql. added_columns" ). format ( columns = added_columns ) )
154165 if dropped_columns :
155- LOGGER .debug (f"- Dropped columns: { dropped_columns } " )
166+ LOGGER .debug (T ( "coal.logs.postgresql. dropped_columns" ). format ( columns = dropped_columns ) )
156167 if type_conversions :
157- LOGGER .debug (f"- Successful type conversions: { type_conversions } " )
168+ LOGGER .debug (T ( "coal.logs.postgresql.successful_conversions" ). format ( conversions = type_conversions ) )
158169 if failed_conversions :
159- LOGGER .debug (f"- Failed conversions (filled with nulls): { failed_conversions } " )
170+ LOGGER .debug (T ( "coal.logs.postgresql. failed_conversions" ). format ( conversions = failed_conversions ) )
160171
161- LOGGER .debug (f"Final adapted table schema: { adapted_table .schema } " )
172+ LOGGER .debug (T ( "coal.logs.postgresql.final_schema" ). format ( schema = adapted_table .schema ) )
162173 return adapted_table
163174
164175
@@ -175,9 +186,11 @@ def send_pyarrow_table_to_postgresql(
175186 force_encode : bool = False ,
176187) -> int :
177188 LOGGER .debug (
178- f"Preparing to send data to PostgreSQL table '{ postgres_schema } .{ target_table_name } '"
189+ T ("coal.logs.postgresql.preparing_send" ).format (
190+ postgres_schema = postgres_schema , target_table_name = target_table_name
191+ )
179192 )
180- LOGGER .debug (f"Input table has { len (data )} rows" )
193+ LOGGER .debug (T ( "coal.logs.postgresql.input_rows" ). format ( rows = len (data )) )
181194
182195 # Get existing schema if table exists
183196 existing_schema = get_postgresql_table_schema (
@@ -192,14 +205,14 @@ def send_pyarrow_table_to_postgresql(
192205 )
193206
194207 if existing_schema is not None :
195- LOGGER .debug (f"Found existing table with schema: { existing_schema } " )
208+ LOGGER .debug (T ( "coal.logs.postgresql.found_existing_table" ). format ( schema = existing_schema ) )
196209 if not replace :
197- LOGGER .debug ("Adapting incoming data to match existing schema" )
210+ LOGGER .debug (T ( "coal.logs.postgresql.adapting_data" ) )
198211 data = adapt_table_to_schema (data , existing_schema )
199212 else :
200- LOGGER .debug ("Replace mode enabled - skipping schema adaptation" )
213+ LOGGER .debug (T ( "coal.logs.postgresql.replace_mode" ) )
201214 else :
202- LOGGER .debug ("No existing table found - will create new table" )
215+ LOGGER .debug (T ( "coal.logs.postgresql.no_existing_table" ) )
203216
204217 # Proceed with ingestion
205218 total = 0
@@ -212,18 +225,12 @@ def send_pyarrow_table_to_postgresql(
212225 force_encode ,
213226 )
214227
215- LOGGER .debug ("Connecting to PostgreSQL database" )
228+ LOGGER .debug (T ( "coal.logs.postgresql.connecting" ) )
216229 with dbapi .connect (postgresql_full_uri , autocommit = True ) as conn :
217230 with conn .cursor () as curs :
218- LOGGER .debug (
219- f"Ingesting data with mode: { 'replace' if replace else 'create_append' } "
220- )
221- total += curs .adbc_ingest (
222- target_table_name ,
223- data ,
224- "replace" if replace else "create_append" ,
225- db_schema_name = postgres_schema ,
226- )
231+ mode = "replace" if replace else "create_append"
232+ LOGGER .debug (T ("coal.logs.postgresql.ingesting_data" ).format (mode = mode ))
233+ total += curs .adbc_ingest (target_table_name , data , mode , db_schema_name = postgres_schema )
227234
228- LOGGER .debug (f"Successfully ingested { total } rows" )
235+ LOGGER .debug (T ( "coal.logs.postgresql.ingestion_success" ). format ( rows = total ) )
229236 return total
0 commit comments