|
1 | 1 | package nl.melp.redis; |
2 | 2 |
|
| 3 | +import nl.melp.redis.protocol.Encoder; |
| 4 | +import nl.melp.redis.protocol.Parser; |
| 5 | + |
3 | 6 | import java.io.*; |
4 | 7 | import java.net.Socket; |
5 | 8 | import java.util.Arrays; |
|
12 | 15 | * Effectively a complete Redis client implementation. |
13 | 16 | */ |
14 | 17 | public class Redis { |
15 | | - /** |
16 | | - * Implements the encoding (writing) side. |
17 | | - */ |
18 | | - static class Encoder { |
19 | | - /** |
20 | | - * CRLF is used a lot. |
21 | | - */ |
22 | | - private static byte[] CRLF = new byte[]{'\r', '\n'}; |
23 | | - |
24 | | - /** |
25 | | - * This stream we will write to. |
26 | | - */ |
27 | | - private final OutputStream out; |
28 | | - |
29 | | - /** |
30 | | - * Construct the encoder with the passed outputstream the encoder will write to. |
31 | | - * |
32 | | - * @param out Will be used to write all encoded data to. |
33 | | - */ |
34 | | - Encoder(OutputStream out) { |
35 | | - this.out = out; |
36 | | - } |
37 | | - |
38 | | - /** |
39 | | - * Write a byte array in the "RESP Bulk String" format. |
40 | | - * |
41 | | - * @param value The byte array to write. |
42 | | - * @throws IOException Propagated from the output stream. |
43 | | - * @link https://redis.io/topics/protocol#resp-bulk-strings |
44 | | - */ |
45 | | - void write(byte[] value) throws IOException { |
46 | | - out.write('$'); |
47 | | - out.write(Long.toString(value.length).getBytes()); |
48 | | - out.write(CRLF); |
49 | | - out.write(value); |
50 | | - out.write(CRLF); |
51 | | - } |
52 | | - |
53 | | - /** |
54 | | - * Write a long value in the "RESP Integers" format. |
55 | | - * |
56 | | - * @param val The value to write. |
57 | | - * @throws IOException Propagated from the output stream. |
58 | | - * @link https://redis.io/topics/protocol#resp-integers |
59 | | - */ |
60 | | - void write(long val) throws IOException { |
61 | | - out.write(':'); |
62 | | - out.write(Long.toString(val).getBytes()); |
63 | | - out.write(CRLF); |
64 | | - } |
65 | | - |
66 | | - /** |
67 | | - * Write a list of objects in the "RESP Arrays" format. |
68 | | - * |
69 | | - * @param list A list of objects that contains Strings, Longs, Integers and (recursively) Lists. |
70 | | - * @throws IOException Propagated from the output stream. |
71 | | - * @throws IllegalArgumentException If the list contains unencodable objects. |
72 | | - * @link https://redis.io/topics/protocol#resp-arrays |
73 | | - */ |
74 | | - void write(List<?> list) throws IOException, IllegalArgumentException { |
75 | | - out.write('*'); |
76 | | - out.write(Long.toString(list.size()).getBytes()); |
77 | | - out.write(CRLF); |
78 | | - |
79 | | - for (Object o : list) { |
80 | | - if (o instanceof byte[]) { |
81 | | - write((byte[]) o); |
82 | | - } else if (o instanceof String) { |
83 | | - write(((String) o).getBytes()); |
84 | | - } else if (o instanceof Long) { |
85 | | - write((Long) o); |
86 | | - } else if (o instanceof Integer) { |
87 | | - write(((Integer) o).longValue()); |
88 | | - } else if (o instanceof List) { |
89 | | - write((List<?>) o); |
90 | | - } else { |
91 | | - throw new IllegalArgumentException("Unexpected type " + o.getClass().getCanonicalName()); |
92 | | - } |
93 | | - } |
94 | | - } |
95 | | - |
96 | | - void flush() throws IOException { |
97 | | - out.flush(); |
98 | | - } |
99 | | - } |
100 | | - |
101 | | - /** |
102 | | - * Implements the parser (reader) side of protocol. |
103 | | - */ |
104 | | - static class Parser { |
105 | | - /** |
106 | | - * Thrown whenever data could not be parsed. |
107 | | - */ |
108 | | - static class ProtocolException extends IOException { |
109 | | - ProtocolException(String msg) { |
110 | | - super(msg); |
111 | | - } |
112 | | - } |
113 | | - |
114 | | - /** |
115 | | - * Thrown whenever an error string is decoded. |
116 | | - */ |
117 | | - static class ServerError extends IOException { |
118 | | - ServerError(String msg) { |
119 | | - super(msg); |
120 | | - } |
121 | | - } |
122 | | - |
123 | | - /** |
124 | | - * The input stream used to read the data from. |
125 | | - */ |
126 | | - private final InputStream input; |
127 | | - |
128 | | - /** |
129 | | - * Constructor. |
130 | | - * |
131 | | - * @param input The stream to read the data from. |
132 | | - */ |
133 | | - Parser(InputStream input) { |
134 | | - this.input = input; |
135 | | - } |
136 | | - |
137 | | - /** |
138 | | - * Parse incoming data from the stream. |
139 | | - * <p> |
140 | | - * Based on each of the markers which will identify the type of data being sent, the parsing |
141 | | - * is delegated to the type-specific methods. |
142 | | - * |
143 | | - * @return The parsed object |
144 | | - * @throws IOException Propagated from the stream |
145 | | - * @throws ProtocolException In case unexpected bytes are encountered. |
146 | | - */ |
147 | | - Object parse() throws IOException, ProtocolException { |
148 | | - Object ret; |
149 | | - int read = this.input.read(); |
150 | | - switch (read) { |
151 | | - case '+': |
152 | | - ret = this.parseSimpleString(); |
153 | | - break; |
154 | | - case '-': |
155 | | - throw new ServerError(new String(this.parseSimpleString())); |
156 | | - case ':': |
157 | | - ret = this.parseNumber(); |
158 | | - break; |
159 | | - case '$': |
160 | | - ret = this.parseBulkString(); |
161 | | - break; |
162 | | - case '*': |
163 | | - long len = this.parseNumber(); |
164 | | - if (len == -1) { |
165 | | - ret = null; |
166 | | - } else { |
167 | | - List<Object> arr = new LinkedList<>(); |
168 | | - for (long i = 0; i < len; i++) { |
169 | | - arr.add(this.parse()); |
170 | | - } |
171 | | - ret = arr; |
172 | | - } |
173 | | - break; |
174 | | - case -1: |
175 | | - return null; |
176 | | - default: |
177 | | - throw new ProtocolException("Unexpected input: " + (byte) read); |
178 | | - } |
179 | | - |
180 | | - return ret; |
181 | | - } |
182 | | - |
183 | | - /** |
184 | | - * Parse "RESP Bulk string" as a String object. |
185 | | - * |
186 | | - * @return The parsed response |
187 | | - * @throws IOException Propagated from underlying stream. |
188 | | - */ |
189 | | - private byte[] parseBulkString() throws IOException, ProtocolException { |
190 | | - final long expectedLength = parseNumber(); |
191 | | - if (expectedLength == -1) { |
192 | | - return null; |
193 | | - } |
194 | | - if (expectedLength > Integer.MAX_VALUE) { |
195 | | - throw new ProtocolException("Unsupported value length for bulk string"); |
196 | | - } |
197 | | - final int numBytes = (int) expectedLength; |
198 | | - final byte[] buffer = new byte[numBytes]; |
199 | | - int read = 0; |
200 | | - while (read < expectedLength) { |
201 | | - read += input.read(buffer, read, numBytes - read); |
202 | | - } |
203 | | - if (input.read() != '\r') { |
204 | | - throw new ProtocolException("Expected CR"); |
205 | | - } |
206 | | - if (input.read() != '\n') { |
207 | | - throw new ProtocolException("Expected LF"); |
208 | | - } |
209 | | - |
210 | | - return buffer; |
211 | | - } |
212 | | - |
213 | | - /** |
214 | | - * Parse "RESP Simple String" |
215 | | - * |
216 | | - * @return Resultant string |
217 | | - * @throws IOException Propagated from underlying stream. |
218 | | - */ |
219 | | - private byte[] parseSimpleString() throws IOException { |
220 | | - return scanCr(1024); |
221 | | - } |
222 | | - |
223 | | - private long parseNumber() throws IOException { |
224 | | - return Long.valueOf(new String(scanCr(1024))); |
225 | | - } |
226 | | - |
227 | | - private byte[] scanCr(int size) throws IOException { |
228 | | - int idx = 0; |
229 | | - int ch; |
230 | | - byte[] buffer = new byte[size]; |
231 | | - while ((ch = input.read()) != '\r') { |
232 | | - buffer[idx++] = (byte) ch; |
233 | | - if (idx == size) { |
234 | | - // increase buffer size. |
235 | | - size *= 2; |
236 | | - buffer = java.util.Arrays.copyOf(buffer, size); |
237 | | - } |
238 | | - } |
239 | | - if (input.read() != '\n') { |
240 | | - throw new ProtocolException("Expected LF"); |
241 | | - } |
242 | | - |
243 | | - return Arrays.copyOfRange(buffer, 0, idx); |
244 | | - } |
245 | | - } |
246 | 18 |
|
247 | 19 | /** |
248 | 20 | * Used for writing the data to the server. |
@@ -378,12 +150,52 @@ public interface FailableConsumer<T, E extends Throwable> { |
378 | 150 | * @throws IOException Propagated |
379 | 151 | */ |
380 | 152 | public static void run(FailableConsumer<Redis, IOException> callback, String addr, int port) throws IOException { |
381 | | - try (Socket s = new Socket(addr, port)) { |
382 | | - run(callback, s); |
| 153 | + try (Managed redis = connect(addr, port)) { |
| 154 | + callback.accept(redis); |
383 | 155 | } |
384 | 156 | } |
385 | 157 |
|
| 158 | + /** |
| 159 | + * Utility method to run a single command on an existing socket. |
| 160 | + * |
| 161 | + * Note that this does not close the connection! |
| 162 | + * |
| 163 | + * @param callback The callback to perform with redis. |
| 164 | + * @param s Connection socket |
| 165 | + * @throws IOException Propagated |
| 166 | + */ |
386 | 167 | public static void run(FailableConsumer<Redis, IOException> callback, Socket s) throws IOException { |
387 | 168 | callback.accept(new Redis(s)); |
388 | 169 | } |
| 170 | + |
| 171 | + /** |
| 172 | + * Autocloseable implementation of Redis. |
| 173 | + */ |
| 174 | + public abstract static class Managed extends Redis implements AutoCloseable { |
| 175 | + Managed(Socket s) throws IOException { |
| 176 | + super(s); |
| 177 | + } |
| 178 | + |
| 179 | + abstract public void close() throws IOException; |
| 180 | + } |
| 181 | + |
| 182 | + /** |
| 183 | + * Create a "managed" connection, i.e. one that is cleanly closed (with a QUIT call), implemented as |
| 184 | + * an Autoclosable. |
| 185 | + * |
| 186 | + * @param host Redis host |
| 187 | + * @param port Redis port |
| 188 | + * @return The Autoclosable implementation |
| 189 | + * @throws IOException Propagated |
| 190 | + */ |
| 191 | + public static Managed connect(String host, int port) throws IOException { |
| 192 | + Socket s = new Socket(host, port); |
| 193 | + return new Managed(s) { |
| 194 | + @Override |
| 195 | + public void close() throws IOException { |
| 196 | + call("QUIT"); |
| 197 | + s.close(); |
| 198 | + } |
| 199 | + }; |
| 200 | + } |
389 | 201 | } |
0 commit comments