Skip to content

Commit abd1632

Browse files
committed
fix
1 parent 31d8b4c commit abd1632

File tree

2 files changed

+15
-6
lines changed

2 files changed

+15
-6
lines changed

cpp/wedpr-computing/ppc-mpc/src/MPCService.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ bool MPCService::addJobIfNotRunning(const JobInfo& jobInfo)
7676
jobStatus.timeCostMs = 0;
7777

7878
m_job2Status[jobInfo.jobId] = jobStatus;
79+
m_runningJobs.insert(jobInfo.jobId);
7980

8081
MPC_LOG(INFO) << LOG_DESC("[MPCService][addJob]")
8182
<< "job added successfully"
@@ -109,6 +110,7 @@ bool MPCService::queryJobStatus(const std::string &jobId, JobStatus &jobStatus)
109110
void MPCService::onSuccess(const std::string &jobId, const std::string &msg)
110111
{
111112
std::lock_guard<std::mutex> l(x_job2Status);
113+
m_runningJobs.erase(jobId);
112114
auto it = m_job2Status.find(jobId);
113115
if (it == m_job2Status.end())
114116
{
@@ -133,6 +135,7 @@ void MPCService::onSuccess(const std::string &jobId, const std::string &msg)
133135
void MPCService::onFailed(const std::string &jobId, const std::string &msg)
134136
{
135137
std::lock_guard<std::mutex> l(x_job2Status);
138+
m_runningJobs.erase(jobId);
136139
auto it = m_job2Status.find(jobId);
137140
if (it == m_job2Status.end())
138141
{
@@ -157,6 +160,7 @@ void MPCService::onFailed(const std::string &jobId, const std::string &msg)
157160
void MPCService::onKill(const std::string &jobId, const std::string &msg)
158161
{
159162
std::lock_guard<std::mutex> l(x_job2Status);
163+
m_runningJobs.erase(jobId);
160164
auto it = m_job2Status.find(jobId);
161165
if (it == m_job2Status.end())
162166
{
@@ -297,7 +301,6 @@ void MPCService::runMpcRpc(Json::Value const& request, RespFunc func)
297301
{
298302
auto jobInfo = paramsToJobInfo(request);
299303
auto r = addJobIfNotRunning(jobInfo);
300-
301304
if (r)
302305
{
303306
doRun(jobInfo);
@@ -412,13 +415,12 @@ void MPCService::doKill(Json::Value const& request, Json::Value& response)
412415
catch (const std::exception& e)
413416
{
414417
const std::string diagnostic_information = std::string(boost::diagnostic_information(e));
415-
MPC_LOG(INFO) << LOG_DESC("[MPCService][doKill]") << LOG_KV("jobId", jobId) << LOG_DESC("kill mpc job failed:")
418+
MPC_LOG(INFO) << LOG_DESC("[MPCService][doKill]") << LOG_DESC("kill mpc job failed:")
416419
<< LOG_DESC(diagnostic_information);
417420
response["code"] = MPC_FAILED;
418421
response["message"] = diagnostic_information;
419422
}
420423
MPC_LOG(INFO) << LOG_DESC("kill mpc job")
421-
<< LOG_KV("jobId", jobId)
422424
<< LOG_KV("request", response.toStyledString())
423425
<< LOG_KV("timecost(ms)", utcSteadyTime() - startT);
424426
}
@@ -565,7 +567,6 @@ void MPCService::makeCommand(std::string& cmd, const JobInfo& jobInfo)
565567
<< LOG_KV("mpcRootPath", mpcRootPath)
566568
<< LOG_KV("ret", r)
567569
<< LOG_KV("jobId", jobId);
568-
;
569570
}
570571
std::string compileFilePath = mpcRootPath + PATH_SEPARATOR + MPC_ALGORITHM_COMPILER;
571572
int participantCount = jobInfo.participantCount;
@@ -580,7 +581,6 @@ void MPCService::makeCommand(std::string& cmd, const JobInfo& jobInfo)
580581
MPC_LOG(ERROR) << LOG_DESC("[MPCService] compile file not exist")
581582
<< LOG_KV("compileFilePath", compileFilePath)
582583
<< LOG_KV("jobId", jobId);
583-
;
584584

585585
BOOST_THROW_EXCEPTION(MpcCompilerNotExistException()
586586
<< errinfo_comment("compile file not exist:" + compileFilePath));

cpp/wedpr-computing/ppc-mpc/src/MPCService.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@
2626
#include "ppc-tools/src/config/StorageConfig.h"
2727
#include <bcos-utilities/Common.h>
2828
#include <bcos-utilities/ThreadPool.h>
29+
#include <bcos-utilities/Timer.h>
2930
#include <memory>
3031
#include <mutex>
3132
#include <string>
33+
#include <unordered_map>
3234

3335
namespace ppc::mpc
3436
{
@@ -37,7 +39,13 @@ class MPCService: std::enable_shared_from_this<MPCService>
3739
public:
3840
using Ptr = std::shared_ptr<MPCService>;
3941
MPCService() = default;
40-
virtual ~MPCService() = default;
42+
virtual ~MPCService()
43+
{
44+
if (m_threadPool)
45+
{
46+
m_threadPool->stop();
47+
}
48+
}
4149

4250
void runMpcRpc(Json::Value const& request, ppc::rpc::RespFunc func);
4351
void killMpcRpc(Json::Value const& request, ppc::rpc::RespFunc func);
@@ -82,6 +90,7 @@ class MPCService: std::enable_shared_from_this<MPCService>
8290
private:
8391
std::mutex x_job2Status;
8492
std::unordered_map<std::string, JobStatus> m_job2Status;
93+
std::unordered_set<std::string> m_runningJobs;
8594
ppc::tools::MPCConfig m_mpcConfig;
8695
ppc::tools::StorageConfig m_storageConfig;
8796

0 commit comments

Comments
 (0)