Skip to content

Commit 34f237d

Browse files
committed
path update
1 parent 1540968 commit 34f237d

File tree

4 files changed

+122
-62
lines changed

4 files changed

+122
-62
lines changed

cpp/wedpr-computing/ppc-mpc/src/Common.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ struct JobInfo
3939
int selfIndex;
4040
bool isMalicious;
4141
int bitLength;
42-
std::string inputFileName;
43-
std::string outputFileName;
42+
std::string mpcFilePath;
43+
std::string inputFilePath;
44+
std::string outputFilePath;
4445
std::string gatewayEngineEndpoint;
4546
};
4647

@@ -51,6 +52,7 @@ const std::string MPC_RELATIVE_PATH = "/Programs/Source/";
5152
const std::string MPC_ALGORITHM_FILE_SUFFIX = ".mpc";
5253
const std::string MPC_ALGORITHM_COMPILER = "compile.py";
5354
const std::string MPC_PREPARE_FILE = "mpc_prepare.csv";
55+
const std::string MPC_RESULT_FILE = "mpc_result.csv";
5456

5557
enum MpcBinaryType
5658
{

cpp/wedpr-computing/ppc-mpc/src/MPCService.cpp

Lines changed: 113 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
*/
2020

2121
#include "MPCService.h"
22+
#include "Common.h"
2223
#include "ppc-framework/io/DataResourceLoader.h"
2324
#include "ppc-io/src/DataResourceLoaderImpl.h"
2425
#include "ppc-io/src/FileLineReader.h"
@@ -44,95 +45,128 @@ using namespace ppc::tools;
4445
using namespace ppc::storage;
4546
using namespace ppc::rpc;
4647

48+
void MPCService::removeAllFiles(const std::vector<std::string> &files)
49+
{
50+
for (const auto &file : files)
51+
{
52+
if (file.empty())
53+
{
54+
continue;
55+
}
56+
try {
57+
if (boost::filesystem::exists(file))
58+
{
59+
boost::filesystem::remove_all(file);
60+
61+
MPC_LOG(INFO) << LOG_DESC("[MPCService][removeAllFiles]")
62+
<< LOG_KV("file", file);
63+
}
64+
} catch (...) {
65+
MPC_LOG(INFO) << LOG_DESC("[MPCService][removeAllFiles]")
66+
<< LOG_DESC("remove file exception")
67+
<< LOG_KV("file", file);
68+
}
69+
}
70+
}
4771

4872
void MPCService::doRun(Json::Value const& request, Json::Value& response)
4973
{
5074
auto startT = utcSteadyTime();
75+
76+
std::string localPathPrefix;
77+
std::string mpcFileLocalPath;
5178
try
5279
{ // 0 get jobInfo and make command
5380
auto jobInfo = paramsToJobInfo(request);
81+
82+
std::string jobId = jobInfo.jobId;
83+
int participantCount = jobInfo.participantCount;
84+
int selfIndex = jobInfo.selfIndex;
85+
5486
std::string mpcCmd;
5587
makeCommand(mpcCmd, jobInfo);
5688

57-
std::string hdfsPathPrefix =
58-
m_mpcConfig.datasetHDFSPath + PATH_SEPARATOR + jobInfo.jobId + PATH_SEPARATOR;
59-
std::string localPathPrefix =
89+
localPathPrefix =
6090
m_mpcConfig.jobPath + PATH_SEPARATOR + jobInfo.jobId + PATH_SEPARATOR;
6191

62-
// 1 download algorithm file
63-
std::string algorithmFileHdfsPath =
64-
hdfsPathPrefix + jobInfo.jobId + MPC_ALGORITHM_FILE_SUFFIX;
92+
// 1 download mpc algorithm file
93+
std::string mpcFileHdfsPath = jobInfo.mpcFilePath;
6594
std::string mpcRootPath = m_mpcConfig.mpcRootPathNoGateway;
6695
if (jobInfo.mpcNodeUseGateway)
6796
{
6897
mpcRootPath = m_mpcConfig.mpcRootPath;
6998
}
70-
std::string algorithmFileLocalPath =
99+
100+
mpcFileLocalPath =
71101
mpcRootPath + MPC_RELATIVE_PATH + jobInfo.jobId + MPC_ALGORITHM_FILE_SUFFIX;
72-
if (!boost::filesystem::exists(algorithmFileLocalPath))
73-
{
74-
auto lineReader1 =
75-
initialize_lineReader(jobInfo, algorithmFileHdfsPath, DataResourceType::HDFS);
76-
auto lineWriter1 =
77-
initialize_lineWriter(jobInfo, algorithmFileLocalPath, DataResourceType::FILE);
78-
readAndSaveFile(lineReader1, lineWriter1);
79-
}
80102

81-
// 2 download dataset file
82-
std::string datasetFileHdfsPath = hdfsPathPrefix + jobInfo.inputFileName;
83-
std::string datasetFileLocalPath = localPathPrefix + jobInfo.inputFileName;
84-
if (!boost::filesystem::exists(datasetFileLocalPath))
85-
{
86-
auto lineReader2 =
87-
initialize_lineReader(jobInfo, datasetFileHdfsPath, DataResourceType::HDFS);
88-
auto lineWriter2 =
89-
initialize_lineWriter(jobInfo, datasetFileLocalPath, DataResourceType::FILE);
90-
readAndSaveFile(lineReader2, lineWriter2);
91-
}
103+
auto mpcFileReader =
104+
initialize_lineReader(jobInfo, mpcFileHdfsPath, DataResourceType::HDFS);
105+
auto mpcFileWriter =
106+
initialize_lineWriter(jobInfo, mpcFileLocalPath, DataResourceType::FILE);
107+
readAndSaveFile(mpcFileHdfsPath, mpcFileLocalPath, mpcFileReader, mpcFileWriter);
108+
109+
110+
// 2 download mpc prepare file
111+
std::string mpcPrepareFileHdfsPath = jobInfo.inputFilePath;
112+
113+
// std::string inputFilePath =
114+
// m_mpcConfig.jobPath + PATH_SEPARATOR + jobInfo.jobId + PATH_SEPARATOR + MPC_PREPARE_FILE;
115+
116+
std::string mpcPrepareFileLocalPath = localPathPrefix + MPC_PREPARE_FILE + "-P" + std::to_string(selfIndex) + "-0";
117+
auto datasetFileReader =
118+
initialize_lineReader(jobInfo, mpcPrepareFileHdfsPath, DataResourceType::HDFS);
119+
auto datasetFileWriter =
120+
initialize_lineWriter(jobInfo, mpcPrepareFileLocalPath, DataResourceType::FILE);
121+
readAndSaveFile(mpcPrepareFileHdfsPath, mpcPrepareFileLocalPath, datasetFileReader, datasetFileWriter);
92122

93123
// 3 run mpc job
94124
int outExitStatus = MPC_SUCCESS;
95125
std::string outResult;
96126
execCommand(mpcCmd, outExitStatus, outResult);
127+
128+
if (outExitStatus != MPC_SUCCESS)
129+
{
130+
removeAllFiles(std::vector<std::string>{localPathPrefix, mpcFileLocalPath});
131+
MPC_LOG(ERROR) << LOG_DESC("[MPCService][doRun]")
132+
<< "run mpc job failed"
133+
<< LOG_KV("jobId", jobId)
134+
<< LOG_KV("outExitStatus", outExitStatus)
135+
<< LOG_KV("outResult", outResult);
136+
BOOST_THROW_EXCEPTION(RunMpcFailException() << errinfo_comment(outResult));
137+
}
138+
97139
std::string message = "run mpc job successfully";
140+
MPC_LOG(INFO) << LOG_DESC("[MPCService][doRun]") << LOG_KV("jobId", jobId) << LOG_DESC(message);
141+
// MPC_LOG(DEBUG) << LOG_DESC("[MPCService][doRun]") << LOG_KV("jobId", jobId) << LOG_KV("outResult", outResult);
142+
response["code"] = MPC_SUCCESS;
143+
response["message"] = "success";
98144

99145
// 4 upload result file
100-
std::string resultFileHdfsPath = hdfsPathPrefix + jobInfo.outputFileName;
101-
std::string resultFileLocalPath = localPathPrefix + jobInfo.outputFileName;
146+
std::string resultFileHdfsPath = jobInfo.outputFilePath;
147+
std::string resultFileLocalPath = localPathPrefix + MPC_RESULT_FILE;
102148
writeStringToFile(outResult, resultFileLocalPath);
103149

104-
auto lineReader3 =
150+
auto resultFileReader =
105151
initialize_lineReader(jobInfo, resultFileLocalPath, DataResourceType::FILE);
106-
auto lineWriter3 =
152+
auto resultFileWriter =
107153
initialize_lineWriter(jobInfo, resultFileHdfsPath, DataResourceType::HDFS);
108-
readAndSaveFile(lineReader3, lineWriter3);
154+
readAndSaveFile(resultFileLocalPath, resultFileHdfsPath, resultFileReader, resultFileWriter);
109155

110-
if (outExitStatus != MPC_SUCCESS)
111-
{
112-
message = "run mpc job failed";
113-
MPC_LOG(INFO) << LOG_DESC("[MPCService][doRun]") << LOG_DESC(message);
114-
BOOST_THROW_EXCEPTION(RunMpcFailException() << errinfo_comment(message));
115-
}
116-
else
117-
{
118-
MPC_LOG(INFO) << LOG_DESC("[MPCService][doRun]") << LOG_DESC(message);
119-
response["code"] = MPC_SUCCESS;
120-
response["message"] = "success";
121-
}
122-
if (boost::filesystem::exists(localPathPrefix))
123-
{
124-
boost::filesystem::remove_all(localPathPrefix);
125-
}
156+
removeAllFiles(std::vector<std::string>{localPathPrefix, mpcFileLocalPath});
126157
}
127158
catch (const std::exception& e)
128159
{
160+
removeAllFiles(std::vector<std::string>{localPathPrefix, mpcFileLocalPath});
161+
129162
const std::string diagnostic_information = std::string(boost::diagnostic_information(e));
130-
MPC_LOG(INFO) << LOG_DESC("[MPCService][doRun]") << LOG_DESC("run mpc job failed:")
163+
MPC_LOG(INFO) << LOG_DESC("[MPCService][doRun]") << LOG_DESC("run mpc job failed")
131164
<< LOG_DESC(diagnostic_information);
132165
response["code"] = MPC_FAILED;
133166
response["message"] = diagnostic_information;
134167
}
135-
MPC_LOG(INFO) << LOG_DESC("run mpc") << LOG_KV("timecost(ms)", utcSteadyTime() - startT);
168+
169+
MPC_LOG(INFO) << LOG_DESC("run mpc") << LOG_KV("request", request.toStyledString())<< LOG_KV("timecost(ms)", utcSteadyTime() - startT);
136170
}
137171

138172
void MPCService::runMpcRpc(Json::Value const& request, RespFunc func)
@@ -200,7 +234,7 @@ void MPCService::writeStringToFile(const std::string& content, const std::string
200234
file << buffer.str();
201235
}
202236

203-
void MPCService::readAndSaveFile(LineReader::Ptr lineReader, LineWriter::Ptr lineWriter)
237+
void MPCService::readAndSaveFile(const std::string &readerFilePath, const std::string &writerFilePath, LineReader::Ptr lineReader, LineWriter::Ptr lineWriter)
204238
{
205239
uint64_t lineSize = 0;
206240
int64_t readPerBatchLines = m_mpcConfig.readPerBatchLines;
@@ -216,7 +250,10 @@ void MPCService::readAndSaveFile(LineReader::Ptr lineReader, LineWriter::Ptr lin
216250
lineWriter->writeLine(dataBatch, DataSchema::String, "\n");
217251
}
218252
lineWriter->close();
219-
MPC_LOG(INFO) << LOG_DESC("save file ok") << LOG_KV("file lines", lineSize);
253+
MPC_LOG(INFO) << LOG_DESC("save file ok")
254+
<< LOG_KV("readerFilePath", readerFilePath)
255+
<< LOG_KV("writerFilePath", writerFilePath)
256+
<< LOG_KV("file lines", lineSize);
220257
}
221258

222259
LineReader::Ptr MPCService::initialize_lineReader(
@@ -282,8 +319,9 @@ JobInfo MPCService::paramsToJobInfo(const Json::Value& params)
282319
jobInfo.selfIndex = params["selfIndex"].asInt();
283320
jobInfo.isMalicious = params["isMalicious"].asBool();
284321
jobInfo.bitLength = params["bitLength"].asInt();
285-
jobInfo.inputFileName = params["inputFileName"].asString();
286-
jobInfo.outputFileName = params["outputFileName"].asString();
322+
jobInfo.mpcFilePath = params["mpcFilePath"].asString();
323+
jobInfo.inputFilePath = params["inputFilePath"].asString();
324+
jobInfo.outputFilePath = params["outputFilePath"].asString();
287325
jobInfo.gatewayEngineEndpoint = params["gatewayEngineEndpoint"].asString();
288326
return jobInfo;
289327
}
@@ -297,30 +335,48 @@ JobInfo MPCService::paramsToJobInfo(const Json::Value& params)
297335

298336
void MPCService::makeCommand(std::string& cmd, const JobInfo& jobInfo)
299337
{
338+
std::string jobId = jobInfo.jobId;
300339
std::string mpcRootPath = m_mpcConfig.mpcRootPath;
301340
if (jobInfo.mpcNodeUseGateway)
302341
{
303-
MPC_LOG(INFO) << LOG_DESC("[MPCService][makeCommand] use gateway to connect node");
342+
MPC_LOG(INFO) << LOG_DESC("[MPCService][makeCommand] use gateway to connect node")
343+
<< LOG_KV("jobId", jobId);
304344
}
305345
else
306346
{
307347
mpcRootPath = m_mpcConfig.mpcRootPathNoGateway;
308-
MPC_LOG(INFO) << LOG_DESC("[MPCService][makeCommand] direct connect node");
348+
MPC_LOG(INFO) << LOG_DESC("[MPCService][makeCommand] direct connect node")
349+
<< LOG_KV("jobId", jobId);
309350
}
310351
int r = chdir(mpcRootPath.c_str());
311352
if (r == 0)
312353
{
313-
MPC_LOG(INFO) << LOG_DESC("[MPCService][makeCommand] change dir ok");
354+
MPC_LOG(INFO) << LOG_DESC("[MPCService][makeCommand] change dir ok")
355+
<< LOG_KV("jobId", jobId);
356+
}
357+
else
358+
{
359+
MPC_LOG(ERROR) << LOG_DESC("[MPCService][makeCommand] change dir fail")
360+
<< LOG_KV("mpcRootPath", mpcRootPath)
361+
<< LOG_KV("ret", r)
362+
<< LOG_KV("jobId", jobId);
363+
;
314364
}
315365
std::string compileFilePath = mpcRootPath + PATH_SEPARATOR + MPC_ALGORITHM_COMPILER;
316366
int participantCount = jobInfo.participantCount;
367+
int selfIndex = jobInfo.selfIndex;
317368
bool isMalicious = jobInfo.isMalicious;
318369
std::string mpcBinFileName;
319370
std::string compileOption;
320371
getMpcProtocol(participantCount, isMalicious, mpcBinFileName, compileOption);
321372

322373
if (!boost::filesystem::exists(compileFilePath))
323374
{
375+
MPC_LOG(ERROR) << LOG_DESC("[MPCService] compile file not exist")
376+
<< LOG_KV("compileFilePath", compileFilePath)
377+
<< LOG_KV("jobId", jobId);
378+
;
379+
324380
BOOST_THROW_EXCEPTION(MpcCompilerNotExistException()
325381
<< errinfo_comment("compile file not exist:" + compileFilePath));
326382
}
@@ -359,7 +415,7 @@ void MPCService::makeCommand(std::string& cmd, const JobInfo& jobInfo)
359415
{
360416
cmd += "-N " + std::to_string(jobInfo.participantCount) + " ";
361417
}
362-
MPC_LOG(INFO) << LOG_DESC("[MPCService][makeCommand]") << LOG_KV("mpcCmd", cmd);
418+
MPC_LOG(INFO) << LOG_DESC("[MPCService][makeCommand]") << LOG_KV("jobId", jobId) << LOG_KV("mpcCmd", cmd);
363419
}
364420

365421
void MPCService::getMpcProtocol(const int participantCount, const bool isMalicious,

cpp/wedpr-computing/ppc-mpc/src/MPCService.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,14 @@ class MPCService
5050
void execCommand(const std::string cmd, int& outExitStatus, std::string& outResult);
5151

5252
void writeStringToFile(const std::string& content, const std::string& filePath);
53-
void readAndSaveFile(ppc::io::LineReader::Ptr lineReader, ppc::io::LineWriter::Ptr lineWriter);
53+
void readAndSaveFile(const std::string &readerFilePath, const std::string &writerFilePath,ppc::io::LineReader::Ptr lineReader, ppc::io::LineWriter::Ptr lineWriter);
5454
ppc::io::LineReader::Ptr initialize_lineReader(const JobInfo& jobInfo,
5555
const std::string& readerFilePath, ppc::protocol::DataResourceType type);
5656
ppc::io::LineWriter::Ptr initialize_lineWriter(const JobInfo& jobInfo,
5757
const std::string& writerFilePath, ppc::protocol::DataResourceType type);
5858

59+
void removeAllFiles(const std::vector<std::string> &files);
60+
5961
private:
6062
ppc::tools::MPCConfig m_mpcConfig;
6163
ppc::tools::StorageConfig m_storageConfig;

cpp/wedpr-computing/ppc-mpc/tests/TestMPCService.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ BOOST_AUTO_TEST_CASE(testMPCService)
8484
jobInfo.selfIndex = 0;
8585
jobInfo.isMalicious = false;
8686
jobInfo.bitLength = 128;
87-
jobInfo.inputFileName = "mpc_prepare.csv";
88-
jobInfo.outputFileName = "mpc_output.txt";
87+
jobInfo.inputFilePath = "mpc_prepare.csv";
88+
jobInfo.outputFilePath = "mpc_output.txt";
8989
jobInfo.gatewayEngineEndpoint = "127.0.0.1:6789";
9090

9191
mpcService->makeCommand(cmd, jobInfo);

0 commit comments

Comments
 (0)