Skip to content

Commit 976976b

Browse files
gkorlandDvirDukhan
andauthored
Add Pipeline support (#123)
* Add Pipeline support * Broke the Transaction and Pipeline tests to seperate files. Co-authored-by: DvirDukhan <[email protected]> Co-authored-by: DvirDukhan <[email protected]>
1 parent 5b14170 commit 976976b

File tree

7 files changed

+868
-200
lines changed

7 files changed

+868
-200
lines changed

src/main/java/com/redislabs/redisgraph/RedisGraphContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ public interface RedisGraphContext extends RedisGraph {
1616
* @return Redis transactional object, over the connection context, with graph API capabilities
1717
*/
1818
RedisGraphTransaction multi();
19+
20+
/**
21+
* Returns a Redis pipeline object, over the connection context, with graph API capabilities
22+
* @return Redis pipeline object, over the connection context, with graph API capabilities
23+
*/
24+
RedisGraphPipeline pipelined();
1925

2026
/**
2127
* Perform watch over given Redis keys
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package com.redislabs.redisgraph;
2+
3+
import redis.clients.jedis.Response;
4+
import redis.clients.jedis.commands.BasicRedisPipeline;
5+
import redis.clients.jedis.commands.BinaryRedisPipeline;
6+
import redis.clients.jedis.commands.BinaryScriptingCommandsPipeline;
7+
import redis.clients.jedis.commands.ClusterPipeline;
8+
import redis.clients.jedis.commands.MultiKeyBinaryRedisPipeline;
9+
import redis.clients.jedis.commands.MultiKeyCommandsPipeline;
10+
import redis.clients.jedis.commands.RedisPipeline;
11+
import redis.clients.jedis.commands.ScriptingCommandsPipeline;
12+
13+
import java.io.Closeable;
14+
import java.util.List;
15+
import java.util.Map;
16+
17+
/**
18+
* An interface which aligned to Jedis Pipeline interface
19+
*/
20+
public interface RedisGraphPipeline extends
21+
MultiKeyBinaryRedisPipeline,
22+
MultiKeyCommandsPipeline, ClusterPipeline,
23+
BinaryScriptingCommandsPipeline, ScriptingCommandsPipeline,
24+
BasicRedisPipeline, BinaryRedisPipeline, RedisPipeline, Closeable {
25+
26+
/**
27+
* Execute a Cypher query.
28+
* @param graphId a graph to perform the query on
29+
* @param query Cypher query
30+
* @return a response which builds the result set with the query answer.
31+
*/
32+
Response<ResultSet> query(String graphId, String query);
33+
34+
/**
35+
* Execute a Cypher read-only query.
36+
* @param graphId a graph to perform the query on
37+
* @param query Cypher query
38+
* @return a response which builds the result set with the query answer.
39+
*/
40+
Response<ResultSet> readOnlyQuery(String graphId, String query);
41+
42+
/**
43+
* Execute a Cypher query with timeout.
44+
* @param graphId a graph to perform the query on
45+
* @param query Cypher query
46+
* @param timeout
47+
* @return a response which builds the result set with the query answer.
48+
*/
49+
Response<ResultSet> query(String graphId, String query, long timeout);
50+
51+
/**
52+
* Execute a Cypher read-only query with timeout.
53+
* @param graphId a graph to perform the query on
54+
* @param query Cypher query
55+
* @param timeout
56+
* @return a response which builds the result set with the query answer.
57+
*/
58+
Response<ResultSet> readOnlyQuery(String graphId, String query, long timeout);
59+
60+
/**
61+
* Execute a Cypher query with arguments
62+
* @param graphId a graph to perform the query on
63+
* @param query Cypher query
64+
* @param args
65+
* @return a response which builds the result set with the query answer.
66+
* @deprecated use {@link #query(String, String, Map)} instead.
67+
*/
68+
@Deprecated
69+
Response<ResultSet> query(String graphId, String query, Object ...args);
70+
71+
/**
72+
* Executes a cypher query with parameters.
73+
* @param graphId a graph to perform the query on.
74+
* @param query Cypher query.
75+
* @param params parameters map.
76+
* @return a response which builds the result set with the query answer.
77+
*/
78+
Response<ResultSet> query(String graphId, String query, Map<String, Object> params);
79+
80+
/**
81+
* Executes a cypher read-only query with parameters.
82+
* @param graphId a graph to perform the query on.
83+
* @param query Cypher query.
84+
* @param params parameters map.
85+
* @return a response which builds the result set with the query answer.
86+
*/
87+
Response<ResultSet> readOnlyQuery(String graphId, String query, Map<String, Object> params);
88+
89+
/**
90+
* Executes a cypher query with parameters and timeout.
91+
* @param graphId a graph to perform the query on.
92+
* @param query Cypher query.
93+
* @param params parameters map.
94+
* @param timeout
95+
* @return a response which builds the result set with the query answer.
96+
*/
97+
Response<ResultSet> query(String graphId, String query, Map<String, Object> params, long timeout);
98+
99+
/**
100+
* Executes a cypher read-only query with parameters and timeout.
101+
* @param graphId a graph to perform the query on.
102+
* @param query Cypher query.
103+
* @param params parameters map.
104+
* @param timeout
105+
* @return a response which builds the result set with the query answer.
106+
*/
107+
Response<ResultSet> readOnlyQuery(String graphId, String query, Map<String, Object> params, long timeout);
108+
109+
/**
110+
* Invokes stored procedures without arguments
111+
* @param graphId a graph to perform the query on
112+
* @param procedure procedure name to invoke
113+
* @return a response which builds result set with the procedure data
114+
*/
115+
Response<ResultSet> callProcedure(String graphId, String procedure);
116+
117+
/**
118+
* Invokes stored procedure with arguments
119+
* @param graphId a graph to perform the query on
120+
* @param procedure procedure name to invoke
121+
* @param args procedure arguments
122+
* @return a response which builds result set with the procedure data
123+
*/
124+
Response<ResultSet> callProcedure(String graphId, String procedure, List<String> args);
125+
126+
/**
127+
* Invoke a stored procedure
128+
* @param graphId a graph to perform the query on
129+
* @param procedure - procedure to execute
130+
* @param args - procedure arguments
131+
* @param kwargs - procedure output arguments
132+
* @return a response which builds result set with the procedure data
133+
*/
134+
Response<ResultSet> callProcedure(String graphId, String procedure, List<String> args , Map<String, List<String>> kwargs);
135+
136+
/**
137+
* Deletes the entire graph
138+
* @param graphId graph to delete
139+
* @return a response which builds the delete running time statistics
140+
*/
141+
Response<String> deleteGraph(String graphId);
142+
143+
144+
/**
145+
* Synchronize pipeline by reading all responses. This operation close the pipeline. Whenever
146+
* possible try to avoid using this version and use Pipeline.sync() as it won't go through all the
147+
* responses and generate the right response type (usually it is a waste of time).
148+
* @return A list of all the responses in the order you executed them.
149+
*/
150+
List<Object> syncAndReturnAll();
151+
152+
/**
153+
* Synchronize pipeline by reading all responses. This operation close the pipeline. In order to
154+
* get return values from pipelined commands, capture the different Response&lt;?&gt; of the
155+
* commands you execute.
156+
*/
157+
public void sync();
158+
}

src/main/java/com/redislabs/redisgraph/impl/api/ContextedRedisGraph.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.redislabs.redisgraph.impl.resultset.ResultSetImpl;
99
import redis.clients.jedis.Client;
1010
import redis.clients.jedis.Jedis;
11+
import redis.clients.jedis.Pipeline;
1112
import redis.clients.jedis.util.SafeEncoder;
1213
import redis.clients.jedis.exceptions.JedisDataException;
1314
import java.util.List;
@@ -148,6 +149,19 @@ public RedisGraphTransaction multi() {
148149
transaction.setRedisGraphCaches(caches);
149150
return transaction;
150151
}
152+
153+
/**
154+
* Creates a new RedisGraphPipeline pipeline object
155+
* @return new RedisGraphPipeline
156+
*/
157+
@Override
158+
public RedisGraphPipeline pipelined() {
159+
Jedis jedis = getConnection();
160+
Client client = jedis.getClient();
161+
RedisGraphPipeline pipeline = new RedisGraphPipeline(client, this);
162+
pipeline.setRedisGraphCaches(caches);
163+
return pipeline;
164+
}
151165

152166
/**
153167
* Perfrom watch over given Redis keys

0 commit comments

Comments
 (0)