Skip to content

Commit e229ccd

Browse files
committed
feat: zip stream now yields predictable chunk sizes
1 parent 0a1c4a1 commit e229ccd

File tree

7 files changed

+24
-21
lines changed

7 files changed

+24
-21
lines changed

rocrate/memory_buffer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,6 @@ def read(self, size=-1):
2626
data = self._buffer[:size]
2727
self._buffer = self._buffer[size:]
2828
return data
29+
30+
def __len__(self):
31+
return len(self._buffer)

rocrate/model/data_entity.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class DataEntity(Entity):
3030
def write(self, base_path):
3131
pass
3232

33-
def stream(self) -> Generator[tuple[str, bytes], None, None]:
33+
def stream(self, chunk_size=8192) -> Generator[tuple[str, bytes], None, None]:
3434
""" Stream the data from the source. Each chunk of the content is yielded as a tuple
3535
containing the name of the destination file relative to the crate and the chunk of data.
3636
The destination file name is required because a DataEntity can be a file or a

rocrate/model/dataset.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,15 @@ def write(self, base_path):
8282
else:
8383
self._copy_folder(base_path)
8484

85-
def stream(self) -> Generator[tuple[str, bytes], None, None]:
85+
def stream(self, chunk_size=8192) -> Generator[tuple[str, bytes], None, None]:
8686
if self.source is None:
8787
return
8888
elif is_url(str(self.source)):
89-
yield from self._stream_folder_from_url()
89+
yield from self._stream_folder_from_url(chunk_size)
9090
else:
91-
yield from self._stream_folder_from_path()
91+
yield from self._stream_folder_from_path(chunk_size)
9292

93-
def _stream_folder_from_path(self) -> Generator[tuple[str, bytes], None, None]:
93+
def _stream_folder_from_path(self, chunk_size=8192):
9494
if not Path(str(self.source)).exists():
9595
raise FileNotFoundError(
9696
errno.ENOENT, os.strerror(errno.ENOENT), str(self.source)
@@ -102,10 +102,10 @@ def _stream_folder_from_path(self) -> Generator[tuple[str, bytes], None, None]:
102102
source = root / name
103103
dest = source.relative_to(Path(self.source).parent)
104104
with open(source, 'rb') as f:
105-
for chunk in f:
105+
while chunk := f.read(chunk_size):
106106
yield str(dest), chunk
107107

108-
def _stream_folder_from_url(self) -> Generator[tuple[str, bytes], None, None]:
108+
def _stream_folder_from_url(self, chunk_size=8192):
109109
if not self.fetch_remote:
110110
if self.validate_url:
111111
with urlopen(self.source) as _:
@@ -121,7 +121,6 @@ def _stream_folder_from_url(self) -> Generator[tuple[str, bytes], None, None]:
121121
rel_out_path = Path(self.id) / part
122122

123123
with urlopen(part_uri) as response:
124-
chunk_size = 8192
125124
while chunk := response.read(chunk_size):
126125
yield str(rel_out_path), chunk
127126
except KeyError:

rocrate/model/file.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def _stream_from_stream(self, stream):
9393
if self.record_size:
9494
self._jsonld['contentSize'] = str(size)
9595

96-
def _stream_from_url(self, url) -> Generator[tuple[str, bytes], None, None]:
96+
def _stream_from_url(self, url, chunk_size=8192):
9797
if self.fetch_remote or self.validate_url:
9898
if self.validate_url:
9999
if url.startswith("http"):
@@ -109,30 +109,29 @@ def _stream_from_url(self, url) -> Generator[tuple[str, bytes], None, None]:
109109
size = 0
110110
self._jsonld['contentUrl'] = str(url)
111111
with urllib.request.urlopen(url) as response:
112-
chunk_size = 8192
113112
while chunk := response.read(chunk_size):
114113
yield self.id, chunk
115114
size += len(chunk)
116115

117116
if self.record_size:
118117
self._jsonld['contentSize'] = str(size)
119118

120-
def _stream_from_file(self, path):
119+
def _stream_from_file(self, path, chunk_size=8192):
121120
size = 0
122121
with open(path, 'rb') as f:
123-
for chunk in f:
122+
while chunk := f.read(chunk_size):
124123
yield self.id, chunk
125124
size += len(chunk)
126125
if self.record_size:
127126
self._jsonld['contentSize'] = str(size)
128127

129-
def stream(self) -> Generator[tuple[str, bytes], None, None]:
128+
def stream(self, chunk_size=8192) -> Generator[tuple[str, bytes], None, None]:
130129
if isinstance(self.source, (BytesIO, StringIO)):
131130
yield from self._stream_from_stream(self.source)
132131
elif is_url(str(self.source)):
133-
yield from self._stream_from_url(self.source)
132+
yield from self._stream_from_url(self.source, chunk_size)
134133
elif self.source is None:
135134
# Allows to record a File entity whose @id does not exist, see #73
136135
warnings.warn(f"No source for {self.id}")
137136
else:
138-
yield from self._stream_from_file(self.source)
137+
yield from self._stream_from_file(self.source, chunk_size)

rocrate/model/metadata.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def generate(self):
7575
context = context[0]
7676
return {'@context': context, '@graph': graph}
7777

78-
def stream(self) -> Generator[tuple[str, bytes], None, None]:
78+
def stream(self, chunk_size=8192) -> Generator[tuple[str, bytes], None, None]:
7979
content = self.generate()
8080
yield self.id, str.encode(json.dumps(content, indent=4, sort_keys=True), encoding='utf-8')
8181

rocrate/model/preview.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def is_object_list(a):
9191
out_html = src.render(crate=self.crate, context=context_entities, data=data_entities)
9292
return out_html
9393

94-
def stream(self) -> Generator[tuple[str, bytes], None, None]:
94+
def stream(self, chunk_size=8192) -> Generator[tuple[str, bytes], None, None]:
9595
if self.source:
9696
yield from super().stream()
9797
else:

rocrate/rocrate.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -480,24 +480,26 @@ def write_zip(self, out_path):
480480
shutil.rmtree(tmp_dir)
481481
return archive
482482

483-
def stream_zip(self):
483+
def stream_zip(self, chunk_size=8192):
484484
""" Create a stream of bytes representing the RO-Crate as a ZIP file. """
485485
with MemoryBuffer() as buffer:
486486
with zipfile.ZipFile(buffer, mode='w', compression=zipfile.ZIP_DEFLATED) as archive:
487487
for writeable_entity in self.data_entities + self.default_entities:
488488
current_file_path, current_out_file = None, None
489-
for path, chunk in writeable_entity.stream():
489+
for path, chunk in writeable_entity.stream(chunk_size=chunk_size):
490490
if path != current_file_path:
491491
if current_out_file:
492492
current_out_file.close()
493493
current_file_path = path
494494
current_out_file = archive.open(path, mode='w')
495495
current_out_file.write(chunk)
496-
yield buffer.read()
496+
while len(buffer) >= chunk_size:
497+
yield buffer.read(chunk_size)
497498
if current_out_file:
498499
current_out_file.close()
499500

500-
yield buffer.read()
501+
while chunk := buffer.read(chunk_size):
502+
yield chunk
501503

502504
def add_workflow(
503505
self, source=None, dest_path=None, fetch_remote=False, validate_url=False, properties=None,

0 commit comments

Comments
 (0)