@@ -778,21 +778,13 @@ def import_sync_CDS(context):
778778 'error_msg' : 'generate src view sql error: %s' % e ,
779779 'task_status_code' : 500
780780 }
781- sql_template = "SELECT %%(fields)s FROM `%(src_table)s` %(filters)s %(sorts)s" % {
782- 'src_table' : src_table_name ,
783- 'filters' : filter_clause ,
784- 'sorts' : sort_clause
785- }
781+ sql_template = f"SELECT `_id` FROM `{ src_table_name } ` { filter_clause } { sort_clause } "
786782 start , step = 0 , 10000
787783 while True :
788784 if server_only and (start + step ) > SRC_ROWS_LIMIT :
789785 step = SRC_ROWS_LIMIT - start
790- sql = (sql_template + " LIMIT %(offset)s, %(limit)s " ) % {
791- 'fields' : '`_id`' ,
792- 'offset' : start ,
793- 'limit' : step
794- }
795- logger .debug ('fetch src sql: %s' , sql )
786+ sql = sql_template + (" LIMIT {offset}, {limit}" .format (offset = start , limit = step ))
787+ logger .debug ('fetch src rows-id sql: %s' , sql )
796788 try :
797789 rows = src_dtable_db_api .query (sql , convert = False , server_only = server_only )
798790 except Exception as e :
@@ -816,11 +808,8 @@ def import_sync_CDS(context):
816808 dst_rows_id_set = set ()
817809 start , step = 0 , 10000
818810 while is_sync and True :
819- sql = "SELECT _id FROM `%(dst_table)s` LIMIT %(offset)s, %(limit)s" % {
820- 'dst_table' : dst_table_name ,
821- 'offset' : start ,
822- 'limit' : step
823- }
811+ sql = f"SELECT `_id` FROM `{ dst_table_name } ` LIMIT { start } , { step } "
812+ logger .debug ('fetch dst rows-id sql: %s' , sql )
824813 try :
825814 rows = dst_dtable_db_api .query (sql , convert = False , server_only = (not to_archive ))
826815 except Exception as e :
@@ -847,15 +836,6 @@ def import_sync_CDS(context):
847836
848837 query_columns = ', ' .join (['_id' ] + ["`%s`" % col ['name' ] for col in final_columns ])
849838
850- sql_template = sql_template % {'fields' : query_columns }
851- logger .debug ('sql_template: %s' , sql_template )
852-
853- if 'WHERE' in sql_template :
854- where_index = sql_template .find ('WHERE' )
855- fetch_src_sql = sql_template [:where_index ] + ' WHERE (%s) AND _id IN (%%(rows_id)s)' % sql_template [where_index + len ('WHERE' ):]
856- else :
857- fetch_src_sql = sql_template + ' WHERE _id IN (%(rows_id)s)'
858-
859839 step = 10000
860840 for i in range (0 , len (to_be_appended_rows_id_list ), step ):
861841 logger .debug ('to_be_appended_rows_id_list i: %s, step: %s' , i , step )
@@ -866,7 +846,11 @@ def import_sync_CDS(context):
866846 break
867847 step_to_be_appended_rows_id_list .append (to_be_appended_rows_id_list [i + j ])
868848 step_row_sort_dict [to_be_appended_rows_id_list [i + j ]] = j
869- sql = (fetch_src_sql % {'rows_id' : ', ' .join (["'%s'" % row_id for row_id in step_to_be_appended_rows_id_list ])}) + ' LIMIT ' + str (step )
849+ rows_id_str = ', ' .join (["'%s'" % row_id for row_id in step_to_be_appended_rows_id_list ])
850+ if filter_clause :
851+ sql = f"SELECT { query_columns } FROM `{ src_table_name } ` WHERE (({ filter_clause [len ('WHERE' ):]} ) AND `_id` IN ({ rows_id_str } )) LIMIT { step } "
852+ else :
853+ sql = f"SELECT { query_columns } FROM `{ src_table_name } ` WHERE `_id` IN ({ rows_id_str } ) LIMIT { step } "
870854 try :
871855 src_rows = src_dtable_db_api .query (sql , convert = False , server_only = server_only )
872856 except Exception as e :
@@ -885,17 +869,11 @@ def import_sync_CDS(context):
885869 # fetch src to-be-updated-rows and dst to-be-updated-rows, update to dst table, step by step
886870 to_be_updated_rows_id_list = list (to_be_updated_rows_id_set )
887871 step = 10000
888- sql_template = "SELECT %(fields)s FROM `%%(table)s` WHERE _id IN (%%(rows_id)s) LIMIT %%(limit)s" % {
889- 'fields' : query_columns
890- }
891872 for i in range (0 , len (to_be_updated_rows_id_list ), step ):
892873 logger .debug ('to_be_updated_rows_id_list i: %s step: %s' , i , step )
893874 ## fetch src to-be-updated-rows
894- sql = sql_template % {
895- 'rows_id' : ', ' .join (["'%s'" % row_id for row_id in to_be_updated_rows_id_list [i : i + step ]]),
896- 'table' : src_table_name ,
897- 'limit' : step
898- }
875+ rows_id_str = ', ' .join (["'%s'" % row_id for row_id in to_be_updated_rows_id_list [i : i + step ]])
876+ sql = f"SELECT { query_columns } FROM `{ src_table_name } ` WHERE _id IN ({ rows_id_str } ) LIMIT { step } "
899877 try :
900878 src_rows = src_dtable_db_api .query (sql , convert = False , server_only = server_only )
901879 except Exception as e :
@@ -907,11 +885,7 @@ def import_sync_CDS(context):
907885 }
908886
909887 ## fetch src to-be-updated-rows
910- sql = sql_template % {
911- 'rows_id' : ', ' .join (["'%s'" % row_id for row_id in to_be_updated_rows_id_list [i : i + step ]]),
912- 'table' : dst_table_name ,
913- 'limit' : step
914- }
888+ sql = f"SELECT { query_columns } FROM `{ dst_table_name } ` WHERE _id IN ({ rows_id_str } ) LIMIT { step } "
915889 try :
916890 dst_rows = dst_dtable_db_api .query (sql , convert = False , server_only = (not to_archive ))
917891 except Exception as e :
@@ -924,11 +898,13 @@ def import_sync_CDS(context):
924898
925899 ## update
926900 to_be_updated_rows , _ , _ = generate_synced_rows (src_rows , src_columns , final_columns , dst_rows = dst_rows , to_archive = to_archive )
901+ logger .debug ('step src update-rows: %s to-be-updated-rows: %s' , len (to_be_updated_rows_id_list [i : i + step ]), len (to_be_updated_rows ))
927902 error_resp = update_dst_rows (dst_dtable_uuid , dst_table_name , to_be_updated_rows , dst_dtable_db_api , dst_dtable_server_api , to_archive )
928903 if error_resp :
929904 return error_resp
930905
931906 # delete dst to-be-deleted-rows
907+ logger .debug ('will delete %s rows' , len (to_be_deleted_rows_id_set ))
932908 delete_dst_rows (dst_dtable_uuid , dst_table_name , list (to_be_deleted_rows_id_set ), dst_dtable_db_api , dst_dtable_server_api , to_archive )
933909
934910 return {
0 commit comments