Skip to content

Commit 1d26596

Browse files
xinhaoyuancopybara-github
authored andcommitted
Extend Command for more control over a single exectuion. #Centipede
This refines the original Execute() into ExecuteAsync() + Wait(), with RequestStop() to interrupt the execution. This allows Centipede to properly handle Command timeout (moved to CentipedeCallbacks::RunBatchForBinary) no matter if it is using a fork server or not. PiperOrigin-RevId: 781159715
1 parent 3cb8569 commit 1d26596

File tree

5 files changed

+175
-87
lines changed

5 files changed

+175
-87
lines changed

centipede/centipede_callbacks.cc

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#include <cstddef>
1919
#include <cstdlib>
2020
#include <filesystem> // NOLINT
21+
#include <memory>
22+
#include <optional>
2123
#include <string>
2224
#include <string_view>
2325
#include <system_error> // NOLINT
@@ -146,7 +148,7 @@ std::string CentipedeCallbacks::ConstructRunnerFlags(
146148
Command &CentipedeCallbacks::GetOrCreateCommandForBinary(
147149
std::string_view binary) {
148150
for (auto &cmd : commands_) {
149-
if (cmd.path() == binary) return cmd;
151+
if (cmd->path() == binary) return *cmd;
150152
}
151153
// We don't want to collect coverage for extra binaries. It won't be used.
152154
bool disable_coverage =
@@ -165,25 +167,51 @@ Command &CentipedeCallbacks::GetOrCreateCommandForBinary(
165167
absl::StrCat("LLVM_PROFILE_FILE=",
166168
WorkDir{env_}.SourceBasedCoverageRawProfilePath()));
167169

168-
// Allow for the time it takes to fork a subprocess etc.
169-
const auto amortized_timeout =
170-
env_.timeout_per_batch == 0
171-
? absl::InfiniteDuration()
172-
: absl::Seconds(env_.timeout_per_batch) + absl::Seconds(5);
173170
Command::Options cmd_options;
174171
cmd_options.env_add = std::move(env);
175172
cmd_options.env_remove = EnvironmentVariablesToUnset();
176173
cmd_options.stdout_file = execute_log_path_;
177174
cmd_options.stderr_file = execute_log_path_;
178-
cmd_options.timeout = amortized_timeout;
179175
cmd_options.temp_file_path = temp_input_file_path_;
180-
Command &cmd =
181-
commands_.emplace_back(Command{binary, std::move(cmd_options)});
176+
Command &cmd = *commands_.emplace_back(
177+
std::make_unique<Command>(binary, std::move(cmd_options)));
182178
if (env_.fork_server) cmd.StartForkServer(temp_dir_, Hash(binary));
183179

184180
return cmd;
185181
}
186182

183+
int CentipedeCallbacks::RunBatchForBinary(std::string_view binary) {
184+
auto &cmd = GetOrCreateCommandForBinary(binary);
185+
const absl::Duration amortized_timeout =
186+
env_.timeout_per_batch == 0
187+
? absl::InfiniteDuration()
188+
: absl::Seconds(env_.timeout_per_batch) + absl::Seconds(5);
189+
const auto deadline = absl::Now() + amortized_timeout;
190+
int exit_code = EXIT_SUCCESS;
191+
const bool should_clean_up = [&] {
192+
if (!cmd.ExecuteAsync()) return true;
193+
const std::optional<int> ret = cmd.Wait(deadline);
194+
if (!ret.has_value()) return true;
195+
exit_code = *ret;
196+
return false;
197+
}();
198+
if (should_clean_up) {
199+
exit_code = [&] {
200+
if (!cmd.is_executing()) return EXIT_FAILURE;
201+
LOG(ERROR) << "Cleaning up the batch execution.";
202+
cmd.RequestStop();
203+
const auto ret = cmd.Wait(absl::Now() + absl::Seconds(60));
204+
if (ret.has_value()) return *ret;
205+
LOG(ERROR) << "Batch execution cleanup failed to end in 60s.";
206+
return EXIT_FAILURE;
207+
}();
208+
commands_.erase(
209+
std::find_if(commands_.begin(), commands_.end(),
210+
[=](const auto &cmd) { return cmd->path() == binary; }));
211+
}
212+
return exit_code;
213+
}
214+
187215
int CentipedeCallbacks::ExecuteCentipedeSancovBinaryWithShmem(
188216
std::string_view binary, const std::vector<ByteArray> &inputs,
189217
BatchResult &batch_result) {
@@ -212,12 +240,11 @@ int CentipedeCallbacks::ExecuteCentipedeSancovBinaryWithShmem(
212240
}
213241

214242
// Run.
215-
Command &cmd = GetOrCreateCommandForBinary(binary);
216-
int retval = cmd.Execute();
243+
const int exit_code = RunBatchForBinary(binary);
217244
inputs_blobseq_.ReleaseSharedMemory(); // Inputs are already consumed.
218245

219246
// Get results.
220-
batch_result.exit_code() = retval;
247+
batch_result.exit_code() = exit_code;
221248
const bool read_success = batch_result.Read(outputs_blobseq_);
222249
LOG_IF(ERROR, !read_success) << "Failed to read batch result!";
223250
outputs_blobseq_.ReleaseSharedMemory(); // Outputs are already consumed.
@@ -229,7 +256,7 @@ int CentipedeCallbacks::ExecuteCentipedeSancovBinaryWithShmem(
229256
// * Will be logged by the caller.
230257
// * some outputs were not written because the outputs_blobseq_ overflown.
231258
// * Logged by the following code.
232-
if (retval == 0 && read_success &&
259+
if (exit_code == 0 && read_success &&
233260
batch_result.num_outputs_read() != num_inputs_written) {
234261
LOG(INFO) << "Read " << batch_result.num_outputs_read() << "/"
235262
<< num_inputs_written
@@ -239,7 +266,7 @@ int CentipedeCallbacks::ExecuteCentipedeSancovBinaryWithShmem(
239266

240267
if (env_.print_runner_log) PrintExecutionLog();
241268

242-
if (retval != EXIT_SUCCESS) {
269+
if (exit_code != EXIT_SUCCESS) {
243270
ReadFromLocalFile(execute_log_path_, batch_result.log());
244271
ReadFromLocalFile(failure_description_path_,
245272
batch_result.failure_description());
@@ -257,7 +284,7 @@ int CentipedeCallbacks::ExecuteCentipedeSancovBinaryWithShmem(
257284
std::filesystem::remove(failure_signature_path_);
258285
}
259286
VLOG(1) << __FUNCTION__ << " took " << (absl::Now() - start_time);
260-
return retval;
287+
return exit_code;
261288
}
262289

263290
// See also: `DumpSeedsToDir()`.
@@ -366,19 +393,18 @@ MutationResult CentipedeCallbacks::MutateViaExternalBinary(
366393
<< VV(num_inputs_written) << VV(inputs.size());
367394

368395
// Execute.
369-
Command &cmd = GetOrCreateCommandForBinary(binary);
370-
int retval = cmd.Execute();
396+
const int exit_code = RunBatchForBinary(binary);
371397
inputs_blobseq_.ReleaseSharedMemory(); // Inputs are already consumed.
372398

373-
if (retval != EXIT_SUCCESS) {
374-
LOG(WARNING) << "Custom mutator failed with exit code: " << retval;
399+
if (exit_code != EXIT_SUCCESS) {
400+
LOG(WARNING) << "Custom mutator failed with exit code: " << exit_code;
375401
}
376-
if (env_.print_runner_log || retval != EXIT_SUCCESS) {
402+
if (env_.print_runner_log || exit_code != EXIT_SUCCESS) {
377403
PrintExecutionLog();
378404
}
379405

380406
MutationResult result;
381-
result.exit_code() = retval;
407+
result.exit_code() = exit_code;
382408
result.Read(num_mutants, outputs_blobseq_);
383409
outputs_blobseq_.ReleaseSharedMemory(); // Outputs are already consumed.
384410

centipede/centipede_callbacks.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ class CentipedeCallbacks {
161161
// Returns a Command object with matching `binary` from commands_,
162162
// creates one if needed.
163163
Command &GetOrCreateCommandForBinary(std::string_view binary);
164+
// Runs a batch with the command `binary` and returns the exit code.
165+
int RunBatchForBinary(std::string_view binary);
164166

165167
// Prints the execution log from the last executed binary.
166168
void PrintExecutionLog() const;
@@ -182,7 +184,8 @@ class CentipedeCallbacks {
182184
SharedMemoryBlobSequence inputs_blobseq_;
183185
SharedMemoryBlobSequence outputs_blobseq_;
184186

185-
std::vector<Command> commands_;
187+
// Need unique_ptr indirection because Command is not movable/copyable.
188+
std::vector<std::unique_ptr<Command>> commands_;
186189
};
187190

188191
// Abstract class for creating/destroying CentipedeCallbacks objects.

centipede/command.cc

Lines changed: 82 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616

1717
#include <errno.h>
1818
#include <fcntl.h>
19+
#include <spawn.h>
1920
#include <sys/poll.h>
2021
#include <sys/stat.h>
2122
#include <sys/types.h>
23+
#include <sys/wait.h>
2224
#include <unistd.h>
2325
#ifdef __APPLE__
2426
#include <inttypes.h>
@@ -30,6 +32,7 @@
3032
#include <cstdlib>
3133
#include <filesystem> // NOLINT
3234
#include <fstream>
35+
#include <optional>
3336
#include <string>
3437
#include <string_view>
3538
#include <system_error> // NOLINT
@@ -55,6 +58,12 @@
5558
#include "./centipede/util.h"
5659
#include "./common/logging.h"
5760

61+
#if !defined(_MSC_VER)
62+
// Needed to pass the current environment to posix_spawn, which needs an
63+
// explicit envp without an option to inherit implicitly.
64+
extern char **environ;
65+
#endif
66+
5867
namespace fuzztest::internal {
5968
namespace {
6069

@@ -141,8 +150,16 @@ struct Command::ForkServerProps {
141150
// the deleter is instantiated, the special member functions must be defined
142151
// out-of-line here, now that ForkServerProps is complete (that's by-the-book
143152
// PIMPL).
144-
Command::Command(Command &&other) noexcept = default;
145-
Command::~Command() = default;
153+
Command::~Command() {
154+
if (is_executing()) {
155+
LOG(WARNING)
156+
<< "Destructing Command object for " << path() << " with "
157+
<< (fork_server_ ? absl::StrCat("fork server PID ", fork_server_->pid_)
158+
: absl::StrCat("PID ", pid_))
159+
<< " still running. Requesting it to stop without waiting for it...";
160+
RequestStop();
161+
}
162+
}
146163

147164
Command::Command(std::string_view path, Options options)
148165
: path_(path), options_(std::move(options)) {}
@@ -308,88 +325,101 @@ absl::Status Command::VerifyForkServerIsHealthy() {
308325
return absl::OkStatus();
309326
}
310327

311-
int Command::Execute() {
328+
bool Command::ExecuteAsync() {
329+
CHECK(!is_executing());
312330
VLOG(1) << "Executing command '" << command_line_ << "'...";
313331

314-
int exit_code = EXIT_SUCCESS;
315-
316332
if (fork_server_ != nullptr) {
317-
VLOG(1) << "Sending execution request to fork server: "
318-
<< VV(options_.timeout);
333+
VLOG(1) << "Sending execution request to fork server";
319334

320335
if (const auto status = VerifyForkServerIsHealthy(); !status.ok()) {
321336
LogProblemInfo(absl::StrCat("Fork server should be running, but isn't: ",
322337
status.message()));
323-
return EXIT_FAILURE;
338+
return false;
324339
}
325340

326341
// Wake up the fork server.
327342
char x = ' ';
328343
CHECK_EQ(1, write(fork_server_->pipe_[0], &x, 1));
344+
} else {
345+
CHECK_EQ(pid_, -1);
346+
std::vector<std::string> argv_strs = {"/bin/sh", "-c", command_line_};
347+
std::vector<char *> argv;
348+
argv.reserve(argv_strs.size() + 1);
349+
for (auto &argv_str : argv_strs) {
350+
argv.push_back(argv_str.data());
351+
}
352+
argv.push_back(nullptr);
353+
CHECK_EQ(posix_spawn(&pid_, argv[0], /*file_actions=*/nullptr,
354+
/*attrp=*/nullptr, argv.data(), environ),
355+
0);
356+
}
357+
358+
is_executing_ = true;
359+
return true;
360+
}
329361

362+
std::optional<int> Command::Wait(absl::Time deadline) {
363+
CHECK(is_executing());
364+
int exit_code = EXIT_SUCCESS;
365+
366+
if (fork_server_ != nullptr) {
330367
// The fork server forks, the child is running. Block until some readable
331368
// data appears in the pipe (that is, after the fork server writes the
332369
// execution result to it).
333370
struct pollfd poll_fd = {};
334371
int poll_ret = -1;
335-
auto poll_deadline = absl::Now() + options_.timeout;
336-
bool sigterm_sent = false;
337-
bool try_again = false;
338372
do {
339-
try_again = false;
340373
// NOTE: `poll_fd` has to be reset every time.
341374
poll_fd = {
342375
/*fd=*/fork_server_->pipe_[1], // The file descriptor to wait for.
343376
/*events=*/POLLIN, // Wait until `fd` gets readable data.
344377
};
345378
const int poll_timeout_ms = static_cast<int>(absl::ToInt64Milliseconds(
346-
std::max(poll_deadline - absl::Now(), absl::Milliseconds(1))));
379+
std::max(deadline - absl::Now(), absl::Milliseconds(1))));
347380
poll_ret = poll(&poll_fd, 1, poll_timeout_ms);
348381
// The `poll()` syscall can get interrupted: it sets errno==EINTR in that
349382
// case. We should tolerate that.
350-
if (poll_ret < 0 && errno == EINTR) {
351-
try_again = true;
352-
continue;
353-
}
354-
if (poll_ret == 0 && !sigterm_sent) {
355-
LogProblemInfo(
356-
absl::StrCat("Timeout while waiting for fork server: timeout is ",
357-
absl::FormatDuration(options_.timeout)));
358-
CHECK_NE(fork_server_->pid_, -1);
359-
LOG(INFO) << "Sending SIGTERM to the fork server PID "
360-
<< fork_server_->pid_ << " and waiting for 60s";
361-
kill(fork_server_->pid_, SIGTERM);
362-
sigterm_sent = true;
363-
poll_deadline += absl::Seconds(60);
364-
try_again = true;
365-
continue;
366-
}
367-
} while (try_again);
368-
383+
} while (poll_ret < 0 && errno == EINTR);
369384
if (poll_ret != 1 || (poll_fd.revents & POLLIN) == 0) {
370385
// The fork server errored out or timed out, or some other error occurred,
371386
// e.g. the syscall was interrupted.
372387
if (poll_ret == 0) {
373-
CHECK(sigterm_sent);
374-
LogProblemInfo(
375-
"Fork server did not respond within 60s after SIGTERM was sent");
376-
// TODO: xinhaoyuan - the right thing to do is to either properly
377-
// recover or request early exit.
388+
LogProblemInfo(absl::StrCat(
389+
"Timeout while waiting for fork server: deadline is ", deadline));
378390
} else {
379391
LogProblemInfo(absl::StrCat(
380392
"Error while waiting for fork server: poll() returned ", poll_ret));
381393
}
382-
return EXIT_FAILURE;
394+
return std::nullopt;
383395
}
384396

385397
// The fork server wrote the execution result to the pipe: read it.
386398
CHECK_EQ(sizeof(exit_code),
387399
read(fork_server_->pipe_[1], &exit_code, sizeof(exit_code)));
388400
} else {
389-
VLOG(1) << "Fork server disabled - executing command directly";
390-
// No fork server, use system().
391-
exit_code = system(command_line_.c_str());
401+
CHECK_NE(pid_, -1);
402+
while (true) {
403+
const pid_t r = waitpid(pid_, &exit_code, WNOHANG);
404+
CHECK_NE(r, -1);
405+
if (r == pid_ && (WIFEXITED(exit_code) || WIFSIGNALED(exit_code))) break;
406+
CHECK_EQ(r, 0);
407+
const auto timeout = deadline - absl::Now();
408+
if (timeout > absl::ZeroDuration()) {
409+
const auto duration = std::clamp<useconds_t>(
410+
absl::ToInt64Microseconds(timeout), 0, 100000);
411+
usleep(duration); // NOLINT: early return on SIGCHLD is desired.
412+
continue;
413+
} else {
414+
LogProblemInfo(absl::StrCat(
415+
"Timeout while waiting for the command process: deadline is ",
416+
deadline));
417+
return std::nullopt;
418+
}
419+
}
420+
pid_ = -1;
392421
}
422+
is_executing_ = false;
393423

394424
// When the command is actually a wrapper shell launching the binary(-es)
395425
// (e.g. a Docker container), the shell will preserve a normal exit code
@@ -444,6 +474,17 @@ int Command::Execute() {
444474
return exit_code;
445475
}
446476

477+
void Command::RequestStop() {
478+
CHECK(is_executing());
479+
if (fork_server_) {
480+
CHECK_NE(fork_server_->pid_, -1);
481+
kill(fork_server_->pid_, SIGTERM);
482+
return;
483+
}
484+
CHECK_NE(pid_, -1);
485+
kill(pid_, SIGTERM);
486+
}
487+
447488
std::string Command::ReadRedirectedStdout() const {
448489
std::string ret;
449490
if (!options_.stdout_file.empty()) {

0 commit comments

Comments
 (0)