Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions schema_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime
import os
import json
import logging
#17June2025 - doesnt work for 3.9 and lesser Python versions
#from types import NoneType
Expand Down Expand Up @@ -54,10 +55,10 @@ def _converter_template(obj, type_name, raw_convert_func, default_value=None):


def to_string(obj) -> str:
if isinstance(obj, list) or isinstance(obj, dict):
return to_json_string(obj)
return _converter_template(
# obj, "string", lambda o: str(o) if o is not None and not pd.isna(o) else None
# Force convert to string as otherwise it will be Binary if values are NULL
obj, "string", lambda o: str(o)
obj, "string", lambda o: str(o) if o is not None and not pd.isna(o) else ''
)


Expand All @@ -69,7 +70,7 @@ def raw_to_numpy_int64(obj) -> np.int64:
if isinstance(obj, bson.Decimal128):
return np.int64(obj.to_decimal())
if isinstance(obj, list) or isinstance(obj, dict):
raise ValueError
return to_json_string(obj)
if obj is not None and not pd.isna(obj):
return np.int64(obj)
else:
Expand Down Expand Up @@ -98,32 +99,49 @@ def to_numpy_float64(obj) -> np.float64:

def to_pandas_timestamp(obj) -> pd.Timestamp:
# return _converter_template(obj, "pandas.Timestamp", lambda o: pd.Timestamp(o))
return _converter_template(obj, "pandas.Timestamp", lambda o: pd.to_datetime(o, utc=True) if o is not None and not pd.isna(o) else None)
return _converter_template(obj, "pandas.Timestamp", lambda o: pd.to_datetime(o, utc=True).isoformat() if o is not None and not pd.isna(o) else '')


def do_nothing(obj):
original_type = type(obj)
logger.info(f'Did not convert "{obj}" of type {original_type}.')
return obj

# for column in expected_columns:
# if column not in df.columns:
# df[column] = None # or another appropriate default value
def to_datetime_iso(obj) -> str:
if not isinstance(obj, datetime):
return obj
return _converter_template(obj, "string", lambda o: o.isoformat() if o is not None and not pd.isna(o) else '')


def to_json_string(obj) -> str:
if isinstance(obj, list):
return _converter_template(obj, "string", lambda o: json.dumps([ob for ob in o]))
return _converter_template(obj, "string", lambda o: json.dumps(o, default=str, cls=CustomJSONEncoder) if o is not None and not pd.isna(o) else '')


class CustomJSONEncoder(json.JSONEncoder):
def default(self, obj):
conversion_fcn = TYPE_TO_CONVERT_FUNCTION_MAP.get(type(obj))
if conversion_fcn:
return conversion_fcn(obj)
return super(CustomJSONEncoder, self).default(obj)

TYPE_TO_CONVERT_FUNCTION_MAP = {
str: to_string,
int: to_numpy_int64,
float: to_numpy_float64,
bool: to_numpy_bool,
datetime: to_pandas_timestamp,
datetime: to_datetime_iso,
bson.ObjectId: to_string,
bson.Decimal128: to_numpy_float64,
np.int32: to_numpy_int64,
np.int64: to_numpy_int64,
bson.int64.Int64: to_numpy_int64,
np.bool_: to_numpy_bool,
np.float64: to_numpy_float64,
pd.Timestamp: to_pandas_timestamp
dict: to_json_string,
list: to_json_string,
pd.Timestamp: to_pandas_timestamp
}

COLUMN_DTYPE_CONVERSION_MAP = {
Expand Down