-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathmodels.py
More file actions
223 lines (193 loc) · 7.69 KB
/
models.py
File metadata and controls
223 lines (193 loc) · 7.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
"""Models used by the maintenance NApp.
This module define models for the maintenance window itself and the
scheduler.
"""
import datetime
from enum import IntEnum
from uuid import uuid4
import pytz
from apscheduler.jobstores.base import JobLookupError
from apscheduler.schedulers.background import BackgroundScheduler
from kytos.core import KytosEvent, log
from kytos.core.interface import TAG, UNI
from kytos.core.link import Link
TIME_FMT = "%Y-%m-%dT%H:%M:%S%z"
class Status(IntEnum):
"""Maintenance windows status."""
PENDING = 0
RUNNING = 1
FINISHED = 2
class MaintenanceWindow:
"""Class to store a maintenance window."""
def __init__(self, start, end, controller, **kwargs):
"""Create an instance of MaintenanceWindow.
Args:
start(datetime): when the maintenance will begin
end(datetime): when the maintenance will finish
items: list of items that will be maintained;
each item can be either a switch, a link or a client interface
"""
# pylint: disable=invalid-name
self.controller = controller
items = kwargs.get('items')
if items is None:
items = list()
mw_id = kwargs.get('mw_id')
self.id = mw_id if mw_id else uuid4().hex
self.description = kwargs.get('description')
self.start = start
self.end = end
self._switches = list()
self._links = list()
self._unis = list()
self.items = items
self.status = kwargs.get('status', Status.PENDING)
@property
def items(self):
"""Items getter."""
return self._switches + self._links + self._unis
@items.setter
def items(self, items):
"""Items setter."""
self._switches = list()
self._unis = list()
self._links = list()
for i in items:
if isinstance(i, UNI):
self._unis.append(i)
elif isinstance(i, Link):
self._links.append(i)
else:
self._switches.append(i)
def as_dict(self):
"""Return this maintenance window as a dictionary."""
mw_dict = dict()
mw_dict['id'] = self.id
mw_dict['description'] = self.description if self.description else ''
mw_dict['status'] = self.status
mw_dict['start'] = self.start.strftime(TIME_FMT)
mw_dict['end'] = self.end.strftime(TIME_FMT)
mw_dict['items'] = []
for i in self.items:
try:
mw_dict['items'].append(i.as_dict())
except (AttributeError, TypeError):
mw_dict['items'].append(i)
return mw_dict
@classmethod
def from_dict(cls, mw_dict, controller):
"""Create a maintenance window from a dictionary of attributes."""
mw_id = mw_dict.get('id')
start = cls.str_to_datetime(mw_dict['start'])
end = cls.str_to_datetime(mw_dict['end'])
items = mw_dict['items']
description = mw_dict.get('description')
status = mw_dict.get('status', Status.PENDING)
return cls(start, end, controller, items=items, mw_id=mw_id,
description=description, status=status)
def update(self, mw_dict):
"""Update a maintenance window with the data from a dictionary."""
try:
start = self.str_to_datetime(mw_dict['start'])
except KeyError:
start = self.start
try:
end = self.str_to_datetime(mw_dict['end'])
except KeyError:
end = self.end
now = datetime.datetime.now(pytz.utc)
if start < now:
raise ValueError('Start in the past not allowed.')
if end < start:
raise ValueError('End before start not allowed.')
self.start = start
self.end = end
if 'items' in mw_dict:
self.items = mw_dict['items']
if 'description' in mw_dict:
self.description = mw_dict['description']
@staticmethod
def intf_from_dict(intf_id, controller):
"""Get the Interface instance with intf_id."""
intf = controller.get_interface_by_id(intf_id)
return intf
@staticmethod
def uni_from_dict(uni_dict, controller):
"""Create UNI instance from a dictionary."""
intf = MaintenanceWindow.intf_from_dict(uni_dict['interface_id'],
controller)
tag = TAG.from_dict(uni_dict['tag'])
if intf and tag:
return UNI(intf, tag)
return None
@staticmethod
def link_from_dict(link_dict, controller):
"""Create a link instance from a dictionary."""
endpoint_a = controller.get_interface_by_id(
link_dict['endpoint_a']['id'])
endpoint_b = controller.get_interface_by_id(
link_dict['endpoint_b']['id'])
link = Link(endpoint_a, endpoint_b)
if 'metadata' in link_dict:
link.extend_metadata(link_dict['metadata'])
s_vlan = link.get_metadata('s_vlan')
if s_vlan:
tag = TAG.from_dict(s_vlan)
link.update_metadata('s_vlan', tag)
return link
@staticmethod
def str_to_datetime(str_date):
"""Convert a string representing a date and time to datetime."""
date = datetime.datetime.strptime(str_date, TIME_FMT)
return date.astimezone(pytz.utc)
def maintenance_event(self, operation):
"""Create events to start/end a maintenance."""
if self._switches:
switches = []
for dpid in self._switches:
switch = self.controller.switches.get(dpid, None)
if switch:
switches.append(switch)
event = KytosEvent(name=f'kytos/maintenance.{operation}_switch',
content={'switches': switches})
self.controller.buffers.app.put(event)
if self._unis:
event = KytosEvent(name=f'kytos/maintenance.{operation}_uni',
content={'unis': self._unis})
self.controller.buffers.app.put(event)
if self._links:
event = KytosEvent(name=f'kytos/maintenance.{operation}_link',
content={'links': self._links})
self.controller.buffers.app.put(event)
def start_mw(self):
"""Actions taken when a maintenance window starts."""
self.status = Status.RUNNING
self.maintenance_event('start')
def end_mw(self):
"""Actions taken when a maintenance window finishes."""
self.status = Status.FINISHED
self.maintenance_event('end')
class Scheduler:
"""Scheduler for a maintenance window."""
def __init__(self):
"""Initialize a new scheduler."""
self.scheduler = BackgroundScheduler(timezone=pytz.utc)
self.scheduler.start()
def add(self, maintenance):
"""Add jobs to start and end a maintenance window."""
self.scheduler.add_job(maintenance.start_mw, 'date',
id=f'{maintenance.id}-start',
run_date=maintenance.start)
self.scheduler.add_job(maintenance.end_mw, 'date',
id=f'{maintenance.id}-end',
run_date=maintenance.end)
def remove(self, maintenance):
"""Remove jobs that start and end a maintenance window."""
try:
self.scheduler.remove_job(f'{maintenance.id}-start')
except JobLookupError:
log.info(f'Job to start {maintenance.id} already removed.')
try:
self.scheduler.remove_job(f'{maintenance.id}-end')
except JobLookupError:
log.info(f'Job to end {maintenance.id} already removed.')