Skip to content

Commit 33e27ee

Browse files
authored
feat(interactive): Support open graph from oss (#4587)
As titled.
1 parent 9050851 commit 33e27ee

File tree

4 files changed

+190
-36
lines changed

4 files changed

+190
-36
lines changed

flex/bin/interactive_server.cc

Lines changed: 85 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
#include "flex/otel/otel.h"
2424
#include "flex/storages/rt_mutable_graph/loading_config.h"
2525
#include "flex/utils/service_utils.h"
26+
#ifdef BUILD_WITH_OSS
27+
#include "flex/utils/remote/oss_storage.h"
28+
#endif
2629

2730
#include <yaml-cpp/yaml.h>
2831
#include <boost/program_options.hpp>
@@ -103,6 +106,68 @@ void config_log_level(int log_level, int verbose_level) {
103106
}
104107
}
105108
}
109+
110+
#ifdef BUILD_WITH_OSS
111+
112+
Status unzip(const std::string& zip_file, const std::string& dest_dir) {
113+
std::string cmd = "unzip -o " + zip_file + " -d " + dest_dir;
114+
boost::process::child process(cmd);
115+
process.wait();
116+
if (process.exit_code() != 0) {
117+
return Status(StatusCode::IO_ERROR,
118+
"Fail to unzip file: " + zip_file +
119+
", error code: " + std::to_string(process.exit_code()));
120+
}
121+
return Status::OK();
122+
}
123+
124+
std::string download_data_from_oss(const std::string& graph_name,
125+
const std::string& remote_data_path,
126+
const std::string& local_data_dir) {
127+
if (std::filesystem::exists(local_data_dir)) {
128+
LOG(INFO) << "Data directory exists";
129+
} else {
130+
LOG(INFO) << "Data directory not exists, create directory";
131+
std::filesystem::create_directories(local_data_dir);
132+
}
133+
gs::OSSConf conf;
134+
conf.load_conf_from_env();
135+
gs::OSSRemoteStorageDownloader downloader(conf);
136+
auto open_res = downloader.Open();
137+
if (!open_res.ok()) {
138+
LOG(FATAL) << "Fail to open oss client: " << open_res.error_message();
139+
}
140+
141+
auto data_dir_zip_path = local_data_dir + "/data.zip";
142+
// if data_path start with conf.bucket_name_, remove it
143+
std::string data_path_no_bucket = remote_data_path;
144+
if (data_path_no_bucket.find(conf.bucket_name_) == 0) {
145+
data_path_no_bucket =
146+
data_path_no_bucket.substr(conf.bucket_name_.size() + 1);
147+
}
148+
LOG(INFO) << "Download data from oss: " << data_path_no_bucket << " to "
149+
<< data_dir_zip_path;
150+
151+
auto download_res = downloader.Get(data_path_no_bucket, data_dir_zip_path);
152+
if (!download_res.ok()) {
153+
LOG(FATAL) << "Fail to download data from oss: "
154+
<< download_res.error_message();
155+
}
156+
if (std::filesystem::exists(data_dir_zip_path)) {
157+
LOG(INFO) << "Data zip file exists, start to unzip";
158+
auto unzip_res = gs::unzip(data_dir_zip_path, local_data_dir);
159+
if (!unzip_res.ok()) {
160+
LOG(FATAL) << "Fail to unzip data file: " << unzip_res.error_message();
161+
}
162+
// remove zip file
163+
std::filesystem::remove(data_dir_zip_path);
164+
} else {
165+
LOG(FATAL) << "Data zip file not exists: " << data_dir_zip_path;
166+
}
167+
return local_data_dir;
168+
}
169+
#endif
170+
106171
} // namespace gs
107172

108173
/**
@@ -214,18 +279,32 @@ int main(int argc, char** argv) {
214279
return -1;
215280
}
216281
graph_schema_path = vm["graph-config"].as<std::string>();
217-
if (!vm.count("data-path")) {
218-
LOG(ERROR) << "data-path is required";
219-
return -1;
220-
}
221-
data_path = vm["data-path"].as<std::string>();
222-
223282
auto schema_res = gs::Schema::LoadFromYaml(graph_schema_path);
224283
if (!schema_res.ok()) {
225284
LOG(FATAL) << "Fail to load graph schema from yaml file: "
226285
<< graph_schema_path;
227286
}
228287

288+
if (!vm.count("data-path")) {
289+
LOG(FATAL) << "data-path is required";
290+
}
291+
data_path = vm["data-path"].as<std::string>();
292+
293+
auto remote_path = schema_res.value().GetRemotePath();
294+
295+
// If data_path starts with oss, download the data from oss
296+
if (!remote_path.empty() && remote_path.find("oss://") == 0) {
297+
#ifdef BUILD_WITH_OSS
298+
auto down_dir = gs::download_data_from_oss(
299+
"default_graph", remote_path.substr(6), data_path);
300+
LOG(INFO) << "Download data from oss to local path: " << remote_path
301+
<< ", " << data_path;
302+
CHECK(down_dir == data_path);
303+
#else
304+
LOG(FATAL) << "OSS is not supported in this build";
305+
#endif
306+
}
307+
229308
// The schema is loaded just to get the plugin dir and plugin list
230309
if (service_config.enable_adhoc_handler) {
231310
gs::init_codegen_proxy(vm, engine_config_file, graph_schema_path);

flex/storages/rt_mutable_graph/schema.cc

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,8 @@ void Schema::Serialize(std::unique_ptr<grape::LocalIOAdaptor>& writer) const {
413413
arc << v_primary_keys_ << vproperties_ << vprop_names_ << vprop_storage_
414414
<< eproperties_ << eprop_names_ << ie_strategy_ << oe_strategy_
415415
<< ie_mutability_ << oe_mutability_ << sort_on_compactions_ << max_vnum_
416-
<< v_descriptions_ << e_descriptions_ << description_ << version_;
416+
<< v_descriptions_ << e_descriptions_ << description_ << version_
417+
<< remote_path_ << name_ << id_;
417418
CHECK(writer->WriteArchive(arc));
418419
}
419420

@@ -426,7 +427,8 @@ void Schema::Deserialize(std::unique_ptr<grape::LocalIOAdaptor>& reader) {
426427
arc >> v_primary_keys_ >> vproperties_ >> vprop_names_ >> vprop_storage_ >>
427428
eproperties_ >> eprop_names_ >> ie_strategy_ >> oe_strategy_ >>
428429
ie_mutability_ >> oe_mutability_ >> sort_on_compactions_ >> max_vnum_ >>
429-
v_descriptions_ >> e_descriptions_ >> description_ >> version_;
430+
v_descriptions_ >> e_descriptions_ >> description_ >> version_ >>
431+
remote_path_ >> name_ >> id_;
430432
has_multi_props_edge_ = false;
431433
for (auto& eprops : eproperties_) {
432434
if (eprops.second.size() > 1) {
@@ -1199,11 +1201,25 @@ static Status parse_schema_from_yaml_node(const YAML::Node& graph_node,
11991201
LOG(WARNING) << "store_type is not set properly, use default value: "
12001202
<< "mutable_csr";
12011203
}
1204+
if (graph_node["name"]) {
1205+
schema.SetGraphName(graph_node["name"].as<std::string>());
1206+
}
1207+
1208+
if (graph_node["id"]) {
1209+
VLOG(1) << "Got id: " << graph_node["id"].as<std::string>();
1210+
schema.SetGraphId(graph_node["id"].as<std::string>());
1211+
} else {
1212+
VLOG(1) << "id is not set";
1213+
}
12021214

12031215
if (graph_node["description"]) {
12041216
schema.SetDescription(graph_node["description"].as<std::string>());
12051217
}
12061218

1219+
if (graph_node["remote_path"]) {
1220+
schema.SetRemotePath(graph_node["remote_path"].as<std::string>());
1221+
}
1222+
12071223
// check whether a version field is specified for the schema, if
12081224
// specified, we will use it to check the compatibility of the schema.
12091225
// If not specified, we will use the default version.
@@ -1387,6 +1403,10 @@ void Schema::SetDescription(const std::string& description) {
13871403
description_ = description;
13881404
}
13891405

1406+
void Schema::SetRemotePath(const std::string& remote_path) {
1407+
remote_path_ = remote_path;
1408+
}
1409+
13901410
void Schema::SetVersion(const std::string& version) { version_ = version; }
13911411
std::string Schema::GetVersion() const { return version_; }
13921412

flex/storages/rt_mutable_graph/schema.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,10 +263,22 @@ class Schema {
263263

264264
std::string GetPluginDir() const;
265265

266+
inline void SetGraphName(const std::string& name) { name_ = name; }
267+
268+
inline void SetGraphId(const std::string& id) { id_ = id; }
269+
270+
inline std::string GetGraphName() const { return name_; }
271+
272+
inline std::string GetGraphId() const { return id_; }
273+
266274
std::string GetDescription() const;
267275

268276
void SetDescription(const std::string& description);
269277

278+
void SetRemotePath(const std::string& remote_path);
279+
280+
inline std::string GetRemotePath() const { return remote_path_; }
281+
270282
void SetVersion(const std::string& version);
271283

272284
std::string GetVersion() const;
@@ -283,6 +295,7 @@ class Schema {
283295

284296
uint32_t generate_edge_label(label_t src, label_t dst, label_t edge) const;
285297

298+
std::string name_, id_;
286299
IdIndexer<std::string, label_t> vlabel_indexer_;
287300
IdIndexer<std::string, label_t> elabel_indexer_;
288301
std::vector<std::vector<PropertyType>> vproperties_;
@@ -310,6 +323,7 @@ class Schema {
310323
std::string description_;
311324
std::string version_;
312325
std::string compiler_path_;
326+
std::string remote_path_; // The path to the data on the remote storage
313327
bool has_multi_props_edge_;
314328
};
315329

k8s/dockerfiles/interactive-entrypoint.sh

Lines changed: 69 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -41,55 +41,83 @@ EOF
4141

4242
function prepare_workspace() {
4343
#receive args
44+
if [ $# -lt 1 ] || [ $# -gt 2 ]; then
45+
echo "Usage: prepare_workspace <workspace> [graph_yaml]"
46+
echo " workspace: the path to the workspace"
47+
echo " graph_yaml: the path to the graph yaml file. If not provided, will use the default graph"
48+
echo " if specified, we assume the remote_path is set in the graph yaml"
49+
exit 1
50+
fi
4451
local workspace=$1
45-
if [ -z "${workspace}" ]; then
46-
workspace="/tmp/interactive_workspace"
52+
local graph_yaml=""
53+
if [ $# -eq 2 ]; then
54+
graph_yaml=$2
4755
fi
56+
echo "workspace: ${workspace}, graph_yaml: ${graph_yaml}"
4857
#if workspace is not exist, create it
49-
if [ ! -d "${workspace}" ]; then
50-
mkdir -p ${workspace}
51-
mkdir -p ${workspace}/conf/
52-
else
53-
if [ -f "${workspace}/conf/interactive_config.yaml" ]; then
54-
echo "Workspace ${workspace} already exists"
55-
return 0
56-
fi
58+
mkdir -p ${workspace}/conf/
59+
mkdir -p ${workspace}/METADATA
60+
mkdir -p ${workspace}/log
61+
if [ -f "${workspace}/conf/interactive_config.yaml" ]; then
62+
echo "Workspace ${workspace} already exists"
63+
return 0
5764
fi
5865
# prepare interactive_config.yaml
5966
engine_config_path="${workspace}/conf/interactive_config.yaml"
6067
cp ${DEFAULT_INTERACTIVE_CONFIG_FILE} $engine_config_path
6168
#make sure the line which start with default_graph is changed to default_graph: ${DEFAULT_GRAPH_NAME}
6269
sed -i "s/default_graph:.*/default_graph: ${DEFAULT_GRAPH_NAME}/" $engine_config_path
63-
echo "Using default graph: ${DEFAULT_GRAPH_NAME} to start the service"
64-
65-
# copy the builtin graph
6670
builtin_graph_dir="${workspace}/data/${DEFAULT_GRAPH_NAME}"
6771
mkdir -p $builtin_graph_dir
68-
builtin_graph_import_path="${builtin_graph_dir}/import.yaml"
6972
builtin_graph_schema_path="${builtin_graph_dir}/graph.yaml"
7073
builtin_graph_data_path="${builtin_graph_dir}/indices"
71-
cp /opt/flex/share/${DEFAULT_GRAPH_NAME}/graph.yaml ${builtin_graph_schema_path}
72-
cp /opt/flex/share/${DEFAULT_GRAPH_NAME}/bulk_load.yaml ${builtin_graph_import_path}
73-
export FLEX_DATA_DIR=/opt/flex/share/gs_interactive_default_graph/
74-
builtin_graph_loader_cmd="${BULK_LOADER_BINARY_PATH} -g ${builtin_graph_schema_path} -d ${builtin_graph_data_path} -l ${builtin_graph_import_path}"
75-
echo "Loading builtin graph: ${DEFAULT_GRAPH_NAME} with command: $builtin_graph_loader_cmd"
76-
eval $builtin_graph_loader_cmd || (echo "Failed to load builtin graph: ${DEFAULT_GRAPH_NAME}" && exit 1)
77-
echo "Successfully loaded builtin graph: ${DEFAULT_GRAPH_NAME}"
74+
if [ -z "${graph_yaml}" ]; then
75+
# construct the builtin graph.
76+
builtin_graph_import_path="${builtin_graph_dir}/import.yaml"
77+
cp /opt/flex/share/${DEFAULT_GRAPH_NAME}/graph.yaml ${builtin_graph_schema_path}
78+
cp /opt/flex/share/${DEFAULT_GRAPH_NAME}/bulk_load.yaml ${builtin_graph_import_path}
79+
export FLEX_DATA_DIR=/opt/flex/share/gs_interactive_default_graph/
80+
builtin_graph_loader_cmd="${BULK_LOADER_BINARY_PATH} -g ${builtin_graph_schema_path} -d ${builtin_graph_data_path} -l ${builtin_graph_import_path}"
81+
echo "Loading builtin graph: ${DEFAULT_GRAPH_NAME} with command: $builtin_graph_loader_cmd"
82+
eval $builtin_graph_loader_cmd || (echo "Failed to load builtin graph: ${DEFAULT_GRAPH_NAME}" && exit 1)
83+
echo "Successfully loaded builtin graph: ${DEFAULT_GRAPH_NAME}"
84+
else
85+
if [ ! -f "${graph_yaml}" ]; then
86+
echo "Graph yaml file ${graph_yaml} does not exist"
87+
exit 1
88+
fi
89+
# check if remote_path is set in the graph yaml
90+
remote_path=$(grep "remote_path" ${graph_yaml} | cut -d':' -f2 | tr -d ' ')
91+
if [ -z "${remote_path}" ]; then
92+
echo "remote_path is not set in the graph yaml"
93+
exit 1
94+
fi
95+
cp ${graph_yaml} ${builtin_graph_schema_path}
96+
# The indices will be downloaded to ${builtin_graph_data_path} when starting the service
97+
fi
7898
}
7999

80100
function launch_service() {
81101
#expect 1 arg
82-
if [ $# -ne 1 ]; then
83-
echo "Usage: launch_service <workspace>"
102+
if [ $# -lt 1 ] || [ $# -gt 2 ]; then
103+
echo "Usage: launch_service <workspace> [graph_yaml]"
84104
exit 1
85105
fi
86106
local workspace=$1
107+
if [ $# -eq 2 ]; then
108+
local graph_yaml=$2
109+
fi
87110
engine_config_path="${workspace}/conf/interactive_config.yaml"
88111
# start the service
89112
start_cmd="${INTERACTIVE_SERVER_BIN} -c ${engine_config_path}"
90113
start_cmd="${start_cmd} -w ${workspace}"
91-
start_cmd="${start_cmd} --enable-admin-service true"
92114
start_cmd="${start_cmd} --start-compiler true"
115+
if [ -n "${graph_yaml}" ]; then
116+
start_cmd="${start_cmd} -g ${graph_yaml}"
117+
start_cmd="${start_cmd} --data-path ${workspace}/data/${DEFAULT_GRAPH_NAME}/indices"
118+
else
119+
start_cmd="${start_cmd} --enable-admin-service true"
120+
fi
93121
echo "Starting the service with command: $start_cmd"
94122
if $ENABLE_COORDINATOR; then start_cmd="${start_cmd} &"; fi
95123
eval $start_cmd
@@ -153,6 +181,7 @@ EOF
153181

154182
ENABLE_COORDINATOR=false
155183
WORKSPACE=/tmp/interactive_workspace
184+
CUSTOM_GRAPH_YAML=""
156185
while [[ $# -gt 0 ]]; do
157186
case $1 in
158187
-w | --workspace)
@@ -164,6 +193,15 @@ while [[ $# -gt 0 ]]; do
164193
WORKSPACE=$1
165194
shift
166195
;;
196+
-g | --graph)
197+
shift
198+
if [[ $# -eq 0 || $1 == -* ]]; then
199+
echo "Option -g requires an argument." >&2
200+
exit 1
201+
fi
202+
CUSTOM_GRAPH_YAML=$1
203+
shift
204+
;;
167205
-c | --enable-coordinator)
168206
ENABLE_COORDINATOR=true
169207
shift
@@ -189,8 +227,11 @@ while [[ $# -gt 0 ]]; do
189227
esac
190228
done
191229

230+
# When a custom graph yaml is specified, we only need to start the query
231+
# service, coordinator and admin service should not be started
232+
prepare_workspace $WORKSPACE ${CUSTOM_GRAPH_YAML}
233+
launch_service $WORKSPACE ${CUSTOM_GRAPH_YAML}
234+
if [ -z "${CUSTOM_GRAPH_YAML}" ]; then
235+
launch_coordinator $PORT_MAPPING
236+
fi
192237

193-
prepare_workspace $WORKSPACE
194-
launch_service $WORKSPACE
195-
# Note that the COORDINATOR_CONFIG_FILE should be inside the container
196-
launch_coordinator $PORT_MAPPING

0 commit comments

Comments
 (0)