Skip to content

Commit 5487315

Browse files
committed
modified schema utils to have dtype changed specifically for 3 scenarios
1 parent 3da0f4c commit 5487315

File tree

2 files changed

+23
-13
lines changed

2 files changed

+23
-13
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
__pycache__/
22
.venv/
33
.env
4+
venv
45
.vscode/
56
data_files/
6-
*.log
7+
*.log
8+
*.log.1
9+
*.ipynb

schema_utils.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def _converter_template(obj, type_name, raw_convert_func, default_value=None):
4646

4747
def to_string(obj) -> str:
4848
return _converter_template(
49-
obj, "string", lambda o: str(o) if o is not None else None
49+
obj, "string", lambda o: str(o) if o is not None and not pd.isna(o) else None
5050
)
5151

5252

@@ -59,8 +59,10 @@ def raw_to_numpy_int64(obj) -> np.int64:
5959
return np.int64(obj.to_decimal())
6060
if isinstance(obj, list) or isinstance(obj, dict):
6161
raise ValueError
62-
63-
return np.int64(obj)
62+
if obj is not None and not pd.isna(obj):
63+
return np.int64(obj)
64+
else:
65+
return None
6466

6567
return _converter_template(obj, "numpy.int64", raw_to_numpy_int64)
6668

@@ -80,12 +82,12 @@ def raw_to_numpy_bool(obj) -> np.bool_:
8082
def to_numpy_float64(obj) -> np.float64:
8183
if isinstance(obj, bson.Decimal128):
8284
obj = str(obj)
83-
return _converter_template(obj, "numpy.float64", lambda o: np.float64(o) if o is not None else None)
85+
return _converter_template(obj, "numpy.float64", lambda o: np.float64(o) if o is not None and not pd.isna(o) else None)
8486

8587

8688
def to_pandas_timestamp(obj) -> pd.Timestamp:
8789
# return _converter_template(obj, "pandas.Timestamp", lambda o: pd.Timestamp(o))
88-
return _converter_template(obj, "pandas.Timestamp", lambda o: pd.to_datetime(o, utc=True) if o is not None else None)
90+
return _converter_template(obj, "pandas.Timestamp", lambda o: pd.to_datetime(o, utc=True) if o is not None and not pd.isna(o) else None)
8991

9092

9193
def do_nothing(obj):
@@ -317,25 +319,30 @@ def process_dataframe(table_name_param: str, df: pd.DataFrame):
317319
f"schema_of_this_column[DTYPE_KEY]={schema_of_this_column[DTYPE_KEY]}"
318320
)
319321

320-
if expected_type == bson.int64.Int64 and current_dtype == "float64":
322+
if (expected_type == bson.int64.Int64 or expected_type == int) and current_dtype == "float64":
321323
# Convert to int64
322324
logger.debug(
323325
f"Converting column {col_name} from float64 to Int64"
324326
)
325327
df[col_name] = df[col_name].astype("Int64")
326328

327-
if current_dtype != schema_of_this_column[DTYPE_KEY]:
329+
current_dtype = df[col_name].dtype
330+
#if current_dtype != schema_of_this_column[DTYPE_KEY]:
331+
print(f">>>>>>>>>>current_dtype: {current_dtype}")
332+
DEFAULT_DTYPE = "default_dtype" # Define a default value for missing keys
333+
column_final_dtype = COLUMN_DTYPE_CONVERSION_MAP.get(current_dtype.__str__(), DEFAULT_DTYPE)
334+
print(f">>>>>>>>>>current_final_dtype: {column_final_dtype}")
335+
if column_final_dtype != DEFAULT_DTYPE:
328336
try:
329337
logger.debug(
330-
f"different column dtype detected: current_dtype={current_dtype}, item type from schema={schema_of_this_column[DTYPE_KEY]}"
338+
f"different column dtype detected: current_dtype={current_dtype}, item type from schema={column_final_dtype}"
331339
)
332-
df[col_name] = df[col_name].astype(schema_of_this_column[DTYPE_KEY])
333-
334-
340+
df[col_name] = df[col_name].astype(column_final_dtype)
341+
335342
except (ValueError, TypeError) as e:
336343
logger.warning(
337344
f"An {e.__class__.__name__} was caught when trying to convert "
338-
+ f"the dtype of the column {col_name} from {current_dtype} to {schema_of_this_column[DTYPE_KEY]}"
345+
+ f"the dtype of the column {col_name} from {current_dtype} to {column_final_dtype}"
339346
)
340347

341348
# Check if conversion log file exists before pushing

0 commit comments

Comments
 (0)