Skip to content

Commit f3a1a02

Browse files
authored
fix(interactive): Support Http Gremlin Service (#4394)
<!-- Thanks for your contribution! please review https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before opening an issue. --> ## What do these changes do? 1. Support sending Gremlin queries via HTTP requests. Clients can submit queries using curl as shown below: ``` curl -X POST \ -H "Connection: close" \ -H "Content-Type: application/json" \ -d '{"gremlin": "g.V().limit(2)"}' \ http://localhost:8182/gremlin ``` The response will be in JSON format like: ``` {"requestId":"bb66ad80-336d-4541-8bd3-de0a40451436","status":{"message":"","code":200,"attributes":{"@type":"g:Map","@value":[]}},"result":{"data":{"@type":"g:List","@value":[{"@type":"g:Vertex","@value":{"id":{"@type":"g:Int64","@value":1},"label":"person"}},{"@type":"g:Vertex","@value":{"id":{"@type":"g:Int64","@value":2},"label":"person"}}]},"meta":{"@type":"g:Map","@value":[]}}} ``` If error occurs, the error message with the error code will be returned in json format like: ``` {"requestId":"1882cbfd-a0b4-419d-924e-2ca3a63e209a","status":{"message":"ErrorCode: GREMLIN_INVALID_RESULT\nMessage: getKeyName fail code is TableNotExistError, msg is entity label_id 2 is not found\nEC: 03-0108\nQueryId: 9132153615669087179\n","code":500,"attributes":{"@type":"g:Map","@value":[]}},"result":{"data":null,"meta":{"@type":"g:Map","@value":[]}}} ``` 2. The HTTP API also supports streaming results using HTTP Chunked Transfer Encoding. You can use curl or other SDKs to check the streamed results. 3. If the Gremlin service requires authentication, the request must include the Authorization header, as shown below: ``` curl -X POST \ -H "Authorization: Basic YWRtaW46YWRtaW4=" \ -H "Connection: close" \ -H "Content-Type: application/json" \ -d '{"gremlin": "g.V().limit(5)"}' \ http://localhost:8182/gremlin ``` The expected format for the Authorization header is: ``` Authorization: Basic <Base64(user:password)> ``` In this example, `YWRtaW46YWRtaW4=` is the Base64 encoding of `admin:admin`. <!-- Please give a short brief about these changes. --> ## Related issue number <!-- Are there any issues opened that will be resolved by merging this change? --> Fixes
1 parent 89671f8 commit f3a1a02

File tree

5 files changed

+462
-6
lines changed

5 files changed

+462
-6
lines changed

interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/auth/IrAuthenticationHandler.java

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import io.netty.channel.Channel;
2929
import io.netty.channel.ChannelHandler;
3030
import io.netty.channel.ChannelHandlerContext;
31+
import io.netty.handler.codec.http.FullHttpMessage;
32+
import io.netty.handler.codec.http.HttpResponseStatus;
3133
import io.netty.util.Attribute;
3234
import io.netty.util.AttributeMap;
3335

@@ -42,6 +44,7 @@
4244
import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
4345
import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
4446
import org.apache.tinkerpop.gremlin.server.handler.AbstractAuthenticationHandler;
47+
import org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtils;
4548
import org.apache.tinkerpop.gremlin.server.handler.SaslAuthenticationHandler;
4649
import org.apache.tinkerpop.gremlin.server.handler.StateKey;
4750
import org.slf4j.Logger;
@@ -50,6 +53,7 @@
5053
import java.net.InetAddress;
5154
import java.net.InetSocketAddress;
5255
import java.net.SocketAddress;
56+
import java.nio.charset.Charset;
5357
import java.util.Base64;
5458
import java.util.HashMap;
5559
import java.util.Map;
@@ -205,9 +209,70 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw
205209
ctx.writeAndFlush(error);
206210
}
207211
}
212+
} else if (msg instanceof FullHttpMessage) { // add Authentication for HTTP requests
213+
FullHttpMessage request = (FullHttpMessage) msg;
214+
215+
if (!authenticator.requireAuthentication()) {
216+
ctx.fireChannelRead(request);
217+
return;
218+
}
219+
220+
String errorMsg =
221+
"Invalid HTTP Header for Authentication. Expected format: 'Authorization: Basic"
222+
+ " <Base64(user:password)>'";
223+
224+
if (!request.headers().contains("Authorization")) {
225+
sendError(ctx, errorMsg, request);
226+
return;
227+
}
228+
229+
String authorizationHeader = request.headers().get("Authorization");
230+
if (!authorizationHeader.startsWith("Basic ")) {
231+
sendError(ctx, errorMsg, request);
232+
return;
233+
}
234+
235+
String authorization;
236+
byte[] decodedUserPass;
237+
try {
238+
authorization = authorizationHeader.substring("Basic ".length());
239+
decodedUserPass = BASE64_DECODER.decode(authorization);
240+
} catch (Exception e) {
241+
sendError(ctx, errorMsg, request);
242+
return;
243+
}
244+
245+
authorization = new String(decodedUserPass, Charset.forName("UTF-8"));
246+
String[] split = authorization.split(":");
247+
if (split.length != 2) {
248+
sendError(
249+
ctx,
250+
"Invalid username or password after decoding the Base64 Authorization"
251+
+ " header.",
252+
request);
253+
return;
254+
}
255+
256+
Map<String, String> credentials = new HashMap();
257+
credentials.put("username", split[0]);
258+
credentials.put("password", split[1]);
259+
String address = ctx.channel().remoteAddress().toString();
260+
if (address.startsWith("/") && address.length() > 1) {
261+
address = address.substring(1);
262+
}
263+
264+
credentials.put("address", address);
265+
266+
try {
267+
AuthenticatedUser user = authenticator.authenticate(credentials);
268+
ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
269+
ctx.fireChannelRead(request);
270+
} catch (AuthenticationException e) {
271+
sendError(ctx, e.getMessage(), request);
272+
}
208273
} else {
209274
logger.warn(
210-
"{} only processes RequestMessage instances - received {} - channel closing",
275+
"{} received invalid request message {} - channel closing",
211276
this.getClass().getSimpleName(),
212277
msg.getClass());
213278
ctx.close();
@@ -226,4 +291,17 @@ private InetAddress getRemoteInetAddress(final ChannelHandlerContext ctx) {
226291

227292
return ((InetSocketAddress) genericSocketAddr).getAddress();
228293
}
294+
295+
private void sendError(
296+
final ChannelHandlerContext ctx, String errorMsg, FullHttpMessage request) {
297+
HttpHandlerUtils.sendError(ctx, HttpResponseStatus.UNAUTHORIZED, errorMsg, false);
298+
if (request.refCnt() > 0) {
299+
boolean fullyReleased = request.release();
300+
if (!fullyReleased) {
301+
logger.warn(
302+
"http request message was not fully released, may cause a"
303+
+ " memory leak");
304+
}
305+
}
306+
}
229307
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
*
3+
* * Copyright 2020 Alibaba Group Holding Limited.
4+
* *
5+
* * Licensed under the Apache License, Version 2.0 (the "License");
6+
* * you may not use this file except in compliance with the License.
7+
* * You may obtain a copy of the License at
8+
* *
9+
* * http://www.apache.org/licenses/LICENSE-2.0
10+
* *
11+
* * Unless required by applicable law or agreed to in writing, software
12+
* * distributed under the License is distributed on an "AS IS" BASIS,
13+
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* * See the License for the specific language governing permissions and
15+
* * limitations under the License.
16+
*
17+
*/
18+
19+
package com.alibaba.graphscope.gremlin.plugin.processor;
20+
21+
import io.netty.buffer.ByteBuf;
22+
import io.netty.buffer.Unpooled;
23+
import io.netty.channel.ChannelFuture;
24+
import io.netty.channel.ChannelFutureListener;
25+
import io.netty.channel.ChannelHandlerContext;
26+
import io.netty.handler.codec.http.DefaultFullHttpResponse;
27+
import io.netty.handler.codec.http.FullHttpResponse;
28+
import io.netty.handler.codec.http.HttpResponseStatus;
29+
import io.netty.handler.codec.http.HttpVersion;
30+
31+
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
32+
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
33+
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
34+
import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer;
35+
import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
36+
import org.apache.tinkerpop.gremlin.server.Context;
37+
import org.apache.tinkerpop.gremlin.server.GraphManager;
38+
import org.apache.tinkerpop.gremlin.server.Settings;
39+
import org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtils;
40+
import org.javatuples.Pair;
41+
42+
import java.nio.charset.StandardCharsets;
43+
import java.util.Objects;
44+
import java.util.concurrent.ScheduledExecutorService;
45+
import java.util.concurrent.atomic.AtomicReference;
46+
47+
/**
48+
* Maintain the gremlin execution context for http request.
49+
*/
50+
public class HttpContext extends Context {
51+
private final Pair<String, MessageTextSerializer<?>> serializer;
52+
private final boolean keepAlive;
53+
private final AtomicReference<Boolean> finalResponse;
54+
55+
public HttpContext(
56+
RequestMessage requestMessage,
57+
ChannelHandlerContext ctx,
58+
Settings settings,
59+
GraphManager graphManager,
60+
GremlinExecutor gremlinExecutor,
61+
ScheduledExecutorService scheduledExecutorService,
62+
Pair<String, MessageTextSerializer<?>> serializer,
63+
boolean keepAlive) {
64+
super(
65+
requestMessage,
66+
ctx,
67+
settings,
68+
graphManager,
69+
gremlinExecutor,
70+
scheduledExecutorService);
71+
this.serializer = Objects.requireNonNull(serializer);
72+
this.keepAlive = keepAlive;
73+
this.finalResponse = new AtomicReference<>(false);
74+
}
75+
76+
/**
77+
* serialize the response message to http response and write to http channel.
78+
* @param responseMessage
79+
*/
80+
@Override
81+
public void writeAndFlush(final ResponseMessage responseMessage) {
82+
try {
83+
if (finalResponse.compareAndSet(
84+
false, responseMessage.getStatus().getCode().isFinalResponse())) {
85+
ByteBuf byteBuf =
86+
Unpooled.wrappedBuffer(
87+
serializer
88+
.getValue1()
89+
.serializeResponseAsString(responseMessage)
90+
.getBytes(StandardCharsets.UTF_8));
91+
FullHttpResponse response =
92+
new DefaultFullHttpResponse(
93+
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
94+
ChannelFuture channelFuture =
95+
this.getChannelHandlerContext().writeAndFlush(response);
96+
ResponseStatusCode statusCode = responseMessage.getStatus().getCode();
97+
if (!keepAlive && statusCode.isFinalResponse()) {
98+
channelFuture.addListener(ChannelFutureListener.CLOSE);
99+
}
100+
}
101+
} catch (Exception e) {
102+
HttpHandlerUtils.sendError(
103+
this.getChannelHandlerContext(),
104+
HttpResponseStatus.INTERNAL_SERVER_ERROR,
105+
e.getMessage(),
106+
keepAlive);
107+
}
108+
}
109+
}

0 commit comments

Comments
 (0)