Skip to content

Commit 06059bf

Browse files
author
DvirDukhan
committed
Modified implementation to work in multi threaded mode
1 parent e1e9bfd commit 06059bf

File tree

6 files changed

+373
-339
lines changed

6 files changed

+373
-339
lines changed
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
package com.redislabs.redisgraph;
2+
3+
import com.redislabs.redisgraph.impl.GraphCache;
4+
import com.redislabs.redisgraph.impl.ResultSetImpl;
5+
import org.apache.commons.text.translate.AggregateTranslator;
6+
import org.apache.commons.text.translate.CharSequenceTranslator;
7+
import org.apache.commons.text.translate.LookupTranslator;
8+
import redis.clients.jedis.BinaryClient;
9+
import redis.clients.jedis.Jedis;
10+
import redis.clients.jedis.JedisPool;
11+
import redis.clients.jedis.commands.ProtocolCommand;
12+
import redis.clients.jedis.util.Pool;
13+
14+
import java.io.Closeable;
15+
import java.util.*;
16+
import java.util.concurrent.ConcurrentHashMap;
17+
import java.util.stream.Collectors;
18+
19+
20+
/**
21+
*
22+
*/
23+
public class RedisGraph implements Closeable {
24+
25+
26+
27+
private final Pool<Jedis> client;
28+
private final Map<String, GraphCache> graphCaches = new ConcurrentHashMap<>();
29+
30+
31+
32+
private static final CharSequenceTranslator ESCAPE_CHYPER;
33+
static {
34+
final Map<CharSequence, CharSequence> escapeJavaMap = new HashMap<>();
35+
escapeJavaMap.put("\'", "\\'");
36+
escapeJavaMap.put("\"", "\\\"");
37+
ESCAPE_CHYPER = new AggregateTranslator(new LookupTranslator(Collections.unmodifiableMap(escapeJavaMap)));
38+
}
39+
40+
/**
41+
* Creates a client running on the local machine
42+
43+
*/
44+
public RedisGraph() {
45+
this("localhost", 6379);
46+
}
47+
48+
/**
49+
* Creates a client running on the specific host/post
50+
*
51+
* @param host Redis host
52+
* @param port Redis port
53+
*/
54+
public RedisGraph(String host, int port) {
55+
this( new JedisPool(host, port));
56+
}
57+
58+
/**
59+
* Creates a client using provided Jedis pool
60+
*
61+
* @param jedis bring your own Jedis pool
62+
*/
63+
public RedisGraph( Pool<Jedis> jedis) {
64+
65+
this.client = jedis;
66+
}
67+
68+
@Override
69+
public void close(){
70+
this.client.close();
71+
}
72+
73+
74+
/**
75+
* Execute a Cypher query with arguments
76+
*
77+
* @param graphId a graph to perform the query on
78+
* @param query Cypher query
79+
* @param args
80+
* @return a result set
81+
*/
82+
public ResultSet query(String graphId, String query, Object ...args) {
83+
if(args.length > 0) {
84+
for(int i=0; i<args.length; ++i) {
85+
if(args[i] instanceof String) {
86+
args[i] = "\'" + ESCAPE_CHYPER.translate((String)args[i]) + "\'";
87+
}
88+
}
89+
query = String.format(query, args);
90+
}
91+
graphCaches.putIfAbsent(graphId, new GraphCache(graphId, this));
92+
93+
try (Jedis conn = getConnection()) {
94+
return new ResultSetImpl(sendCompactCommand(conn, Command.QUERY, graphId, query).getObjectMultiBulkReply(), graphCaches.get(graphId));
95+
}
96+
}
97+
98+
/**
99+
* Invokes stored procedures without arguments
100+
* @param graphId a graph to perform the query on
101+
* @param procedure procedure name to invoke
102+
* @return result set with the procedure data
103+
*/
104+
public ResultSet callProcedure(String graphId, String procedure ){
105+
return callProcedure(graphId, procedure, new ArrayList<>(), new HashMap<>());
106+
}
107+
108+
109+
/**
110+
* Invokes stored procedure with arguments
111+
* @param graphId a graph to perform the query on
112+
* @param procedure procedure name to invoke
113+
* @param args procedure arguments
114+
* @return result set with the procedure data
115+
*/
116+
public ResultSet callProcedure(String graphId, String procedure, List<String> args ){
117+
return callProcedure(graphId, procedure, args, new HashMap<>());
118+
}
119+
120+
121+
/**
122+
* Deletes the entire graph
123+
*
124+
* @return delete running time statistics
125+
*/
126+
public String deleteGraph(String graphId) {
127+
//clear local state
128+
graphCaches.remove(graphId);
129+
try (Jedis conn = getConnection()) {
130+
return sendCommand(conn, Command.DELETE, graphId).getBulkReply();
131+
}
132+
133+
}
134+
135+
136+
/**
137+
* Sends command - will be replaced with sendCompactCommand once graph.delete support --compact flag
138+
* @param conn - connection
139+
* @param provider - command type
140+
* @param args - command arguments
141+
* @return
142+
*/
143+
private BinaryClient sendCommand(Jedis conn, ProtocolCommand provider, String ...args) {
144+
BinaryClient binaryClient = conn.getClient();
145+
binaryClient.sendCommand(provider, args);
146+
return binaryClient;
147+
}
148+
149+
150+
/**
151+
* Sends the command with --COMPACT flag
152+
* @param conn - connection
153+
* @param provider - command type
154+
* @param args - command arguments
155+
* @return
156+
*/
157+
private BinaryClient sendCompactCommand(Jedis conn, ProtocolCommand provider, String ...args) {
158+
BinaryClient binaryClient = conn.getClient();
159+
String[] t = new String[args.length +1];
160+
System.arraycopy(args, 0 , t, 0, args.length);
161+
t[args.length]="--COMPACT";
162+
binaryClient.sendCommand(provider, t);
163+
return binaryClient;
164+
}
165+
166+
private Jedis getConnection() {
167+
return this.client.getResource();
168+
}
169+
170+
171+
/**
172+
* Invoke a stored procedure
173+
* @param graphId a graph to perform the query on
174+
* @param procedure - procedure to execute
175+
* @param args - procedure arguments
176+
* @param kwargs
177+
* @return
178+
*/
179+
public ResultSet callProcedure(String graphId, String procedure, List<String> args , Map<String, List<String>> kwargs ){
180+
181+
args = args.stream().map( s -> Utils.quoteString(s)).collect(Collectors.toList());
182+
StringBuilder q = new StringBuilder();
183+
q.append(String.format("CALL %s(%s)", procedure, String.join(",", args)));
184+
List<String> y = kwargs.getOrDefault("y", null);
185+
if(y != null){
186+
q.append(String.join(",", y));
187+
}
188+
return query(graphId, q.toString());
189+
}
190+
}

0 commit comments

Comments
 (0)