2424#include " flex/engines/graph_db/database/graph_db.h"
2525#include " flex/engines/http_server/options.h"
2626
27+ #ifdef BUILD_WITH_OSS
28+ #include < boost/process.hpp>
29+ #include " flex/utils/remote/oss_storage.h"
30+ #endif
31+
2732namespace bpo = boost::program_options;
2833
2934static std::string work_dir;
@@ -44,6 +49,73 @@ void signal_handler(int signal) {
4449 }
4550}
4651
52+ #ifdef BUILD_WITH_OSS
53+
54+ void check_oss_object_not_exist (std::string& data_path,
55+ std::string& object_path,
56+ gs::OSSConf& oss_conf) {
57+ auto pos = data_path.find (" /" , 6 );
58+ if (pos == std::string::npos) {
59+ LOG (FATAL) << " Invalid data path: " << data_path;
60+ }
61+ oss_conf.bucket_name_ = data_path.substr (6 , pos - 6 );
62+ object_path = data_path.substr (pos + 1 );
63+ oss_conf.load_conf_from_env ();
64+ // check whether the object exists
65+ auto oss_reader = std::make_shared<gs::OSSRemoteStorageDownloader>(oss_conf);
66+ if (!oss_reader || !oss_reader->Open ().ok ()) {
67+ LOG (FATAL) << " Failed to open oss reader" ;
68+ }
69+ std::vector<std::string> path_list;
70+ auto status = oss_reader->List (object_path, path_list);
71+ if (status.ok () && path_list.size () > 0 ) {
72+ LOG (FATAL) << " Object already exists: " << object_path
73+ << " , list size: " << path_list.size ()
74+ << " , please remove the object and try again." ;
75+ }
76+ // use a random directory
77+ data_path = " /tmp/" + std::to_string (time (nullptr ));
78+ }
79+
80+ int32_t upload_data_dir_to_oss (const std::filesystem::path& data_dir_path,
81+ const std::string& object_path,
82+ const gs::OSSConf& oss_conf) {
83+ // zip the data directory
84+ std::string zip_file = data_dir_path.string () + " .zip" ;
85+ std::string zip_cmd = " zip -r " + zip_file + " " + data_dir_path.string ();
86+ boost::process::child zip_process (zip_cmd);
87+ zip_process.wait ();
88+
89+ int res = zip_process.exit_code ();
90+ if (res != 0 ) {
91+ LOG (ERROR) << " Failed to zip data directory: " << zip_cmd
92+ << " , code: " << res;
93+ return -1 ;
94+ }
95+
96+ auto oss_writer = std::make_shared<gs::OSSRemoteStorageUploader>(oss_conf);
97+ if (!oss_writer || !oss_writer->Open ().ok ()) {
98+ LOG (ERROR) << " Failed to open oss writer" ;
99+ return -1 ;
100+ }
101+ auto status = oss_writer->Put (zip_file, object_path, false );
102+ if (!status.ok ()) {
103+ LOG (ERROR) << " Failed to upload data to oss: " << status.ToString ();
104+ return -1 ;
105+ }
106+ status = oss_writer->Close ();
107+ if (!status.ok ()) {
108+ LOG (ERROR) << " Failed to close oss writer: " << status.ToString ();
109+ return -1 ;
110+ }
111+ LOG (INFO) << " Successfully uploaded data to oss: " << object_path
112+ << " , it is in zip format" ;
113+ std::filesystem::remove (zip_file);
114+ std::filesystem::remove_all (data_dir_path);
115+ return 0 ;
116+ }
117+ #endif
118+
47119int main (int argc, char ** argv) {
48120 bpo::options_description desc (" Usage:" );
49121 /* *
@@ -90,6 +162,17 @@ int main(int argc, char** argv) {
90162 }
91163
92164 std::string data_path = " " ;
165+ /* *
166+ * If the data path is an oss path, the data will be uploaded to oss after
167+ * loading to a temporary directory. To improve the performance of the
168+ * performance, bulk_loader will zip the data directory before uploading.
169+ * The data path should be in the format of oss://bucket_name/object_path
170+ */
171+ #ifdef BUILD_WITH_OSS
172+ bool upload_to_oss = false ;
173+ std::string object_path = " " ;
174+ auto oss_conf = gs::OSSConf ();
175+ #endif
93176 std::string bulk_load_config_path = " " ;
94177 std::string graph_schema_path = " " ;
95178
@@ -141,6 +224,16 @@ int main(int argc, char** argv) {
141224 vm[" use-mmap-vector" ].as <bool >());
142225 }
143226
227+ if (data_path.find (" oss://" ) == 0 ) {
228+ #ifdef BUILD_WITH_OSS
229+ upload_to_oss = true ;
230+ check_oss_object_not_exist (data_path, object_path, oss_conf);
231+ #else
232+ LOG (ERROR) << " OSS is not supported in this build" ;
233+ return -1 ;
234+ #endif
235+ }
236+
144237 std::filesystem::path data_dir_path (data_path);
145238 if (!std::filesystem::exists (data_dir_path)) {
146239 std::filesystem::create_directory (data_dir_path);
@@ -185,5 +278,11 @@ int main(int argc, char** argv) {
185278 t += grape::GetCurrentTime ();
186279 LOG (INFO) << " Finished bulk loading in " << t << " seconds." ;
187280
281+ #ifdef BUILD_WITH_OSS
282+ if (upload_to_oss) {
283+ return upload_data_dir_to_oss (data_dir_path, object_path, oss_conf);
284+ }
285+ #endif
286+
188287 return 0 ;
189288}
0 commit comments