Skip to content

Commit 9416dd4

Browse files
authored
Lazy column family create (#905)
1 parent 3e73165 commit 9416dd4

File tree

8 files changed

+37
-133
lines changed

8 files changed

+37
-133
lines changed

quixstreams/state/memory/partition.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
from quixstreams.state import PartitionTransaction
66
from quixstreams.state.base import PartitionTransactionCache, StorePartition
7-
from quixstreams.state.exceptions import ColumnFamilyDoesNotExist
87
from quixstreams.state.metadata import METADATA_CF_NAME, Marker
98
from quixstreams.state.recovery import ChangelogProducer
109
from quixstreams.utils.json import dumps as json_dumps
@@ -99,9 +98,6 @@ def write(
9998
def recover_from_changelog_message(
10099
self, key: bytes, value: Optional[bytes], cf_name: str, offset: int
101100
) -> None:
102-
if cf_name not in self._state:
103-
raise ColumnFamilyDoesNotExist(f'Column family "{cf_name}" does not exist')
104-
105101
if value:
106102
self._state.setdefault(cf_name, {})[key] = value
107103
else:
Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
from quixstreams.state.exceptions import StateError
22

3-
__all__ = ("ColumnFamilyAlreadyExists",)
4-
5-
6-
class ColumnFamilyAlreadyExists(StateError): ...
3+
__all__ = ("RocksDBCorruptedError",)
74

85

96
class RocksDBCorruptedError(StateError): ...

quixstreams/state/rocksdb/partition.py

Lines changed: 17 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717
PartitionTransactionCache,
1818
StorePartition,
1919
)
20-
from quixstreams.state.exceptions import ColumnFamilyDoesNotExist
2120
from quixstreams.state.metadata import METADATA_CF_NAME, Marker
2221
from quixstreams.state.recovery import ChangelogProducer
2322
from quixstreams.state.serialization import int_from_bytes, int_to_bytes
2423

25-
from .exceptions import ColumnFamilyAlreadyExists, RocksDBCorruptedError
24+
from .exceptions import RocksDBCorruptedError
2625
from .metadata import (
2726
CHANGELOG_OFFSET_KEY,
2827
)
@@ -52,8 +51,6 @@ class RocksDBStorePartition(StorePartition):
5251
:param options: RocksDB options. If `None`, the default options will be used.
5352
"""
5453

55-
additional_column_families: tuple[str, ...] = ()
56-
5754
def __init__(
5855
self,
5956
path: str,
@@ -72,8 +69,6 @@ def __init__(
7269
self._db = self._init_rocksdb()
7370
self._cf_cache: Dict[str, Rdict] = {}
7471
self._cf_handle_cache: Dict[str, ColumnFamily] = {}
75-
for cf_name in self.additional_column_families:
76-
self._ensure_column_family(cf_name)
7772

7873
def recover_from_changelog_message(
7974
self, key: bytes, value: Optional[bytes], cf_name: str, offset: int
@@ -149,7 +144,9 @@ def get(
149144
:param cf_name: rocksdb column family name. Default - "default"
150145
:return: a value if the key is present in the DB. Otherwise, `default`
151146
"""
152-
result = self.get_column_family(cf_name).get(key, default=Marker.UNDEFINED)
147+
result = self.get_or_create_column_family(cf_name).get(
148+
key, default=Marker.UNDEFINED
149+
)
153150

154151
# RDict accept Any type as value but we only write bytes so we should only get bytes back.
155152
return cast(Union[bytes, Literal[Marker.UNDEFINED]], result)
@@ -172,7 +169,7 @@ def iter_items(
172169
Default is "default".
173170
:return: An iterator yielding (key, value) tuples.
174171
"""
175-
cf = self.get_column_family(cf_name=cf_name)
172+
cf = self.get_or_create_column_family(cf_name=cf_name)
176173

177174
# Set iterator bounds to reduce IO by limiting the range of keys fetched
178175
read_opt = ReadOptions()
@@ -219,15 +216,15 @@ def exists(self, key: bytes, cf_name: str = "default") -> bool:
219216
:param cf_name: rocksdb column family name. Default - "default"
220217
:return: `True` if the key is present, `False` otherwise.
221218
"""
222-
cf_dict = self.get_column_family(cf_name)
219+
cf_dict = self.get_or_create_column_family(cf_name)
223220
return key in cf_dict
224221

225222
def get_changelog_offset(self) -> Optional[int]:
226223
"""
227224
Get offset that the changelog is up-to-date with.
228225
:return: offset or `None` if there's no processed offset yet
229226
"""
230-
metadata_cf = self.get_column_family(METADATA_CF_NAME)
227+
metadata_cf = self.get_or_create_column_family(METADATA_CF_NAME)
231228
offset_bytes = metadata_cf.get(CHANGELOG_OFFSET_KEY)
232229
if offset_bytes is None:
233230
return None
@@ -288,18 +285,12 @@ def get_column_family_handle(self, cf_name: str) -> ColumnFamily:
288285
:return: instance of `rocksdict.ColumnFamily`
289286
"""
290287
if (cf_handle := self._cf_handle_cache.get(cf_name)) is None:
291-
try:
292-
cf_handle = self._db.get_column_family_handle(cf_name)
293-
self._cf_handle_cache[cf_name] = cf_handle
294-
except Exception as exc:
295-
if "does not exist" in str(exc):
296-
raise ColumnFamilyDoesNotExist(
297-
f'Column family "{cf_name}" does not exist'
298-
)
299-
raise
288+
self.get_or_create_column_family(cf_name)
289+
cf_handle = self._db.get_column_family_handle(cf_name)
290+
self._cf_handle_cache[cf_name] = cf_handle
300291
return cf_handle
301292

302-
def get_column_family(self, cf_name: str) -> Rdict:
293+
def get_or_create_column_family(self, cf_name: str) -> Rdict:
303294
"""
304295
Get a column family instance.
305296
This method will cache the CF instance to avoid creating them repeatedly.
@@ -310,38 +301,14 @@ def get_column_family(self, cf_name: str) -> Rdict:
310301
if (cf := self._cf_cache.get(cf_name)) is None:
311302
try:
312303
cf = self._db.get_column_family(cf_name)
313-
self._cf_cache[cf_name] = cf
314304
except Exception as exc:
315-
if "does not exist" in str(exc):
316-
raise ColumnFamilyDoesNotExist(
317-
f'Column family "{cf_name}" does not exist'
318-
)
319-
raise
320-
return cf
321-
322-
def create_column_family(self, cf_name: str):
323-
try:
324-
cf = self._db.create_column_family(cf_name, options=self._rocksdb_options)
325-
except Exception as exc:
326-
if "column family already exists" in str(exc).lower():
327-
raise ColumnFamilyAlreadyExists(
328-
f'Column family already exists: "{cf_name}"'
329-
)
330-
raise
331-
332-
self._cf_cache[cf_name] = cf
333-
334-
def drop_column_family(self, cf_name: str):
335-
self._cf_cache.pop(cf_name, None)
336-
self._cf_handle_cache.pop(cf_name, None)
337-
try:
338-
self._db.drop_column_family(cf_name)
339-
except Exception as exc:
340-
if "invalid column family:" in str(exc).lower():
341-
raise ColumnFamilyDoesNotExist(
342-
f'Column family does not exist: "{cf_name}"'
305+
if "does not exist" not in str(exc):
306+
raise
307+
cf = self._db.create_column_family(
308+
cf_name, options=self._rocksdb_options
343309
)
344-
raise
310+
self._cf_cache[cf_name] = cf
311+
return cf
345312

346313
def list_column_families(self) -> List[str]:
347314
return self._db.list_cf(self._path)
@@ -427,9 +394,3 @@ def _update_changelog_offset(self, batch: WriteBatch, offset: int):
427394
int_to_bytes(offset),
428395
self.get_column_family_handle(METADATA_CF_NAME),
429396
)
430-
431-
def _ensure_column_family(self, cf_name: str) -> None:
432-
try:
433-
self.get_column_family(cf_name)
434-
except ColumnFamilyDoesNotExist:
435-
self.create_column_family(cf_name)

quixstreams/state/rocksdb/timestamped.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,8 +262,6 @@ class TimestampedStorePartition(RocksDBStorePartition):
262262
`TimestampedPartitionTransaction` instances to handle atomic operations for that partition.
263263
"""
264264

265-
additional_column_families = (MIN_ELIGIBLE_TIMESTAMPS_CF_NAME,)
266-
267265
def begin(self) -> TimestampedPartitionTransaction:
268266
# Override the method to specify the correct return type
269267
return TimestampedPartitionTransaction(

quixstreams/state/rocksdb/windowed/partition.py

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,6 @@
22
from typing import Iterator, cast
33

44
from ..partition import RocksDBStorePartition
5-
from .metadata import (
6-
GLOBAL_COUNTER_CF_NAME,
7-
LATEST_DELETED_VALUE_CF_NAME,
8-
LATEST_DELETED_WINDOW_CF_NAME,
9-
LATEST_EXPIRED_WINDOW_CF_NAME,
10-
LATEST_TIMESTAMPS_CF_NAME,
11-
VALUES_CF_NAME,
12-
)
135
from .transaction import WindowedRocksDBPartitionTransaction
146

157
logger = logging.getLogger(__name__)
@@ -24,15 +16,6 @@ class WindowedRocksDBStorePartition(RocksDBStorePartition):
2416
stores the expiration index to delete expired windows.
2517
"""
2618

27-
additional_column_families = (
28-
LATEST_DELETED_VALUE_CF_NAME,
29-
LATEST_EXPIRED_WINDOW_CF_NAME,
30-
LATEST_DELETED_WINDOW_CF_NAME,
31-
LATEST_TIMESTAMPS_CF_NAME,
32-
GLOBAL_COUNTER_CF_NAME,
33-
VALUES_CF_NAME,
34-
)
35-
3619
def iter_keys(self, cf_name: str = "default") -> Iterator[bytes]:
3720
"""
3821
Iterate over all keys in the DB.
@@ -42,7 +25,7 @@ def iter_keys(self, cf_name: str = "default") -> Iterator[bytes]:
4225
:param cf_name: rocksdb column family name. Default - "default"
4326
:return: An iterable of keys
4427
"""
45-
cf_dict = self.get_column_family(cf_name)
28+
cf_dict = self.get_or_create_column_family(cf_name)
4629
return cast(Iterator[bytes], cf_dict.keys())
4730

4831
def begin(self) -> WindowedRocksDBPartitionTransaction:

tests/test_quixstreams/test_state/test_partition.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import pytest
22

3-
from quixstreams.state.exceptions import ColumnFamilyDoesNotExist
43
from quixstreams.state.manager import SUPPORTED_STORES
4+
from quixstreams.state.metadata import Marker
55

66

77
@pytest.mark.parametrize("store_type", SUPPORTED_STORES, indirect=True)
@@ -24,13 +24,12 @@ def test_recover_from_changelog_message_success(self, store_value, store_partiti
2424
assert store_partition.get(b"key") == b"value"
2525
assert store_partition.get_changelog_offset() == 1
2626

27-
def test_recover_from_changelog_message_missing_cf_fails(self, store_partition):
28-
with pytest.raises(
29-
ColumnFamilyDoesNotExist, match='Column family "some_cf" does not exist'
30-
):
31-
store_partition.recover_from_changelog_message(
32-
key=b"key", value=b"value", cf_name="some_cf", offset=1
33-
)
27+
def test_recover_from_changelog_message_missing_cf(self, store_partition):
28+
store_partition.recover_from_changelog_message(
29+
key=b"key", value=b"value", cf_name="some_cf", offset=1
30+
)
31+
assert store_partition.get(b"key") == Marker.UNDEFINED
32+
assert store_partition.get_changelog_offset() == 1
3433

3534
def test_write_changelog_offset(self, store_partition):
3635
assert store_partition.get_changelog_offset() is None

tests/test_quixstreams/test_state/test_rocksdb/test_rocksdb_partition.py

Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@
55
import pytest
66
from rocksdict import Rdict
77

8-
from quixstreams.state.exceptions import ColumnFamilyDoesNotExist
98
from quixstreams.state.rocksdb import (
10-
ColumnFamilyAlreadyExists,
119
RocksDBOptions,
1210
RocksDBStorePartition,
1311
)
@@ -113,47 +111,19 @@ def test_db_corrupted_sst_file(self, store_partition_factory, tmp_path):
113111

114112
store_partition_factory(options=RocksDBOptions(on_corrupted_recreate=True))
115113

116-
def test_create_and_get_column_family(self, store_partition: RocksDBStorePartition):
117-
store_partition.create_column_family("cf")
118-
assert store_partition.get_column_family("cf")
114+
def test_get_or_create_column_family(self, store_partition: RocksDBStorePartition):
115+
assert store_partition.get_or_create_column_family("cf")
119116

120-
def test_create_column_family_already_exists(
117+
def test_get_or_create_column_family_cached(
121118
self, store_partition: RocksDBStorePartition
122119
):
123-
store_partition.create_column_family("cf")
124-
with pytest.raises(ColumnFamilyAlreadyExists):
125-
store_partition.create_column_family("cf")
126-
127-
def test_get_column_family_doesnt_exist(
128-
self, store_partition: RocksDBStorePartition
129-
):
130-
with pytest.raises(ColumnFamilyDoesNotExist):
131-
store_partition.get_column_family("cf")
132-
133-
def test_get_column_family_cached(self, store_partition: RocksDBStorePartition):
134-
store_partition.create_column_family("cf")
135-
cf1 = store_partition.get_column_family("cf")
136-
cf2 = store_partition.get_column_family("cf")
120+
cf1 = store_partition.get_or_create_column_family("cf")
121+
cf2 = store_partition.get_or_create_column_family("cf")
137122
assert cf1 is cf2
138123

139-
def test_create_and_drop_column_family(
140-
self, store_partition: RocksDBStorePartition
141-
):
142-
store_partition.create_column_family("cf")
143-
store_partition.drop_column_family("cf")
144-
145-
with pytest.raises(ColumnFamilyDoesNotExist):
146-
store_partition.get_column_family("cf")
147-
148-
def test_drop_column_family_doesnt_exist(
149-
self, store_partition: RocksDBStorePartition
150-
):
151-
with pytest.raises(ColumnFamilyDoesNotExist):
152-
store_partition.drop_column_family("cf")
153-
154124
def test_list_column_families(self, store_partition: RocksDBStorePartition):
155-
store_partition.create_column_family("cf1")
156-
store_partition.create_column_family("cf2")
125+
store_partition.get_or_create_column_family("cf1")
126+
store_partition.get_or_create_column_family("cf2")
157127
cfs = store_partition.list_column_families()
158128
assert "cf1" in cfs
159129
assert "cf2" in cfs
@@ -187,7 +157,7 @@ def test_list_column_families_defaults(
187157
]
188158

189159
def test_ensure_metadata_cf(self, store_partition: RocksDBStorePartition):
190-
assert store_partition.get_column_family("__metadata__")
160+
assert store_partition.get_or_create_column_family("__metadata__")
191161

192162
@pytest.mark.parametrize(
193163
["backwards", "expected"],

tests/test_quixstreams/test_state/test_rocksdb/test_rocksdb_transaction.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ def test_set_get_with_column_family(self, store_partition):
88
key = "key"
99
value = "value"
1010
prefix = b"__key__"
11-
store_partition.create_column_family("cf")
11+
store_partition.get_or_create_column_family("cf")
1212

1313
with store_partition.begin() as tx:
1414
tx.set(key, value, cf_name="cf", prefix=prefix)
@@ -21,7 +21,7 @@ def test_set_delete_get_with_column_family(self, store_partition):
2121
key = "key"
2222
value = "value"
2323
prefix = b"__key__"
24-
store_partition.create_column_family("cf")
24+
store_partition.get_or_create_column_family("cf")
2525

2626
with store_partition.begin() as tx:
2727
tx.set(key, value, cf_name="cf", prefix=prefix)
@@ -35,7 +35,7 @@ def test_set_delete_get_with_column_family(self, store_partition):
3535
def test_set_exists_get_with_column_family(self, store_partition):
3636
key = "key"
3737
value = "value"
38-
store_partition.create_column_family("cf")
38+
store_partition.get_or_create_column_family("cf")
3939
prefix = b"__key__"
4040

4141
with store_partition.begin() as tx:

0 commit comments

Comments
 (0)