Skip to content

Commit 19b19a4

Browse files
authored
Merge pull request ceph#63523 from Matan-B/wip-matanb-crimson-asock-race-fixes
crimson/osd: Admin Socket fixes Reviewed-by: Yingxin Cheng <[email protected]>
2 parents 44c5cfb + 978e9c8 commit 19b19a4

File tree

5 files changed

+228
-204
lines changed

5 files changed

+228
-204
lines changed

src/crimson/admin/admin_socket.cc

Lines changed: 98 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,7 @@ using ceph::common::bad_cmd_get;
3030
using ceph::common::validate_cmd;
3131
using ceph::common::dump_cmd_and_help_to_json;
3232

33-
namespace {
34-
seastar::logger& logger()
35-
{
36-
return crimson::get_logger(ceph_subsys_osd);
37-
}
38-
} // namespace
33+
SET_SUBSYS(asok);
3934

4035
using std::string;
4136
using std::string_view;
@@ -59,15 +54,18 @@ tell_result_t::tell_result_t(std::unique_ptr<Formatter> formatter)
5954

6055
void AdminSocket::register_command(std::unique_ptr<AdminSocketHook>&& hook)
6156
{
57+
LOG_PREFIX(AdminSocket::register_command);
6258
auto prefix = hook->prefix;
6359
auto [it, added] = hooks.emplace(prefix, std::move(hook));
64-
assert(added);
65-
logger().info("register_command(): {})", it->first);
60+
ceph_assert(added);
61+
INFO("register_command(): {})", it->first);
6662
}
6763

6864
auto AdminSocket::parse_cmd(const std::vector<std::string>& cmd)
6965
-> std::variant<parsed_command_t, tell_result_t>
7066
{
67+
LOG_PREFIX(AdminSocket::parse_cmd);
68+
INFO("");
7169
// preliminaries:
7270
// - create the formatter specified by the cmd parameters
7371
// - locate the "op-code" string (the 'prefix' segment)
@@ -79,13 +77,13 @@ auto AdminSocket::parse_cmd(const std::vector<std::string>& cmd)
7977
stringstream errss;
8078
// note that cmdmap_from_json() may throw on syntax issues
8179
if (!cmdmap_from_json(cmd, &cmdmap, errss)) {
82-
logger().error("{}: incoming command error: {}", __func__, errss.str());
80+
ERROR("incoming command error: {}", errss.str());
8381
out.append("error:"s);
8482
out.append(errss.str());
8583
return tell_result_t{-EINVAL, "invalid json", std::move(out)};
8684
}
8785
} catch (const std::runtime_error& e) {
88-
logger().error("{}: incoming command syntax: {}", __func__, cmd);
86+
ERROR("incoming command syntax: {}", cmd);
8987
out.append(string{e.what()});
9088
return tell_result_t{-EINVAL, "invalid json", std::move(out)};
9189
}
@@ -100,30 +98,34 @@ auto AdminSocket::parse_cmd(const std::vector<std::string>& cmd)
10098
cmd_getval(cmdmap, "cmd", prefix);
10199
}
102100
} catch (const bad_cmd_get& e) {
103-
logger().error("{}: invalid syntax: {}", __func__, cmd);
101+
ERROR("invalid syntax: {}", cmd);
104102
out.append(string{e.what()});
105103
return tell_result_t{-EINVAL, "invalid json", std::move(out)};
106104
}
107105

108106
// match the incoming op-code to one of the registered APIs
109-
if (auto found = hooks.find(prefix); found != hooks.end()) {
110-
return parsed_command_t{ cmdmap, format, *found->second };
111-
} else {
107+
auto found = hooks.find(prefix);
108+
if (found == hooks.end()) {
109+
ERROR("unknown command: {}", prefix);
112110
return tell_result_t{-EINVAL,
113111
fmt::format("unknown command '{}'", prefix),
114112
std::move(out)};
115113
}
114+
DEBUG("parsed successfully {} {}", prefix, cmd);
115+
return parsed_command_t{ cmdmap, format, *found->second };
116116
}
117117

118118
seastar::future<> AdminSocket::finalize_response(
119119
seastar::output_stream<char>& out, ceph::bufferlist&& msgs)
120120
{
121+
LOG_PREFIX(AdminSocket::finalize_response);
122+
INFO("");
121123
string outbuf_cont = msgs.to_str();
122124
if (outbuf_cont.empty()) {
123125
outbuf_cont = " {} ";
124126
}
125127
uint32_t response_length = htonl(outbuf_cont.length());
126-
logger().info("asok response length: {}", outbuf_cont.length());
128+
INFO("asok response length: {}", outbuf_cont.length());
127129

128130
return out.write(reinterpret_cast<char*>(&response_length),
129131
sizeof(response_length))
@@ -134,25 +136,32 @@ seastar::future<> AdminSocket::finalize_response(
134136
seastar::future<> AdminSocket::handle_command(crimson::net::ConnectionRef conn,
135137
boost::intrusive_ptr<MCommand> m)
136138
{
139+
LOG_PREFIX(AdminSocket::handle_command);
140+
INFO("");
137141
return execute_command(m->cmd, std::move(m->get_data())).then(
138-
[conn, tid=m->get_tid()](auto result) {
142+
[FNAME, conn, tid=m->get_tid()](auto result) {
139143
auto [ret, err, out] = std::move(result);
140144
auto reply = crimson::make_message<MCommandReply>(ret, err);
141145
reply->set_tid(tid);
142146
reply->set_data(out);
147+
DEBUG("replying with ret {} error {}", ret, err);
143148
return conn->send(std::move(reply));
144149
});
145150
}
146151

147152
seastar::future<> AdminSocket::execute_line(std::string cmdline,
148153
seastar::output_stream<char>& out)
149154
{
150-
return execute_command({std::move(cmdline)}, {}).then([&out, this](auto result) {
155+
LOG_PREFIX(AdminSocket::execute_line);
156+
INFO("");
157+
return execute_command({std::move(cmdline)}, {}).then([FNAME, &out, this](auto result) {
151158
auto [ret, stderr, stdout] = std::move(result);
152159
if (ret < 0) {
160+
ERROR("{}", cpp_strerror(ret));
153161
stdout.append(fmt::format("ERROR: {}\n", cpp_strerror(ret)));
154162
stdout.append(stderr);
155163
}
164+
DEBUG("finalizing response");
156165
return finalize_response(out, std::move(stdout));
157166
});
158167
}
@@ -161,20 +170,24 @@ auto AdminSocket::execute_command(const std::vector<std::string>& cmd,
161170
ceph::bufferlist&& buf)
162171
-> seastar::future<tell_result_t>
163172
{
173+
LOG_PREFIX(AdminSocket::execute_command);
174+
INFO("");
164175
auto maybe_parsed = parse_cmd(cmd);
165176
if (auto* parsed = std::get_if<parsed_command_t>(&maybe_parsed); parsed) {
177+
DEBUG("parsed ok");
166178
stringstream os;
167179
string desc{parsed->hook.desc};
168180
if (!validate_cmd(desc, parsed->params, os)) {
169-
logger().error("AdminSocket::execute_command: "
170-
"failed to validate '{}': {}", cmd, os.str());
181+
ERROR("failed to validate '{}': {}", cmd, os.str());
171182
ceph::bufferlist out;
172183
out.append(os);
173184
return seastar::make_ready_future<tell_result_t>(
174185
tell_result_t{-EINVAL, "invalid command json", std::move(out)});
175186
}
187+
DEBUG("validated {} {}", cmd, os.str());
176188
return parsed->hook.call(parsed->params, parsed->format, std::move(buf));
177189
} else {
190+
DEBUG("failed to parse");
178191
auto& result = std::get<tell_result_t>(maybe_parsed);
179192
return seastar::make_ready_future<tell_result_t>(std::move(result));
180193
}
@@ -188,17 +201,21 @@ struct line_consumer {
188201
typename seastar::input_stream<char>::consumption_result_type;
189202

190203
seastar::future<consumption_result_type> operator()(tmp_buf&& buf) {
204+
LOG_PREFIX(line_consumer::operator());
205+
INFO("");
191206
size_t consumed = 0;
192207
for (auto c : buf) {
193208
consumed++;
194209
if (c == '\0') {
195210
buf.trim_front(consumed);
211+
INFO("stop consuming");
196212
return seastar::make_ready_future<consumption_result_type>(
197213
consumption_result_type::stop_consuming_type(std::move(buf)));
198214
} else {
199215
line.push_back(c);
200216
}
201217
}
218+
INFO("continue consuming");
202219
return seastar::make_ready_future<consumption_result_type>(
203220
seastar::continue_consuming{});
204221
}
@@ -208,88 +225,101 @@ struct line_consumer {
208225
seastar::future<> AdminSocket::handle_client(seastar::input_stream<char>& in,
209226
seastar::output_stream<char>& out)
210227
{
228+
LOG_PREFIX(AdminSocket::handle_client);
229+
INFO("");
211230
auto consumer = seastar::make_shared<line_consumer>();
212-
return in.consume(*consumer).then([consumer, &out, this] {
213-
logger().debug("AdminSocket::handle_client: incoming asok string: {}",
214-
consumer->line);
231+
return in.consume(*consumer).then([FNAME, consumer, &out, this] {
232+
DEBUG("incoming asok string: {}", consumer->line);
215233
return execute_line(consumer->line, out);
216-
}).then([&out] {
234+
}).then([FNAME, &out] {
235+
DEBUG("flush");
217236
return out.flush();
218-
}).finally([&out] {
237+
}).finally([FNAME, &out] {
238+
DEBUG("out close");
219239
return out.close();
220-
}).then([&in] {
240+
}).then([FNAME, &in] {
241+
DEBUG("in close");
221242
return in.close();
222-
}).handle_exception([](auto ep) {
223-
logger().debug("exception on {}: {}", __func__, ep);
243+
}).handle_exception([FNAME](auto ep) {
244+
ERROR("exception on {}", ep);
224245
});
225246
}
226247

227248
seastar::future<> AdminSocket::start(const std::string& path)
228249
{
250+
LOG_PREFIX(AdminSocket::start);
251+
INFO("");
229252
if (path.empty()) {
230-
logger().error(
231-
"{}: Admin Socket socket path missing from the configuration", __func__);
253+
ERROR("socket path missing from the configuration");
232254
return seastar::now();
233255
}
234256

235-
logger().debug("{}: asok socket path={}", __func__, path);
257+
DEBUG("asok socket path={}", path);
236258
auto sock_path = seastar::socket_address{ seastar::unix_domain_addr{ path } };
237259
try {
238260
server_sock = seastar::engine().listen(sock_path);
239261
} catch (const std::system_error& e) {
240262
if (e.code() == std::errc::address_in_use) {
241-
logger().debug("{}: Admin Socket socket path={} already exists, retrying",
242-
__func__, path);
243-
return seastar::remove_file(path).then([this, path] {
263+
DEBUG("socket path={} already exists, retrying", path);
264+
return seastar::remove_file(path).then([FNAME, this, path] {
265+
DEBUG("socket path={} retrying", path);
244266
server_sock.reset();
245267
return start(path);
246268
});
247269
}
248-
logger().error("{}: unable to listen({}): {}", __func__, path, e.what());
270+
ERROR("unable to listen({}): {}", path, e.what());
249271
server_sock.reset();
250272
return seastar::make_ready_future<>();
251273
}
252274
// listen in background
253-
task = seastar::keep_doing([this] {
254-
return seastar::try_with_gate(stop_gate, [this] {
255-
assert(!connected_sock.has_value());
256-
return server_sock->accept().then([this](seastar::accept_result acc) {
275+
task = seastar::keep_doing([FNAME, this] {
276+
return seastar::try_with_gate(stop_gate, [FNAME, this] {
277+
ceph_assert(!connected_sock.has_value());
278+
return server_sock->accept().then([FNAME, this](seastar::accept_result acc) {
257279
connected_sock = std::move(acc.connection);
258280
return seastar::do_with(connected_sock->input(),
259281
connected_sock->output(),
260-
[this](auto& input, auto& output) mutable {
282+
[FNAME, this](auto& input, auto& output) mutable {
283+
DEBUG("handling client");
261284
return handle_client(input, output);
262-
}).finally([this] {
263-
assert(connected_sock.has_value());
285+
}).finally([FNAME, this] {
286+
DEBUG("reset");
287+
ceph_assert(connected_sock.has_value());
264288
connected_sock.reset();
265289
});
266-
}).handle_exception([this](auto ep) {
290+
}).handle_exception([FNAME, this](auto ep) {
267291
if (!stop_gate.is_closed()) {
268-
logger().error("AdminSocket: terminated: {}", ep);
292+
ERROR("terminated: {}", ep);
269293
}
270294
});
271295
});
272-
}).handle_exception_type([](const seastar::gate_closed_exception&) {
273-
}).finally([path] {
296+
}).handle_exception_type([FNAME](const seastar::gate_closed_exception&) {
297+
DEBUG("gate closed");
298+
}).finally([FNAME, path] {
299+
DEBUG("closing: {}", path);
274300
return seastar::remove_file(path);
275301
});
302+
DEBUG("exisited, listening in background");
276303
return seastar::make_ready_future<>();
277304
}
278305

279306
seastar::future<> AdminSocket::stop()
280307
{
308+
LOG_PREFIX(AdminSocket::stop);
309+
INFO("");
281310
if (!server_sock) {
311+
DEBUG("no server socket");
282312
return seastar::now();
283313
}
284314
server_sock->abort_accept();
285315
if (connected_sock) {
286316
connected_sock->shutdown_input();
287317
connected_sock->shutdown_output();
288318
}
289-
return stop_gate.close().then([this] {
290-
assert(task.has_value());
291-
return task->then([] {
292-
logger().info("AdminSocket: stopped");
319+
return stop_gate.close().then([FNAME, this] {
320+
ceph_assert(task.has_value());
321+
return task->then([FNAME] {
322+
INFO("stopped");
293323
return seastar::now();
294324
});
295325
});
@@ -308,6 +338,8 @@ class VersionHook final : public AdminSocketHook {
308338
std::string_view format,
309339
ceph::bufferlist&&) const final
310340
{
341+
LOG_PREFIX(AdminSocket::VersionHook);
342+
INFO("");
311343
unique_ptr<Formatter> f{Formatter::create(format, "json-pretty", "json-pretty")};
312344
f->open_object_section("version");
313345
f->dump_string("version", ceph_version_to_str());
@@ -331,6 +363,8 @@ class GitVersionHook final : public AdminSocketHook {
331363
std::string_view format,
332364
ceph::bufferlist&&) const final
333365
{
366+
LOG_PREFIX(AdminSocket::AdminSocketHook);
367+
INFO("");
334368
unique_ptr<Formatter> f{Formatter::create(format, "json-pretty", "json-pretty")};
335369
f->open_object_section("version");
336370
f->dump_string("git_version", git_version_to_str());
@@ -352,6 +386,8 @@ class HelpHook final : public AdminSocketHook {
352386
std::string_view format,
353387
ceph::bufferlist&&) const final
354388
{
389+
LOG_PREFIX(AdminSocket::HelpHook);
390+
INFO("");
355391
unique_ptr<Formatter> f{Formatter::create(format,
356392
"json-pretty", "json-pretty")};
357393
f->open_object_section("help");
@@ -379,6 +415,8 @@ class GetdescsHook final : public AdminSocketHook {
379415
std::string_view format,
380416
ceph::bufferlist&&) const final
381417
{
418+
LOG_PREFIX(AdminSocket::GetdescsHook);
419+
INFO("");
382420
unique_ptr<Formatter> f{Formatter::create(format, "json-pretty", "json-pretty")};
383421
int cmdnum = 0;
384422
f->open_object_section("command_descriptions");
@@ -405,6 +443,8 @@ class InjectArgsHook final : public AdminSocketHook {
405443
std::string_view format,
406444
ceph::bufferlist&&) const final
407445
{
446+
LOG_PREFIX(AdminSocket::InjectArgsHook);
447+
INFO("");
408448
std::vector<std::string> argv;
409449
if (!cmd_getval(cmdmap, "injected_args", argv)) {
410450
return seastar::make_ready_future<tell_result_t>();
@@ -433,6 +473,8 @@ class ConfigShowHook : public AdminSocketHook {
433473
std::string_view format,
434474
ceph::bufferlist&& input) const final
435475
{
476+
LOG_PREFIX(AdminSocket::ConfigShowHook);
477+
INFO("");
436478
unique_ptr<Formatter> f{Formatter::create(format, "json-pretty", "json-pretty")};
437479
f->open_object_section("config_show");
438480
local_conf().show_config(f.get());
@@ -455,9 +497,11 @@ class ConfigGetHook : public AdminSocketHook {
455497
std::string_view format,
456498
ceph::bufferlist&& input) const final
457499
{
500+
LOG_PREFIX(AdminSocket::ConfigGetHook);
501+
INFO("");
458502
std::string var;
459503
[[maybe_unused]] bool found = cmd_getval(cmdmap, "var", var);
460-
assert(found);
504+
ceph_assert(found);
461505
std::string conf_val;
462506
if (int r = local_conf().get_val(var, &conf_val); r < 0) {
463507
return seastar::make_ready_future<tell_result_t>(
@@ -490,6 +534,8 @@ class ConfigSetHook : public AdminSocketHook {
490534
std::string_view format,
491535
ceph::bufferlist&&) const final
492536
{
537+
LOG_PREFIX(AdminSocket::ConfigSetHook);
538+
INFO("");
493539
std::string var;
494540
std::vector<std::string> new_val;
495541
cmd_getval(cmdmap, "var", var);
@@ -523,6 +569,8 @@ class ConfigHelpHook : public AdminSocketHook {
523569
std::string_view format,
524570
ceph::bufferlist&& input) const final
525571
{
572+
LOG_PREFIX(AdminSocket::ConfigHelpHook);
573+
INFO("");
526574
unique_ptr<Formatter> f{Formatter::create(format, "json-pretty", "json-pretty")};
527575
// Output all
528576
f->open_array_section("options");

0 commit comments

Comments
 (0)