Skip to content

Commit a6c472c

Browse files
committed
fix
1 parent 3fea826 commit a6c472c

File tree

3 files changed

+43
-6
lines changed

3 files changed

+43
-6
lines changed

cpp/wedpr-computing/ppc-mpc/src/Common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ struct JobStatus
5858
const int MPC_SUCCESS = 0;
5959
const int MPC_RUNNING = 1;
6060
const int MPC_DUPLICATED = 2;
61+
const int MPC_KILLED = 3;
6162
const int MPC_FAILED = -1;
6263

6364
const std::string PATH_SEPARATOR = "/";

cpp/wedpr-computing/ppc-mpc/src/MPCService.cpp

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,25 @@ using namespace ppc::tools;
4747
using namespace ppc::storage;
4848
using namespace ppc::rpc;
4949

50-
bool MPCService::addJobIfNotExist(const JobInfo& jobInfo)
50+
bool MPCService::addJobIfNotRunning(const JobInfo& jobInfo)
5151
{
5252
std::lock_guard<std::mutex> l(x_job2Status);
5353
auto it = m_job2Status.find(jobInfo.jobId);
5454
if (it != m_job2Status.end())
5555
{
5656
MPC_LOG(INFO) << LOG_DESC("[MPCService][addJob]")
5757
<< "job already exists"
58-
<< LOG_KV("jobId", jobInfo.jobId);
59-
return false;
58+
<< LOG_KV("jobId", jobInfo.jobId)
59+
<< LOG_KV("code", it->code)
60+
<< LOG_KV("message", it->message);
61+
62+
if (it->second.code == MPC_RUNNING)
63+
{
64+
// job is running
65+
return false;
66+
}
67+
68+
m_job2Status.erase(it);
6069
}
6170

6271
JobStatus jobStatus;
@@ -145,6 +154,31 @@ void MPCService::onFailed(const std::string &jobId, const std::string &msg)
145154
;
146155
}
147156

157+
158+
void MPCService::onKill(const std::string &jobId, const std::string &msg)
159+
{
160+
std::lock_guard<std::mutex> l(x_job2Status);
161+
auto it = m_job2Status.find(jobId);
162+
if (it == m_job2Status.end())
163+
{
164+
MPC_LOG(ERROR) << LOG_DESC("[MPCService][onKill]")
165+
<< "job does not exist"
166+
<< LOG_KV("jobId", jobId);
167+
return;
168+
}
169+
170+
JobStatus &jobStatus = it->second;
171+
jobStatus.code = MPC_KILLED;
172+
jobStatus.message = msg;
173+
jobStatus.timeCostMs = utcSteadyTime() - jobStatus.startTimeMs;
174+
175+
MPC_LOG(INFO) << LOG_DESC("[MPCService][onKill]")
176+
<< "job is killed"
177+
<< LOG_KV("jobId", jobId)
178+
<< LOG_KV("timeCostMs", jobStatus.timeCostMs)
179+
;
180+
}
181+
148182
void MPCService::doRun(const JobInfo& jobInfo)
149183
{
150184
auto startT = utcSteadyTime();
@@ -263,7 +297,7 @@ void MPCService::runMpcRpc(Json::Value const& request, RespFunc func)
263297
try
264298
{
265299
auto jobInfo = paramsToJobInfo(request);
266-
auto r = addJobIfNotExist(jobInfo);
300+
auto r = addJobIfNotRunning(jobInfo);
267301

268302
if (r)
269303
{
@@ -301,7 +335,7 @@ void MPCService::asyncRunMpcRpc(Json::Value const& request, RespFunc func)
301335
try
302336
{
303337
auto jobInfo = paramsToJobInfo(request);
304-
auto r = addJobIfNotExist(jobInfo);
338+
auto r = addJobIfNotRunning(jobInfo);
305339

306340
if (r)
307341
{
@@ -362,6 +396,7 @@ void MPCService::doKill(Json::Value const& request, Json::Value& response)
362396
if (outExitStatus == 0)
363397
{
364398
message = "Kill mpc job successfully";
399+
onKill(jobId, message);
365400
MPC_LOG(INFO) << LOG_DESC("[MPCService][doKill]") << LOG_DESC(message);
366401
response["code"] = MPC_SUCCESS;
367402
response["message"] = "success";

cpp/wedpr-computing/ppc-mpc/src/MPCService.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class MPCService: std::enable_shared_from_this<MPCService>
5959

6060
void onSuccess(const std::string &jobId, const std::string &msg);
6161
void onFailed(const std::string &jobId, const std::string &msg);
62+
void onKill(const std::string &jobId, const std::string &msg);
6263

6364
void execCommand(const std::string cmd, int& outExitStatus, std::string& outResult);
6465

@@ -74,7 +75,7 @@ class MPCService: std::enable_shared_from_this<MPCService>
7475
m_threadPool = threadPool;
7576
}
7677

77-
bool addJobIfNotExist(const JobInfo& jobInfo);
78+
bool addJobIfNotRunning(const JobInfo& jobInfo);
7879

7980
bool queryJobStatus(const std::string &jobId, JobStatus &jobStatus);
8081

0 commit comments

Comments
 (0)