Skip to content

Commit 8abd636

Browse files
authored
fix(interactive): make the server exit cleanly (#4566)
Fixes
1 parent 915e5f1 commit 8abd636

File tree

9 files changed

+69
-25
lines changed

9 files changed

+69
-25
lines changed

flex/bin/interactive_server.cc

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,6 @@ std::string parse_codegen_dir(const bpo::variables_map& vm) {
5454
return codegen_dir;
5555
}
5656

57-
void blockSignal(int sig) {
58-
sigset_t set;
59-
sigemptyset(&set);
60-
sigaddset(&set, sig);
61-
if (pthread_sigmask(SIG_BLOCK, &set, NULL) != 0) {
62-
perror("pthread_sigmask");
63-
}
64-
}
65-
6657
// When graph_schema is not specified, codegen proxy will use the running graph
6758
// schema in graph_db_service
6859
void init_codegen_proxy(const bpo::variables_map& vm,

flex/bin/rt_server.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@
1313
* limitations under the License.
1414
*/
1515

16-
#include "grape/util.h"
17-
1816
#include "flex/engines/graph_db/database/graph_db.h"
1917
#include "flex/engines/http_server/graph_db_service.h"
2018
#include "flex/engines/http_server/options.h"
19+
#include "flex/utils/service_utils.h"
20+
#include "grape/util.h"
2121

2222
#include <boost/program_options.hpp>
2323
#include <seastar/core/alien.hh>
@@ -77,6 +77,9 @@ int main(int argc, char** argv) {
7777
setenv("TZ", "Asia/Shanghai", 1);
7878
tzset();
7979

80+
gs::blockSignal(SIGINT);
81+
gs::blockSignal(SIGTERM);
82+
8083
double t0 = -grape::GetCurrentTime();
8184
auto& db = gs::GraphDB::get();
8285
std::string graph_schema_path = data_path + "/graph.yaml";
@@ -107,6 +110,7 @@ int main(int argc, char** argv) {
107110
service_config.start_compiler = false;
108111
service_config.set_sharding_mode(vm["sharding-mode"].as<std::string>());
109112
server::GraphDBService::get().init(service_config);
113+
110114
server::GraphDBService::get().run_and_wait_for_exit();
111115

112116
return 0;

flex/engines/graph_db/app/cypher_app_utils.cc

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,16 @@ bool generate_plan(const std::string& query, const std::string& statistics,
110110

111111
// call compiler to generate plan
112112
{
113+
int pipefd[2];
114+
if (pipe(pipefd) == -1) {
115+
LOG(ERROR) << "pipe failed!" << strerror(errno);
116+
exit(EXIT_FAILURE);
117+
}
118+
113119
pid_t pid = fork();
114120

115121
if (pid == -1) {
116-
std::cerr << "Fork failed!" << std::endl;
122+
LOG(ERROR) << "fork failed!" << strerror(errno);
117123
return false;
118124
} else if (pid == 0) {
119125
const char* const args[] = {
@@ -127,22 +133,45 @@ bool generate_plan(const std::string& query, const std::string& statistics,
127133
"temp.cypher.yaml",
128134
nullptr // execvp expects a null-terminated array
129135
};
136+
137+
close(pipefd[0]);
138+
139+
if (dup2(pipefd[1], STDERR_FILENO) == -1) {
140+
LOG(ERROR) << "dup2 failed!" << strerror(errno);
141+
exit(EXIT_FAILURE);
142+
}
143+
144+
close(pipefd[1]);
145+
130146
execvp(args[0], const_cast<char* const*>(args));
131147

132148
std::cerr << "Exec failed!" << std::endl;
133149
return false;
134150
} else {
151+
close(pipefd[1]);
152+
153+
ssize_t count;
154+
constexpr size_t BUFFSIZ = 4096;
155+
char buffer[BUFFSIZ];
156+
std::string error_message;
157+
while ((count = read(pipefd[0], buffer, sizeof(buffer) - 1)) > 0) {
158+
buffer[count] = '\0';
159+
error_message += buffer;
160+
}
161+
135162
int status;
136163
waitpid(pid, &status, 0);
137164
if (WIFEXITED(status)) {
138165
VLOG(1) << "Child exited with status " << WEXITSTATUS(status)
139166
<< std::endl;
140167
}
168+
close(pipefd[0]);
141169

142170
{
143171
std::ifstream file(output_file, std::ios::binary);
144172

145173
if (!file.is_open()) {
174+
LOG(ERROR) << "Compiler message: " << error_message;
146175
return false;
147176
}
148177

@@ -157,6 +186,7 @@ bool generate_plan(const std::string& query, const std::string& statistics,
157186

158187
file.close();
159188
if (!plan.ParseFromString(std::string(buffer))) {
189+
LOG(ERROR) << "Compiler message: " << error_message;
160190
return false;
161191
}
162192
}

flex/engines/graph_db/database/graph_db.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ GraphDB::~GraphDB() {
5959
compact_thread_.join();
6060
}
6161
if (contexts_ != nullptr) {
62-
showAppMetrics();
62+
// showAppMetrics();
6363
for (int i = 0; i < thread_num_; ++i) {
6464
contexts_[i].~SessionLocalContext();
6565
}
@@ -478,16 +478,16 @@ void GraphDB::openWalAndCreateContexts(const GraphDBConfig& config,
478478
data_dir);
479479
}
480480
VLOG(1) << "Using wal uri: " << wal_uri;
481+
482+
auto wal_parser = WalParserFactory::CreateWalParser(wal_uri);
483+
ingestWals(*wal_parser, data_dir, thread_num_);
481484
for (int i = 0; i < thread_num_; ++i) {
482485
new (&contexts_[i])
483486
SessionLocalContext(*this, data_dir, i, allocator_strategy,
484487
WalWriterFactory::CreateWalWriter(wal_uri));
485488
contexts_[i].logger->open(wal_uri, i);
486489
}
487490

488-
auto wal_parser = WalParserFactory::CreateWalParser(wal_uri);
489-
ingestWals(*wal_parser, data_dir, thread_num_);
490-
491491
initApps(graph_.schema().GetPlugins());
492492
VLOG(1) << "Successfully restore load plugins";
493493
}

flex/engines/graph_db/database/wal/wal.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ std::string get_wal_uri_scheme(const std::string& uri) {
2929
scheme = uri.substr(0, pos);
3030
}
3131
if (scheme.empty()) {
32-
LOG(INFO) << "No scheme found in wal uri: " << uri
33-
<< ", using default scheme: file";
32+
VLOG(1) << "No scheme found in wal uri: " << uri
33+
<< ", using default scheme: file";
3434
scheme = "file";
3535
}
3636
return scheme;

flex/engines/graph_db/runtime/utils/cypher_runner_impl.cc

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,21 @@ bool CypherRunnerImpl::gen_plan(const GraphDB& db, const std::string& query,
3939
}
4040

4141
physical::PhysicalPlan plan;
42-
if (!generate_plan(query, statistics, compiler_path, compiler_yaml, tmp_dir,
43-
plan)) {
44-
LOG(ERROR) << "Generate plan failed for query: " << query;
45-
return false;
42+
{
43+
// avoid multiple threads to generate plan for the same query
44+
std::unique_lock<std::mutex> lock(mutex_);
45+
if (plan_cache.get(query, plan_str)) {
46+
return true;
47+
}
48+
if (!generate_plan(query, statistics, compiler_path, compiler_yaml, tmp_dir,
49+
plan)) {
50+
LOG(ERROR) << "Generate plan failed for query: " << query;
51+
return false;
52+
}
53+
plan_str = plan.SerializeAsString();
54+
plan_cache.put(query, plan_str);
4655
}
47-
plan_str = plan.SerializeAsString();
48-
plan_cache.put(query, plan_str);
56+
4957
return true;
5058
}
5159

flex/engines/graph_db/runtime/utils/cypher_runner_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class CypherRunnerImpl {
7171
CypherRunnerImpl(const CypherRunnerImpl&) = delete;
7272
CypherRunnerImpl& operator=(const CypherRunnerImpl&) = delete;
7373
PlanCache plan_cache_;
74+
std::mutex mutex_;
7475
};
7576
} // namespace runtime
7677
} // namespace gs

flex/storages/rt_mutable_graph/mutable_property_fragment.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ void MutablePropertyFragment::Open(const std::string& work_dir,
197197
// We will reserve the at least 4096 slots for each vertex label
198198
size_t vertex_capacity =
199199
std::max(lf_indexers_[i].capacity(), (size_t) 4096);
200-
if (vertex_capacity >= lf_indexers_[i].capacity()) {
200+
if (vertex_capacity > lf_indexers_[i].capacity()) {
201201
lf_indexers_[i].reserve(vertex_capacity);
202202
}
203203
vertex_data_[i].resize(vertex_capacity);

flex/utils/service_utils.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,16 @@ namespace gs {
4646
static constexpr const char* CODEGEN_BIN = "load_plan_and_gen.sh";
4747

4848
/// Util functions.
49+
50+
inline void blockSignal(int sig) {
51+
sigset_t set;
52+
sigemptyset(&set);
53+
sigaddset(&set, sig);
54+
if (pthread_sigmask(SIG_BLOCK, &set, NULL) != 0) {
55+
perror("pthread_sigmask");
56+
}
57+
}
58+
4959
inline int64_t GetCurrentTimeStamp() {
5060
return std::chrono::duration_cast<std::chrono::milliseconds>(
5161
std::chrono::system_clock::now().time_since_epoch())

0 commit comments

Comments
 (0)