Skip to content

Commit 07d1c47

Browse files
committed
Use sqlite3 in scheduler instead of DbManager.
Cleanup scheduler code.
1 parent 1096a5f commit 07d1c47

File tree

2 files changed

+71
-51
lines changed

2 files changed

+71
-51
lines changed

myDevices/cloud/dbmanager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,6 @@ def test():
112112
cursor = connection.cursor()
113113
except Exception as ex:
114114
error('DbManager failed to initialize: ' + str(ex))
115-
DbManager.CreateTable('scheduled_settings', "id TEXT PRIMARY KEY, data TEXT", ['id', 'data'])
115+
# DbManager.CreateTable('scheduled_items', "id TEXT PRIMARY KEY, data TEXT", ['id', 'data'])
116116
DbManager.CreateTable('disabled_sensors', "id TEXT PRIMARY KEY", ['id'])
117117
DbManager.CreateTable('historical_averages', "id INTEGER PRIMARY KEY, data TEXT, count INTEGER, start TIMESTAMP, end TIMESTAMP, interval TEXT, send TEXT, count_sensor TEXT", ['id', 'data', 'count', 'start', 'end', 'interval', 'send', 'count_sensor'])

myDevices/cloud/scheduler.py

Lines changed: 70 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
from threading import Thread, RLock
2-
from myDevices.utils.logger import exception, info, warn, error, debug, setDebug, logJson
3-
import myDevices.schedule as schedule
4-
from time import sleep
5-
from json import dumps, loads, JSONEncoder
6-
from myDevices.cloud.dbmanager import DbManager
71
from datetime import datetime
2+
from json import JSONEncoder, dumps, loads
3+
from sqlite3 import connect
4+
from threading import RLock, Thread
5+
from time import sleep
6+
7+
import myDevices.schedule as schedule
8+
from myDevices.utils.logger import debug, error, exception, info, logJson, setDebug, warn
89

910

1011
class ScheduleItemEncoder(JSONEncoder):
@@ -96,6 +97,9 @@ def __init__(self, client, name):
9697
9798
client: the client running the scheduler
9899
name: name to use for the scheduler thread"""
100+
self.connection = connect('/etc/myDevices/agent.db', check_same_thread = False)
101+
self.cursor = self.connection.cursor()
102+
self.cursor.execute('CREATE TABLE IF NOT EXISTS scheduled_items (id TEXT PRIMARY KEY, data TEXT)')
99103
Thread.__init__(self, name=name)
100104
self.mutex = RLock()
101105
#failover cases: keep last run and next run times
@@ -104,20 +108,26 @@ def __init__(self, client, name):
104108
self.testIndex = 0
105109
self.client = client
106110
self.running = False
107-
self.tablename = 'scheduled_settings'
108111
#at the end load data
109112
self.load_data()
110113
self.start()
111-
114+
115+
def __del__(self):
116+
"""Delete scheduler object"""
117+
try:
118+
self.connection.close()
119+
except:
120+
exception('Error deleting SchedulerEngine')
121+
112122
def load_data(self):
113123
"""Load saved scheduler data from the database"""
114124
with self.mutex:
115-
results = DbManager.Select(self.tablename)
116-
if results:
117-
for row in results:
118-
#info('Row: ' + str(row))
119-
#for each item already present in db add call AddScheduledItem with insert false
120-
self.add_scheduled_item(loads(row[1]), False)
125+
self.cursor.execute('SELECT * FROM scheduled_items')
126+
results = self.cursor.fetchall()
127+
for row in results:
128+
#info('Row: ' + str(row))
129+
#for each item already present in db add call AddScheduledItem with insert false
130+
self.add_scheduled_item(loads(row[1]), False)
121131
return True
122132

123133
def add_scheduled_item(self, json_data, insert = False):
@@ -126,7 +136,7 @@ def add_scheduled_item(self, json_data, insert = False):
126136
json_data: JSON object representing the scheduled item to add
127137
insert: if True add the item to the database, otherwise just add it to the running scheduler"""
128138
debug('')
129-
retVal = False
139+
result = False
130140
try:
131141
schedule_item = ScheduleItem(json_data)
132142
if schedule_item.id is None:
@@ -137,23 +147,23 @@ def add_scheduled_item(self, json_data, insert = False):
137147
if insert == True:
138148
self.add_database_item(schedule_item.id, dumps(json_data))
139149
info('Setup item: ' + str(schedule_item.to_dict()))
140-
retVal = self.setup(schedule_item)
141-
if retVal == True:
150+
result = self.setup(schedule_item)
151+
if result == True:
142152
self.schedules[schedule_item.id] = schedule_item
143153
else:
144-
retVal = self.update_scheduled_item(json_data)
154+
result = self.update_scheduled_item(json_data)
145155
except:
146156
exception('Error adding scheduled item')
147157
except:
148-
exception('AddScheduledItem Failed')
149-
return retVal
158+
exception('AddScheduledItem failed')
159+
return result
150160

151161
def update_scheduled_item(self, json_data):
152162
"""Update an existing scheduled item
153163
154164
json_data: JSON object representing the scheduled item to update"""
155-
debug('')
156-
retVal = False
165+
debug('Update scheduled item')
166+
result = False
157167
try:
158168
scheduleItemNew = ScheduleItem(json_data)
159169
with self.mutex:
@@ -162,13 +172,14 @@ def update_scheduled_item(self, json_data):
162172
schedule.cancel_job(scheduleItemOld.job)
163173
except KeyError:
164174
debug('Old schedule with id = {} not found'.format(scheduleItemNew.id))
165-
retVal = self.setup(scheduleItemNew)
166-
if retVal == True:
175+
result = self.setup(scheduleItemNew)
176+
debug('Update scheduled item result: {}'.format(result))
177+
if result == True:
167178
self.update_database_item(dumps(json_data), scheduleItemNew.id)
168179
self.schedules[scheduleItemNew.id] = scheduleItemNew
169180
except:
170-
exception('UpdateScheduledItem Failed')
171-
return retVal
181+
exception('UpdateScheduledItem failed')
182+
return result
172183

173184
def setup(self, schedule_item):
174185
"""Setup a job to run a scheduled item
@@ -236,12 +247,12 @@ def process_action(self, schedule_item):
236247
with self.mutex:
237248
schedule.cancel_job(schedule_item.job)
238249

239-
def remove_scheduled_item(self, removeItem):
250+
def remove_scheduled_item(self, remove_item):
240251
"""Remove an item that has been scheduled
241252
242-
removeItem: a JSON object specifying the item to remove"""
253+
remove_item: a JSON object specifying the item to remove"""
243254
debug('')
244-
return self.remove_scheduled_item_by_id(removeItem['id'])
255+
return self.remove_scheduled_item_by_id(remove_item['id'])
245256

246257
def remove_scheduled_item_by_id(self, id):
247258
"""Remove a scheduled item with the specified id
@@ -274,30 +285,29 @@ def remove_schedules(self):
274285

275286
def get_schedules(self):
276287
"""Return a list of all scheduled items"""
277-
jsonSchedules = []
288+
schedules = []
278289
try:
279290
with self.mutex:
280291
for schedule_item in self.schedules:
281-
jsonSchedules.append(self.schedules[schedule_item].to_dict())
292+
schedules.append(self.schedules[schedule_item].to_dict())
282293
except:
283-
exception('GetSchedules Failed')
284-
return jsonSchedules
294+
exception('GetSchedules failed')
295+
return schedules
285296

286-
def update_schedules(self, json_data):
297+
def update_schedules(self, schedule_items):
287298
"""Update all scheduled items
288299
289-
json_data: JSON containing a list of all the new items to schedule"""
300+
schedule_items: list of all the new items to schedule"""
290301
result = True
291-
logJson('UpdateSchedules' + str(json_data), 'schedules')
292-
info('Updating schedules')
302+
logJson('Update schedules: {}'.format(schedule_items))
303+
debug('Updating schedules')
293304
try:
294305
with self.mutex:
295-
jsonSchedules = json_data['Schedules']
296306
self.remove_schedules()
297-
for item in jsonSchedules:
298-
self.add_scheduled_item(item['Schedule'], True)
307+
for item in schedule_items:
308+
self.add_scheduled_item(item, True)
299309
except:
300-
exception('UpdateSchedules Failed')
310+
exception('UpdateSchedules failed')
301311
result = False
302312
return result
303313

@@ -306,48 +316,58 @@ def update_database_item(self, json_data, id):
306316
307317
json_data: JSON containing the scheduled item
308318
id: id of the scheduled item"""
309-
debug('')
319+
debug('Update database item')
310320
result = True
311321
try:
312-
setClause = 'data = ?'
313-
whereClause = 'id = ?'
314322
with self.mutex:
315-
DbManager.Update(self.tablename, setClause, json_data, whereClause, id)
323+
self.cursor.execute('UPDATE scheduled_items SET data = ? WHERE id = ?', (json_data, id))
324+
self.connection.commit()
316325
except:
326+
exception('Error updating database item')
317327
result = False
328+
debug('Update database item result: {}'.format(result))
318329
return result
319330

320331
def add_database_item(self, id, json_data):
321332
"""Add a scheduled item to the database
322333
323334
id: id of the scheduled item
324335
json_data: JSON containing the scheduled item"""
325-
debug('')
336+
debug('Add database item')
326337
result = False
327338
with self.mutex:
328-
result = DbManager.Insert(self.tablename, id, json_data)
339+
self.cursor.execute('INSERT INTO scheduled_items VALUES (?,?)', (id, json_data))
340+
self.connection.commit()
341+
result = self.cursor.lastrowid
342+
debug('Add database item result: {}'.format(result))
329343
return result
330344

331345
def remove_database_item(self, id):
332346
"""Remove a scheduled item from the database
333347
334348
id: id of the scheduled item"""
349+
debug('Remove database item')
335350
result = True
336351
try:
337352
with self.mutex:
338-
DbManager.Delete(self.tablename, id)
353+
self.cursor.execute('DELETE FROM scheduled_items WHERE id = ?', (id,))
354+
self.connection.commit()
339355
except:
340356
result = False
357+
debug('Remove database item result: {}'.format(result))
341358
return result
342359

343360
def remove_all_database_items(self):
344361
"""Remove all scheduled items from the database"""
345362
result = True
363+
debug('Remove all database items')
346364
try:
347365
with self.mutex:
348-
DbManager.DeleteAll(self.tablename)
366+
self.cursor.execute('DELETE FROM scheduled_items')
367+
self.connection.commit()
349368
except:
350369
result = False
370+
debug('Remove all database items result: {}'.format(result))
351371
return result
352372

353373
def stop(self):
@@ -364,7 +384,7 @@ def run(self):
364384
with self.mutex:
365385
schedule.run_pending()
366386
except:
367-
exception("SchedulerEngine run, Unexpected error")
387+
exception("SchedulerEngine run, unexpected error")
368388
sleep(1)
369389

370390

0 commit comments

Comments
 (0)