Skip to content

Commit 60cd144

Browse files
Fix Redis Query for RediSearch 2.8+ (dapr#3303)
Signed-off-by: joshvanl <[email protected]> Co-authored-by: Alessandro (Ale) Segala <[email protected]> Signed-off-by: Alessandro (Ale) Segala <[email protected]>
1 parent 007d0e0 commit 60cd144

File tree

4 files changed

+152
-16
lines changed

4 files changed

+152
-16
lines changed

state/redis/redis_query.go

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -295,20 +295,83 @@ func (q *Query) execute(ctx context.Context, client rediscomponent.RedisClient)
295295
if err != nil {
296296
return nil, "", err
297297
}
298-
arr, ok := ret.([]interface{})
298+
299+
res, ok, err := parseQueryResponsePost28(ret)
300+
if err != nil {
301+
return nil, "", err
302+
}
303+
if !ok {
304+
res, err = parseQueryResponsePre28(ret)
305+
if err != nil {
306+
return nil, "", err
307+
}
308+
}
309+
310+
// set next query token only if limit is specified
311+
var token string
312+
if q.limit > 0 && len(res) > 0 {
313+
token = strconv.FormatInt(q.offset+int64(len(res)), 10)
314+
}
315+
316+
return res, token, err
317+
}
318+
319+
// parseQueryResponsePost28 parses the query Do response from redisearch 2.8+.
320+
func parseQueryResponsePost28(ret any) ([]state.QueryItem, bool, error) {
321+
aarr, ok := ret.(map[any]any)
299322
if !ok {
300-
return nil, "", fmt.Errorf("invalid output")
323+
return nil, false, nil
324+
}
325+
326+
var res []state.QueryItem
327+
arr := aarr["results"].([]any)
328+
if len(arr) == 0 {
329+
return nil, false, errors.New("invalid output")
301330
}
302-
// arr[0] = number of matching elements in DB (ignoring pagination)
331+
for i := 0; i < len(arr); i++ {
332+
inner, ok := arr[i].(map[any]any)
333+
if !ok {
334+
return nil, false, fmt.Errorf("invalid output")
335+
}
336+
exattr, ok := inner["extra_attributes"].(map[any]any)
337+
if !ok {
338+
return nil, false, fmt.Errorf("invalid output")
339+
}
340+
item := state.QueryItem{
341+
Key: inner["id"].(string),
342+
}
343+
if data, ok := exattr["$.data"].(string); ok {
344+
item.Data = []byte(data)
345+
} else {
346+
item.Error = fmt.Sprintf("%#v is not string", exattr["$.data"])
347+
}
348+
if etag, ok := exattr["$.version"].(string); ok {
349+
item.ETag = &etag
350+
}
351+
res = append(res, item)
352+
}
353+
354+
return res, true, nil
355+
}
356+
357+
// parseQueryResponsePre28 parses the query Do response from redisearch 2.8-.
358+
func parseQueryResponsePre28(ret any) ([]state.QueryItem, error) {
359+
arr, ok := ret.([]any)
360+
if !ok {
361+
return nil, errors.New("invalid output")
362+
}
363+
364+
// arr[0] = number of matching elements in DB (ignoring pagination
303365
// arr[2n] = key
304366
// arr[2n+1][0] = "$.data"
305367
// arr[2n+1][1] = value
306368
// arr[2n+1][2] = "$.version"
307369
// arr[2n+1][3] = etag
308370
if len(arr)%2 != 1 {
309-
return nil, "", fmt.Errorf("invalid output")
371+
return nil, fmt.Errorf("invalid output")
310372
}
311-
res := []state.QueryItem{}
373+
374+
var res []state.QueryItem
312375
for i := 1; i < len(arr); i += 2 {
313376
item := state.QueryItem{
314377
Key: arr[i].(string),
@@ -322,11 +385,6 @@ func (q *Query) execute(ctx context.Context, client rediscomponent.RedisClient)
322385
}
323386
res = append(res, item)
324387
}
325-
// set next query token only if limit is specified
326-
var token string
327-
if q.limit > 0 && len(res) > 0 {
328-
token = strconv.FormatInt(q.offset+int64(len(res)), 10)
329-
}
330388

331-
return res, token, err
389+
return res, nil
332390
}

tests/certification/state/redis/components/docker/default/redisstatestore.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,16 @@ spec:
1313
value: 5m
1414
- name: timeout
1515
value: 100s
16+
- name: queryIndexes
17+
value: |
18+
[
19+
{
20+
"name": "tquery",
21+
"indexes": [
22+
{
23+
"key": "qmsg",
24+
"type": "TEXT"
25+
}
26+
]
27+
}
28+
]

tests/certification/state/redis/docker-compose.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
version: "3.3"
22
services:
33
redis:
4+
healthcheck:
5+
test: ["CMD", "redis-cli","ping"]
46
image: 'redislabs/redisearch:latest'
57
ports:
68
- '6379:6379'
7-
command: redis-server

tests/certification/state/redis/redis_test.go

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/stretchr/testify/require"
2626

2727
"github.com/dapr/components-contrib/state"
28+
"github.com/dapr/components-contrib/state/query"
2829
state_redis "github.com/dapr/components-contrib/state/redis"
2930
"github.com/dapr/components-contrib/tests/certification/embedded"
3031
"github.com/dapr/components-contrib/tests/certification/flow"
@@ -248,12 +249,74 @@ func TestRedis(t *testing.T) {
248249
return nil
249250
}
250251

252+
type qValue struct {
253+
Qmsg string `json:"qmsg"`
254+
}
255+
256+
// Query test
257+
queryTest := func(ctx flow.Context) error {
258+
err := stateStore.Multi(context.Background(), &state.TransactionalStateRequest{
259+
Operations: []state.TransactionalStateOperation{
260+
state.SetRequest{
261+
Key: "qKey1",
262+
Value: qValue{Qmsg: "test1"},
263+
},
264+
state.SetRequest{
265+
Key: "qKey2",
266+
Value: qValue{Qmsg: "test2"},
267+
},
268+
state.SetRequest{
269+
Key: "qKey3",
270+
Value: qValue{Qmsg: "test3"},
271+
},
272+
},
273+
Metadata: map[string]string{
274+
"contentType": "application/json",
275+
},
276+
})
277+
require.NoError(t, err)
278+
resp, err := stateStore.Query(context.Background(), &state.QueryRequest{
279+
Query: query.Query{
280+
QueryFields: query.QueryFields{
281+
Filters: map[string]any{
282+
"OR": []any{
283+
map[string]any{
284+
"EQ": map[string]any{
285+
"qmsg": "test1",
286+
},
287+
},
288+
map[string]any{
289+
"EQ": map[string]any{
290+
"qmsg": "test2",
291+
},
292+
},
293+
},
294+
},
295+
},
296+
Filter: &query.OR{
297+
Filters: []query.Filter{
298+
&query.EQ{Key: "qmsg", Val: "test1"},
299+
&query.EQ{Key: "qmsg", Val: "test2"},
300+
},
301+
},
302+
},
303+
Metadata: map[string]string{
304+
"queryIndexName": "tquery",
305+
"contentType": "application/json",
306+
},
307+
})
308+
require.NoError(t, err)
309+
require.Len(t, resp.Results, 2)
310+
assert.Equal(t, "qKey1", resp.Results[0].Key)
311+
assert.Equal(t, "qKey2", resp.Results[1].Key)
312+
assert.JSONEq(t, `{"qmsg":"test1"}`, string(resp.Results[0].Data))
313+
assert.JSONEq(t, `{"qmsg":"test2"}`, string(resp.Results[1].Data))
314+
return nil
315+
}
316+
251317
testForStateStoreNotConfigured := func(ctx flow.Context) error {
252318
client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort))
253-
// require.Error(t, err)
254-
if err != nil {
255-
panic(err)
256-
}
319+
require.NoError(t, err)
257320
defer client.Close()
258321

259322
err = client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key1", []byte("redisCert"), nil)
@@ -289,6 +352,7 @@ func TestRedis(t *testing.T) {
289352
embedded.WithStates(stateRegistry),
290353
)).
291354
Step("Run basic test", basicTest).
355+
Step("Run test for Query", queryTest).
292356
Step("Run TTL related test", timeToLiveTest).
293357
Step("interrupt network",
294358
network.InterruptNetwork(10*time.Second, nil, nil, "6379:6379")).

0 commit comments

Comments
 (0)