Skip to content

Commit 5f57422

Browse files
committed
order append-update-delete => delete-update-append
1 parent 4b9a501 commit 5f57422

File tree

1 file changed

+38
-38
lines changed

1 file changed

+38
-38
lines changed

dtable_events/common_dataset/common_dataset_sync_utils.py

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -734,9 +734,9 @@ def import_sync_CDS(context):
734734
# to-be-appended-rows-id = S - D
735735
# to-be-updated-rows-id = S & D
736736
# to-be-deleted-rows-id = D - S
737-
# fetch src to-be-append-rows, append to dst table, step by step
737+
# delete dst to-be-deleted-rows, step by step
738738
# fetch src to-be-updated-rows and dst to-be-updated-rows, update to dst table, step by step
739-
# delete dst to-be-deleted-rows
739+
# fetch src to-be-append-rows, append to dst table, step by step
740740

741741
# fetch create dst table or update dst table columns
742742
# use src_columns from context temporary !
@@ -830,42 +830,12 @@ def import_sync_CDS(context):
830830
to_be_deleted_rows_id_set = dst_rows_id_set - src_rows_id_set
831831
logger.debug('to_be_appended_rows_id_set: %s, to_be_updated_rows_id_set: %s, to_be_deleted_rows_id_set: %s', len(to_be_appended_rows_id_set), len(to_be_updated_rows_id_set), len(to_be_deleted_rows_id_set))
832832

833-
# fetch src to-be-append-rows, append to dst table, step by step
834-
## this list is to record the order of src rows
835-
to_be_appended_rows_id_list = [row_id for row_id in src_rows_id_list if row_id in to_be_appended_rows_id_set]
833+
# delete dst to-be-deleted-rows
834+
logger.debug('will delete %s rows', len(to_be_deleted_rows_id_set))
835+
delete_dst_rows(dst_dtable_uuid, dst_table_name, list(to_be_deleted_rows_id_set), dst_dtable_db_api, dst_dtable_server_api, to_archive)
836836

837837
query_columns = ', '.join(['_id'] + ["`%s`" % col['name'] for col in final_columns])
838838

839-
step = 10000
840-
for i in range(0, len(to_be_appended_rows_id_list), step):
841-
logger.debug('to_be_appended_rows_id_list i: %s, step: %s', i, step)
842-
step_to_be_appended_rows_id_list = []
843-
step_row_sort_dict = {}
844-
for j in range(step):
845-
if i + j >= len(to_be_appended_rows_id_list):
846-
break
847-
step_to_be_appended_rows_id_list.append(to_be_appended_rows_id_list[i+j])
848-
step_row_sort_dict[to_be_appended_rows_id_list[i+j]] = j
849-
rows_id_str = ', '.join(["'%s'" % row_id for row_id in step_to_be_appended_rows_id_list])
850-
if filter_clause:
851-
sql = f"SELECT {query_columns} FROM `{src_table_name}` WHERE (({filter_clause[len('WHERE'):]}) AND `_id` IN ({rows_id_str})) LIMIT {step}"
852-
else:
853-
sql = f"SELECT {query_columns} FROM `{src_table_name}` WHERE `_id` IN ({rows_id_str}) LIMIT {step}"
854-
try:
855-
src_rows = src_dtable_db_api.query(sql, convert=False, server_only=server_only)
856-
except Exception as e:
857-
logger.error('fetch to-be-appended-rows error: %s', e)
858-
return {
859-
'dst_table_id': None,
860-
'error_msg': 'fetch to-be-appended-rows error: %s' % e,
861-
'task_status_code': 500
862-
}
863-
src_rows = sorted(src_rows, key=lambda row: step_row_sort_dict[row['_id']])
864-
_, to_be_appended_rows, _ = generate_synced_rows(src_rows, src_columns, final_columns, [], to_archive=to_archive)
865-
error_resp = append_dst_rows(dst_dtable_uuid, dst_table_name, to_be_appended_rows, dst_dtable_db_api, dst_dtable_server_api, to_archive=to_archive)
866-
if error_resp:
867-
return error_resp
868-
869839
# fetch src to-be-updated-rows and dst to-be-updated-rows, update to dst table, step by step
870840
to_be_updated_rows_id_list = list(to_be_updated_rows_id_set)
871841
step = 10000
@@ -903,9 +873,39 @@ def import_sync_CDS(context):
903873
if error_resp:
904874
return error_resp
905875

906-
# delete dst to-be-deleted-rows
907-
logger.debug('will delete %s rows', len(to_be_deleted_rows_id_set))
908-
delete_dst_rows(dst_dtable_uuid, dst_table_name, list(to_be_deleted_rows_id_set), dst_dtable_db_api, dst_dtable_server_api, to_archive)
876+
# fetch src to-be-append-rows, append to dst table, step by step
877+
## this list is to record the order of src rows
878+
to_be_appended_rows_id_list = [row_id for row_id in src_rows_id_list if row_id in to_be_appended_rows_id_set]
879+
880+
step = 10000
881+
for i in range(0, len(to_be_appended_rows_id_list), step):
882+
logger.debug('to_be_appended_rows_id_list i: %s, step: %s', i, step)
883+
step_to_be_appended_rows_id_list = []
884+
step_row_sort_dict = {}
885+
for j in range(step):
886+
if i + j >= len(to_be_appended_rows_id_list):
887+
break
888+
step_to_be_appended_rows_id_list.append(to_be_appended_rows_id_list[i+j])
889+
step_row_sort_dict[to_be_appended_rows_id_list[i+j]] = j
890+
rows_id_str = ', '.join(["'%s'" % row_id for row_id in step_to_be_appended_rows_id_list])
891+
if filter_clause:
892+
sql = f"SELECT {query_columns} FROM `{src_table_name}` WHERE (({filter_clause[len('WHERE'):]}) AND `_id` IN ({rows_id_str})) LIMIT {step}"
893+
else:
894+
sql = f"SELECT {query_columns} FROM `{src_table_name}` WHERE `_id` IN ({rows_id_str}) LIMIT {step}"
895+
try:
896+
src_rows = src_dtable_db_api.query(sql, convert=False, server_only=server_only)
897+
except Exception as e:
898+
logger.error('fetch to-be-appended-rows error: %s', e)
899+
return {
900+
'dst_table_id': None,
901+
'error_msg': 'fetch to-be-appended-rows error: %s' % e,
902+
'task_status_code': 500
903+
}
904+
src_rows = sorted(src_rows, key=lambda row: step_row_sort_dict[row['_id']])
905+
_, to_be_appended_rows, _ = generate_synced_rows(src_rows, src_columns, final_columns, [], to_archive=to_archive)
906+
error_resp = append_dst_rows(dst_dtable_uuid, dst_table_name, to_be_appended_rows, dst_dtable_db_api, dst_dtable_server_api, to_archive=to_archive)
907+
if error_resp:
908+
return error_resp
909909

910910
return {
911911
'dst_table_id': dst_table_id,

0 commit comments

Comments
 (0)