@@ -237,7 +237,7 @@ def _find_terminator(body, sep, quoting, quotechar, lineterminator):
237237
238238 @staticmethod
239239 def _read_csv_once (session_primitives : "SessionPrimitives" , bucket_name : str , key_path : str ,
240- ** pd_additional_kwargs ):
240+ ** pd_additional_kwargs ) -> pd . DataFrame :
241241 """
242242 Read a single CSV file from Amazon S3 using optimized strategies.
243243
@@ -256,7 +256,7 @@ def _read_csv_once(session_primitives: "SessionPrimitives", bucket_name: str, ke
256256 if pd_additional_kwargs .get ('compression' , 'infer' ) == 'infer' :
257257 pd_additional_kwargs ['compression' ] = infer_compression (key_path , compression = 'infer' )
258258
259- dataframe = pd .read_csv (buff , ** pd_additional_kwargs )
259+ dataframe : pd . DataFrame = pd .read_csv (buff , ** pd_additional_kwargs )
260260 buff .close ()
261261 return dataframe
262262
@@ -1613,16 +1613,18 @@ def read_csv_list(
16131613 logger .debug (f"procs_cpu_bound: { procs_cpu_bound } " )
16141614 session_primitives = self ._session .primitives
16151615 if len (paths ) == 1 :
1616- path = paths [0 ]
1616+ path : str = paths [0 ]
1617+ bucket_name : str
1618+ key_path : str
16171619 bucket_name , key_path = Pandas ._parse_path (path )
16181620 logger .debug (f"path: { path } " )
16191621 df : pd .DataFrame = self ._read_csv_once (session_primitives = self ._session .primitives ,
16201622 bucket_name = bucket_name ,
16211623 key_path = key_path ,
16221624 ** pd_additional_kwargs )
16231625 else :
1624- procs = []
1625- receive_pipes = []
1626+ procs : list = []
1627+ receive_pipes : list = []
16261628 logger .debug (f"len(paths): { len (paths )} " )
16271629 for path in paths :
16281630 receive_pipe , send_pipe = mp .Pipe ()
@@ -1639,7 +1641,7 @@ def read_csv_list(
16391641 dfs : List [pd .DataFrame ] = []
16401642 for i in range (len (procs )):
16411643 logger .debug (f"Waiting pipe number: { i } " )
1642- df_received = receive_pipes [i ].recv ()
1644+ df_received : pd . DataFrame = receive_pipes [i ].recv ()
16431645 dfs .append (df_received )
16441646 logger .debug (f"Waiting proc number: { i } " )
16451647 procs [i ].join ()
@@ -1689,3 +1691,128 @@ def read_csv_prefix(
16891691 max_result_size = max_result_size ,
16901692 procs_cpu_bound = procs_cpu_bound ,
16911693 ** pd_additional_kwargs )
1694+
1695+ @staticmethod
1696+ def _read_fwf (session_primitives : "SessionPrimitives" , bucket_name : str , key_path : str , ** pd_additional_kwargs ) -> pd .DataFrame :
1697+ """
1698+ Read a single fixed-width formatted file from Amazon S3 using optimized strategies.
1699+
1700+ :param session_primitives: SessionPrimitives()
1701+ :param bucket_name: S3 bucket name
1702+ :param key_path: S3 key path (w/o bucket)
1703+ :param **pd_additional_kwargs: Additional parameters forwarded to pandas.read_fwf
1704+ :return: Pandas Dataframe
1705+ """
1706+ buff = BytesIO ()
1707+ session : Session = session_primitives .session
1708+ client_s3 = session .boto3_session .client (service_name = "s3" , use_ssl = True , config = session .botocore_config )
1709+ client_s3 .download_fileobj (Bucket = bucket_name , Key = key_path , Fileobj = buff )
1710+ buff .seek (0 )
1711+ if pd_additional_kwargs .get ('compression' , 'infer' ) == 'infer' :
1712+ pd_additional_kwargs ['compression' ] = infer_compression (key_path , compression = 'infer' )
1713+ dataframe : pd .DataFrame = pd .read_fwf (buff , ** pd_additional_kwargs )
1714+ buff .close ()
1715+ return dataframe
1716+
1717+ @staticmethod
1718+ def _read_fwf_remote (send_pipe : mp .connection .Connection , session_primitives : "SessionPrimitives" ,
1719+ bucket_name : str , key_path : str , ** pd_additional_kwargs ):
1720+ df : pd .DataFrame = Pandas ._read_fwf (session_primitives = session_primitives ,
1721+ bucket_name = bucket_name ,
1722+ key_path = key_path ,
1723+ ** pd_additional_kwargs )
1724+ send_pipe .send (df )
1725+ send_pipe .close ()
1726+
1727+ def read_fwf (self , path : str , ** pd_additional_kwargs ) -> pd .DataFrame :
1728+ """
1729+ Read a single fixed-width formatted file from Amazon S3 using optimized strategies.
1730+
1731+ :param path: Amazon S3 path (e.g. s3://bucket_name/key_name)
1732+ :param pd_additional_kwargs: Additional parameters forwarded to pandas.read_fwf
1733+ :return: Pandas Dataframe
1734+ """
1735+ bucket_name , key_path = self ._parse_path (path )
1736+ dataframe : pd .DataFrame = self ._read_fwf (
1737+ session_primitives = self ._session .primitives ,
1738+ bucket_name = bucket_name ,
1739+ key_path = key_path ,
1740+ ** pd_additional_kwargs )
1741+ return dataframe
1742+
1743+ def read_fwf_list (
1744+ self ,
1745+ paths : List [str ],
1746+ procs_cpu_bound : Optional [int ] = None ,
1747+ ** pd_additional_kwargs ,
1748+ ) -> pd .DataFrame :
1749+ """
1750+ Read a list of fixed-width formatted files from Amazon S3 using optimized strategies.
1751+
1752+ :param paths: List of Amazon S3 paths (e.g. ['s3://bucket_name/key_name1', 's3://bucket_name/key_name2'])
1753+ :param procs_cpu_bound: Number of cores used for CPU bound tasks
1754+ :param pd_additional_kwargs: Additional parameters forwarded to pandas.read_fwf
1755+ :return: Pandas Dataframe
1756+ """
1757+ procs_cpu_bound = procs_cpu_bound if procs_cpu_bound is not None else self ._session .procs_cpu_bound if self ._session .procs_cpu_bound is not None else 1
1758+ logger .debug (f"procs_cpu_bound: { procs_cpu_bound } " )
1759+ session_primitives = self ._session .primitives
1760+ if len (paths ) == 1 :
1761+ path : str = paths [0 ]
1762+ bucket_name : str
1763+ key_path : str
1764+ bucket_name , key_path = Pandas ._parse_path (path )
1765+ logger .debug (f"path: { path } " )
1766+ df : pd .DataFrame = self ._read_fwf (session_primitives = self ._session .primitives ,
1767+ bucket_name = bucket_name ,
1768+ key_path = key_path ,
1769+ ** pd_additional_kwargs )
1770+ else :
1771+ procs : list = []
1772+ receive_pipes : list = []
1773+ logger .debug (f"len(paths): { len (paths )} " )
1774+ for path in paths :
1775+ receive_pipe , send_pipe = mp .Pipe ()
1776+ bucket_name , key_path = Pandas ._parse_path (path )
1777+ logger .debug (f"launching path: { path } " )
1778+ proc = mp .Process (target = self ._read_fwf_remote ,
1779+ args = (send_pipe , session_primitives , bucket_name , key_path ),
1780+ kwargs = pd_additional_kwargs )
1781+ proc .daemon = False
1782+ proc .start ()
1783+ procs .append (proc )
1784+ receive_pipes .append (receive_pipe )
1785+ utils .wait_process_release (processes = procs , target_number = procs_cpu_bound )
1786+ dfs : List [pd .DataFrame ] = []
1787+ for i in range (len (procs )):
1788+ logger .debug (f"Waiting pipe number: { i } " )
1789+ df_received : pd .DataFrame = receive_pipes [i ].recv ()
1790+ dfs .append (df_received )
1791+ logger .debug (f"Waiting proc number: { i } " )
1792+ procs [i ].join ()
1793+ logger .debug (f"Closing proc number: { i } " )
1794+ receive_pipes [i ].close ()
1795+ logger .debug (f"Concatenating all { len (paths )} DataFrames..." )
1796+ df = pd .concat (objs = dfs , ignore_index = True , sort = False )
1797+ logger .debug ("Concatenation done!" )
1798+ return df
1799+
1800+ def read_fwf_prefix (
1801+ self ,
1802+ path_prefix : str ,
1803+ procs_cpu_bound : Optional [int ] = None ,
1804+ ** pd_additional_kwargs ,
1805+ ) -> pd .DataFrame :
1806+ """
1807+ Read all fixed-width formatted files from a given Amazon S3 prefix using optimized strategies.
1808+
1809+ :param path_prefix: Amazon S3 prefix (e.g. s3://bucket_name/prefix)
1810+ :param procs_cpu_bound: Number of cores used for CPU bound tasks
1811+ :param pd_additional_kwargs: Additional parameters forwarded to pandas.read_fwf
1812+ :return: Pandas Dataframe
1813+ """
1814+ paths : List [str ] = self ._session .s3 .list_objects (path = path_prefix )
1815+ paths = [p for p in paths if not p .endswith ("/" )]
1816+ return self .read_fwf_list (paths = paths ,
1817+ procs_cpu_bound = procs_cpu_bound ,
1818+ ** pd_additional_kwargs )
0 commit comments