Skip to content

Commit f5e60d0

Browse files
author
Sergio García Prado
committed
ISSUE #99
* Add initial implementation of the `LmdbDatabaseClient`.
1 parent e7ca7e1 commit f5e60d0

File tree

3 files changed

+299
-8
lines changed

3 files changed

+299
-8
lines changed
Lines changed: 117 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,136 @@
1+
from __future__ import (
2+
annotations,
3+
)
4+
5+
from pathlib import (
6+
Path,
7+
)
18
from typing import (
29
Any,
3-
AsyncIterator,
10+
Optional,
11+
Union,
12+
)
13+
14+
from lmdb import (
15+
Environment,
416
)
517

618
from minos.common import (
719
DatabaseClient,
820
DatabaseOperation,
21+
MinosAvroDatabaseProtocol,
22+
MinosBinaryProtocol,
23+
)
24+
25+
from .operations import (
26+
LmdbDatabaseOperation,
27+
LmdbDatabaseOperationType,
928
)
1029

1130

1231
class LmdbDatabaseClient(DatabaseClient):
1332
"""TODO"""
1433

34+
_environment: Optional[Environment]
35+
36+
def __init__(
37+
self,
38+
path: Optional[Union[str, Path]] = None,
39+
max_tables: int = 100,
40+
map_size: int = int(1e9),
41+
protocol: type[MinosBinaryProtocol] = MinosAvroDatabaseProtocol,
42+
*args,
43+
**kwargs,
44+
):
45+
super().__init__(*args, **kwargs)
46+
if path is None:
47+
path = ".lmdb"
48+
49+
self._path = path
50+
self._max_tables = max_tables
51+
self._map_size = map_size
52+
self._protocol = protocol
53+
self._tables = {}
54+
55+
self._prefetched = None
56+
57+
self._environment = None
58+
59+
async def _setup(self) -> None:
60+
await super()._setup()
61+
self._create_environment()
62+
63+
async def _destroy(self) -> None:
64+
await super()._destroy()
65+
self._close_environment()
66+
67+
def _create_environment(self) -> None:
68+
self._environment = Environment(str(self._path), max_dbs=self._max_tables, map_size=self._map_size)
69+
70+
def _close_environment(self) -> None:
71+
if self._environment is not None:
72+
self._environment.close()
73+
1574
async def _is_valid(self, **kwargs) -> bool:
16-
pass
75+
return True
1776

1877
async def _reset(self, **kwargs) -> None:
19-
pass
78+
self._prefetched = None
79+
self._environment.sync()
80+
81+
async def _fetch_all(self, *args, **kwargs) -> Any:
82+
prefetched = self._prefetched
83+
self._prefetched = None
84+
yield prefetched
2085

2186
async def _execute(self, operation: DatabaseOperation) -> None:
22-
pass
87+
if not isinstance(operation, LmdbDatabaseOperation):
88+
raise ValueError(f"The operation must be a {LmdbDatabaseOperation!r} instance. Obtained: {operation!r}")
89+
90+
mapper = {
91+
LmdbDatabaseOperationType.CREATE: self._create,
92+
LmdbDatabaseOperationType.READ: self._read,
93+
LmdbDatabaseOperationType.UPDATE: self._update,
94+
LmdbDatabaseOperationType.DELETE: self._delete,
95+
}
96+
97+
fn = mapper[operation.type_]
98+
99+
fn(table=operation.table, key=operation.key, value=operation.value)
100+
101+
# noinspection PyUnusedLocal
102+
def _create(self, table: str, key: str, value: Any, **kwargs) -> None:
103+
table = self._get_table(table)
104+
with self._environment.begin(write=True) as transaction:
105+
encoded = self._protocol.encode(value)
106+
transaction.put(key.encode(), encoded, db=table)
107+
108+
# noinspection PyUnusedLocal
109+
def _read(self, table: str, key: str, **kwargs):
110+
table = self._get_table(table)
111+
with self._environment.begin(db=table) as transaction:
112+
value_binary = transaction.get(key.encode())
113+
if value_binary is not None:
114+
value = self._protocol.decode(value_binary)
115+
else:
116+
value = None
117+
118+
self._prefetched = value
119+
120+
# noinspection PyUnusedLocal
121+
def _delete(self, table: str, key: str, **kwargs) -> None:
122+
table = self._get_table(table)
123+
with self._environment.begin(write=True, db=table) as transaction:
124+
transaction.delete(key.encode())
125+
126+
# noinspection PyUnusedLocal
127+
def _update(self, table: str, key: str, value: Any, **kwargs) -> None:
128+
table = self._get_table(table)
129+
with self._environment.begin(write=True, db=table) as transaction:
130+
encoded = self._protocol.encode(value)
131+
transaction.put(key.encode(), encoded, db=table, overwrite=True)
23132

24-
def _fetch_all(self, *args, **kwargs) -> AsyncIterator[Any]:
25-
pass
133+
def _get_table(self, table: str) -> Any:
134+
if table not in self._tables:
135+
self._tables[table] = self._environment.open_db(table.encode())
136+
return self._tables[table]
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,33 @@
1+
from enum import (
2+
Enum,
3+
)
4+
from typing import (
5+
Any,
6+
Optional,
7+
)
8+
19
from minos.common import (
210
DatabaseOperation,
311
)
412

513

14+
class LmdbDatabaseOperationType(str, Enum):
15+
"""TODO"""
16+
17+
CREATE = "create"
18+
READ = "read"
19+
UPDATE = "update"
20+
DELETE = "delete"
21+
22+
623
class LmdbDatabaseOperation(DatabaseOperation):
724
"""TODO"""
25+
26+
def __init__(
27+
self, type_: LmdbDatabaseOperationType, table: str, key: str, value: Optional[Any] = None, *args, **kwargs
28+
):
29+
super().__init__(*args, **kwargs)
30+
self.type_ = type_
31+
self.table = table
32+
self.key = key
33+
self.value = value

packages/plugins/minos-database-lmdb/tests/test_lmdb/test_clients.py

Lines changed: 156 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,170 @@
1+
import shutil
12
import unittest
3+
from pathlib import (
4+
Path,
5+
)
26

37
from minos.common import (
8+
DatabaseClient,
49
DatabaseOperation,
510
)
611
from minos.plugins.lmdb import (
12+
LmdbDatabaseClient,
713
LmdbDatabaseOperation,
814
)
15+
from minos.plugins.lmdb.operations import (
16+
LmdbDatabaseOperationType,
17+
)
18+
from tests.utils import (
19+
BASE_PATH,
20+
)
921

1022

11-
class TestLmdbDatabaseClient(unittest.TestCase):
23+
class TestLmdbDatabaseClient(unittest.IsolatedAsyncioTestCase):
24+
def setUp(self) -> None:
25+
super().setUp()
26+
self.path = BASE_PATH / "order.lmdb"
27+
1228
def test_subclass(self) -> None:
13-
self.assertTrue(issubclass(LmdbDatabaseOperation, DatabaseOperation))
29+
self.assertTrue(issubclass(LmdbDatabaseClient, DatabaseClient))
30+
31+
def tearDown(self) -> None:
32+
shutil.rmtree(self.path, ignore_errors=True)
33+
shutil.rmtree(".lmdb", ignore_errors=True)
34+
35+
async def test_constructor_default_path(self):
36+
async with LmdbDatabaseClient():
37+
self.assertTrue(Path(".lmdb").exists())
38+
39+
async def test_is_valid(self):
40+
async with LmdbDatabaseClient(self.path) as client:
41+
self.assertTrue(await client.is_valid())
42+
43+
async def test_execute_raises_unsupported(self):
44+
class _DatabaseOperation(DatabaseOperation):
45+
"""For testing purposes."""
46+
47+
async with LmdbDatabaseClient(self.path) as client:
48+
with self.assertRaises(ValueError):
49+
await client.execute(_DatabaseOperation())
50+
51+
async def test_execute_create_text(self):
52+
create_op = LmdbDatabaseOperation(LmdbDatabaseOperationType.CREATE, "TestOne", "first", "Text Value")
53+
read_op = LmdbDatabaseOperation(LmdbDatabaseOperationType.READ, "TestOne", "first")
54+
55+
async with LmdbDatabaseClient(self.path) as client:
56+
await client.execute(create_op)
57+
await client.execute(read_op)
58+
59+
self.assertEqual("Text Value", await client.fetch_one())
60+
61+
async def test_execute_create_int(self):
62+
create_op = LmdbDatabaseOperation(LmdbDatabaseOperationType.CREATE, "TestOne", "first", 123)
63+
read_op = LmdbDatabaseOperation(LmdbDatabaseOperationType.READ, "TestOne", "first")
64+
65+
async with LmdbDatabaseClient(self.path) as client:
66+
await client.execute(create_op)
67+
await client.execute(read_op)
68+
69+
self.assertEqual(123, await client.fetch_one())
70+
71+
async def test_execute_create_dict(self):
72+
create_op = LmdbDatabaseOperation(
73+
LmdbDatabaseOperationType.CREATE, "TestOne", "first", {"key_one": "hello", "key_two": "minos"}
74+
)
75+
read_op = LmdbDatabaseOperation(LmdbDatabaseOperationType.READ, "TestOne", "first")
76+
77+
async with LmdbDatabaseClient(self.path) as client:
78+
await client.execute(create_op)
79+
await client.execute(read_op)
80+
81+
self.assertEqual({"key_one": "hello", "key_two": "minos"}, await client.fetch_one())
82+
83+
async def test_execute_create_multi_dict(self):
84+
create_op = LmdbDatabaseOperation(
85+
LmdbDatabaseOperationType.CREATE,
86+
"TestOne",
87+
"first",
88+
{"key_one": "hello", "key_two": {"sub_key": "this is a sub text"}},
89+
)
90+
read_op = LmdbDatabaseOperation(LmdbDatabaseOperationType.READ, "TestOne", "first")
91+
92+
async with LmdbDatabaseClient(self.path) as client:
93+
await client.execute(create_op)
94+
await client.execute(read_op)
95+
96+
self.assertEqual(
97+
{"key_one": "hello", "key_two": {"sub_key": "this is a sub text"}}, await client.fetch_one()
98+
)
99+
100+
async def test_execute_create_list(self):
101+
create_op = LmdbDatabaseOperation(LmdbDatabaseOperationType.CREATE, "TestOne", "first", ["hello", "minos"])
102+
read_op = LmdbDatabaseOperation(LmdbDatabaseOperationType.READ, "TestOne", "first")
103+
104+
async with LmdbDatabaseClient(self.path) as client:
105+
await client.execute(create_op)
106+
await client.execute(read_op)
107+
108+
self.assertEqual(["hello", "minos"], await client.fetch_one())
109+
110+
async def test_execute_create_multi_table(self):
111+
create_op_1 = LmdbDatabaseOperation(LmdbDatabaseOperationType.CREATE, "TestOne", "first", "Text Value")
112+
create_op_2 = LmdbDatabaseOperation(
113+
LmdbDatabaseOperationType.CREATE, "TestTwo", "first_double", "Text Double Value"
114+
)
115+
create_op_3 = LmdbDatabaseOperation(LmdbDatabaseOperationType.CREATE, "TestTwo", "first", "Text Value Diff")
116+
117+
read_op_1 = LmdbDatabaseOperation(LmdbDatabaseOperationType.READ, "TestOne", "first")
118+
read_op_2 = LmdbDatabaseOperation(LmdbDatabaseOperationType.READ, "TestTwo", "first_double")
119+
read_op_3 = LmdbDatabaseOperation(LmdbDatabaseOperationType.READ, "TestTwo", "first")
120+
121+
async with LmdbDatabaseClient(self.path) as client:
122+
await client.execute(create_op_1)
123+
await client.execute(create_op_2)
124+
await client.execute(create_op_3)
125+
126+
await client.execute(read_op_1)
127+
self.assertEqual("Text Value", await client.fetch_one())
128+
129+
await client.execute(read_op_2)
130+
self.assertEqual("Text Double Value", await client.fetch_one())
131+
132+
await client.execute(read_op_3)
133+
self.assertEqual("Text Value Diff", await client.fetch_one())
134+
135+
async def test_execute_delete(self):
136+
create_op_1 = LmdbDatabaseOperation(LmdbDatabaseOperationType.CREATE, "TestOne", "first", "Text Value")
137+
create_op_2 = LmdbDatabaseOperation(LmdbDatabaseOperationType.CREATE, "TestOne", "second", "Text Second Value")
138+
delete_op_1 = LmdbDatabaseOperation(LmdbDatabaseOperationType.DELETE, "TestOne", "first")
139+
read_op_1 = LmdbDatabaseOperation(LmdbDatabaseOperationType.READ, "TestOne", "second")
140+
read_op_2 = LmdbDatabaseOperation(LmdbDatabaseOperationType.READ, "TestOne", "first")
141+
142+
async with LmdbDatabaseClient(self.path) as client:
143+
await client.execute(create_op_1)
144+
await client.execute(create_op_2)
145+
await client.execute(delete_op_1)
146+
147+
await client.execute(read_op_1)
148+
self.assertEqual("Text Second Value", await client.fetch_one())
149+
150+
await client.execute(read_op_2)
151+
self.assertEqual(None, await client.fetch_one())
152+
153+
async def test_execute_update(self):
154+
create_op = LmdbDatabaseOperation(LmdbDatabaseOperationType.CREATE, "TestOne", "first", "Text Value")
155+
update_op = LmdbDatabaseOperation(LmdbDatabaseOperationType.UPDATE, "TestOne", "first", "Updated Text Value")
156+
read_op = LmdbDatabaseOperation(LmdbDatabaseOperationType.READ, "TestOne", "first")
157+
158+
async with LmdbDatabaseClient(self.path) as client:
159+
await client.execute(create_op)
160+
await client.execute(read_op)
161+
162+
self.assertEqual("Text Value", await client.fetch_one())
163+
164+
await client.execute(update_op)
165+
await client.execute(read_op)
166+
167+
self.assertEqual("Updated Text Value", await client.fetch_one())
14168

15169

16170
if __name__ == "__main__":

0 commit comments

Comments
 (0)