11import os
22import logging
33from types import NoneType
4+ import bson .int64
45import pymongo
56import pandas as pd
67import numpy as np
78import pickle
9+ # from bson import Decimal128, int64
810import bson
911from push_file_to_lz import push_file_to_lz
1012from utils import get_table_dir
1416 TYPE_KEY ,
1517 DTYPE_KEY ,
1618 TYPES_TO_CONVERT_TO_STR ,
17- COLUMN_RENAMING_FILE_NAME ,
19+ COLUMN_RENAMING_FILE_NAME
1820)
1921import schemas
2022from file_utils import FileType , append_to_file , read_from_file
2325logger = logging .getLogger (f"{ __name__ } " )
2426
2527def _converter_template (obj , type_name , raw_convert_func , default_value = None ):
26- original_type = type (obj )
28+ original_type = type (obj )
29+ logger .debug (f"Converting { obj } of type { original_type } to { type_name } ." )
2730 try :
2831 return raw_convert_func (obj )
2932 except (ValueError , TypeError ):
@@ -47,11 +50,15 @@ def to_string(obj) -> str:
4750
4851
4952def to_numpy_int64 (obj ) -> np .int64 :
53+ logger .debug (f"to_numpy_int64: obj={ obj } , type={ type (obj )} " )
5054 def raw_to_numpy_int64 (obj ) -> np .int64 :
5155 # there's a rare case that converting a list of int to numpy.int64 won't
5256 # raise any error, hence covering it here separately
57+ if isinstance (obj , bson .Decimal128 ):
58+ return np .int64 (obj .to_decimal ())
5359 if isinstance (obj , list ) or isinstance (obj , dict ):
5460 raise ValueError
61+
5562 return np .int64 (obj )
5663
5764 return _converter_template (obj , "numpy.int64" , raw_to_numpy_int64 )
@@ -92,10 +99,11 @@ def do_nothing(obj):
9299TYPE_TO_CONVERT_FUNCTION_MAP = {
93100 str : to_string ,
94101 np .int64 : to_numpy_int64 ,
102+ bson .int64 .Int64 : to_numpy_int64 ,
95103 np .bool_ : to_numpy_bool ,
96104 np .float64 : to_numpy_float64 ,
97- pd .Timestamp : to_pandas_timestamp ,
98105 bson .Decimal128 : to_numpy_float64 ,
106+ pd .Timestamp : to_pandas_timestamp ,
99107}
100108
101109COLUMN_DTYPE_CONVERSION_MAP = {
@@ -104,7 +112,7 @@ def do_nothing(obj):
104112 # nullable fix
105113 "bool" : "boolean" ,
106114 # nullable fix
107- "int64" : "Int64" ,
115+ "int64" : "Int64"
108116}
109117
110118
@@ -133,6 +141,35 @@ def process_column_name(column_name: str) -> str:
133141 return str (column_name ).replace (" " , "_" )[:128 ]
134142
135143
144+ def _get_first_valid_id (df : pd .DataFrame , column_name : str ):
145+ """
146+ Get the first non-null item from given DataFrame column.
147+ This is useful when reading data in init sync, and a few (or even just one)
148+ documents have an extra column, making most items of this column to be null.
149+ In this case we really want to find the actual non-null item, and derive
150+ data type based on it.
151+
152+ Args:
153+ df (pd.DataFrame): The DataFrame object
154+ column_name (str): The name of the column
155+
156+ Returns:
157+ Any: the first non-null item in given DataFrame column
158+ """
159+ first_valid_index = (
160+ df [column_name ].first_valid_index () or 0
161+ ) # in case of first_valid_index() return None, let it be zero
162+ # first_valid_item = df[column_name][first_valid_index]
163+ first_valid_index_id = df ['_id' ][first_valid_index ]
164+ # logger.debug(
165+ # f"get first item {first_valid_index_id} of type {type(first_valid_index)} in column {column_name}"
166+ # )
167+ # logger.debug(
168+ # f"get first item {first_valid_item} of type {type(first_valid_item)} in column {column_name}"
169+ # )
170+ # return first_valid_item
171+ return first_valid_index_id
172+
136173def _get_first_item (df : pd .DataFrame , column_name : str ):
137174 """
138175 Get the first non-null item from given DataFrame column.
@@ -157,7 +194,6 @@ def _get_first_item(df: pd.DataFrame, column_name: str):
157194 )
158195 return first_valid_item
159196
160-
161197def init_table_schema (table_name : str ):
162198 # determine if the internal schema file exist
163199 table_dir = get_table_dir (table_name )
@@ -180,15 +216,28 @@ def init_table_schema(table_name: str):
180216 # else, init schema from collection
181217 client = pymongo .MongoClient (os .getenv ("MONGO_CONN_STR" ))
182218 db = client [os .getenv ("MONGO_DB_NAME" )]
219+ batch_size = int (os .getenv ("INIT_LOAD_BATCH_SIZE" ))
183220 collection = db [table_name ]
184221 schema_of_this_table = {}
185222 column_renaming_of_this_table = {}
186- with collection .find ().sort ({"_id" : 1 }).limit (1 ) as cursor :
187- df = pd .DataFrame (list (cursor ))
223+ with collection .find ().sort ({"_id" : 1 }).limit (batch_size ) as cursor :
224+ fetched_data = list (cursor )
225+ print (f"fetched_data: { fetched_data } " )
226+ df = pd .DataFrame (fetched_data )
188227 for col_name in df .keys ().values :
189- first_item = _get_first_item (df , col_name )
228+ get_id = _get_first_valid_id (df , col_name )
229+ # Fetch the exact value from mongodb using the _id, dumping into df changes the data type.
230+ # projection = {col_name: 1, "_id": 0} if col_name != "_id" else {"_id": 1}
231+ # data = list(collection.find({"_id": get_id}, (projection)))[0].get(col_name)
232+ # logger.debug(
233+ # f"get first item {data} of type {type(data)} in column {col_name}"
234+ # )
235+ data = next (item .get (col_name ) for item in fetched_data if item .get ('_id' ) == get_id )
236+ logger .debug (f"get first item { data } of type { type (data )} in column { col_name } " )
190237 column_dtype = df [col_name ].dtype
191- schema_of_this_column = init_column_schema (column_dtype , first_item )
238+ # column_dtype = type(data)
239+ # schema_of_this_column = init_column_schema(column_dtype, first_item)
240+ schema_of_this_column = init_column_schema (column_dtype , data )
192241 processed_col_name = process_column_name (col_name )
193242 if processed_col_name != col_name :
194243 column_renaming_of_this_table [col_name ] = processed_col_name
@@ -205,6 +254,7 @@ def process_dataframe(table_name_param: str, df: pd.DataFrame):
205254 current_dtype = df [col_name ].dtype
206255 current_first_item = _get_first_item (df , col_name )
207256 current_item_type = type (current_first_item )
257+
208258
209259 processed_col_name = schemas .find_column_renaming (table_name , col_name )
210260 schema_of_this_column = schemas .get_table_column_schema (table_name , col_name )
@@ -248,6 +298,7 @@ def process_dataframe(table_name_param: str, df: pd.DataFrame):
248298 # Set the current column name for logging
249299 current_column_name = col_name
250300 df [col_name ] = df [col_name ].apply (conversion_fcn )
301+ print (df [col_name ])
251302 break
252303 # for index, item in enumerate(df[col_name]):
253304 # print(f"Row {index}: Value={item}, Type={type(item)}")
@@ -257,13 +308,22 @@ def process_dataframe(table_name_param: str, df: pd.DataFrame):
257308 logger .debug (
258309 f"schema_of_this_column[DTYPE_KEY]={ schema_of_this_column [DTYPE_KEY ]} "
259310 )
311+
312+ if expected_type == bson .int64 .Int64 and current_dtype == "float64" :
313+ # Convert to int64
314+ logger .debug (
315+ f"Converting column { col_name } from float64 to Int64"
316+ )
317+ df [col_name ] = df [col_name ].astype ("Int64" )
318+
260319 if current_dtype != schema_of_this_column [DTYPE_KEY ]:
261320 try :
262321 logger .debug (
263322 f"different column dtype detected: current_dtype={ current_dtype } , item type from schema={ schema_of_this_column [DTYPE_KEY ]} "
264323 )
265324 df [col_name ] = df [col_name ].astype (schema_of_this_column [DTYPE_KEY ])
266325
326+
267327 except (ValueError , TypeError ) as e :
268328 logger .warning (
269329 f"An { e .__class__ .__name__ } was caught when trying to convert "
0 commit comments