Skip to content

Commit 4a5ac7a

Browse files
committed
Add Dataset feature to s3.to_csv #141 #170
1 parent 3c459ce commit 4a5ac7a

File tree

9 files changed

+1060
-160
lines changed

9 files changed

+1060
-160
lines changed

.pylintrc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -555,10 +555,10 @@ max-attributes=7
555555
max-bool-expr=5
556556

557557
# Maximum number of branch for function / method body.
558-
max-branches=12
558+
max-branches=15
559559

560560
# Maximum number of locals for function / method body.
561-
max-locals=25
561+
max-locals=30
562562

563563
# Maximum number of parents for a class (see R0901).
564564
max-parents=7

awswrangler/_data_types.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ def pyarrow2sqlalchemy( # pylint: disable=too-many-branches,too-many-return-sta
219219

220220

221221
def pyarrow_types_from_pandas(
222-
df: pd.DataFrame, index: bool, ignore_cols: Optional[List[str]] = None
222+
df: pd.DataFrame, index: bool, ignore_cols: Optional[List[str]] = None, index_left: bool = False
223223
) -> Dict[str, pa.DataType]:
224224
"""Extract the related Pyarrow data types from any Pandas DataFrame."""
225225
# Handle exception data types (e.g. Int64, Int32, string)
@@ -251,18 +251,23 @@ def pyarrow_types_from_pandas(
251251
if (name not in df.columns) and (index is True):
252252
indexes.append(name)
253253

254+
# Merging Index
255+
sorted_cols: List[str] = indexes + list(df.columns) if index_left is True else list(df.columns) + indexes
256+
254257
# Filling schema
255258
columns_types: Dict[str, pa.DataType]
256-
columns_types = {n: cols_dtypes[n] for n in list(df.columns) + indexes} # add cols + indexes
259+
columns_types = {n: cols_dtypes[n] for n in sorted_cols}
257260
_logger.debug(f"columns_types: {columns_types}")
258261
return columns_types
259262

260263

261-
def athena_types_from_pandas(df: pd.DataFrame, index: bool, dtype: Optional[Dict[str, str]] = None) -> Dict[str, str]:
264+
def athena_types_from_pandas(
265+
df: pd.DataFrame, index: bool, dtype: Optional[Dict[str, str]] = None, index_left: bool = False
266+
) -> Dict[str, str]:
262267
"""Extract the related Athena data types from any Pandas DataFrame."""
263268
casts: Dict[str, str] = dtype if dtype else {}
264269
pa_columns_types: Dict[str, Optional[pa.DataType]] = pyarrow_types_from_pandas(
265-
df=df, index=index, ignore_cols=list(casts.keys())
270+
df=df, index=index, ignore_cols=list(casts.keys()), index_left=index_left
266271
)
267272
athena_columns_types: Dict[str, str] = {}
268273
for k, v in pa_columns_types.items():
@@ -275,11 +280,17 @@ def athena_types_from_pandas(df: pd.DataFrame, index: bool, dtype: Optional[Dict
275280

276281

277282
def athena_types_from_pandas_partitioned(
278-
df: pd.DataFrame, index: bool, partition_cols: Optional[List[str]] = None, dtype: Optional[Dict[str, str]] = None
283+
df: pd.DataFrame,
284+
index: bool,
285+
partition_cols: Optional[List[str]] = None,
286+
dtype: Optional[Dict[str, str]] = None,
287+
index_left: bool = False,
279288
) -> Tuple[Dict[str, str], Dict[str, str]]:
280289
"""Extract the related Athena data types from any Pandas DataFrame considering possible partitions."""
281290
partitions: List[str] = partition_cols if partition_cols else []
282-
athena_columns_types: Dict[str, str] = athena_types_from_pandas(df=df, index=index, dtype=dtype)
291+
athena_columns_types: Dict[str, str] = athena_types_from_pandas(
292+
df=df, index=index, dtype=dtype, index_left=index_left
293+
)
283294
columns_types: Dict[str, str] = {}
284295
partitions_types: Dict[str, str] = {}
285296
for k, v in athena_columns_types.items():
@@ -296,10 +307,12 @@ def pyarrow_schema_from_pandas(
296307
"""Extract the related Pyarrow Schema from any Pandas DataFrame."""
297308
casts: Dict[str, str] = {} if dtype is None else dtype
298309
ignore: List[str] = [] if ignore_cols is None else ignore_cols
299-
ignore = ignore + list(casts.keys())
300-
columns_types: Dict[str, Optional[pa.DataType]] = pyarrow_types_from_pandas(df=df, index=index, ignore_cols=ignore)
310+
ignore_plus = ignore + list(casts.keys())
311+
columns_types: Dict[str, Optional[pa.DataType]] = pyarrow_types_from_pandas(
312+
df=df, index=index, ignore_cols=ignore_plus
313+
)
301314
for k, v in casts.items():
302-
if k in df.columns:
315+
if (k in df.columns) and (k not in ignore):
303316
columns_types[k] = athena2pyarrow(v)
304317
columns_types = {k: v for k, v in columns_types.items() if v is not None}
305318
_logger.debug(f"columns_types: {columns_types}")

0 commit comments

Comments
 (0)