@@ -106,6 +106,27 @@ def _get_primary_keys(cursor: "redshift_connector.Cursor", schema: str, table: s
106106 return fields
107107
108108
109+ def _get_table_columns (cursor : "redshift_connector.Cursor" , schema : str , table : str ) -> list [str ]:
110+ sql = f"SELECT column_name FROM svv_columns\n WHERE table_schema = '{ schema } ' AND table_name = '{ table } '"
111+ _logger .debug ("Executing select query:\n %s" , sql )
112+ cursor .execute (sql )
113+ result : tuple [list [str ]] = cursor .fetchall ()
114+ columns = ["" .join (lst ) for lst in result ]
115+ return columns
116+
117+
118+ def _add_table_columns (
119+ cursor : "redshift_connector.Cursor" , schema : str , table : str , new_columns : dict [str , str ]
120+ ) -> None :
121+ for column_name , column_type in new_columns .items ():
122+ sql = (
123+ f"ALTER TABLE { _identifier (schema )} .{ _identifier (table )} "
124+ f"\n ADD COLUMN { _identifier (column_name )} { column_type } ;"
125+ )
126+ _logger .debug ("Executing alter query:\n %s" , sql )
127+ cursor .execute (sql )
128+
129+
109130def _does_table_exist (cursor : "redshift_connector.Cursor" , schema : str | None , table : str ) -> bool :
110131 schema_str = f"TABLE_SCHEMA = '{ schema } ' AND" if schema else ""
111132 sql = (
@@ -128,6 +149,16 @@ def _get_paths_from_manifest(path: str, boto3_session: boto3.Session | None = No
128149 return paths
129150
130151
152+ def _get_parameter_setting (cursor : "redshift_connector.Cursor" , parameter_name : str ) -> str :
153+ sql = f"SHOW { parameter_name } "
154+ _logger .debug ("Executing select query:\n %s" , sql )
155+ cursor .execute (sql )
156+ result = cursor .fetchall ()
157+ status = str (result [0 ][0 ])
158+ _logger .debug (f"{ parameter_name } ='{ status } '" )
159+ return status
160+
161+
131162def _lock (
132163 cursor : "redshift_connector.Cursor" ,
133164 table_names : list [str ],
@@ -267,7 +298,90 @@ def _redshift_types_from_path(
267298 return redshift_types
268299
269300
270- def _create_table ( # noqa: PLR0912,PLR0913,PLR0915
301+ def _get_rsh_columns_types (
302+ df : pd .DataFrame | None ,
303+ path : str | list [str ] | None ,
304+ index : bool ,
305+ dtype : dict [str , str ] | None ,
306+ varchar_lengths_default : int ,
307+ varchar_lengths : dict [str , int ] | None ,
308+ data_format : Literal ["parquet" , "orc" , "csv" ] = "parquet" ,
309+ redshift_column_types : dict [str , str ] | None = None ,
310+ parquet_infer_sampling : float = 1.0 ,
311+ path_suffix : str | None = None ,
312+ path_ignore_suffix : str | list [str ] | None = None ,
313+ manifest : bool | None = False ,
314+ use_threads : bool | int = True ,
315+ boto3_session : boto3 .Session | None = None ,
316+ s3_additional_kwargs : dict [str , str ] | None = None ,
317+ ) -> dict [str , str ]:
318+ if df is not None :
319+ redshift_types : dict [str , str ] = _data_types .database_types_from_pandas (
320+ df = df ,
321+ index = index ,
322+ dtype = dtype ,
323+ varchar_lengths_default = varchar_lengths_default ,
324+ varchar_lengths = varchar_lengths ,
325+ converter_func = _data_types .pyarrow2redshift ,
326+ )
327+ _logger .debug ("Converted redshift types from pandas: %s" , redshift_types )
328+ elif path is not None :
329+ if manifest :
330+ if not isinstance (path , str ):
331+ raise TypeError (
332+ f"""type: { type (path )} is not a valid type for 'path' when 'manifest' is set to True;
333+ must be a string"""
334+ )
335+ path = _get_paths_from_manifest (
336+ path = path ,
337+ boto3_session = boto3_session ,
338+ )
339+
340+ if data_format in ["parquet" , "orc" ]:
341+ redshift_types = _redshift_types_from_path (
342+ path = path ,
343+ data_format = data_format , # type: ignore[arg-type]
344+ varchar_lengths_default = varchar_lengths_default ,
345+ varchar_lengths = varchar_lengths ,
346+ parquet_infer_sampling = parquet_infer_sampling ,
347+ path_suffix = path_suffix ,
348+ path_ignore_suffix = path_ignore_suffix ,
349+ use_threads = use_threads ,
350+ boto3_session = boto3_session ,
351+ s3_additional_kwargs = s3_additional_kwargs ,
352+ )
353+ else :
354+ if redshift_column_types is None :
355+ raise ValueError (
356+ "redshift_column_types is None. It must be specified for files formats other than Parquet or ORC."
357+ )
358+ redshift_types = redshift_column_types
359+ else :
360+ raise ValueError ("df and path are None. You MUST pass at least one." )
361+ return redshift_types
362+
363+
364+ def _add_new_table_columns (
365+ cursor : "redshift_connector.Cursor" , schema : str , table : str , redshift_columns_types : dict [str , str ]
366+ ) -> None :
367+ # Check if Redshift is configured as case sensitive or not
368+ is_case_sensitive = False
369+ if _get_parameter_setting (cursor = cursor , parameter_name = "enable_case_sensitive_identifier" ).lower () in [
370+ "on" ,
371+ "true" ,
372+ ]:
373+ is_case_sensitive = True
374+
375+ # If it is case-insensitive, convert all the DataFrame columns names to lowercase before performing the comparison
376+ if is_case_sensitive is False :
377+ redshift_columns_types = {key .lower (): value for key , value in redshift_columns_types .items ()}
378+ actual_table_columns = set (_get_table_columns (cursor = cursor , schema = schema , table = table ))
379+ new_df_columns = {key : value for key , value in redshift_columns_types .items () if key not in actual_table_columns }
380+
381+ _add_table_columns (cursor = cursor , schema = schema , table = table , new_columns = new_df_columns )
382+
383+
384+ def _create_table ( # noqa: PLR0913
271385 df : pd .DataFrame | None ,
272386 path : str | list [str ] | None ,
273387 con : "redshift_connector.Connection" ,
@@ -336,49 +450,24 @@ def _create_table( # noqa: PLR0912,PLR0913,PLR0915
336450 return table , schema
337451 diststyle = diststyle .upper () if diststyle else "AUTO"
338452 sortstyle = sortstyle .upper () if sortstyle else "COMPOUND"
339- if df is not None :
340- redshift_types : dict [str , str ] = _data_types .database_types_from_pandas (
341- df = df ,
342- index = index ,
343- dtype = dtype ,
344- varchar_lengths_default = varchar_lengths_default ,
345- varchar_lengths = varchar_lengths ,
346- converter_func = _data_types .pyarrow2redshift ,
347- )
348- _logger .debug ("Converted redshift types from pandas: %s" , redshift_types )
349- elif path is not None :
350- if manifest :
351- if not isinstance (path , str ):
352- raise TypeError (
353- f"""type: { type (path )} is not a valid type for 'path' when 'manifest' is set to True;
354- must be a string"""
355- )
356- path = _get_paths_from_manifest (
357- path = path ,
358- boto3_session = boto3_session ,
359- )
360453
361- if data_format in ["parquet" , "orc" ]:
362- redshift_types = _redshift_types_from_path (
363- path = path ,
364- data_format = data_format , # type: ignore[arg-type]
365- varchar_lengths_default = varchar_lengths_default ,
366- varchar_lengths = varchar_lengths ,
367- parquet_infer_sampling = parquet_infer_sampling ,
368- path_suffix = path_suffix ,
369- path_ignore_suffix = path_ignore_suffix ,
370- use_threads = use_threads ,
371- boto3_session = boto3_session ,
372- s3_additional_kwargs = s3_additional_kwargs ,
373- )
374- else :
375- if redshift_column_types is None :
376- raise ValueError (
377- "redshift_column_types is None. It must be specified for files formats other than Parquet or ORC."
378- )
379- redshift_types = redshift_column_types
380- else :
381- raise ValueError ("df and path are None. You MUST pass at least one." )
454+ redshift_types = _get_rsh_columns_types (
455+ df = df ,
456+ path = path ,
457+ index = index ,
458+ dtype = dtype ,
459+ varchar_lengths_default = varchar_lengths_default ,
460+ varchar_lengths = varchar_lengths ,
461+ parquet_infer_sampling = parquet_infer_sampling ,
462+ path_suffix = path_suffix ,
463+ path_ignore_suffix = path_ignore_suffix ,
464+ use_threads = use_threads ,
465+ boto3_session = boto3_session ,
466+ s3_additional_kwargs = s3_additional_kwargs ,
467+ data_format = data_format ,
468+ redshift_column_types = redshift_column_types ,
469+ manifest = manifest ,
470+ )
382471 _validate_parameters (
383472 redshift_types = redshift_types ,
384473 diststyle = diststyle ,
0 commit comments