Skip to content

Commit 498b0bf

Browse files
authored
Merge pull request #306 from powerapi-ng/feat/database-disconnect
feat(database): Add `disconnect` method
2 parents 9bc4013 + ec30793 commit 498b0bf

File tree

12 files changed

+119
-87
lines changed

12 files changed

+119
-87
lines changed

src/powerapi/database/base_db.py

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,86 +26,92 @@
2626
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
2727
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
2828
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29-
from typing import List, Type
30-
from powerapi.report import Report
29+
3130
from powerapi.exception import PowerAPIExceptionWithMessage
31+
from powerapi.report import Report
3232

3333

3434
class DBError(PowerAPIExceptionWithMessage):
3535

3636
"""
37-
Error raised when an error occuried when using a database
37+
Error raised when an error occurred when using a database.
3838
"""
3939
def __init__(self, msg: str):
4040
PowerAPIExceptionWithMessage.__init__(self, msg)
4141

4242

4343
class IterDB:
4444
"""
45-
IterDB class
46-
47-
This class allows to browse a database as an iterable
45+
This class define the interface of a database results iterator.
4846
"""
4947

50-
def __init__(self, db, report_type, stream_mode):
48+
def __init__(self, db, report_type: type[Report], stream_mode: bool):
5149
"""
50+
:param db: Database instance
51+
:param report_type: Report type to convert the database results into
52+
:param stream_mode: Define if the iterator should stop when there is no data
5253
"""
5354
self.db = db
54-
self.stream_mode = stream_mode
5555
self.report_type = report_type
56+
self.stream_mode = stream_mode
5657

5758
def __iter__(self):
5859
"""
60+
Return an iterator for the database results.
5961
"""
6062
raise NotImplementedError()
6163

6264
def __next__(self) -> Report:
6365
"""
66+
Return and consume the next database result available.
6467
"""
6568
raise NotImplementedError()
6669

6770

6871
class BaseDB:
6972
"""
70-
Abstract class which define every common function for database uses.
71-
72-
This class define every common function that need to be implemented
73-
by each DB module. A database module correspond to a kind of BDD.
74-
For example, Mongodb, influxdb, csv are different kind of BDD.
73+
This class define the interface needed to fetch/save reports from/to a database.
7574
"""
76-
def __init__(self, report_type: Type[Report], exceptions: List[Type[Exception]] = None, asynchrone: bool = False):
77-
self.exceptions = exceptions or []
78-
self.asynchrone = asynchrone
75+
76+
def __init__(self, report_type: type[Report], exceptions: list[type[Exception]] = None, is_async: bool = False):
77+
"""
78+
:param report_type: The type of report expected
79+
:param exceptions: List of exception type raised by the database module
80+
:param is_async: Whether the database use asyncio or not
81+
"""
7982
self.report_type = report_type
83+
self.exceptions = exceptions or []
84+
self.is_async = is_async
8085

8186
def connect(self):
8287
"""
83-
Function that allow to load the database. Depending of the type,
84-
different process can happen.
88+
Connect to the database.
89+
"""
90+
raise NotImplementedError()
8591

86-
.. note:: Need to be overrided
92+
def disconnect(self):
93+
"""
94+
Disconnect from the database.
8795
"""
8896
raise NotImplementedError()
8997

90-
def iter(self, stream_mode: bool) -> IterDB:
98+
def iter(self, stream_mode: bool = False) -> IterDB:
9199
"""
92-
Create the iterator for get the data
93-
:param stream_mode: Define if we read in stream mode
100+
Create and returns a database results iterator.
101+
:param stream_mode: Define if we read continuously (streaming) or stop when no data is available
94102
"""
95103
raise NotImplementedError()
96104

97105
def save(self, report: Report):
98106
"""
99-
Allow to save a json input in the db
100-
101-
:param report: Report
107+
Save a report to the database.
108+
:param report: Report to be saved
102109
"""
103110
raise NotImplementedError()
104111

105-
def save_many(self, reports: List[Report]):
112+
def save_many(self, reports: list[Report]):
106113
"""
107-
Allow to save a batch of data
108-
109-
:param reports: Batch of Serialized Report
114+
Save multiple reports to the database. (batch mode)
115+
:param reports: List of Report to be saved
110116
"""
111117
raise NotImplementedError()

src/powerapi/database/csvdb.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,17 +245,20 @@ def clean_files(self):
245245
# Override from BaseDB #
246246
########################
247247

248-
def iter(self, stream_mode: bool) -> CsvIterDB:
248+
def iter(self, stream_mode: bool = False) -> CsvIterDB:
249249
"""
250250
Create the iterator for get the data
251251
"""
252252
return CsvIterDB(self, self.filenames, self.report_type, stream_mode)
253253

254254
def connect(self):
255255
"""
256-
Override from BaseDB.
256+
Connect to the csv database.
257+
"""
257258

258-
Nothing to do with CSV, because it's just files operations.
259+
def disconnect(self):
260+
"""
261+
Disconnect from the csv database.
259262
"""
260263

261264
def save(self, report: Report):

src/powerapi/database/file_db.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,12 @@ def connect(self):
112112
if not os.path.exists(self.filename):
113113
raise FileBadDBError(self.filename)
114114

115-
def iter(self, stream_mode: bool) -> FileIterDB:
115+
def disconnect(self):
116+
"""
117+
Disconnect from the file database.
118+
"""
119+
120+
def iter(self, stream_mode: bool = False) -> FileIterDB:
116121
"""
117122
Create the iterator for get the data
118123
"""

src/powerapi/database/influxdb2.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,7 @@ def _ping_client(self):
9494

9595
def connect(self):
9696
"""
97-
Override from BaseDB.
98-
99-
Create the connection to the influxdb database with the current
100-
configuration (url:port/bucket_name/org), then check if the connection has
101-
been created without failure.
102-
97+
Connect to the influxdb2 database.
10398
"""
10499
# close connection if reload
105100
if self.client is not None:
@@ -123,6 +118,11 @@ def connect(self):
123118

124119
self.buckets_api.create_bucket(bucket_name=self.bucket_name)
125120

121+
def disconnect(self):
122+
"""
123+
Disconnect from the influxdb2 database.
124+
"""
125+
126126
def get_db_by_name(self, db_name: str):
127127
"""
128128
Get the database (bucket) with the given name

src/powerapi/database/mongodb.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,12 @@ def connect(self):
145145

146146
self.collection = self.mongo_client[self.db_name][self.collection_name]
147147

148-
def iter(self, stream_mode: bool) -> MongoIterDB:
148+
def disconnect(self):
149+
"""
150+
Disconnect from the mongodb database.
151+
"""
152+
153+
def iter(self, stream_mode: bool = False) -> MongoIterDB:
149154
"""
150155
Create the iterator for get the data
151156
"""

src/powerapi/database/opentsdb.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ def connect(self):
9191
if not self.client.is_connected() and not self.client.is_alive():
9292
raise CantConnectToOpenTSDBException('connexion error')
9393

94+
def disconnect(self):
95+
"""
96+
Disconnect from the OpenTSDB database.
97+
"""
98+
9499
def save(self, report: PowerReport):
95100
"""
96101
Override from BaseDB

src/powerapi/database/prometheus_db.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ def connect(self):
7373
"""
7474
start_http_server(port=self.port, addr=self.address)
7575

76+
def disconnect(self):
77+
"""
78+
Disconnect from the Prometheus database.
79+
"""
80+
7681

7782
class PrometheusDB(BasePrometheusDB):
7883
"""

src/powerapi/database/socket_db.py

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,36 @@
3131
from typing import Type, List
3232
import json
3333

34-
3534
from powerapi.utils import JsonStream
3635
from powerapi.report import Report
3736
from .base_db import IterDB, BaseDB, DBError
3837

39-
BUFFER_SIZE = 4096
40-
SOCKET_TIMEOUT = 0.5
38+
39+
class IterSocketDB(IterDB):
40+
"""
41+
iterator connected to a socket that receive report from a sensor
42+
"""
43+
44+
def __init__(self, report_type, stream_mode, queue):
45+
"""
46+
"""
47+
IterDB.__init__(self, None, report_type, stream_mode)
48+
49+
self.queue = queue
50+
51+
def __aiter__(self):
52+
return self
53+
54+
async def __anext__(self):
55+
try:
56+
json_str = await asyncio.wait_for(self.queue.get(), 2)
57+
# json = self.queue.get_nowait()
58+
# self.queue.get()
59+
report = self.report_type.from_json(json.loads(json_str))
60+
return report
61+
# except Empty:
62+
except asyncio.TimeoutError:
63+
return None
4164

4265

4366
class SocketDB(BaseDB):
@@ -46,23 +69,31 @@ class SocketDB(BaseDB):
4669
"""
4770

4871
def __init__(self, report_type: Type[Report], port: int):
49-
BaseDB.__init__(self, report_type, asynchrone=True)
72+
BaseDB.__init__(self, report_type, is_async=True)
5073
self.queue = None
5174
self.port = port
5275
self.server = None
5376

5477
async def connect(self):
78+
"""
79+
Connect to the socket database.
80+
"""
5581
self.queue = asyncio.Queue()
5682
self.server = await asyncio.start_server(self._gen_server_callback(), host='127.0.0.1', port=self.port)
5783

84+
async def disconnect(self):
85+
"""
86+
Disconnect from the socket database.
87+
"""
88+
5889
async def stop(self):
5990
"""
6091
stop server connection
6192
"""
6293
self.server.close()
6394
await self.server.wait_closed()
6495

65-
def iter(self, stream_mode):
96+
def iter(self, stream_mode: bool = False) -> IterSocketDB:
6697
return IterSocketDB(self.report_type, stream_mode, self.queue)
6798

6899
def _gen_server_callback(self):
@@ -91,30 +122,3 @@ def save(self, report: Report):
91122

92123
def save_many(self, reports: List[Report]):
93124
raise DBError('Socket db don\'t support save_many method')
94-
95-
96-
class IterSocketDB(IterDB):
97-
"""
98-
iterator connected to a socket that receive report from a sensor
99-
"""
100-
101-
def __init__(self, report_type, stream_mode, queue):
102-
"""
103-
"""
104-
IterDB.__init__(self, None, report_type, stream_mode)
105-
106-
self.queue = queue
107-
108-
def __aiter__(self):
109-
return self
110-
111-
async def __anext__(self):
112-
try:
113-
json_str = await asyncio.wait_for(self.queue.get(), 2)
114-
# json = self.queue.get_nowait()
115-
# self.queue.get()
116-
report = self.report_type.from_json(json.loads(json_str))
117-
return report
118-
# except Empty:
119-
except asyncio.TimeoutError:
120-
return None

src/powerapi/database/virtiofs_db.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ def connect(self):
7979
if not os.path.exists(self.root_directory_name):
8080
raise DirectoryDoesNotExistForVirtioFS(self.root_directory_name)
8181

82+
def disconnect(self):
83+
"""
84+
Disconnect from the virtiosfs database.
85+
"""
86+
8287
def save(self, report: Report):
8388
directory_name = self._generate_vm_directory_name(report.target)
8489
if directory_name is None:

src/powerapi/puller/handlers.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def _connect(self):
7272

7373
def _pull_database(self):
7474
try:
75-
if self.state.asynchrone:
75+
if self.state.database.is_async:
7676
report = self.loop.run_until_complete(anext(self.state.database_it))
7777
if report is None:
7878
raise StopIteration()
@@ -95,7 +95,7 @@ def run(self):
9595
9696
:param None msg: None.
9797
"""
98-
if self.state.asynchrone:
98+
if self.state.database.is_async:
9999
self.loop = asyncio.new_event_loop()
100100
asyncio.set_event_loop(self.loop)
101101
self.state.loop = self.loop
@@ -200,7 +200,7 @@ def pull_db(self):
200200

201201
def _database_connection(self):
202202
try:
203-
if not self.state.asynchrone:
203+
if not self.state.database.is_async:
204204
self.state.database.connect()
205205
self.state.database_it = self.state.database.iter(stream_mode=self.state.stream_mode)
206206

0 commit comments

Comments
 (0)