Skip to content

Commit 8a33b12

Browse files
authored
Merge pull request #13 from mongodb-partners/test-changes-march
Data types and schema enforcement changes
2 parents 4f9697f + 3f6f2dd commit 8a33b12

File tree

10 files changed

+458
-177
lines changed

10 files changed

+458
-177
lines changed

.DS_Store

0 Bytes
Binary file not shown.

.gitignore

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
__pycache__/
22
.venv/
33
.env
4+
venv
45
.vscode/
56
data_files/
6-
*.log
7+
*.log
8+
*.log.1
9+
*.log.*
10+
*.ipynb

app.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66

77
def create_app():
88
app = Flask(__name__)
9-
Thread(target=mirror).start()
9+
thread_name=Thread(target=mirror).start()
1010

1111
@app.route("/")
1212
def home_page():
13+
import threading
14+
for thread in threading.enumerate():
15+
print(thread.name)
1316
return "The MongoDB Fabric Mirroring Service is running..."
14-
17+
1518
return app
1619

1720
app = create_app()

constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252

5353
COLUMN_RENAMING_FILE_NAME = "_column_renaming.pkl"
5454

55+
CONVERSION_LOG_FILE_NAME = "_conversion_log.txt"
5556
# dict keys for schema
5657
TYPE_KEY = "type"
5758
DTYPE_KEY = "dtype"

file_utils.py

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import pickle
33
from enum import Enum
44
from typing import Any
5-
65
import utils
76
from push_file_to_lz import push_file_to_lz, get_file_from_lz, delete_file_from_lz
87

@@ -23,25 +22,48 @@ class FileType(Enum):
2322
}
2423

2524

25+
# def read_from_file(table_name: str, file_name: str, file_type: FileType):
26+
# table_path = utils.get_table_dir(table_name)
27+
# file_full_path = os.path.join(table_path, file_name)
28+
# print("File path ; ", file_full_path)
29+
# # always read from LZ first
30+
# if get_file_from_lz(table_name, file_name) : # and os.path.exists(file_full_path):
31+
# with open(
32+
# file_full_path, FILETYPE_TO_READ_MODE_MAP.get(file_type, "r")
33+
# ) as file:
34+
# if file_type == FileType.PICKLE:
35+
# obj = pickle.load(file)
36+
# elif file_type == FileType.TEXT:
37+
# obj = file.read()
38+
# else:
39+
# obj = None
40+
# return obj
41+
# else:
42+
# return None
43+
44+
2645
def read_from_file(table_name: str, file_name: str, file_type: FileType):
2746
table_path = utils.get_table_dir(table_name)
2847
file_full_path = os.path.join(table_path, file_name)
2948
# always read from LZ first
30-
if get_file_from_lz(table_name, file_name) and os.path.exists(file_full_path):
31-
with open(
32-
file_full_path, FILETYPE_TO_READ_MODE_MAP.get(file_type, "r")
33-
) as file:
34-
if file_type == FileType.PICKLE:
35-
obj = pickle.load(file)
36-
elif file_type == FileType.TEXT:
37-
obj = file.read()
38-
else:
39-
obj = None
49+
response_status_code, file_content = get_file_from_lz(table_name, file_name)
50+
if response_status_code == 200:
51+
if file_type == FileType.PICKLE:
52+
obj = pickle.loads(file_content.content)
53+
print("Type of object: ", isinstance(obj, bytes))
54+
# Check if the result is itself a pickled object (nested)
55+
if isinstance(obj, bytes):
56+
obj = pickle.loads(obj)
57+
print("Unpickled object: ", obj)
4058
return obj
59+
60+
elif file_type == FileType.TEXT:
61+
return file_content.content.decode('utf-8')
62+
else:
63+
return None
4164
else:
4265
return None
4366

44-
4567
def write_to_file(obj: Any, table_name: str, file_name: str, file_type: FileType):
4668
table_path = utils.get_table_dir(table_name)
4769
file_full_path = os.path.join(table_path, file_name)
@@ -54,6 +76,25 @@ def write_to_file(obj: Any, table_name: str, file_name: str, file_type: FileType
5476
push_file_to_lz(file_full_path, table_name)
5577

5678

79+
def append_to_file(obj: Any, table_name: str, file_name: str, file_type: FileType):
80+
table_path = utils.get_table_dir(table_name)
81+
file_full_path = os.path.join(table_path, file_name)
82+
append_mode = "ab" if file_type == FileType.PICKLE else "a"
83+
84+
# Create file if it doesn't exist
85+
if not os.path.exists(file_full_path):
86+
with open(file_full_path, FILETYPE_TO_WRITE_MODE_MAP.get(file_type, "w")) as f:
87+
f.write(f"\n{'Column Name':<20} | {'Original Value':<20} | {'Converting Value':<20}\n{'-'*70}\n")
88+
89+
with open(file_full_path, append_mode) as file:
90+
if file_type == FileType.PICKLE:
91+
pickle.dump(obj, file)
92+
elif file_type == FileType.TEXT:
93+
file.write(obj)
94+
# write to LZ
95+
# push_file_to_lz(file_full_path, table_name)
96+
97+
5798
def delete_file(table_name: str, file_name: str):
5899
file_full_path = os.path.join(utils.get_table_dir(table_name), file_name)
59100
os.remove(file_full_path)

init_sync.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import pprint
12
import pymongo
23
from pymongo.collection import Collection
34
import time
@@ -6,8 +7,9 @@
67
import shutil
78
import logging
89
import glob
9-
from bson import ObjectId
10+
from bson import ObjectId, Decimal128
1011
import pickle
12+
import numpy as np
1113

1214
from constants import (
1315
TYPES_TO_CONVERT_TO_STR,
@@ -137,6 +139,7 @@ def init_sync(collection_name: str):
137139

138140
read_start_time = time.time()
139141
batch_df = pd.DataFrame(list(batch_cursor))
142+
140143
read_end_time = time.time()
141144
if enable_perf_timer:
142145
logger.info(f"TIME: read took {read_end_time-read_start_time:.2f} seconds")
@@ -171,17 +174,21 @@ def init_sync(collection_name: str):
171174
parquet_full_path_filename = get_parquet_full_path_filename(collection_name, last_parquet_file_num)
172175

173176
logger.info(f"writing parquet file: {parquet_full_path_filename}")
177+
174178
batch_df.to_parquet(parquet_full_path_filename, index=False)
175-
write_end_time = time.time()
179+
# os.remove("temp.csv")
180+
write_end_time = time.time()
176181
if enable_perf_timer:
177182
logger.info(f"TIME: write took {write_end_time-trans_end_time:.2f} seconds")
178-
if not last_id:
183+
184+
#>>># changes to remove write metadata.json here as it will now be written as the first file in mongodb_generic_mirroring.py - 6Mar2025
185+
# if not last_id:
179186
# do not copy, but send the template file directly
180-
metadata_json_path = os.path.join(
181-
os.path.dirname(os.path.abspath(__file__)), METADATA_FILE_NAME
182-
)
183-
logger.info("writing metadata file to LZ")
184-
push_file_to_lz(metadata_json_path, collection_name)
187+
# metadata_json_path = os.path.join(
188+
# os.path.dirname(os.path.abspath(__file__)), METADATA_FILE_NAME
189+
# )
190+
# logger.info("writing metadata file to LZ")
191+
# push_file_to_lz(metadata_json_path, collection_name)
185192
# write the current batch to LZ
186193
push_start_time = time.time()
187194
logger.info("writing parquet file to LZ")
@@ -205,6 +212,8 @@ def init_sync(collection_name: str):
205212
LAST_PARQUET_FILE_NUMBER,
206213
FileType.PICKLE,
207214
)
215+
#>>># added sleep to ensure that Fabric picks up one file at a time - 14Mar2025
216+
time.sleep(30)
208217

209218
# delete last_id file, as init sync is complete
210219
logger.info("removing the last_id file")

0 commit comments

Comments
 (0)