Skip to content

Commit 89efa9d

Browse files
fix(interactive): Support Add Vertex and Edge Operations in Interactive (#4591)
<!-- Thanks for your contribution! please review https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before opening an issue. --> ## What do these changes do? Support the following Cypher queries to add vertex and edge data by the Interactive Python SDK: 1. Add Vertex ``` CREATE (charlie:Person {name: 'Charlie', id: 211, age: 32}) ``` 2. Add Edge ``` CREATE (charlie:Person {id: 211})-[:ACTED_IN]->(movie:Movie {name: 'Wall Street'}); ``` Continuous `Add` operations will be batched, and leverage the batch api provided by Python SDK to implement. <!-- Please give a short brief about these changes. --> ## Related issue number <!-- Are there any issues opened that will be resolved by merging this change? --> Fixes --------- Co-authored-by: xiaolei.zl <[email protected]>
1 parent 2312e3c commit 89efa9d

File tree

30 files changed

+1816
-73
lines changed

30 files changed

+1816
-73
lines changed

.github/workflows/interactive.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,13 @@ jobs:
341341
${GITHUB_WORKSPACE}/flex/interactive/examples/modern_graph/graph.yaml ${INTERACTIVE_ENGINE_HOME}/compiler/src/test/resources/statistics/modern_statistics.json \
342342
"MATCH(n) return count(n);" /tmp/physical_plan_gen_config.yaml
343343
344+
- name: Tests cypher write
345+
env:
346+
INTERACTIVE_WORKSPACE: /tmp/interactive_workspace
347+
run: |
348+
cd ${GITHUB_WORKSPACE}/flex/tests/hqps
349+
bash hqps_cypher_write_test.sh ${INTERACTIVE_WORKSPACE} modern_graph ./interactive_config_test.yaml
350+
344351
- name: Run End-to-End cypher adhoc ldbc query test
345352
env:
346353
GS_TEST_DIR: ${{ github.workspace }}/gstest

flex/engines/graph_db/database/graph_db_operations.cc

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,17 @@ namespace gs {
2929
bool check_primary_key_value_valid(
3030
const rapidjson::Value& vertex_json,
3131
const std::string& pk_filed_name = "primary_key_values") {
32-
if (!vertex_json.HasMember(pk_filed_name) ||
33-
!vertex_json[pk_filed_name].IsArray() ||
34-
vertex_json[pk_filed_name].Size() != 1 ||
35-
!vertex_json[pk_filed_name][0].HasMember("value")) {
36-
return false;
32+
LOG(INFO) << "check_primary_key_value_valid for " << pk_filed_name << " "
33+
<< gs::rapidjson_stringify(vertex_json);
34+
if (vertex_json.HasMember(pk_filed_name)) {
35+
if (vertex_json[pk_filed_name].IsArray()) {
36+
return vertex_json[pk_filed_name].Size() == 1 &&
37+
vertex_json[pk_filed_name][0].HasMember("value");
38+
}
39+
return true;
3740
}
38-
return true;
41+
LOG(INFO) << "check_primary_key_value_valid failed";
42+
return false;
3943
}
4044

4145
Result<std::string> GraphDBOperations::CreateVertex(
@@ -260,12 +264,17 @@ VertexData GraphDBOperations::inputVertex(const rapidjson::Value& vertex_json,
260264
GraphDBSession& session) {
261265
VertexData vertex;
262266
std::string label = jsonToString(vertex_json["label"]);
263-
if (!check_primary_key_value_valid(vertex_json)) {
264-
throw std::runtime_error("primary_key_values is invalid");
267+
if (!check_primary_key_value_valid(vertex_json, "primary_key_values") &&
268+
!check_primary_key_value_valid(vertex_json, "primary_key_value")) {
269+
throw std::runtime_error("primary_key_values/primary_key_value is invalid");
265270
}
266271

267-
vertex.pk_value =
268-
Any(jsonToString(vertex_json["primary_key_values"][0]["value"]));
272+
if (vertex_json.HasMember("primary_key_values")) {
273+
vertex.pk_value =
274+
Any(jsonToString(vertex_json["primary_key_values"][0]["value"]));
275+
} else {
276+
vertex.pk_value = Any(jsonToString(vertex_json["primary_key_value"]));
277+
}
269278
std::unordered_set<std::string> property_names;
270279
std::vector<std::string> property_names_arr;
271280
for (auto& property : vertex_json["properties"].GetArray()) {
@@ -294,16 +303,27 @@ EdgeData GraphDBOperations::inputEdge(const rapidjson::Value& edge_json,
294303
std::string src_label = jsonToString(edge_json["src_label"]);
295304
std::string dst_label = jsonToString(edge_json["dst_label"]);
296305
std::string edge_label = jsonToString(edge_json["edge_label"]);
297-
if (!check_primary_key_value_valid(edge_json, "src_primary_key_values")) {
298-
throw std::runtime_error("src_primary_key_values is invalid");
306+
if (!check_primary_key_value_valid(edge_json, "src_primary_key_values") &&
307+
!check_primary_key_value_valid(edge_json, "src_primary_key_value")) {
308+
throw std::runtime_error(
309+
"src_primary_key_values/src_primary_key_value is invalid");
299310
}
300-
if (!check_primary_key_value_valid(edge_json, "dst_primary_key_values")) {
311+
if (!check_primary_key_value_valid(edge_json, "dst_primary_key_values") &&
312+
!check_primary_key_value_valid(edge_json, "dst_primary_key_value")) {
301313
throw std::runtime_error("dst_primary_key_values is invalid");
302314
}
303-
edge.src_pk_value =
304-
Any(jsonToString(edge_json["src_primary_key_values"][0]["value"]));
305-
edge.dst_pk_value =
306-
Any(jsonToString(edge_json["dst_primary_key_values"][0]["value"]));
315+
if (edge_json.HasMember("src_primary_key_values")) {
316+
edge.src_pk_value =
317+
Any(jsonToString(edge_json["src_primary_key_values"][0]["value"]));
318+
} else {
319+
edge.src_pk_value = Any(jsonToString(edge_json["src_primary_key_value"]));
320+
}
321+
if (edge_json.HasMember("dst_primary_key_values")) {
322+
edge.dst_pk_value =
323+
Any(jsonToString(edge_json["dst_primary_key_values"][0]["value"]));
324+
} else {
325+
edge.dst_pk_value = Any(jsonToString(edge_json["dst_primary_key_value"]));
326+
}
307327
// Check that all parameters in the parameter
308328
if (edge_json["properties"].Size() > 1) {
309329
throw std::runtime_error(
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
#!/bin/bash
2+
# Copyright 2020 Alibaba Group Holding Limited.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
set -e
16+
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
17+
FLEX_HOME=${SCRIPT_DIR}/../../
18+
BULK_LOADER=${FLEX_HOME}/build/bin/bulk_loader
19+
SERVER_BIN=${FLEX_HOME}/build/bin/interactive_server
20+
GIE_HOME=${FLEX_HOME}/../interactive_engine/
21+
22+
#
23+
if [ $# -ne 3 ]; then
24+
echo "Receives: $# args, need 3 args"
25+
echo "Usage: $0 <INTERACTIVE_WORKSPACE> <GRAPH_NAME> <ENGINE_CONFIG>"
26+
exit 1
27+
fi
28+
29+
INTERACTIVE_WORKSPACE=$1
30+
GRAPH_NAME=$2
31+
ENGINE_CONFIG_PATH=$3
32+
# if ENGINE_CONFIG_PATH is not absolute path, convert it to absolute path
33+
if [[ ! ${ENGINE_CONFIG_PATH} == /* ]]; then
34+
ENGINE_CONFIG_PATH=$(cd $(dirname ${ENGINE_CONFIG_PATH}) && pwd)/$(basename ${ENGINE_CONFIG_PATH})
35+
fi
36+
37+
38+
if [ ! -d ${INTERACTIVE_WORKSPACE} ]; then
39+
echo "INTERACTIVE_WORKSPACE: ${INTERACTIVE_WORKSPACE} not exists"
40+
exit 1
41+
fi
42+
43+
if [ ! -d ${INTERACTIVE_WORKSPACE}/data/${GRAPH_NAME} ]; then
44+
echo "GRAPH: ${GRAPH_NAME} not exists"
45+
exit 1
46+
fi
47+
if [ ! -f ${INTERACTIVE_WORKSPACE}/data/${GRAPH_NAME}/graph.yaml ]; then
48+
echo "GRAPH_SCHEMA_FILE: ${BULK_LOAD_FILE} not exists"
49+
exit 1
50+
fi
51+
if [ ! -f ${ENGINE_CONFIG_PATH} ]; then
52+
echo "ENGINE_CONFIG: ${ENGINE_CONFIG_PATH} not exists"
53+
exit 1
54+
fi
55+
56+
GRAPH_SCHEMA_YAML=${INTERACTIVE_WORKSPACE}/data/${GRAPH_NAME}/graph.yaml
57+
GRAPH_CSR_DATA_DIR=${INTERACTIVE_WORKSPACE}/data/${GRAPH_NAME}/indices
58+
rm -rf ${GRAPH_CSR_DATA_DIR}/wal
59+
60+
RED='\033[0;31m'
61+
GREEN='\033[0;32m'
62+
NC='\033[0m' # No Color
63+
err() {
64+
echo -e "${RED}[$(date +'%Y-%m-%d %H:%M:%S')] -ERROR- $* ${NC}" >&2
65+
}
66+
67+
info() {
68+
echo -e "${GREEN}[$(date +'%Y-%m-%d %H:%M:%S')] -INFO- $* ${NC}"
69+
}
70+
71+
72+
kill_service(){
73+
info "Kill Service first"
74+
ps -ef | grep "com.alibaba.graphscope.GraphServer" | awk '{print $2}' | xargs kill -9 || true
75+
ps -ef | grep "interactive_server" | awk '{print $2}' | xargs kill -9 || true
76+
sleep 3
77+
sed -i "s/default_graph: .*/default_graph: modern_graph/g" ${ENGINE_CONFIG_PATH}
78+
rm -rf ${INTERACTIVE_WORKSPACE}/METADATA || (err "rm METADATA failed")
79+
rm ${INTERACTIVE_WORKSPACE}/data/1 || (err "rm builtin graph failed")
80+
info "Kill Service success"
81+
rm -rf /tmp/neo4j-* || true
82+
# clean the wal
83+
rm -rf ${GRAPH_CSR_DATA_DIR}/wal || true
84+
rm -rf ${GRAPH_CSR_DATA_DIR}/runtime || true
85+
}
86+
87+
# kill service when exit
88+
trap kill_service EXIT
89+
90+
91+
# start engine service and load ldbc graph
92+
start_engine_service(){
93+
# suppose graph has been loaded, check ${GRAPH_CSR_DATA_DIR} exists
94+
rm -rf ${INTERACTIVE_WORKSPACE}/metadata/
95+
rm ${INTERACTIVE_WORKSPACE}/data/1 || true
96+
if [ ! -d ${GRAPH_CSR_DATA_DIR} ]; then
97+
err "GRAPH_CSR_DATA_DIR not found"
98+
exit 1
99+
fi
100+
101+
#check SERVER_BIN exists
102+
if [ ! -f ${SERVER_BIN} ]; then
103+
err "SERVER_BIN not found"
104+
exit 1
105+
fi
106+
sed -i "s/default_graph: .*/default_graph: ${GRAPH_NAME}/g" ${ENGINE_CONFIG_PATH}
107+
108+
cmd="${SERVER_BIN} -c ${ENGINE_CONFIG_PATH} "
109+
cmd="${cmd} -w ${INTERACTIVE_WORKSPACE} --enable-admin-service true --start-compiler true"
110+
111+
info "Start engine service with command: ${cmd}"
112+
${cmd} &
113+
sleep 5
114+
#check interactive_server is running, if not, exit
115+
ps -ef | grep "interactive_server" | grep -v grep
116+
117+
info "Start engine service success"
118+
}
119+
120+
121+
start_compiler_service(){
122+
echo "try to start compiler service"
123+
pushd ${GIE_HOME}/compiler
124+
cmd="make run graph.schema=${GRAPH_SCHEMA_YAML} config.path=${ENGINE_CONFIG_PATH}"
125+
echo "Start compiler service with command: ${cmd}"
126+
${cmd} &
127+
sleep 5
128+
# check if Graph Server is running, if not exist
129+
ps -ef | grep "com.alibaba.graphscope.GraphServer" | grep -v grep
130+
info "Start compiler service success"
131+
popd
132+
}
133+
134+
run_cypher_write_test(){
135+
echo "run cypher write test"
136+
pushd ${GIE_HOME}/compiler
137+
cmd="mvn test -Dskip.ir.core=true -Dtest=com.alibaba.graphscope.cypher.integration.modern.ModernGraphWriteTest"
138+
echo "Run cypher write test with command: ${cmd}"
139+
${cmd}
140+
info "Run cypher write test success"
141+
popd
142+
}
143+
144+
145+
kill_service
146+
start_engine_service
147+
start_compiler_service
148+
run_cypher_write_test
149+
150+
151+
152+
kill_service

interactive_engine/compiler/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,7 @@
362362
<exclude>**/IrPatternTest.java</exclude>
363363
<exclude>**/MovieTest.java</exclude>
364364
<exclude>**/GraphAlgoTest.java</exclude>
365+
<exclude>**/ModernGraphWriteTest.java</exclude>
365366
</excludes>
366367
</configuration>
367368
</plugin>

interactive_engine/compiler/src/main/antlr4/CypherGS.g4

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,14 @@ CALL : ( 'C' | 'c' ) ( 'A' | 'a' ) ( 'L' | 'l' ) ( 'L' | 'l' ) ;
5252
YIELD : ( 'Y' | 'y' ) ( 'I' | 'i' ) ( 'E' | 'e' ) ( 'L' | 'l' ) ( 'D' | 'd' ) ;
5353

5454
oC_RegularQuery
55-
: ( ( oC_ReadingClause ) SP? )+ ( SP oC_Return )
55+
: ( ( oC_ReadingClause | oC_UpdatingClause ) SP? )* SP? oC_ReadingClause ( SP oC_Return )
56+
| ( ( oC_ReadingClause | oC_UpdatingClause ) SP? )* SP? oC_UpdatingClause ( SP oC_Return )?
5657
;
5758

59+
oC_UpdatingClause
60+
: oC_Create
61+
;
62+
5863
oC_ReadingClause
5964
: oC_Match
6065
| oC_With
@@ -63,6 +68,11 @@ oC_ReadingClause
6368
| oC_UnionCallSubQuery
6469
;
6570

71+
CREATE : ( 'C' | 'c' ) ( 'R' | 'r' ) ( 'E' | 'e' ) ( 'A' | 'a' ) ( 'T' | 't' ) ( 'E' | 'e' ) ;
72+
73+
oC_Create
74+
: CREATE SP? oC_Pattern ;
75+
6676
oC_SubQuery
6777
: ( ( oC_Match | oC_With | oC_Unwind ) SP? )* ( SP? oC_Return ) ;
6878

interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/HttpExecutionClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ public HttpExecutionClient(Configs graphConfig, ChannelFetcher<URI> channelFetch
5353
HiactorConfig.INTERACTIVE_QUERY_ENDPOINT.get(graphConfig));
5454
}
5555

56+
public Session getSession() {
57+
return session;
58+
}
59+
5660
@Override
5761
public void submit(
5862
ExecutionRequest request,

interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/type/ExecutionResponseListener.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@
1616

1717
package com.alibaba.graphscope.common.client.type;
1818

19-
import com.alibaba.graphscope.gaia.proto.IrResult;
20-
2119
/**
2220
* listener to handle response from remote engine service
2321
*/
24-
public interface ExecutionResponseListener {
25-
void onNext(IrResult.Record record);
22+
public interface ExecutionResponseListener<T> {
23+
void onNext(T record);
2624

2725
void onCompleted();
2826

0 commit comments

Comments
 (0)