Skip to content

Commit 70d338a

Browse files
committed
Additional data type enforcement changes
1 parent 5487315 commit 70d338a

File tree

2 files changed

+58
-16
lines changed

2 files changed

+58
-16
lines changed

.DS_Store

0 Bytes
Binary file not shown.

schema_utils.py

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@
2121
)
2222
import schemas
2323
from file_utils import FileType, append_to_file, read_from_file
24+
from pandas.api.types import (
25+
is_numeric_dtype,
26+
is_string_dtype,
27+
is_object_dtype,
28+
is_datetime64_any_dtype
29+
)
2430

2531

2632
logger = logging.getLogger(f"{__name__}")
@@ -46,7 +52,9 @@ def _converter_template(obj, type_name, raw_convert_func, default_value=None):
4652

4753
def to_string(obj) -> str:
4854
return _converter_template(
49-
obj, "string", lambda o: str(o) if o is not None and not pd.isna(o) else None
55+
# obj, "string", lambda o: str(o) if o is not None and not pd.isna(o) else None
56+
# Force convert to string as otherwise it will be Binary if values are NULL
57+
obj, "string", lambda o: str(o)
5058
)
5159

5260

@@ -112,7 +120,6 @@ def do_nothing(obj):
112120
bson.int64.Int64: to_numpy_int64,
113121
np.bool_: to_numpy_bool,
114122
np.float64: to_numpy_float64,
115-
bson.Decimal128: to_numpy_float64,
116123
pd.Timestamp: to_pandas_timestamp
117124
}
118125

@@ -136,14 +143,22 @@ def init_column_schema(column_dtype, first_item) -> dict:
136143
# if item_type is None:
137144
item_type = str
138145
column_dtype = "object"
139-
146+
#Diana - handling bson.Decimal128 cant be target data type as it cant be written to parquet
147+
if isinstance(first_item, bson.Decimal128):
148+
item_type = float
149+
#It takes column dtype as object otherwise
150+
# column_dtype = "object"
151+
column_dtype = "float64"
140152
#Diana 107 comment and prints added
141153
# if not column_dtype:
142154
#print(f"SU: original column_dtype={column_dtype}")
143155
column_dtype = COLUMN_DTYPE_CONVERSION_MAP.get(column_dtype.__str__(), column_dtype)
144156
#print(f"SU: converted column_dtype={column_dtype}")
145157
schema_of_this_column[DTYPE_KEY] = column_dtype
146158
schema_of_this_column[TYPE_KEY] = item_type
159+
logger.debug(
160+
f"Internal schema file : column_dtype {column_dtype} and item_type {item_type}"
161+
)
147162
return schema_of_this_column
148163

149164

@@ -207,7 +222,7 @@ def _get_first_item(df: pd.DataFrame, column_name: str):
207222
def init_table_schema(table_name: str):
208223
# determine if the internal schema file exist
209224
table_dir = get_table_dir(table_name)
210-
schema_file_path = os.path.join(table_dir, INTERNAL_SCHEMA_FILE_NAME)
225+
#schema_file_path = os.path.join(table_dir, INTERNAL_SCHEMA_FILE_NAME)
211226
schema_of_this_table = read_from_file(
212227
table_name, INTERNAL_SCHEMA_FILE_NAME, FileType.PICKLE
213228
)
@@ -232,7 +247,7 @@ def init_table_schema(table_name: str):
232247
column_renaming_of_this_table = {}
233248
with collection.find().sort({"_id": 1}).limit(batch_size) as cursor:
234249
fetched_data = list(cursor)
235-
print(f"fetched_data: {fetched_data}")
250+
#print(f"fetched_data: {fetched_data}")
236251
df = pd.DataFrame(fetched_data)
237252
for col_name in df.keys().values:
238253
get_id = _get_first_valid_id(df, col_name)
@@ -263,7 +278,7 @@ def process_dataframe(table_name_param: str, df: pd.DataFrame):
263278
for col_name in df.keys().values:
264279
current_dtype = df[col_name].dtype
265280
current_first_item = _get_first_item(df, col_name)
266-
current_item_type = type(current_first_item)
281+
#current_item_type = type(current_first_item)
267282

268283

269284
processed_col_name = schemas.find_column_renaming(table_name, col_name)
@@ -328,23 +343,50 @@ def process_dataframe(table_name_param: str, df: pd.DataFrame):
328343

329344
current_dtype = df[col_name].dtype
330345
#if current_dtype != schema_of_this_column[DTYPE_KEY]:
331-
print(f">>>>>>>>>>current_dtype: {current_dtype}")
332-
DEFAULT_DTYPE = "default_dtype" # Define a default value for missing keys
333-
column_final_dtype = COLUMN_DTYPE_CONVERSION_MAP.get(current_dtype.__str__(), DEFAULT_DTYPE)
334-
print(f">>>>>>>>>>current_final_dtype: {column_final_dtype}")
335-
if column_final_dtype != DEFAULT_DTYPE:
346+
logger.debug(
347+
f">>>>>>>>>>current_dtype: {current_dtype}"
348+
)
349+
##column_final_dtype = COLUMN_DTYPE_CONVERSION_MAP.get(current_dtype.__str__(), DEFAULT_DTYPE)
350+
# Date type needs to be converted to MILLIS from NANOS in all cases
351+
if is_datetime64_any_dtype(df[col_name]):
336352
try:
337353
logger.debug(
338-
f"different column dtype detected: current_dtype={current_dtype}, item type from schema={column_final_dtype}"
354+
f"different column dtype detected: current_dtype={current_dtype}, item type from default=datetime64[ms]"
339355
)
340-
df[col_name] = df[col_name].astype(column_final_dtype)
341-
356+
#df[col_name] = df[col_name].dt.floor('ms')
357+
df[col_name] = df[col_name].dt.tz_localize(None)
358+
df[col_name] = df[col_name].astype("datetime64[ms]")
342359
except (ValueError, TypeError) as e:
343360
logger.warning(
344361
f"An {e.__class__.__name__} was caught when trying to convert "
345-
+ f"the dtype of the column {col_name} from {current_dtype} to {column_final_dtype}"
362+
+ f"the dtype of the column {col_name} from {current_dtype} to datetime64[ms]"
346363
)
347-
364+
365+
current_dtype = df[col_name].dtype
366+
#if current_dtype != schema_of_this_column[DTYPE_KEY]:
367+
logger.debug(
368+
f">>>>>>>>>>current_dtype 1: {current_dtype}, "
369+
f">>>>>>>>>>schema_of_this_column[DTYPE_KEY] 1: {schema_of_this_column[DTYPE_KEY]}, "
370+
f"****is_datetime64_any_dtype(df['col_name']): {is_datetime64_any_dtype(df[col_name])}, "
371+
f"****is_object_dtype(schema_of_this_column[DTYPE_KEY]): {is_object_dtype(schema_of_this_column[DTYPE_KEY])}"
372+
)
373+
##if current_dtype == datetime and schema_of_this_column[DTYPE_KEY] == object:
374+
#print(f"****is_datetime64_any_dtype(df['col_name']): {is_datetime64_any_dtype(df[col_name])}")
375+
#print(f"****is_object_dtype(schema_of_this_column[DTYPE_KEY]): {is_object_dtype(schema_of_this_column[DTYPE_KEY])}")
376+
if is_datetime64_any_dtype(df[col_name]) and is_object_dtype(schema_of_this_column[DTYPE_KEY]):
377+
do_nothing
378+
elif current_dtype.__str__() != schema_of_this_column[DTYPE_KEY].__str__():
379+
try:
380+
logger.debug(
381+
f"different column dtype detected1: current_dtype={current_dtype}, item type from schema 1 ={schema_of_this_column[DTYPE_KEY]}"
382+
)
383+
df[col_name] = df[col_name].astype(schema_of_this_column[DTYPE_KEY])
384+
385+
except (ValueError, TypeError) as e:
386+
logger.warning(
387+
f"An {e.__class__.__name__} was caught when trying to convert "
388+
+ f"the dtype of the column {col_name} from {current_dtype} to {schema_of_this_column[DTYPE_KEY]}"
389+
)
348390
# Check if conversion log file exists before pushing
349391
print("conversion_flag: ", conversion_flag)
350392
conversion_log_path = os.path.join(get_table_dir(table_name), CONVERSION_LOG_FILE_NAME)

0 commit comments

Comments
 (0)