Skip to content
This repository was archived by the owner on Apr 20, 2020. It is now read-only.

Commit ea52d19

Browse files
committed
Initial RediSearch support: JSON.INDEX ADD, JSON.QGET, JSON.SET ... INDEX
1 parent 0a06793 commit ea52d19

File tree

8 files changed

+255
-48
lines changed

8 files changed

+255
-48
lines changed

src/commands/index.rs

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
use serde_json::Value;
2+
3+
use redismodule::{Context, RedisError, RedisResult, RedisValue};
4+
use redismodule::{NextArg, REDIS_OK};
5+
6+
use redisearch_api::{Document, FieldType};
7+
8+
use crate::error::Error;
9+
use crate::redisjson::{Format, RedisJSON};
10+
use crate::schema::Schema;
11+
use crate::REDIS_JSON_TYPE;
12+
13+
pub mod schema_map {
14+
use crate::schema::Schema;
15+
use std::collections::HashMap;
16+
17+
type SchemaMap = HashMap<String, Schema>;
18+
19+
/// We keep a static map, since it needs to be accessed from multiple Redis module commands
20+
/// and there is no other obvious way to implement this (such as a "user data" pointer maintained
21+
/// for us by Redis).
22+
///
23+
/// The init function should be called only once.
24+
/// Unwrapping the Option is thus safe afterwards.
25+
///
26+
/// Since we have only one thread at the moment, getting a (mutable) reference to the map
27+
/// is also safe. If we add threads, we can simply wrap the map: `Option<Mutex<SchemaMap>`.
28+
///
29+
static mut SCHEMA_MAP: Option<SchemaMap> = None;
30+
31+
pub fn init() {
32+
let map = HashMap::new();
33+
unsafe {
34+
SCHEMA_MAP = Some(map);
35+
}
36+
}
37+
38+
pub fn as_ref() -> &'static SchemaMap {
39+
unsafe { SCHEMA_MAP.as_ref() }.unwrap()
40+
}
41+
42+
pub fn as_mut() -> &'static mut SchemaMap {
43+
unsafe { SCHEMA_MAP.as_mut() }.unwrap()
44+
}
45+
}
46+
47+
///////////////////////////
48+
49+
fn add_field(index_name: &str, field_name: &str, path: String) -> RedisResult {
50+
let map = schema_map::as_mut();
51+
52+
if !map.contains_key(index_name) {
53+
let schema = Schema::new(&index_name);
54+
map.insert(index_name.to_owned(), schema);
55+
}
56+
57+
let schema = map.get_mut(index_name).unwrap();
58+
59+
if schema.fields.contains_key(field_name) {
60+
Err("Field already exists".into())
61+
} else {
62+
schema.index.create_field(field_name);
63+
schema.fields.insert(field_name.to_string(), path);
64+
REDIS_OK
65+
}
66+
}
67+
68+
pub fn add_document(key: &str, index_name: &str, doc: &RedisJSON) -> RedisResult {
69+
// TODO: Index the document with RediSearch:
70+
// 1. Determine the index to use (how?)
71+
// 2. Get the fields from the index along with their associated paths
72+
// 3. Build a RS Document and populate the fields from our doc
73+
// 4. Add the Document to the index
74+
75+
let map = schema_map::as_ref();
76+
77+
map.get(index_name)
78+
.ok_or("ERR no such index".into())
79+
.and_then(|schema| {
80+
let rsdoc = create_document(key, schema, doc)?;
81+
schema.index.add_document(&rsdoc)?;
82+
REDIS_OK
83+
})
84+
}
85+
86+
fn create_document(key: &str, schema: &Schema, doc: &RedisJSON) -> Result<Document, Error> {
87+
let fields = &schema.fields;
88+
89+
let score = 1.0;
90+
let rsdoc = Document::create(key, score);
91+
92+
for (field_name, path) in fields {
93+
let value = doc.get_doc(&path)?;
94+
95+
match value {
96+
Value::String(v) => rsdoc.add_field(field_name, &v, FieldType::FULLTEXT),
97+
Value::Number(v) => rsdoc.add_field(field_name, &v.to_string(), FieldType::NUMERIC),
98+
Value::Bool(v) => rsdoc.add_field(field_name, &v.to_string(), FieldType::TAG),
99+
_ => {}
100+
}
101+
}
102+
103+
Ok(rsdoc)
104+
}
105+
106+
// JSON.INDEX ADD <index> <field> <path>
107+
// JSON.INDEX DEL <index> <field>
108+
// JSON.INDEX INFO <index> <field>
109+
pub fn index<I>(_ctx: &Context, args: I) -> RedisResult
110+
where
111+
I: IntoIterator<Item = String>,
112+
{
113+
let mut args = args.into_iter().skip(1);
114+
115+
let subcommand = args.next_string()?;
116+
let index_name = args.next_string()?;
117+
let field_name = args.next_string()?;
118+
119+
match subcommand.to_uppercase().as_str() {
120+
"ADD" => {
121+
let path = args.next_string()?;
122+
add_field(&index_name, &field_name, path)
123+
}
124+
//"DEL" => {}
125+
//"INFO" => {}
126+
_ => Err("ERR unknown subcommand - try `JSON.INDEX HELP`".into()),
127+
}
128+
}
129+
130+
// JSON.QGET <index> <query> <path>
131+
pub fn qget<I>(ctx: &Context, args: I) -> RedisResult
132+
where
133+
I: IntoIterator<Item = String>,
134+
{
135+
let mut args = args.into_iter().skip(1);
136+
137+
let index_name = args.next_string()?;
138+
let query = args.next_string()?;
139+
let path = args.next().unwrap_or("$".to_string());
140+
141+
let map = schema_map::as_ref();
142+
143+
map.get(&index_name)
144+
.ok_or("ERR no such index".into())
145+
.map(|schema| &schema.index)
146+
.and_then(|index| {
147+
let results: Result<Vec<_>, RedisError> = index
148+
.search(&query)?
149+
.map(|key| {
150+
let key = ctx.open_key_writable(&key);
151+
let value = match key.get_value::<RedisJSON>(&REDIS_JSON_TYPE)? {
152+
Some(doc) => doc.to_string(&path, Format::JSON)?.into(),
153+
None => RedisValue::None,
154+
};
155+
Ok(value)
156+
})
157+
.collect();
158+
159+
Ok(results?.into())
160+
})
161+
}

src/commands/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod index;

src/lib.rs

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,23 @@
22
extern crate redismodule;
33

44
use redismodule::native_types::RedisType;
5-
use redismodule::raw as rawmod;
65
use redismodule::raw::RedisModuleTypeMethods;
7-
use redismodule::{Context, NextArg, RedisError, RedisResult, RedisValue, REDIS_OK};
6+
use redismodule::{raw as rawmod, NextArg};
7+
use redismodule::{Context, RedisError, RedisResult, RedisValue, REDIS_OK};
88
use serde_json::{Number, Value};
9+
910
use std::{i64, usize};
1011

1112
mod array_index;
1213
mod backward;
14+
mod commands;
1315
mod error;
1416
mod nodevisitor;
1517
mod redisjson;
16-
mod schema;
18+
mod schema; // TODO: Remove
1719

1820
use crate::array_index::ArrayIndex;
21+
use crate::commands::index;
1922
use crate::error::Error;
2023
use crate::redisjson::{Format, RedisJSON, SetOptions};
2124

@@ -36,23 +39,6 @@ static REDIS_JSON_TYPE: RedisType = RedisType::new(
3639
},
3740
);
3841

39-
static REDIS_JSON_SCHEMA_TYPE: RedisType = RedisType::new(
40-
"ReJSON-SC",
41-
1,
42-
RedisModuleTypeMethods {
43-
version: redismodule::TYPE_METHOD_VERSION,
44-
45-
rdb_load: Some(schema::type_methods::rdb_load),
46-
rdb_save: Some(schema::type_methods::rdb_save),
47-
aof_rewrite: None, // TODO add support
48-
free: Some(schema::type_methods::free),
49-
50-
// Currently unused by Redis
51-
mem_usage: None,
52-
digest: None,
53-
},
54-
);
55-
5642
///
5743
/// Backwards compatibility convertor for RedisJSON 1.x clients
5844
///
@@ -94,7 +80,7 @@ fn json_del(ctx: &Context, args: Vec<String>) -> RedisResult {
9480
}
9581

9682
///
97-
/// JSON.SET <key> <path> <json> [NX | XX]
83+
/// JSON.SET <key> <path> <json> [NX | XX | FORMAT <format> | INDEX <index>]
9884
///
9985
fn json_set(ctx: &Context, args: Vec<String>) -> RedisResult {
10086
let mut args = args.into_iter().skip(1);
@@ -105,6 +91,8 @@ fn json_set(ctx: &Context, args: Vec<String>) -> RedisResult {
10591

10692
let mut format = Format::JSON;
10793
let mut set_option = SetOptions::None;
94+
let mut index = None;
95+
10896
loop {
10997
if let Some(s) = args.next() {
11098
match s.to_uppercase().as_str() {
@@ -113,19 +101,25 @@ fn json_set(ctx: &Context, args: Vec<String>) -> RedisResult {
113101
"FORMAT" => {
114102
format = Format::from_str(args.next_string()?.as_str())?;
115103
}
104+
"INDEX" => {
105+
index = Some(args.next_string()?);
106+
}
116107
_ => break,
117108
};
118109
} else {
119110
break;
120111
}
121112
}
122113

123-
let key = ctx.open_key_writable(&key);
124-
let current = key.get_value::<RedisJSON>(&REDIS_JSON_TYPE)?;
114+
let redis_key = ctx.open_key_writable(&key);
115+
let current = redis_key.get_value::<RedisJSON>(&REDIS_JSON_TYPE)?;
125116

126117
match (current, set_option) {
127118
(Some(ref mut doc), ref op) => {
128119
if doc.set_value(&value, &path, op, format)? {
120+
if let Some(index) = index {
121+
index::add_document(&key, &index, &doc)?;
122+
}
129123
REDIS_OK
130124
} else {
131125
Ok(RedisValue::None)
@@ -135,7 +129,16 @@ fn json_set(ctx: &Context, args: Vec<String>) -> RedisResult {
135129
(None, _) => {
136130
let doc = RedisJSON::from_str(&value, format)?;
137131
if path == "$" {
138-
key.set_value(&REDIS_JSON_TYPE, doc)?;
132+
redis_key.set_value(&REDIS_JSON_TYPE, doc)?;
133+
134+
if let Some(index) = index {
135+
// FIXME: We need to get the value even though we just set it,
136+
// since the original doc is consumed by set_value.
137+
// Can we do better than this?
138+
let doc = redis_key.get_value(&REDIS_JSON_TYPE)?.unwrap();
139+
index::add_document(&key, &index, doc)?;
140+
}
141+
139142
REDIS_OK
140143
} else {
141144
Err("ERR new objects must be created at the root".into())
@@ -721,10 +724,6 @@ fn json_len<F: Fn(&RedisJSON, &String) -> Result<usize, Error>>(
721724
Ok(length)
722725
}
723726

724-
fn json_createindex(ctx: &Context, args: Vec<String>) -> RedisResult {
725-
Err("Command was not implemented".into())
726-
}
727-
728727
fn json_cache_info(_ctx: &Context, _args: Vec<String>) -> RedisResult {
729728
Err("Command was not implemented".into())
730729
}
@@ -735,6 +734,7 @@ fn json_cache_init(_ctx: &Context, _args: Vec<String>) -> RedisResult {
735734
//////////////////////////////////////////////////////
736735

737736
pub extern "C" fn init(raw_ctx: *mut rawmod::RedisModuleCtx) -> c_int {
737+
crate::commands::index::schema_map::init();
738738
redisearch_api::init(raw_ctx)
739739
}
740740

@@ -743,7 +743,6 @@ redis_module! {
743743
version: 1,
744744
data_types: [
745745
REDIS_JSON_TYPE,
746-
REDIS_JSON_SCHEMA_TYPE
747746
],
748747
init: init,
749748
commands: [
@@ -768,7 +767,8 @@ redis_module! {
768767
["json.debug", json_debug, ""],
769768
["json.forget", json_del, "write"],
770769
["json.resp", json_resp, ""],
771-
["json.createindex", json_createindex, "write"],
770+
["json.index", commands::index::index, "write"],
771+
["json.qget", commands::index::qget, ""],
772772
["json._cacheinfo", json_cache_info, ""],
773773
["json._cacheinit", json_cache_init, "write"],
774774
],

src/redisjson.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ impl RedisJSON {
166166
Ok(res)
167167
}
168168

169+
// FIXME: Implement this by manipulating serde_json::Value values,
170+
// and then using serde to serialize to JSON instead of doing it ourselves with strings.
169171
pub fn to_json(&self, paths: &mut Vec<String>) -> Result<String, Error> {
170172
let mut selector = jsonpath_lib::selector(&self.data);
171173
let mut result = paths.drain(..).fold(String::from("{"), |mut acc, path| {
@@ -331,6 +333,7 @@ impl RedisJSON {
331333
Ok(res.into())
332334
}
333335

336+
// TODO: Rename this to 'get_value', since 'doc' is overloaded.
334337
pub fn get_doc<'a>(&'a self, path: &'a str) -> Result<&'a Value, Error> {
335338
let results = jsonpath_lib::select(&self.data, path)?;
336339
match results.first() {

0 commit comments

Comments
 (0)