Skip to content

Commit ca70194

Browse files
committed
Merge pull request #7 from citrusleaf/query_changes
Lua parameters to a query are a list
2 parents 1060d15 + f01c6ad commit ca70194

File tree

3 files changed

+80
-17
lines changed

3 files changed

+80
-17
lines changed

src/main/query/apply.c

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,28 +26,35 @@
2626

2727
AerospikeQuery * AerospikeQuery_Apply(AerospikeQuery * self, PyObject * args, PyObject * kwds)
2828
{
29-
as_error err;
3029

31-
// Initialize error
30+
// Python function arguments
31+
PyObject * py_module = NULL;
32+
PyObject * py_function = NULL;
33+
PyObject * py_args = NULL;
34+
PyObject * py_policy = NULL;
35+
36+
// Python function keyword arguments
37+
static char * kwlist[] = {"module", "function", "arguments", "policy", NULL};
38+
39+
if ( PyArg_ParseTupleAndKeywords(args, kwds, "OO|OO:apply", kwlist, &py_module, &py_function, &py_args, &py_policy) == false ){
40+
return NULL;
41+
}
42+
43+
// Aerospike error object
44+
as_error err;
45+
// Initialize error object
3246
as_error_init(&err);
3347

34-
int nargs = (int) PyTuple_Size(args);
48+
if ( !self || !self->client->as ){
49+
as_error_update(&err, AEROSPIKE_ERR_PARAM, "Invalid query object");
50+
goto CLEANUP;
51+
}
3552

3653
// Aerospike API Arguments
3754
char * module = NULL;
3855
char * function = NULL;
3956
as_arraylist * arglist = NULL;
4057

41-
// too few args
42-
if ( nargs < 2 ) {
43-
as_error_update(&err, AEROSPIKE_ERR_CLIENT, "udf module and function names are required.");
44-
goto CLEANUP;
45-
}
46-
47-
// Python Arguments
48-
PyObject * py_module = PyTuple_GetItem(args, 0);
49-
PyObject * py_function = PyTuple_GetItem(args, 1);
50-
5158
if ( PyString_Check(py_module) ) {
5259
module = PyString_AsString(py_module);
5360
}
@@ -64,10 +71,13 @@ AerospikeQuery * AerospikeQuery_Apply(AerospikeQuery * self, PyObject * args, Py
6471
goto CLEANUP;
6572
}
6673

67-
if ( nargs > 2 ) {
68-
arglist = as_arraylist_new(nargs-2, 0);
69-
for ( int i = 2; i < nargs; i++ ) {
70-
PyObject * py_val = PyTuple_GetItem(args, i);
74+
if ( py_args && PyList_Check(py_args) ){
75+
Py_ssize_t size = PyList_Size(py_args);
76+
77+
arglist = as_arraylist_new(size, 0);
78+
79+
for ( int i = 0; i < size; i++ ) {
80+
PyObject * py_val = PyList_GetItem(py_args, (Py_ssize_t)i);
7181
as_val * val = NULL;
7282
pyobject_to_val(&err, py_val, &val);
7383
if ( err.code != AEROSPIKE_OK ) {

test/stream_example.lua

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,40 @@ end
1717
function count_less()
1818
return stream : map(one) : reduce(add);
1919
end
20+
21+
local function having_ge_threshold(bin_having, ge_threshold)
22+
return function(rec)
23+
if rec[bin_having] < ge_threshold then
24+
return false
25+
end
26+
return true
27+
end
28+
end
29+
30+
local function count_bins(group_by_bin)
31+
return function(group, rec)
32+
if rec[group_by_bin] then
33+
local bin_name = rec[group_by_bin]
34+
group[bin_name] = (group[bin_name] or 0) + 1
35+
end
36+
return group
37+
end
38+
end
39+
40+
local function add_values(val1, val2)
41+
return val1 + val2
42+
end
43+
44+
local function reduce_groups(a, b)
45+
return map.merge(a, b, add_values)
46+
end
47+
48+
function group_count(stream, group_by_bin, bin_having, ge_threshold)
49+
if bin_having and ge_threshold then
50+
local myfilter = having_ge_threshold(bin_having, ge_threshold)
51+
return stream : filter(myfilter) : aggregate(map{}, count_bins(group_by_bin)) : reduce(reduce_groups)
52+
else
53+
return stream : aggregate(map{}, count_bins(group_by_bin)) : reduce(reduce_groups)
54+
end
55+
end
56+

test/test_aggregate.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,3 +348,19 @@ def user_callback(value):
348348
query.foreach(user_callback)
349349

350350
assert exception.value[0] == 1L
351+
352+
def test_aggregate_with_arguments_to_lua_function(self):
353+
"""
354+
Invoke apply() with parameter's list for lua function.
355+
"""
356+
query = self.client.query('test', 'demo')
357+
query.where(p.between('test_age', 0, 5))
358+
query.apply('stream_example', 'group_count', ["name", "addr"])
359+
360+
rec = []
361+
def callback(value):
362+
rec.append(value)
363+
364+
query.foreach(callback)
365+
assert rec == [{u'name4': 1, u'name2': 1, u'name3': 1, u'name0': 1, u'name1': 1}]
366+

0 commit comments

Comments
 (0)