@@ -698,196 +698,6 @@ def delete_dst_rows(dst_dtable_uuid, dst_table_name, to_be_deleted_row_ids, dst_
698698
699699
700700def import_sync_CDS (context ):
701- """
702- import or sync common dataset
703- please check all resources in context before call this function
704-
705- Steps:
706- 1. create or update dst columns
707- 2. fetch src rows, (find rows to be updated and rows to be appended, update and append them), step by step
708- 3. fetch dst rows, (find rows to be deleted, delete them), step by step
709- """
710- src_dtable_uuid = context .get ('src_dtable_uuid' )
711- dst_dtable_uuid = context .get ('dst_dtable_uuid' )
712-
713- src_table_name = context .get ('src_table_name' )
714- src_view_name = context .get ('src_view_name' )
715- src_view_type = context .get ('src_view_type' , 'table' )
716- src_columns = context .get ('src_columns' )
717- src_enable_archive = context .get ('src_enable_archive' , False )
718-
719- dst_table_id = context .get ('dst_table_id' )
720- dst_table_name = context .get ('dst_table_name' )
721- dst_columns = context .get ('dst_columns' )
722-
723- operator = context .get ('operator' )
724- lang = context .get ('lang' , 'en' )
725-
726- to_archive = context .get ('to_archive' , False )
727-
728- src_dtable_server_api = DTableServerAPI (operator , src_dtable_uuid , dtable_server_url )
729- src_dtable_db_api = DTableDBAPI (operator , src_dtable_uuid , INNER_DTABLE_DB_URL )
730- dst_dtable_server_api = DTableServerAPI (operator , dst_dtable_uuid , dtable_server_url )
731- dst_dtable_db_api = DTableDBAPI (operator , dst_dtable_uuid , INNER_DTABLE_DB_URL )
732-
733- server_only = not (to_archive and src_enable_archive and src_view_type == 'archive' )
734- is_sync = bool (dst_table_id )
735- logger .debug ('to_archive: %s src_enable_archive: %s src_view_type: %s' , to_archive , src_enable_archive , src_view_type )
736-
737- src_column_keys_set = {col ['key' ] for col in src_columns }
738-
739- # fetch src rows, find existed rows, not existed rows, update/append rows, step by step
740- start , step = 0 , 10000
741- src_row_ids_set = set ()
742- to_be_updated_columns , to_be_appended_columns = [], []
743- final_columns = []
744- while True :
745- logger .debug ('update/append start: %s step: %s' , start , step )
746- if server_only and (start + step ) > SRC_ROWS_LIMIT :
747- step = SRC_ROWS_LIMIT - start
748- try :
749- res_json = src_dtable_server_api .internal_view_rows (src_table_name , src_view_name , use_dtable_db = True , server_only = server_only , start = start , limit = step )
750- step_src_rows = res_json .get ('rows' , [])
751- src_view_metadata = res_json .get ('metadata' )
752- except Exception as e :
753- logger .error ('request src_dtable: %s view-rows error: %s' , src_dtable_uuid , e )
754- return {
755- 'dst_table_id' : None ,
756- 'error_msg' : 'fetch view rows error' ,
757- 'task_status_code' : 500
758- }
759- if start == 0 : ## generate columns from the columns(archive_metadata) returned from SQL query in internal_view_rows
760- sync_columns = [col for col in src_view_metadata if col ['key' ] in src_column_keys_set ]
761- to_be_updated_columns , to_be_appended_columns , error = generate_synced_columns (sync_columns , dst_columns = dst_columns )
762- if error :
763- return {
764- 'dst_table_id' : None ,
765- 'error_type' : 'generate_synced_columns_error' ,
766- 'error_msg' : str (error ), # generally, this error is caused by client
767- 'task_status_code' : 400
768- }
769- final_columns = (to_be_updated_columns or []) + (to_be_appended_columns or [])
770- ### create or update dst columns
771- dst_table_id , error_resp = create_dst_table_or_update_columns (dst_dtable_uuid , dst_table_id , dst_table_name , to_be_appended_columns , to_be_updated_columns , dst_dtable_server_api , lang )
772- if error_resp :
773- return error_resp
774- row_ids = []
775- step_rows_dict = {}
776- for row in step_src_rows :
777- if row ['_id' ] in src_row_ids_set :
778- continue
779- row_ids .append (row ['_id' ])
780- src_row_ids_set .add (row ['_id' ])
781- step_rows_dict [row ['_id' ]] = row
782- if not row_ids :
783- if not step_src_rows or len (step_src_rows ) < step or (server_only and (start + step ) >= SRC_ROWS_LIMIT ):
784- break
785- start += step
786- continue
787- ## find to-be-appended-rows to-be-updated-rows
788- step_dst_rows = None
789- if dst_table_id :
790- sql = "SELECT _id, %(dst_columns)s FROM `%(dst_table)s` WHERE _id IN %(row_ids)s LIMIT %(rows_count)s" % {
791- 'dst_table' : dst_table_name ,
792- 'dst_columns' : ', ' .join (["`%s`" % col ['name' ] for col in final_columns ]),
793- 'row_ids' : '(%s)' % ', ' .join (["'%s'" % row_id for row_id in row_ids ]),
794- 'rows_count' : len (row_ids )
795- }
796- try :
797- step_dst_rows = dst_dtable_db_api .query (sql , convert = False , server_only = (not to_archive ))
798- except Exception as e :
799- logger .error ('find to-be-updated-rows error: %s' , e )
800- return {
801- 'dst_table_id' : None ,
802- 'error_msg' : 'find to-be-updated-rows error' ,
803- 'task_status_code' : 500
804- }
805- logger .debug ('step_dst_rows: %s' , len (step_dst_rows ))
806- filtered_step_src_rows = [step_rows_dict [row_id ] for row_id in row_ids ]
807- to_be_updated_rows , to_be_appended_rows , _ = generate_synced_rows (filtered_step_src_rows , src_columns , final_columns , dst_rows = step_dst_rows , to_archive = to_archive )
808- logger .debug ('to_be_updated_rows: %s to_be_appended_rows: %s' , len (to_be_updated_rows ), len (to_be_appended_rows ))
809- ## append
810- if to_be_appended_rows :
811- error_resp = append_dst_rows (dst_dtable_uuid , dst_table_name , to_be_appended_rows , dst_dtable_db_api , dst_dtable_server_api , to_archive )
812- if error_resp :
813- return error_resp
814- ## update
815- if to_be_updated_rows :
816- error_resp = update_dst_rows (dst_dtable_uuid , dst_table_name , to_be_updated_rows , dst_dtable_db_api , dst_dtable_server_api , to_archive )
817- if error_resp :
818- return error_resp
819- ## judge whether break
820- if not step_src_rows or len (step_src_rows ) < step or (server_only and (start + step ) >= SRC_ROWS_LIMIT ):
821- break
822- start += step
823-
824- # fetch dst rows, find useless rows, delete rows, step by step
825- dst_row_ids_set = set ()
826- start , step = 0 , 10000
827- ## generate src view WHERE clause
828- try :
829- src_metadata = src_dtable_server_api .get_metadata ()
830- except Exception as e :
831- logger .error ('request src dtable: %s table: %s metadata error: %s' , src_dtable_uuid , src_table_name , e )
832- return {
833- 'dst_table_id' : None ,
834- 'error_msg' : 'request src table metadata error' ,
835- 'task_status_code' : 500
836- }
837- src_table = [table for table in src_metadata ['tables' ] if table ['name' ] == src_table_name ][0 ]
838- src_view = [view for view in src_table ['views' ] if view ['name' ] == src_view_name ][0 ]
839- filters = src_view .get ('filters' , [])
840- filter_conjunction = src_view .get ('filter_conjunction' , 'And' )
841- filter_conditions = {
842- 'filters' : filters ,
843- 'filter_conjunction' : filter_conjunction
844- }
845- sql_generator = BaseSQLGenerator (src_table_name , src_table ['columns' ], filter_conditions = filter_conditions )
846- filter_clause = sql_generator ._filter2sql ()
847- ## delete useless rows step by step
848- while is_sync and True :
849- logger .debug ('delete start: %s step: %s' , start , step )
850- sql = "SELECT _id FROM `%(dst_table_name)s` LIMIT %(start)s, %(limit)s" % {
851- 'dst_table_name' : dst_table_name ,
852- 'start' : start ,
853- 'limit' : step
854- }
855- rows = dst_dtable_db_api .query (sql , convert = False , server_only = (not to_archive ))
856- query_row_ids_set = set ()
857- for row in rows :
858- if row ['_id' ] in dst_row_ids_set :
859- continue
860- if row ['_id' ] in src_row_ids_set :
861- continue
862- query_row_ids_set .add (row ['_id' ])
863- dst_row_ids_set .add (row ['_id' ])
864- if not query_row_ids_set :
865- if len (rows ) < step :
866- break
867- start += step
868- continue
869- sql = "SELECT _id FROM `%(src_table_name)s` WHERE _id IN %(row_ids)s %(view_filter_clause)s LIMIT %(rows_count)s" % {
870- 'src_table_name' : src_table_name ,
871- 'row_ids' : '(%s)' % ', ' .join (["'%s'" % row_id for row_id in query_row_ids_set ]),
872- 'rows_count' : len (query_row_ids_set ),
873- 'view_filter_clause' : 'AND (%s)' % filter_clause [filter_clause .find ('WHERE' ) + len ('WHERE' ):] if filter_clause else ''
874- }
875- existed_rows = src_dtable_db_api .query (sql , convert = False , server_only = server_only )
876- to_be_deleted_row_ids_set = query_row_ids_set - {row ['_id' ] for row in existed_rows }
877- if to_be_deleted_row_ids_set :
878- delete_dst_rows (dst_dtable_uuid , dst_table_name , list (to_be_deleted_row_ids_set ), dst_dtable_db_api , dst_dtable_server_api , to_archive )
879- if len (rows ) < step :
880- break
881- start += step
882-
883- return {
884- 'dst_table_id' : dst_table_id ,
885- 'error_msg' : None ,
886- 'task_status_code' : 200
887- }
888-
889-
890- def new_import_sync_CDS (context ):
891701 """
892702 fetch src/dst rows id, find need append/update/delete rows
893703 """
@@ -918,8 +728,6 @@ def new_import_sync_CDS(context):
918728 is_sync = bool (dst_table_id )
919729 logger .debug ('to_archive: %s src_enable_archive: %s src_view_type: %s' , to_archive , src_enable_archive , src_view_type )
920730
921- src_column_keys_set = {col ['key' ] for col in src_columns }
922-
923731 # fetch create dst table or update dst table columns
924732 # fetch all src view rows id, S
925733 # fetch all dst table rows id, D
0 commit comments