-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathloadCSV.py
More file actions
313 lines (274 loc) · 11.2 KB
/
loadCSV.py
File metadata and controls
313 lines (274 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
import sys
import re
import mysql.connector
from antlr4 import *
from antlr4.InputStream import InputStream
from lib.ClusterConfigGrammar.ClusterConfigLexer import ClusterConfigLexer
from lib.ClusterConfigGrammar.ClusterConfigParser import ClusterConfigParser
from lib.ClusterConfigGrammar.ClusterConfigListener import ClusterConfigListener
from lib.CSVGrammar.csvfileLexer import csvfileLexer
from lib.CSVGrammar.csvfileParser import csvfileParser
from lib.CSVGrammar.csvfileListener import csvfileListener
from lib.ClusterConfigLoader import ClusterConfigLoader
from lib.csvfileLoader import csvfileLoader
from connectionLoader import connectionLoader
def main(clustername, csvname):
# Use antlr4 to parse clustercfg
cluster_input = FileStream(clustername)
cluster_lexer = ClusterConfigLexer(cluster_input)
cluster_stream = CommonTokenStream(cluster_lexer)
cluster_parser = ClusterConfigParser(cluster_stream)
cluster_tree = cluster_parser.config()
cluster_loader = ClusterConfigLoader()
cluster_walker = ParseTreeWalker()
cluster_walker.walk(cluster_loader, cluster_tree)
clustercfg = cluster_loader.getCFG()
# Use antlr4 to parse csvfile
csv_input = FileStream(csvname)
csv_lexer = csvfileLexer(csv_input)
csv_stream = CommonTokenStream(csv_lexer)
csv_parser = csvfileParser(csv_stream)
csv_tree = csv_parser.rows()
csv_loader = csvfileLoader()
csv_walker = ParseTreeWalker()
csv_walker.walk(csv_loader, csv_tree)
csvfile = csv_loader.getCSV()
# print("\nclustercfg contents:\n{") # COMMENT OUT
# for x in clustercfg: print("{}: {}".format(x, clustercfg[x])) # COMMENT OUT
# print("}") # COMMENT OUT
# print("\ncsvfile (unsorted):") # COMMENT OUT
# for x in csvfile: print(x) # COMMENT OUT
# Get partition method and get list of connectionLoaders
conn_list = None
if 'partition' in clustercfg and 'method' in clustercfg['partition']:
partmtd = clustercfg['partition']['method']
if partmtd == 'notpartition':
conn_list = noPartitioning(clustercfg, csvfile)
elif partmtd == 'range':
conn_list = rangePartitioning(clustercfg, csvfile)
elif partmtd == 'hash':
conn_list = hashPartitioning(clustercfg, csvfile)
if conn_list:
# print("\nGot conn_list:") # COMMENT OUT
# for conn in conn_list: # COMMENT OUT
# print("\nConnection Info:") # COMMENT OUT
# conn.show() # COMMENT OUT
try:
for conn in conn_list:
conn.establishConnection()
try:
for conn in conn_list:
conn.loadData()
try:
for conn in conn_list:
conn.commit()
conn.closeConnection()
print("Tables successfully loaded.")
except BaseException as e:
print("Error when commiting:")
print(str(e))
except BaseException as e:
print("Could not load Data:")
print(str(e))
for conn in conn_list:
conn.rollback()
except BaseException as e:
print("Could not establish connection:")
print(str(e))
else:
print("Connection list could not be established.")
# No partitioning method so insert everything into all tables
def noPartitioning(clustercfg, csvfile):
conn_list = list()
nodes = getNodeInfo(clustercfg)
for (i, node) in enumerate(nodes):
if str(node['nodeid']) in clustercfg: # Weeding out extra nodes not in clustercfg
conn_list.insert(-1, connectionLoader(node, csvfile, clustercfg) )
return conn_list
# Range partitioning
def rangePartitioning(clustercfg, csvfile):
conn_list = list() # For storing connections.
nodes = getNodeInfo(clustercfg) # For connecting to nodes.
columns = getColumns(nodes[0]) # For figuring out which column number to range by.
colnum = None
# Get column number for sorting
for (i, column) in enumerate(columns):
if column == clustercfg['partition']['column']:
colnum = i
# sort csvfile
csvfile = sorted(csvfile, key=lambda x: int(x[colnum]))
# print("\ncsvfile (sorted):") # COMMENT OUT
# for row in csvfile: print(row) # COMMENT OUT
# print("\nCOLUMNS (where column '{}' is in position {}):\n{}".format(clustercfg['partition']['column'], colnum, columns)) # COMMENT OUT
for (i, node) in enumerate(nodes):
if str(node['nodeid']) in clustercfg:
# Get data in range
# print("\nnode{}".format(node['nodeid'])) # COMMENT OUT
# print("Range: {} to {}".format(clustercfg[str(node['nodeid'])]['param1'], clustercfg[str(node['nodeid'])]['param2'])) # COMMENT OUT
if float(clustercfg[str(node['nodeid'])]['param1']) < float(clustercfg[str(node['nodeid'])]['param2']):
(startrow, endrow) = getRangeSlice(
clustercfg[str(node['nodeid'])]['param1'],
clustercfg[str(node['nodeid'])]['param2'],
colnum, csvfile
)
else:
(startrow, endrow) = (None, None)
# print("Result ({}:{}) out of (0:{}):".format(startrow, endrow, len(csvfile))) # COMMENT OUT
# for row in csvfile[startrow:endrow]: print(row) # COMMENT OUT
if startrow is not None and endrow is not None:
conn_list.insert(-1, connectionLoader(node, csvfile[startrow:endrow], clustercfg) )
else:
return None
return conn_list
# returns the beginning and ending index for slice for given range.
def getRangeSlice(low, high, colnum, csvfile):
try:
startrow = None
endrow = None
get_endrow = 0
if low == '-inf':
startrow = 0
endrow = 0
get_endrow = 1
else:
low = float(low)
if high == '+inf':
endrow = len(csvfile) - 1
get_endrow = -1
else:
high = float(high)
for (i, row) in enumerate(csvfile):
if startrow is None and float(row[colnum]) > low:
# print("{}>{}".format(row[colnum], low)) # COMMENT OUT
startrow = i
if get_endrow == 0:
endrow = i
get_endrow = 1
elif get_endrow > 0 and float(row[colnum]) <= high:
# print("{}<={}".format(row[colnum], high)) # COMMENT OUT
endrow = i
if endrow is not None:
endrow = endrow + 1 # to include last element for slice command
return (startrow, endrow)
except BaseException as e:
print("Problem with range parameters:")
print(str(e))
return (None, None)
# Hash partitioning
def hashPartitioning(clustercfg, csvfile):
conn_list = list()
nodes = getNodeInfo(clustercfg)
candidates, colNumber = catalogCompliance(clustercfg,nodes)
partparam1 = len(candidates)
for x in range(partparam1):
thisPart = candidates[x]['nodeid']
# print(thisPart)
templist = list()
for (i,row) in enumerate(csvfile):
part = ((int(row[colNumber]) % int(partparam1)) +1)
if thisPart == part:
templist.insert(-1, row)
conn_list.insert(-1, connectionLoader(candidates[x], templist, clustercfg) )
return conn_list
# Takes the clustercfg dictionary and returns a list of dictionaries containing info on each node with the table from clustercfg.
def getNodeInfo(clustercfg):
catalog_query = (
"SELECT * "
"FROM DTABLES "
"WHERE tname = %s;"
)
catalog = getCatalogParams(clustercfg)
# print("catalog")
# print(catalog)
if catalog:
results = None
try:
connection = mysql.connector.connect(**catalog)
cursor = connection.cursor(dictionary=True)
# print("catalog_query")
# print(catalog_query)
cursor.execute(catalog_query, (clustercfg['tablename'],))
results = cursor.fetchall()
# Get a list of dictionaries.
for row in cursor:
print(row)
results.append(row)
except mysql.connector.Error as err:
print(err)
except Error as err:
print(str(err))
finally:
cursor.close()
connection.close()
if len(results) > 0:
return results
return None
def getCatalogParams(clustercfg):
try:
(host, port, database) = parseURL(clustercfg['catalog']['hostname'])
return {
'host': host,
'port': port,
'database': database,
'user': clustercfg['catalog']['username'],
'password': clustercfg['catalog']['passwd']
}
except:
return None
def getColumns(node):
try:
user = node['nodeuser']
passwd = node['nodepasswd']
(host, port, database) = parseURL(node['nodeurl'])
conn_params = {
'user': user,
'passwd': passwd,
'host': host,
'port': port,
'database': database
}
connection = mysql.connector.connect(**conn_params)
cursor = connection.cursor()
cursor.execute("SELECT * FROM {};".format(node['tname']))
column_names = [i[0] for i in cursor.description]
return column_names
except BaseException as e:
print(cursor.statement)
print("Could not get columns in getColumns()")
print(str(e))
print("Arg:")
print(node)
return None
# Grabs the address, port, and database from the hostname url.
def parseURL(url):
hostmatch = re.search('^.*//([\.\d]+):(\d+)/(.*)$', url, flags=re.IGNORECASE)
if hostmatch and len(hostmatch.groups()) == 3:
return hostmatch.groups()
else:
return None
# A check to see that the partition column requested exists in the table, and that the nodes are partitioned for the same type
def catalogCompliance(clustercfg, nodes):
candidates = list()
partCol = getColumns(nodes[0])
colNumber = None
#Checks every col against the potential range column
for col in range(len(partCol)):
if clustercfg['partition']['column'] == partCol[int(col)]:
colNumber = int(col)
# Checks candidate nodes for correct partition
for (i, node) in enumerate(nodes):
# if 2 == nodes[i]['partmtd']:
candidates.insert(0,nodes[i])
return candidates , colNumber
print("The available columns for this table are: {0}. \n {1} is not a column in this table.".format(partCol,clustercfg['partition']['column']))
return candidates , colNumber
if __name__ == '__main__':
if len(sys.argv) > 1:
clustername = sys.argv[1]
else:
clustername = 'clustercfg'
if len(sys.argv) > 2:
csvname = sys.argv[2]
else:
csvname = 'csvfile'
main(clustername, csvname)