Skip to content

Commit e1b0431

Browse files
authored
Merge pull request #8 from mongodb-partners/new_listening
Added resilience by storing initi_sync_stat and last_parquet_File num…
2 parents bbf4a3f + 45fd585 commit e1b0431

File tree

6 files changed

+167
-67
lines changed

6 files changed

+167
-67
lines changed

constants.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,12 @@
3333

3434
TEMP_PREFIX_DURING_INIT = "Temp_"
3535

36-
INIT_SYNC_CURRENT_SKIP_FILE_NAME = "init_sync_current_skip"
36+
#INIT_SYNC_CURRENT_SKIP_FILE_NAME = "init_sync_current_skip"
37+
# added the two new files to save the initial sync status and last parquet file number
38+
39+
INIT_SYNC_STATUS_FILE_NAME = "_init_sync_status.pkl"
40+
41+
LAST_PARQUET_FILE_NUMBER = "_last_created_parquet.pkl"
3742

3843
INIT_SYNC_LAST_ID_FILE_NAME = "_last_id.pkl"
3944

flags.py

Lines changed: 0 additions & 17 deletions
This file was deleted.

init_sync.py

Lines changed: 82 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,39 +14,70 @@
1414
MONGODB_READING_BATCH_SIZE,
1515
METADATA_FILE_NAME,
1616
DATA_FILES_PATH,
17-
INIT_SYNC_CURRENT_SKIP_FILE_NAME,
17+
# INIT_SYNC_CURRENT_SKIP_FILE_NAME,
18+
# added the two new files to save the initial sync status and last parquet file number
19+
INIT_SYNC_STATUS_FILE_NAME,
20+
LAST_PARQUET_FILE_NUMBER,
1821
INIT_SYNC_LAST_ID_FILE_NAME,
1922
INIT_SYNC_MAX_ID_FILE_NAME,
2023
)
2124
import schema_utils
2225
from utils import get_parquet_full_path_filename, to_string, get_table_dir
2326
from push_file_to_lz import push_file_to_lz
24-
from flags import set_init_flag, clear_init_flag
27+
# not required as now init_sync stat is stored in LZ
28+
#from flags import set_init_flag, clear_init_flag
2529
from file_utils import FileType, read_from_file, write_to_file, delete_file
2630

2731

2832
def init_sync(collection_name: str):
2933
logger = logging.getLogger(f"{__name__}[{collection_name}]")
30-
# skip init_sync if there's already parquet files and no current_skip/last_id file
31-
table_dir = get_table_dir(collection_name)
32-
current_skip_file_path = os.path.join(table_dir, INIT_SYNC_CURRENT_SKIP_FILE_NAME)
33-
last_id_file_path = os.path.join(table_dir, INIT_SYNC_LAST_ID_FILE_NAME)
34-
# needs to exclude the situation of cache or temp parquet files exist but
35-
# not normal numbered parquet files, in which case we shouldn't skip init sync
36-
if (
37-
not os.path.exists(last_id_file_path)
38-
and os.path.exists(table_dir)
39-
and any(
40-
file.endswith(".parquet") and os.path.splitext(file)[0].isnumeric()
41-
for file in os.listdir(table_dir)
42-
)
43-
):
34+
35+
# detect if there's a init_sync_stat file in LZ, and get its value
36+
init_sync_stat_flag = read_from_file(
37+
collection_name, INIT_SYNC_STATUS_FILE_NAME, FileType.PICKLE
38+
)
39+
if init_sync_stat_flag == "Y":
4440
logger.info(
4541
f"init sync for collection {collection_name} has already finished previously. Skipping init sync this time."
4642
)
4743
return
44+
45+
# detect if there's a last_id file, and restore last_id from it
46+
last_id = read_from_file(
47+
collection_name, INIT_SYNC_LAST_ID_FILE_NAME, FileType.PICKLE
48+
)
49+
if (init_sync_stat_flag == "N" and last_id):
50+
logger.info(
51+
f"interrupted init sync detected, continuing with previous _id={last_id}"
52+
)
53+
# skip old logic with LZ file for init_sync_stat
54+
# skip init_sync if there's already parquet files and no current_skip/last_id file
55+
#table_dir = get_table_dir(collection_name)
56+
#current_skip_file_path = os.path.join(table_dir, INIT_SYNC_CURRENT_SKIP_FILE_NAME)
57+
#last_id_file_path = os.path.join(table_dir, INIT_SYNC_LAST_ID_FILE_NAME)
58+
# needs to exclude the situation of cache or temp parquet files exist but
59+
# not normal numbered parquet files, in which case we shouldn't skip init sync
60+
# if (
61+
# not os.path.exists(last_id_file_path)
62+
# and os.path.exists(table_dir)
63+
# and any(
64+
# file.endswith(".parquet") and os.path.splitext(file)[0].isnumeric()
65+
# for file in os.listdir(table_dir)
66+
# )
67+
# ):
68+
4869
logger.info(f"begin init sync for {collection_name}")
49-
set_init_flag(collection_name)
70+
71+
# begin by writing init_sync_stat file with "N" as value
72+
#set_init_flag(collection_name)
73+
if not init_sync_stat_flag:
74+
# writing init_sync_stat file with "N"
75+
init_sync_stat_flag = "N"
76+
logger.info(f"writing init sync stat file with as 'N' for {collection_name}")
77+
write_to_file(
78+
init_sync_stat_flag, collection_name, INIT_SYNC_STATUS_FILE_NAME, FileType.PICKLE
79+
)
80+
5081
db_name = os.getenv("MONGO_DB_NAME")
5182
logger.debug(f"db_name={db_name}")
5283
logger.debug(f"collection={collection_name}")
@@ -77,14 +108,15 @@ def init_sync(collection_name: str):
77108

78109
columns_to_convert_to_str = None
79110

111+
#moved to the begining to check if initial sync is completed
80112
# detect if there's a last_id file, and restore last_id from it
81-
last_id = read_from_file(
82-
collection_name, INIT_SYNC_LAST_ID_FILE_NAME, FileType.PICKLE
83-
)
84-
if last_id:
85-
logger.info(
86-
f"interrupted init sync detected, continuing with previous _id={last_id}"
87-
)
113+
# last_id = read_from_file(
114+
# collection_name, INIT_SYNC_LAST_ID_FILE_NAME, FileType.PICKLE
115+
# )
116+
# if last_id:
117+
# logger.info(
118+
# f"interrupted init sync detected, continuing with previous _id={last_id}"
119+
# )
88120

89121
while last_id is None or last_id < max_id:
90122
# for debug only
@@ -128,7 +160,16 @@ def init_sync(collection_name: str):
128160
logger.info(f"TIME: trans took {trans_end_time-read_end_time:.2f} seconds")
129161

130162
logger.debug("creating parquet file...")
131-
parquet_full_path_filename = get_parquet_full_path_filename(collection_name)
163+
# changed to get last parquet file number from LZ for resilience
164+
#parquet_full_path_filename = get_parquet_full_path_filename(collection_name)
165+
last_parquet_file_num = read_from_file(
166+
collection_name, LAST_PARQUET_FILE_NUMBER, FileType.PICKLE
167+
)
168+
if not last_parquet_file_num:
169+
last_parquet_file_num = 0
170+
171+
parquet_full_path_filename = get_parquet_full_path_filename(collection_name, last_parquet_file_num)
172+
132173
logger.info(f"writing parquet file: {parquet_full_path_filename}")
133174
batch_df.to_parquet(parquet_full_path_filename, index=False)
134175
write_end_time = time.time()
@@ -139,11 +180,10 @@ def init_sync(collection_name: str):
139180
metadata_json_path = os.path.join(
140181
os.path.dirname(os.path.abspath(__file__)), METADATA_FILE_NAME
141182
)
142-
#Diana 143
143183
logger.info("writing metadata file to LZ")
144184
push_file_to_lz(metadata_json_path, collection_name)
185+
# write the current batch to LZ
145186
push_start_time = time.time()
146-
#Diana 147
147187
logger.info("writing parquet file to LZ")
148188
push_file_to_lz(parquet_full_path_filename, collection_name)
149189
push_end_time = time.time()
@@ -156,12 +196,26 @@ def init_sync(collection_name: str):
156196
write_to_file(
157197
last_id, collection_name, INIT_SYNC_LAST_ID_FILE_NAME, FileType.PICKLE
158198
)
199+
# write last parquet file number to file
200+
last_parquet_file_num += 1
201+
logger.info(f"writing last parquet number into file: {last_parquet_file_num}")
202+
write_to_file(
203+
last_parquet_file_num,
204+
collection_name,
205+
LAST_PARQUET_FILE_NUMBER,
206+
FileType.PICKLE,
207+
)
159208

160209
# delete last_id file, as init sync is complete
161210
logger.info("removing the last_id file")
162211
delete_file(collection_name, INIT_SYNC_LAST_ID_FILE_NAME)
163212

164-
clear_init_flag(collection_name)
213+
#set_init_flag_stat as complete = Y
214+
logger.info("Setting init_sync_stat flag as Y")
215+
init_sync_stat_flag = "Y"
216+
write_to_file(
217+
init_sync_stat_flag, collection_name, INIT_SYNC_STATUS_FILE_NAME, FileType.PICKLE
218+
)
165219
logger.info(f"init sync completed for collection {collection_name}")
166220

167221

listening.py

Lines changed: 59 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
DATA_FILES_PATH,
1616
DELTA_SYNC_CACHE_PARQUET_FILE_NAME,
1717
DELTA_SYNC_RESUME_TOKEN_FILE_NAME,
18+
# added the two new files to save the initial sync status and last parquet file number
19+
INIT_SYNC_STATUS_FILE_NAME,
20+
LAST_PARQUET_FILE_NUMBER,
1821
DTYPE_KEY,
1922
TYPE_KEY,
2023
)
21-
from utils import to_string, get_parquet_full_path_filename, get_table_dir
24+
from utils import to_string, get_parquet_full_path_filename, get_temp_parquet_full_path_filename, get_table_dir
2225
from push_file_to_lz import push_file_to_lz
23-
from flags import get_init_flag
26+
#from flags import get_init_flag
2427
from init_sync import init_sync
2528
import schemas
2629
import schema_utils
@@ -57,10 +60,16 @@ def listening(collection_name: str):
5760
Thread(target=init_sync, args=(collection_name,)).start()
5861

5962
logger.info(f"start listening to change stream for collection {collection_name}")
63+
init_sync_stat_flag = None
6064
for change in cursor:
61-
init_flag = get_init_flag(collection_name)
65+
# init_flag = get_init_flag(collection_name)
66+
if not init_sync_stat_flag == "Y":
67+
init_sync_stat_flag = read_from_file(
68+
collection_name, INIT_SYNC_STATUS_FILE_NAME, FileType.PICKLE
69+
)
6270
# do post init flush if this is the first iteration after init is done
63-
if not init_flag and not post_init_flush_done:
71+
#if not init_flag and not post_init_flush_done:
72+
if init_sync_stat_flag == "Y" and not post_init_flush_done:
6473
__post_init_flush(collection_name, logger)
6574
post_init_flush_done = True
6675
# logger.debug(type(change))
@@ -79,7 +88,7 @@ def listening(collection_name: str):
7988
# process df according to internal schema
8089
schema_utils.process_dataframe(collection_name, df)
8190

82-
if init_flag:
91+
if not init_sync_stat_flag == "Y":
8392
logger.debug(
8493
f"collection {collection_name} still initializing, use UPSERT instead of INSERT"
8594
)
@@ -101,33 +110,43 @@ def listening(collection_name: str):
101110
logger.info(f"last_sync_time when first record added: {last_sync_time}")
102111

103112

104-
if init_flag:
113+
if not init_sync_stat_flag == "Y":
105114
if (accumulative_df is not None
106115
and (
107116
(accumulative_df.shape[0] >= int(os.getenv("DELTA_SYNC_BATCH_SIZE")))
108117
)
109118
):
110119
prefix = TEMP_PREFIX_DURING_INIT
111-
parquet_full_path_filename = get_parquet_full_path_filename(
120+
# changed to a diff method just for temp as temp continues in local
121+
# parquet_full_path_filename = get_parquet_full_path_filename(
122+
parquet_full_path_filename = get_temp_parquet_full_path_filename(
112123
collection_name, prefix=prefix
113124
)
114125

115-
logger.info(f"writing parquet file: {parquet_full_path_filename}")
126+
logger.info(f"writing TEMP parquet file: {parquet_full_path_filename}")
116127
accumulative_df.to_parquet(parquet_full_path_filename)
117128
accumulative_df = None
118-
119-
if not init_flag:
129+
else:
120130
if (accumulative_df is not None
121131
and (
122132
(accumulative_df.shape[0] >= int(os.getenv("DELTA_SYNC_BATCH_SIZE")))
123-
# or (time.time() - last_sync_time >= TIME_THRESHOLD_IN_SEC)
124133
or ((time.time() - last_sync_time) >= time_threshold_in_sec)
125134
)
126135
):
127136
prefix = ""
128-
parquet_full_path_filename = get_parquet_full_path_filename(
129-
collection_name, prefix=prefix
130-
)
137+
# parquet_full_path_filename = get_parquet_full_path_filename(
138+
# collection_name, prefix=prefix
139+
# )
140+
# changed to get last parquet file number from LZ for resilience
141+
#parquet_full_path_filename = get_parquet_full_path_filename(collection_name)
142+
last_parquet_file_num = read_from_file(
143+
collection_name, LAST_PARQUET_FILE_NUMBER, FileType.PICKLE
144+
)
145+
if not last_parquet_file_num:
146+
last_parquet_file_num = 0
147+
148+
parquet_full_path_filename = get_parquet_full_path_filename(collection_name, last_parquet_file_num)
149+
131150
logger.info(f"writing parquet file: {parquet_full_path_filename}")
132151
accumulative_df.to_parquet(parquet_full_path_filename)
133152
accumulative_df = None
@@ -143,7 +162,15 @@ def listening(collection_name: str):
143162
DELTA_SYNC_RESUME_TOKEN_FILE_NAME,
144163
FileType.PICKLE,
145164
)
146-
165+
# write last parquet file number to file
166+
last_parquet_file_num += 1
167+
logger.info(f"writing last parquet number into file: {last_parquet_file_num}")
168+
write_to_file(
169+
last_parquet_file_num,
170+
collection_name,
171+
LAST_PARQUET_FILE_NUMBER,
172+
FileType.PICKLE,
173+
)
147174

148175
def __post_init_flush(table_name: str, logger):
149176
if not logger:
@@ -163,7 +190,14 @@ def __post_init_flush(table_name: str, logger):
163190
)
164191
for temp_parquet_filename in temp_parquet_filename_list:
165192
temp_parquet_full_path = os.path.join(table_dir, temp_parquet_filename)
166-
new_parquet_full_path = get_parquet_full_path_filename(table_name)
193+
# changed to get last parquet file number from LZ for resilience
194+
#new_parquet_full_path = get_parquet_full_path_filename(table_name)
195+
last_parquet_file_num = read_from_file(
196+
table_name, LAST_PARQUET_FILE_NUMBER, FileType.PICKLE
197+
)
198+
if not last_parquet_file_num:
199+
last_parquet_file_num = 0
200+
new_parquet_full_path = get_parquet_full_path_filename(table_name, last_parquet_file_num)
167201
logger.debug("renaming temp parquet file")
168202
logger.debug(f"old name: {temp_parquet_full_path}")
169203
logger.debug(f"new name: {new_parquet_full_path}")
@@ -172,3 +206,12 @@ def __post_init_flush(table_name: str, logger):
172206
)
173207
os.rename(temp_parquet_full_path, new_parquet_full_path)
174208
push_file_to_lz(new_parquet_full_path, table_name)
209+
# write last parquet file number to file
210+
last_parquet_file_num += 1
211+
logger.info(f"writing last parquet number into file: {last_parquet_file_num}")
212+
write_to_file(
213+
last_parquet_file_num,
214+
table_name,
215+
LAST_PARQUET_FILE_NUMBER,
216+
FileType.PICKLE,
217+
)

schema_persistence.py

Whitespace-only changes.

utils.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,32 @@ def get_table_dir(table_name: str) -> str:
1010
table_dir = os.path.join(current_dir, DATA_FILES_PATH, table_name + os.sep)
1111
os.makedirs(table_dir, exist_ok=True)
1212
return table_dir
13-
14-
def get_parquet_full_path_filename(table_name: str, prefix: str = "") -> str:
13+
#changes to get next parquet file num based on the last parquet from LZ, it will pass 0 if first file
14+
def get_parquet_full_path_filename(table_name: str, parquet_filename_int_list: int, prefix: str = "") -> str:
15+
table_dir = get_table_dir(table_name)
16+
# parquet_filename_int_list = [
17+
# int(os.path.splitext(filename)[0].removeprefix(prefix))
18+
# for filename in os.listdir(table_dir)
19+
# if os.path.splitext(filename)[1] == ".parquet"
20+
# and os.path.splitext(filename)[0].removeprefix(prefix).isnumeric()
21+
# ]
22+
#if parquet_filename_int_list:
23+
# return os.path.join(table_dir, prefix + __num_to_filename(max(parquet_filename_int_list) + 1))
24+
return os.path.join(table_dir, prefix + __num_to_filename(parquet_filename_int_list + 1))
25+
# else:
26+
# return os.path.join(table_dir, prefix + __num_to_filename(1))
27+
28+
#as temp files will be stored in local only, kept the original logic a is
29+
def get_temp_parquet_full_path_filename(table_name: str, prefix: str = "") -> str:
1530
table_dir = get_table_dir(table_name)
16-
parquet_filename_int_list = [
31+
tmp_parquet_filename_int_list = [
1732
int(os.path.splitext(filename)[0].removeprefix(prefix))
1833
for filename in os.listdir(table_dir)
1934
if os.path.splitext(filename)[1] == ".parquet"
2035
and os.path.splitext(filename)[0].removeprefix(prefix).isnumeric()
2136
]
22-
if parquet_filename_int_list:
23-
return os.path.join(table_dir, prefix + __num_to_filename(max(parquet_filename_int_list) + 1))
37+
if tmp_parquet_filename_int_list:
38+
return os.path.join(table_dir, prefix + __num_to_filename(max(tmp_parquet_filename_int_list) + 1))
2439
else:
2540
return os.path.join(table_dir, prefix + __num_to_filename(1))
2641

0 commit comments

Comments
 (0)