Skip to content

Commit 6a0b097

Browse files
committed
feat: add strategies to free LZMA decompressors
1 parent 3d0c3e2 commit 6a0b097

File tree

11 files changed

+302
-14
lines changed

11 files changed

+302
-14
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,19 @@ adheres to [Semantic Versioning](https://semver.org/).
99

1010
[unreleased]: https://github.com/rogdham/bigxml/compare/v0.3.1...HEAD
1111

12+
### :rocket: Added
13+
14+
- Advanced users may use the new `block_read_strategy` argument of `XZFile`/`xz.open` to
15+
customize the strategy for freeing block readers, and implement a different tradeoff
16+
between memory consumption and read speed when alternating reads between several
17+
blocks; the following strategies are provided: `RollingBlockReadStrategy` and
18+
`KeepBlockReadStrategy`
19+
1220
### :bug: Fixes
1321

1422
- Free memory after a block is fully read
23+
- Free memory of LZMA decompressors when many blocks are partially read; this is a
24+
tradeoff defaulting to keeping the last 8 LZMA decompressors used
1525
- Typing: use `BinaryIO` instead of `IO[bytes]`
1626

1727
## [0.3.1] - 2021-12-26

src/xz/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from xz.common import XZError
3838
from xz.file import XZFile
3939
from xz.open import xz_open
40+
from xz.strategy import KeepBlockReadStrategy, RollingBlockReadStrategy
4041

4142
# pylint: disable=redefined-builtin
4243
open = xz_open
@@ -45,6 +46,8 @@
4546

4647
__all__ = (
4748
"__version__",
49+
"KeepBlockReadStrategy",
50+
"RollingBlockReadStrategy",
4851
"XZError",
4952
"XZFile",
5053
"open",

src/xz/block.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,13 @@
1010
parse_xz_index,
1111
)
1212
from xz.io import IOAbstract, IOCombiner, IOStatic
13-
from xz.typing import _LZMAFiltersType, _LZMAPresetType
13+
from xz.strategy import KeepBlockReadStrategy
14+
from xz.typing import (
15+
Optional,
16+
_BlockReadStrategyType,
17+
_LZMAFiltersType,
18+
_LZMAPresetType,
19+
)
1420

1521

1622
class BlockRead:
@@ -122,12 +128,14 @@ def __init__(
122128
uncompressed_size: int,
123129
preset: _LZMAPresetType = None,
124130
filters: _LZMAFiltersType = None,
131+
block_read_strategy: Optional[_BlockReadStrategyType] = None,
125132
):
126133
super().__init__(uncompressed_size)
127134
self.fileobj = fileobj
128135
self.check = check
129136
self.preset = preset
130137
self.filters = filters
138+
self.block_read_strategy = block_read_strategy or KeepBlockReadStrategy()
131139
self.unpadded_size = unpadded_size
132140
self.operation: Union[BlockRead, BlockWrite, None] = None
133141

@@ -139,6 +147,8 @@ def _read(self, size: int) -> bytes:
139147
# enforce read mode
140148
if not isinstance(self.operation, BlockRead):
141149
self._write_end()
150+
self.clear()
151+
self.block_read_strategy.on_create(self)
142152
self.operation = BlockRead(
143153
self.fileobj,
144154
self.check,
@@ -147,13 +157,14 @@ def _read(self, size: int) -> bytes:
147157
)
148158

149159
# read data
160+
self.block_read_strategy.on_read(self)
150161
try:
151162
data = self.operation.decompress(self._pos, size)
152163
except LZMAError as ex:
153164
raise XZError(f"block: error while decompressing: {ex}") from ex
154165

155166
if self._pos + len(data) == self._length:
156-
self.operation = None # free memory
167+
self.clear()
157168

158169
return data
159170

@@ -163,6 +174,7 @@ def writable(self) -> bool:
163174
def _write(self, data: bytes) -> int:
164175
# enforce write mode
165176
if not isinstance(self.operation, BlockWrite):
177+
self.clear()
166178
self.operation = BlockWrite(
167179
self.fileobj,
168180
self.check,
@@ -179,10 +191,15 @@ def _write_after(self) -> None:
179191
self.unpadded_size, uncompressed_size = self.operation.finish()
180192
if uncompressed_size != self.uncompressed_size:
181193
raise XZError("block: compressor uncompressed size")
182-
self.operation = None # free memory
194+
self.clear()
183195

184196
def _truncate(self, size: int) -> None:
185197
# thanks to the writable method, we are sure that length is zero
186198
# so we don't need to handle the case of truncating in middle of the block
187199
self.seek(size)
188200
self.write(b"")
201+
202+
def clear(self) -> None:
203+
if isinstance(self.operation, BlockRead):
204+
self.block_read_strategy.on_delete(self)
205+
self.operation = None # free memory

src/xz/file.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,14 @@
55

66
from xz.common import DEFAULT_CHECK, XZError
77
from xz.io import IOCombiner, IOProxy
8+
from xz.strategy import RollingBlockReadStrategy
89
from xz.stream import XZStream
9-
from xz.typing import _LZMAFilenameType, _LZMAFiltersType, _LZMAPresetType
10+
from xz.typing import (
11+
_BlockReadStrategyType,
12+
_LZMAFilenameType,
13+
_LZMAFiltersType,
14+
_LZMAPresetType,
15+
)
1016
from xz.utils import parse_mode, proxy_property
1117

1218

@@ -28,7 +34,8 @@ def __init__(
2834
*,
2935
check: int = -1,
3036
preset: _LZMAPresetType = None,
31-
filters: _LZMAFiltersType = None
37+
filters: _LZMAFiltersType = None,
38+
block_read_strategy: Optional[_BlockReadStrategyType] = None,
3239
) -> None:
3340
"""Open an XZ file in binary mode.
3441
@@ -52,6 +59,11 @@ def __init__(
5259
5360
For more information about the check/preset/filters arguments,
5461
refer to the documentation of the lzma module.
62+
63+
The block_read_strategy argument allows to specify a strategy
64+
for freeing block readers, and implement a different tradeoff
65+
between memory consumption and read speed when alternating reads
66+
between several blocks.
5567
"""
5668
self._close_fileobj = False
5769
self._close_check_empty = False
@@ -60,6 +72,14 @@ def __init__(
6072

6173
self.mode, self._readable, self._writable = parse_mode(mode)
6274

75+
# create strategy
76+
if block_read_strategy is None:
77+
self.block_read_strategy: _BlockReadStrategyType = (
78+
RollingBlockReadStrategy()
79+
)
80+
else:
81+
self.block_read_strategy = block_read_strategy
82+
6383
# get fileobj
6484
if isinstance(filename, (str, bytes, os.PathLike)):
6585
# pylint: disable=consider-using-with, unspecified-encoding
@@ -143,7 +163,7 @@ def _init_parse(self) -> None:
143163
raise XZError("file: invalid size")
144164
self.fileobj.seek(-4, SEEK_CUR)
145165
if any(self.fileobj.read(4)):
146-
streams.append(XZStream.parse(self.fileobj))
166+
streams.append(XZStream.parse(self.fileobj, self.block_read_strategy))
147167
else:
148168
self.fileobj.seek(-4, SEEK_CUR) # stream padding
149169

@@ -161,6 +181,7 @@ def _create_fileobj(self) -> XZStream:
161181
self.check,
162182
self.preset,
163183
self.filters,
184+
self.block_read_strategy,
164185
)
165186

166187
def change_stream(self) -> None:

src/xz/open.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from xz.file import XZFile
66
from xz.typing import (
7+
_BlockReadStrategyType,
78
_LZMAFilenameType,
89
_LZMAFiltersType,
910
_LZMAPresetType,
@@ -22,6 +23,7 @@ def __init__(
2223
check: int = -1,
2324
preset: _LZMAPresetType = None,
2425
filters: _LZMAFiltersType = None,
26+
block_read_strategy: Optional[_BlockReadStrategyType] = None,
2527
encoding: Optional[str] = None,
2628
errors: Optional[str] = None,
2729
newline: Optional[str] = None,
@@ -32,6 +34,7 @@ def __init__(
3234
check=check,
3335
preset=preset,
3436
filters=filters,
37+
block_read_strategy=block_read_strategy,
3538
)
3639
super().__init__(
3740
cast(BinaryIO, self.xz_file),
@@ -45,6 +48,9 @@ def __init__(
4548
filters: _LZMAFiltersType = proxy_property("filters", "xz_file")
4649
stream_boundaries: List[int] = proxy_property("stream_boundaries", "xz_file")
4750
block_boundaries: List[int] = proxy_property("block_boundaries", "xz_file")
51+
block_read_strategy: _BlockReadStrategyType = proxy_property(
52+
"block_read_strategy", "xz_file"
53+
)
4854

4955
@wraps(XZFile.change_stream)
5056
def change_stream(self) -> None:
@@ -66,6 +72,7 @@ def xz_open(
6672
check: int = -1,
6773
preset: _LZMAPresetType = None,
6874
filters: _LZMAFiltersType = None,
75+
block_read_strategy: Optional[_BlockReadStrategyType] = None,
6976
# text-mode kwargs
7077
encoding: Optional[str] = None,
7178
errors: Optional[str] = None,
@@ -83,6 +90,7 @@ def xz_open(
8390
check: int = -1,
8491
preset: _LZMAPresetType = None,
8592
filters: _LZMAFiltersType = None,
93+
block_read_strategy: Optional[_BlockReadStrategyType] = None,
8694
# text-mode kwargs
8795
encoding: Optional[str] = None,
8896
errors: Optional[str] = None,
@@ -100,6 +108,7 @@ def xz_open(
100108
check: int = -1,
101109
preset: _LZMAPresetType = None,
102110
filters: _LZMAFiltersType = None,
111+
block_read_strategy: Optional[_BlockReadStrategyType] = None,
103112
# text-mode kwargs
104113
encoding: Optional[str] = None,
105114
errors: Optional[str] = None,
@@ -116,6 +125,7 @@ def xz_open(
116125
check: int = -1,
117126
preset: _LZMAPresetType = None,
118127
filters: _LZMAFiltersType = None,
128+
block_read_strategy: Optional[_BlockReadStrategyType] = None,
119129
# text-mode kwargs
120130
encoding: Optional[str] = None,
121131
errors: Optional[str] = None,
@@ -145,6 +155,7 @@ def xz_open(
145155
check=check,
146156
preset=preset,
147157
filters=filters,
158+
block_read_strategy=block_read_strategy,
148159
encoding=encoding,
149160
errors=errors,
150161
newline=newline,
@@ -163,4 +174,5 @@ def xz_open(
163174
check=check,
164175
preset=preset,
165176
filters=filters,
177+
block_read_strategy=block_read_strategy,
166178
)

src/xz/strategy.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import time
2+
from typing import TYPE_CHECKING, Dict
3+
4+
if TYPE_CHECKING: # pragma: no cover
5+
# avoid circular dependency
6+
from xz.block import XZBlock
7+
8+
9+
class KeepBlockReadStrategy:
10+
def on_create(self, block: "XZBlock") -> None:
11+
pass # do nothing
12+
13+
def on_delete(self, block: "XZBlock") -> None:
14+
pass # do nothing
15+
16+
def on_read(self, block: "XZBlock") -> None:
17+
pass # do nothing
18+
19+
20+
class RollingBlockReadStrategy:
21+
def __init__(self, max_block_read_nb: int = 8) -> None:
22+
self.block_reads: Dict["XZBlock", float] = {}
23+
self.max_block_read_nb = max_block_read_nb
24+
25+
def _freshly_used(self, block: "XZBlock") -> None:
26+
self.block_reads[block] = time.monotonic()
27+
28+
def on_create(self, block: "XZBlock") -> None:
29+
self._freshly_used(block)
30+
if len(self.block_reads) > self.max_block_read_nb:
31+
to_clear = min(
32+
self.block_reads.items(),
33+
key=lambda item: item[1],
34+
)[0]
35+
to_clear.clear() # will call on_delete
36+
37+
def on_delete(self, block: "XZBlock") -> None:
38+
del self.block_reads[block]
39+
40+
def on_read(self, block: "XZBlock") -> None:
41+
self._freshly_used(block)

src/xz/stream.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@
1212
round_up,
1313
)
1414
from xz.io import IOCombiner, IOProxy
15-
from xz.typing import _LZMAFiltersType, _LZMAPresetType
15+
from xz.typing import (
16+
Optional,
17+
_BlockReadStrategyType,
18+
_LZMAFiltersType,
19+
_LZMAPresetType,
20+
)
1621

1722

1823
class XZStream(IOCombiner[XZBlock]):
@@ -22,12 +27,14 @@ def __init__(
2227
check: int,
2328
preset: _LZMAPresetType = None,
2429
filters: _LZMAFiltersType = None,
30+
block_read_strategy: Optional[_BlockReadStrategyType] = None,
2531
) -> None:
2632
super().__init__()
2733
self.fileobj = fileobj
2834
self._check = check
2935
self.preset = preset
3036
self.filters = filters
37+
self.block_read_strategy = block_read_strategy
3138

3239
@property
3340
def check(self) -> int:
@@ -44,7 +51,11 @@ def _fileobj_blocks_end_pos(self) -> int:
4451
)
4552

4653
@classmethod
47-
def parse(cls, fileobj: BinaryIO) -> "XZStream":
54+
def parse(
55+
cls,
56+
fileobj: BinaryIO,
57+
block_read_strategy: Optional[_BlockReadStrategyType] = None,
58+
) -> "XZStream":
4859
"""Parse one XZ stream from a fileobj.
4960
5061
fileobj position should be right at the end of the stream when calling
@@ -70,6 +81,7 @@ def parse(cls, fileobj: BinaryIO) -> "XZStream":
7081
check,
7182
unpadded_size,
7283
uncompressed_size,
84+
block_read_strategy=block_read_strategy,
7385
)
7486
)
7587
block_start = block_end
@@ -84,7 +96,7 @@ def parse(cls, fileobj: BinaryIO) -> "XZStream":
8496
header_start_pos = fileobj.seek(-12, SEEK_CUR)
8597

8698
stream_fileobj = IOProxy(fileobj, header_start_pos, footer_end_pos)
87-
stream = cls(stream_fileobj, check)
99+
stream = cls(stream_fileobj, check, block_read_strategy=block_read_strategy)
88100
for block in blocks:
89101
stream._append(block)
90102
return stream
@@ -102,6 +114,7 @@ def _create_fileobj(self) -> XZBlock:
102114
0,
103115
self.preset,
104116
self.filters,
117+
self.block_read_strategy,
105118
)
106119

107120
def _write_before(self) -> None:

0 commit comments

Comments
 (0)