11import logging
22import random
3+ import string
34
45import boto3
56import pandas as pd
1516 extract_cloudformation_outputs ,
1617 get_df ,
1718 get_df_category ,
19+ get_time_str_with_random_suffix ,
20+ path_generator
1821)
1922
2023logging .basicConfig (level = logging .INFO , format = "[%(asctime)s][%(levelname)s][%(name)s][%(funcName)s] %(message)s" )
@@ -27,6 +30,11 @@ def cloudformation_outputs():
2730 yield extract_cloudformation_outputs ()
2831
2932
33+ @pytest .fixture (scope = "function" )
34+ def path (bucket ):
35+ yield from path_generator (bucket )
36+
37+
3038@pytest .fixture (scope = "module" )
3139def bucket (cloudformation_outputs ):
3240 if "BucketName" in cloudformation_outputs :
@@ -63,6 +71,15 @@ def glue_database(cloudformation_outputs):
6371 yield cloudformation_outputs ["GlueDatabaseName" ]
6472
6573
74+ @pytest .fixture (scope = "function" )
75+ def glue_table (glue_database ):
76+ name = f"tbl_{ get_time_str_with_random_suffix ()} "
77+ print (f"Table name: { name } " )
78+ wr .catalog .delete_table_if_exists (database = glue_database , table = name )
79+ yield name
80+ wr .catalog .delete_table_if_exists (database = glue_database , table = name )
81+
82+
6683@pytest .fixture (scope = "module" )
6784def external_schema (cloudformation_outputs , parameters , glue_database ):
6885 region = cloudformation_outputs .get ("Region" )
@@ -529,3 +546,60 @@ def test_null(parameters, db_type):
529546 df2 = wr .db .read_sql_table (table = table , schema = schema , con = engine )
530547 df ["id" ] = df ["id" ].astype ("Int64" )
531548 assert pd .concat (objs = [df , df ], ignore_index = True ).equals (df2 )
549+
550+
551+ def test_redshift_spectrum_long_string (path , glue_table , glue_database , external_schema ):
552+ df = pd .DataFrame ({
553+ "id" : [1 , 2 ],
554+ "col_str" : [
555+ '' .join (random .choice (string .ascii_letters ) for _ in range (300 )),
556+ '' .join (random .choice (string .ascii_letters ) for _ in range (300 ))
557+ ]
558+ })
559+ paths = wr .s3 .to_parquet (
560+ df = df ,
561+ path = path ,
562+ database = glue_database ,
563+ table = glue_table ,
564+ mode = "overwrite" ,
565+ index = False ,
566+ dataset = True ,
567+ )["paths" ]
568+ wr .s3 .wait_objects_exist (paths = paths , use_threads = False )
569+ engine = wr .catalog .get_engine (connection = "aws-data-wrangler-redshift" )
570+ with engine .connect () as con :
571+ cursor = con .execute (f"SELECT * FROM { external_schema } .{ glue_table } " )
572+ rows = cursor .fetchall ()
573+ assert len (rows ) == len (df .index )
574+ for row in rows :
575+ assert len (row ) == len (df .columns )
576+
577+
578+ def test_redshift_copy_unload_long_string (path , parameters ):
579+ df = pd .DataFrame ({
580+ "id" : [1 , 2 ],
581+ "col_str" : [
582+ '' .join (random .choice (string .ascii_letters ) for _ in range (300 )),
583+ '' .join (random .choice (string .ascii_letters ) for _ in range (300 ))
584+ ]
585+ })
586+ engine = wr .catalog .get_engine (connection = "aws-data-wrangler-redshift" )
587+ wr .db .copy_to_redshift (
588+ df = df ,
589+ path = path ,
590+ con = engine ,
591+ schema = "public" ,
592+ table = "test_redshift_copy_unload_long_string" ,
593+ mode = "overwrite" ,
594+ varchar_lengths = {"col_str" : 300 },
595+ iam_role = parameters ["redshift" ]["role" ],
596+ )
597+ df2 = wr .db .unload_redshift (
598+ sql = "SELECT * FROM public.test_redshift_copy_unload_long_string" ,
599+ con = engine ,
600+ iam_role = parameters ["redshift" ]["role" ],
601+ path = path ,
602+ keep_files = False ,
603+ )
604+ assert len (df2 .index ) == 2
605+ assert len (df2 .columns ) == 2
0 commit comments