Skip to content

Commit 50cd912

Browse files
make sync arch worker
1 parent a24c27c commit 50cd912

File tree

2 files changed

+178
-10
lines changed

2 files changed

+178
-10
lines changed

storage-app/src/shared/archive_helpers.py

Lines changed: 159 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ def run(self):
6868

6969
buffer, read_size = task
7070
buffer.seek(0)
71-
v = buffer.read(read_size)
71+
data = buffer.read(read_size)
7272

73-
dest.write(v)
73+
dest.write(data)
7474

7575
del buffer
7676

@@ -248,6 +248,7 @@ def produce_sync(self):
248248
for file in self.object_set:
249249
self.queue.put((self._get_file_name(file), file.read()))
250250
self.iter_count += 1
251+
file.close()
251252
if not self.iter_count % GC_FREQ: gc_collect()
252253

253254
self.queue.put(None)
@@ -283,3 +284,159 @@ def _get_file_name(self, file: GridOut) -> str:
283284
if extension: name += f".{extension}"
284285

285286
return name
287+
288+
289+
class SyncZipping():
290+
DUMP_THRESHOLD: int = 10 << 20
291+
292+
def __init__(
293+
self,
294+
dest_name: str,
295+
object_set: Cursor,
296+
additional: list[tuple[str, bytes]]
297+
):
298+
self.object_set = object_set
299+
self.additional = additional
300+
self.file_list = []
301+
self._local_dir_end = 0
302+
self._archive_id = None
303+
self.dest = SyncDataBase \
304+
.get_fs_bucket(TEMP_BUCKET) \
305+
.open_upload_stream(
306+
dest_name,
307+
metadata={"created_at": datetime.now().isoformat()}
308+
)
309+
310+
def dest_write(self, buffer, read_size):
311+
buffer.seek(0)
312+
data = buffer.read(read_size)
313+
self.dest.write(data)
314+
315+
def tell(self) -> int: return self._local_dir_end
316+
317+
def result(self) -> Optional[str]: return self._archive_id
318+
319+
def _dump_buffer(self, buffer: BytesIO, zip_buffer: ZipFile):
320+
dest_offset = self.tell()
321+
322+
new_list = zip_buffer.filelist
323+
for zinfo in new_list: zinfo.header_offset += dest_offset
324+
325+
self.file_list += new_list
326+
self._local_dir_end += buffer.tell()
327+
328+
self.dest_write(buffer, buffer.tell())
329+
330+
zip_buffer.close()
331+
332+
def _finalize(self):
333+
self._write_end_record(end_buffer := BytesIO())
334+
self.dest_write(end_buffer, end_buffer.tell())
335+
336+
self._write_cent_dir(
337+
self.tell() + end_buffer.tell(),
338+
self.tell(),
339+
len(self.file_list),
340+
cent_dir_buffer := BytesIO()
341+
)
342+
self.dest_write(cent_dir_buffer, cent_dir_buffer.tell())
343+
344+
self._archive_id = self.dest._id
345+
346+
self.dest.close()
347+
348+
SyncDataBase.close_connection()
349+
350+
def _write_end_record(self, buffer: BytesIO):
351+
for zinfo in self.file_list:
352+
dt = zinfo.date_time
353+
354+
dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
355+
dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
356+
extra = []
357+
358+
assert zinfo.file_size <= ZIP64_LIMIT and zinfo.compress_size <= ZIP64_LIMIT
359+
360+
file_size = zinfo.file_size
361+
compress_size = zinfo.compress_size
362+
363+
if zinfo.header_offset > ZIP64_LIMIT:
364+
extra.append(zinfo.header_offset)
365+
header_offset = 0xffffffff
366+
else: header_offset = zinfo.header_offset
367+
368+
extra_data = zinfo.extra
369+
min_version = 0
370+
371+
if extra:
372+
extra_data = _Extra.strip(extra_data, (1,))
373+
extra_data = pack_data("<HH" + "Q" * len(extra), 1, 8 * len(extra), *extra) + extra_data
374+
375+
min_version = ZIP64_VERSION
376+
377+
extract_version = max(min_version, zinfo.extract_version)
378+
create_version = max(min_version, zinfo.create_version)
379+
380+
filename, flag_bits = zinfo._encodeFilenameFlags()
381+
382+
centdir = pack_data(
383+
CENTRAL_STRUCT,
384+
CENTRAL_STRING,
385+
create_version,
386+
zinfo.create_system,
387+
extract_version,
388+
zinfo.reserved,
389+
flag_bits,
390+
zinfo.compress_type,
391+
dostime,
392+
dosdate,
393+
zinfo.CRC,
394+
compress_size,
395+
file_size,
396+
len(filename),
397+
len(extra_data),
398+
len(zinfo.comment),
399+
0,
400+
zinfo.internal_attr,
401+
zinfo.external_attr,
402+
header_offset
403+
)
404+
405+
buffer.write(centdir + filename + extra_data + zinfo.comment)
406+
407+
def _write_cent_dir(self, pos: int, start_dir: int, d_size: int, buffer: BytesIO):
408+
cent_dir = pos - start_dir
409+
410+
if d_size > ZIP_FILECOUNT_LIMIT or pos > ZIP64_LIMIT:
411+
pack = (END_64_STRUCT, END_64_STRING, 44, 45, 45, 0, 0, d_size, d_size, 0, pos)
412+
buffer.write(pack_data(*pack))
413+
buffer.write(pack_data(END_64_STRUCT_LOC, END_64_STRING_LOC, 0, pos, 1))
414+
cent_dir = min(cent_dir, 0xFFFFFFFF)
415+
start_dir = min(start_dir, 0xFFFFFFFF)
416+
d_size = min(d_size, 0xFFFF)
417+
418+
buffer.write(pack_data(END_STRUCT, END_STRING, 0, 0, d_size, d_size, cent_dir, start_dir, 0))
419+
420+
def run(self):
421+
buffer = BytesIO()
422+
zip_buffer: ZipFile = ZipFile(buffer, "w", ZIP_DEFLATED)
423+
424+
for file in self.object_set:
425+
f_name, ext = str(file._id), file.metadata.get("file_extension", "")
426+
if ext: f_name += f".{ext}"
427+
428+
f_data = file.read()
429+
430+
zip_buffer.writestr(f_name, f_data)
431+
432+
if buffer.tell() > self.DUMP_THRESHOLD:
433+
self._dump_buffer(buffer, zip_buffer)
434+
buffer = BytesIO()
435+
zip_buffer = ZipFile(buffer, "w", ZIP_DEFLATED)
436+
437+
for f_name, f_data in self.additional: zip_buffer.writestr(f_name, f_data)
438+
439+
if buffer.tell(): self._dump_buffer(buffer, zip_buffer)
440+
441+
self.object_set.close()
442+
self._finalize()

storage-app/src/shared/worker_services.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import requests
1616
from .hasher import VHash, IHash
1717
from queue import Queue
18-
from .archive_helpers import FileProducer, ZipConsumer, ZipWriter
18+
from .archive_helpers import FileProducer, ZipConsumer, ZipWriter, SyncZipping
1919
from celery import Task
2020

2121

@@ -33,7 +33,6 @@ def to_value(self):
3333

3434

3535
class Zipper:
36-
written: bool = False
3736
archive_extension: str = "zip"
3837

3938
def __init__(
@@ -48,9 +47,25 @@ def __init__(
4847

4948
self._get_annotation(bucket_name, file_ids)
5049

51-
def archive_objects(self) -> Optional[bool]:
50+
def archive_objects(self):
51+
json_data: Any = ("annotation.json", dumps(self.annotation, indent=4).encode("utf-8"))
52+
object_set = SyncDataBase \
53+
.get_fs_bucket(self.bucket_name) \
54+
.find(
55+
{"_id": {"$in": [get_object_id(str(object_id)) for object_id in self.file_ids]}},
56+
no_cursor_timeout=True
57+
) \
58+
.batch_size(200)
59+
60+
zipper = SyncZipping(f"{self.bucket_name}_dataset", object_set, [json_data])
61+
zipper.run()
62+
63+
assert (result_id := zipper.result()), "Archive was not written"
64+
self._archive_id = result_id
65+
66+
67+
def _archive_objects(self):
5268
json_data: Any = ("annotation.json", dumps(self.annotation, indent=4).encode("utf-8"))
53-
# object_set = Bucket(self.bucket_name).get_download_objects(self.file_ids)
5469
object_set = SyncDataBase \
5570
.get_fs_bucket(self.bucket_name) \
5671
.find(
@@ -86,13 +101,9 @@ def archive_objects(self) -> Optional[bool]:
86101
consumer.join()
87102
writer.join()
88103

89-
self.written = True
90-
91104
assert (result_id := writer.result()), "Archive was not written"
92105
self._archive_id = result_id
93106

94-
return self.written
95-
96107
def _get_annotation(self, bucket_name: str, file_ids: list[str]) -> Any:
97108
url: str = APP_BACKEND_URL + "/api/files/annotation/"
98109
payload_token = emit_token({"minutes": 1}, SECRET_KEY, SECRET_ALGO)

0 commit comments

Comments
 (0)