forked from rabjohnston/E81ProjectPredictiveDialer
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcalling_list.py
More file actions
executable file
·87 lines (56 loc) · 2.36 KB
/
calling_list.py
File metadata and controls
executable file
·87 lines (56 loc) · 2.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from os import listdir
import pandas as pd
import logging as log
from callstats import CallStats, QueuedStats
class CallingList:
def __init__(self, calls=None, queued_calls=None):
self._df = {}
if calls is None:
self._calls = []
log.info('Calls: None')
else:
self._calls = calls
log.info('Calls size: {}'.format(len(calls)))
if queued_calls is None:
self._queued_calls = []
else:
self._queued_calls = queued_calls
self._next_queued_call = 0
def load(self, filename):
log.info('Loading simulation file: {}'.format(filename))
self._df = pd.read_csv(filename, infer_datetime_format=True)
def parse(self):
log.info('Parsing simulation file.')
# Reset our storage
self._calls = []
self._queued_calls = []
self._next_queued_call = 0
for row in self._df.itertuples():
#log.debug(row)
c = CallStats(row.CallStartDateTime, row.OutcomeCode, row.OffsetConnect,
row.OffsetDisconnect,row.CallEndDateTime, row.UniqueId, row.CauseCode,
row.QueuedStartDateTime, row.QueuedEndDateTime, row.Queued, row.TransferredToAgent)
self._calls.append(c)
if row.Queued == 1:
q = QueuedStats(row.CallStartDateTime, row.OutcomeCode, row.OffsetConnect,
row.OffsetDisconnect,row.CallEndDateTime, row.UniqueId, row.CauseCode,
row.QueuedStartDateTime, row.QueuedEndDateTime, row.Queued, row.TransferredToAgent)
self._queued_calls.append(q)
def get_call(self):
if len(self._calls) > 0:
return self._calls.pop(0)
else:
return None
def get_number_calls(self):
return len(self._calls)
def get_queued_call(self):
if len(self._queued_calls) == 0:
return None
# If we've used up all of our queued calls then start at the beginning
if self._next_queued_call >= len(self._queued_calls):
self._next_queued_call = 0
# Retrieve the next queued call
queued_call = self._queued_calls[self._next_queued_call]
# Move along one for the next time
self._next_queued_call += 1
return queued_call