Skip to content

Commit b9c6223

Browse files
pdetgijzelaerr
authored andcommitted
Exporting Python UDFs Using the Python Client (#26)
* Add support for locally debugging Python UDFs. * Exporting UDFs as py and bin files to be imported in IDEs * Fixing problem with udf export * Removing traces option from udf debugging * Exporting input data as dict * Correcting Functions Name to meet the paper * Adding Sampling option * Fix some PEP8 violations. * Working testcase and additional formatting. * More PEP8 violations. * Fixing bug
1 parent 2b28cd1 commit b9c6223

File tree

2 files changed

+278
-4
lines changed

2 files changed

+278
-4
lines changed

pymonetdb/sql/cursors.py

Lines changed: 247 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66

77
import logging
88
from collections import namedtuple
9+
import tempfile
10+
import re
11+
import pickle
12+
import pdb
13+
914
from pymonetdb.sql import monetize, pythonize
1015
from pymonetdb.exceptions import ProgrammingError, InterfaceError
1116
from pymonetdb import mapi
@@ -24,6 +29,7 @@ class Cursor(object):
2429
connection are not isolated, i.e., any changes done to the
2530
database by a cursor are immediately visible by the other
2631
cursors"""
32+
2733
def __init__(self, connection):
2834
"""This read-only attribute return a reference to the Connection
2935
object on which the cursor was created."""
@@ -153,10 +159,10 @@ def execute(self, operation, parameters=None):
153159
if parameters:
154160
if isinstance(parameters, dict):
155161
query = operation % dict([(k, monetize.convert(v))
156-
for (k, v) in parameters.items()])
162+
for (k, v) in parameters.items()])
157163
elif type(parameters) == list or type(parameters) == tuple:
158-
query = operation % tuple([monetize.convert(item) for item
159-
in parameters])
164+
query = operation % tuple(
165+
[monetize.convert(item) for item in parameters])
160166
elif isinstance(parameters, str):
161167
query = operation % monetize.convert(parameters)
162168
else:
@@ -185,6 +191,244 @@ def executemany(self, operation, seq_of_parameters):
185191
self.rowcount = count
186192
return count
187193

194+
def __exportparameters(self, ftype, fname, query, quantity_parameters,
195+
sample):
196+
""" Exports the input parameters of a given UDF execution
197+
to the Python process. Used internally for .debug() and
198+
.export() functions.
199+
"""
200+
201+
# create a dummy function that only exports its parameters
202+
# using the pickle module
203+
if ftype == 5:
204+
# table producing function
205+
return_type = "TABLE(s STRING)"
206+
else:
207+
return_type = "STRING"
208+
if sample == -1:
209+
export_function = """
210+
CREATE OR REPLACE FUNCTION export_parameters(*)
211+
RETURNS %s LANGUAGE PYTHON
212+
{
213+
import inspect
214+
import pickle
215+
frame = inspect.currentframe();
216+
args, _, _, values = inspect.getargvalues(frame);
217+
dd = {x: values[x] for x in args};
218+
del dd['_conn']
219+
return pickle.dumps(dd);
220+
};""" % return_type
221+
else:
222+
export_function = """
223+
CREATE OR REPLACE FUNCTION export_parameters(*)
224+
RETURNS %s LANGUAGE PYTHON
225+
{
226+
import inspect
227+
import pickle
228+
import numpy
229+
frame = inspect.currentframe();
230+
args, _, _, values = inspect.getargvalues(frame);
231+
dd = {x: values[x] for x in args};
232+
del dd['_conn']
233+
result = dict()
234+
argname = "arg1"
235+
x = numpy.arange(len(dd[argname]))
236+
x = numpy.random.choice(x,%s,replace=False)
237+
for i in range(len(dd)-2):
238+
argname = "arg" + str(i + 1)
239+
result = dd[argname]
240+
aux = []
241+
for j in range(len(x)):
242+
aux.append(result[x[j]])
243+
dd[argname] = aux
244+
print(dd[argname])
245+
print(x)
246+
return pickle.dumps(dd);
247+
};
248+
""" % (return_type, str(sample))
249+
250+
if fname not in query:
251+
raise Exception("Function %s not found in query!" % fname)
252+
253+
query = query.replace(fname, 'export_parameters')
254+
query = query.replace(';', ' sample 1;')
255+
256+
self.execute(export_function)
257+
self.execute(query)
258+
input_data = self.fetchall()
259+
self.execute('DROP FUNCTION export_parameters;')
260+
if len(input_data) <= 0:
261+
raise Exception("Could not load input data!")
262+
arguments = pickle.loads(str(input_data[0][0]))
263+
264+
if len(arguments) != quantity_parameters + 2:
265+
raise Exception("Incorrect amount of input arguments found!")
266+
267+
return arguments
268+
269+
def export(self, query, fname, sample=-1, filespath='./'):
270+
""" Exports a Python UDF and its input parameters to a given
271+
file so it can be called locally in an IDE environment.
272+
"""
273+
274+
# first retrieve UDF information from the server
275+
self.execute("""
276+
SELECT func,type
277+
FROM functions
278+
WHERE language >= 6 AND language <= 11 AND name='%s';""" % fname)
279+
data = self.fetchall()
280+
self.execute("""
281+
SELECT args.name
282+
FROM args INNER JOIN functions ON args.func_id=functions.id
283+
WHERE functions.name='%s' AND args.inout=1
284+
ORDER BY args.number;""" % fname)
285+
input_names = self.fetchall()
286+
quantity_parameters = len(input_names)
287+
fcode = data[0][0]
288+
ftype = data[0][1]
289+
parameter_list = []
290+
# exporting Python UDF Function
291+
if len(data) == 0:
292+
raise Exception("Function not found!")
293+
else:
294+
parameters = '('
295+
for x in range(0, len(input_names)):
296+
parameter = str(input_names[x]).split('\'')
297+
if x < len(input_names) - 1:
298+
parameter_list.append(parameter[1])
299+
parameters = parameters + parameter[1] + ','
300+
else:
301+
parameter_list.append(parameter[1])
302+
parameters = parameters + parameter[1] + '): \n'
303+
304+
data = str(data[0]).replace('\\t', '\t').split('\\n')
305+
306+
python_udf = 'import pickle \n \n \ndef ' + fname + parameters
307+
for x in range(1, len(data) - 1):
308+
python_udf = python_udf + '\t' + str(data[x]) + '\n'
309+
310+
# exporting Columns as Binary Files
311+
arguments = self.__exportparameters(ftype, fname, query,
312+
quantity_parameters, sample)
313+
result = dict()
314+
for i in range(len(arguments) - 2):
315+
argname = "arg%d" % (i + 1)
316+
result[parameter_list[i]] = arguments[argname]
317+
pickle.dump(result, open(filespath + 'input_data.bin', 'wb'))
318+
319+
# loading Columns in Python & Call Function
320+
python_udf += '\n' + 'input_parameters = pickle.load(open(\''
321+
python_udf += filespath + 'input_data.bin\',\'rb\'))' + '\n'
322+
python_udf += fname + '('
323+
for i in range(0, quantity_parameters):
324+
if i < quantity_parameters - 1:
325+
python_udf += 'input_parameters[\''
326+
python_udf += parameter_list[i] + '\'],'
327+
else:
328+
python_udf += 'input_parameters[\''
329+
python_udf += parameter_list[i] + '\'])'
330+
331+
file = open(filespath + fname + '.py', 'w')
332+
file.write(python_udf)
333+
file.close()
334+
335+
def debug(self, query, fname, sample=-1):
336+
""" Locally debug a given Python UDF function in a SQL query
337+
using the PDB debugger. Optionally can run on only a
338+
sample of the input data, for faster data export.
339+
"""
340+
341+
# first gather information about the function
342+
self.execute("""
343+
SELECT func, type
344+
FROM functions
345+
WHERE language>=6 AND language <= 11 AND name='%s';""" % fname)
346+
data = self.fetchall()
347+
if len(data) == 0:
348+
raise Exception("Function not found!")
349+
350+
# then gather the input arguments of the function
351+
self.execute("""
352+
SELECT args.name, args.type
353+
FROM args
354+
INNER JOIN functions ON args.func_id=functions.id
355+
WHERE functions.name='%s' AND args.inout=1
356+
ORDER BY args.number;""" % fname)
357+
input_types = self.fetchall()
358+
359+
fcode = data[0][0]
360+
ftype = data[0][1]
361+
362+
# now obtain the input columns
363+
arguments = self.__exportparameters(ftype, fname, query,
364+
len(input_types), sample)
365+
366+
arglist = "_columns, _column_types, _conn"
367+
cleaned_arguments = dict()
368+
for i in range(len(input_types)):
369+
argname = "arg%d" % (i + 1)
370+
if argname not in arguments:
371+
raise Exception("Argument %d not found!" % (i + 1))
372+
input_name = str(input_types[i][0])
373+
cleaned_arguments[input_name] = arguments[argname]
374+
arglist += ", %s" % input_name
375+
cleaned_arguments['_columns'] = arguments['_columns']
376+
cleaned_arguments['_column_types'] = arguments['_column_types']
377+
378+
# create a temporary file for the function execution and run it
379+
with tempfile.NamedTemporaryFile() as f:
380+
fcode = fcode.strip()
381+
fcode = re.sub('^{', '', fcode)
382+
fcode = re.sub('};$', '', fcode)
383+
fcode = re.sub('^\n', '', fcode)
384+
function_definition = "def pyfun(%s):\n %s\n" % (
385+
arglist, fcode.replace("\n", "\n "))
386+
f.write(function_definition)
387+
f.flush()
388+
execfile(f.name, globals(), locals())
389+
390+
class LoopbackObject(object):
391+
def __init__(self, connection):
392+
self.__conn = connection
393+
394+
def execute(self, query):
395+
self.__conn.execute("""
396+
CREATE OR REPLACE FUNCTION export_parameters(*)
397+
RETURNS TABLE(s STRING) LANGUAGE PYTHON
398+
{
399+
import inspect
400+
import pickle
401+
frame = inspect.currentframe();
402+
args, _, _, values = inspect.getargvalues(frame);
403+
dd = {x: values[x] for x in args};
404+
del dd['_conn']
405+
del dd['_columns']
406+
del dd['_column_types']
407+
return pickle.dumps(dd);
408+
};""")
409+
self.__conn.execute("""
410+
SELECT *
411+
FROM (%s) AS xx
412+
LIMIT 1""" % query)
413+
query_description = self.__conn.description
414+
self.__conn.execute("""
415+
SELECT *
416+
FROM export_parameters ( (%s) );""" % query)
417+
data = self.__conn.fetchall()
418+
arguments = pickle.loads(str(data[0][0]))
419+
self.__conn.execute('DROP FUNCTION export_parameters;')
420+
if len(arguments) != len(query_description):
421+
raise Exception("Incorrect number of parameters!")
422+
result = dict()
423+
for i in range(len(arguments)):
424+
argname = "arg%d" % (i + 1)
425+
result[query_description[i][0]] = arguments[argname]
426+
return result
427+
428+
cleaned_arguments['_conn'] = LoopbackObject(self)
429+
pdb.set_trace()
430+
return locals()['pyfun'](*[], **cleaned_arguments)
431+
188432
def fetchone(self):
189433
"""Fetch the next row of a query result set, returning a
190434
single sequence, or None when no more data is available."""

test/test_capabilities.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import os
1616
from time import time
1717
import unittest
18+
import tempfile
1819

1920
from pymonetdb.exceptions import ProgrammingError
2021
import pymonetdb.sql
@@ -366,7 +367,7 @@ def test_bigresult(self):
366367
self.cursor.execute('select count(*) from types')
367368
r = self.cursor.fetchone()
368369
n = r[0]
369-
self.cursor.arraysize=100000
370+
self.cursor.arraysize = 100000
370371
self.cursor.execute('select * from types, types')
371372
r = self.cursor.fetchall()
372373
self.assertEqual(len(r), n**2)
@@ -393,3 +394,32 @@ def test_multiple_queries(self):
393394
(table1, table2))
394395
result = self.cursor.fetchall()
395396
self.assertEqual(result, [(50, 50)])
397+
398+
def test_debug_udf(self):
399+
self.cursor.execute("""
400+
CREATE FUNCTION test_python_udf(i INTEGER)
401+
RETURNS INTEGER
402+
LANGUAGE PYTHON {
403+
return i * 2;
404+
};
405+
""")
406+
# test if Python UDFs are enabled on the server
407+
try:
408+
self.cursor.execute('SELECT test_python_udf(1);')
409+
except:
410+
# python UDFs are disabled or not compiled in, skip this test
411+
return
412+
result = self.cursor.fetchall()
413+
self.assertEqual(result, [(2,)])
414+
# test python debugging capabilities
415+
with tempfile.NamedTemporaryFile(delete=False) as f:
416+
fname = f.name
417+
self.cursor.export('SELECT test_python_udf(1)', 'test_python_udf', filespath=fname)
418+
fname += "test_python_udf.py"
419+
with open(fname) as f:
420+
code = f.read()
421+
self.assertEqual('test_python_udf(i)' in code, True)
422+
423+
424+
425+

0 commit comments

Comments
 (0)