Skip to content

Commit 114e790

Browse files
authored
fix cm2020 check hostResource (#101)
* fix cm2020 check hostResource * complement log * psi support retry * unregister front taskInfo
1 parent 89b8a6b commit 114e790

40 files changed

+132
-98
lines changed

cpp/ppc-framework/protocol/DataResource.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ class DataResource
119119

120120
inline std::string printDataResourceInfo(DataResource::ConstPtr _dataResource)
121121
{
122+
if (!_dataResource)
123+
{
124+
return "empty";
125+
}
122126
std::ostringstream stringstream;
123127
stringstream << LOG_KV("dataResource", _dataResource->resourceID());
124128
if (_dataResource->desc())

cpp/ppc-framework/protocol/PartyResource.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ using ConstParties = std::vector<PartyResource::ConstPtr>;
6767

6868
inline std::string printPartyInfo(PartyResource::ConstPtr _party)
6969
{
70+
if (!_party)
71+
{
72+
return "empty";
73+
}
7074
std::ostringstream stringstream;
7175
stringstream << LOG_KV("partyId", _party->id()) << LOG_KV("partyIndex", _party->partyIndex())
7276
<< LOG_KV("desc", _party->desc());

cpp/ppc-framework/protocol/Task.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,15 @@ class Task
143143
// decode the task
144144
virtual void decode(std::string_view _taskData) = 0;
145145
virtual std::string encode() const = 0;
146+
147+
virtual bool enableOutputExists() const { return m_enableOutputExists; }
148+
virtual void setEnableOutputExists(bool enableOutputExists)
149+
{
150+
m_enableOutputExists = enableOutputExists;
151+
}
152+
153+
protected:
154+
bool m_enableOutputExists = false;
146155
};
147156

148157
class TaskFactory
@@ -160,9 +169,14 @@ class TaskFactory
160169
inline std::string printTaskInfo(Task::ConstPtr _task)
161170
{
162171
std::ostringstream stringstream;
172+
if (!_task)
173+
{
174+
return "empty";
175+
}
163176
stringstream << LOG_KV("id", _task->id())
164177
<< LOG_KV("type", (ppc::protocol::TaskType)_task->type())
165178
<< LOG_KV("algorithm", (ppc::protocol::TaskAlgorithmType)_task->algorithm())
179+
<< LOG_KV("enableOutputExists", _task->enableOutputExists())
166180
<< LOG_KV("taskPtr", _task);
167181
if (_task->selfParty())
168182
{

cpp/ppc-framework/rpc/RpcStatusInterface.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,5 @@ class RpcStatusInterface
4444
virtual bcos::Error::Ptr insertTask(protocol::Task::Ptr _task) = 0;
4545
virtual bcos::Error::Ptr updateTaskStatus(protocol::TaskResult::Ptr _taskResult) = 0;
4646
virtual protocol::TaskResult::Ptr getTaskStatus(const std::string& _taskID) = 0;
47-
virtual bcos::Error::Ptr deleteGateway(const std::string& _agencyID) = 0;
48-
virtual std::vector<protocol::GatewayInfo> listGateway() = 0;
4947
};
5048
} // namespace ppc::rpc

cpp/wedpr-computing/ppc-pir/src/OtPIRImpl.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,11 @@ void OtPIRImpl::onReceiveMessage(ppc::front::PPCMessageFace::Ptr _msg)
149149
}
150150
}
151151

152-
void OtPIRImpl::onReceivedErrorNotification(const std::string& _taskID)
152+
void OtPIRImpl::onReceivedErrorNotification(ppc::front::PPCMessageFace::Ptr const& _message)
153153
{
154+
PIR_LOG(WARNING) << LOG_DESC("onReceivedErrorNotification") << printPPCMsg(_message);
154155
// finish the task while the peer is failed
155-
auto taskState = findPendingTask(_taskID);
156+
auto taskState = findPendingTask(_message->taskID());
156157
if (taskState)
157158
{
158159
taskState->onPeerNotifyFinish();
@@ -242,7 +243,7 @@ void OtPIRImpl::handleReceivedMessage(const ppc::front::PPCMessageFace::Ptr& _me
242243
{
243244
case int(CommonMessageType::ErrorNotification):
244245
{
245-
pir->onReceivedErrorNotification(_message->taskID());
246+
pir->onReceivedErrorNotification(_message);
246247
break;
247248
}
248249
case int(CommonMessageType::PingPeer):
@@ -444,7 +445,8 @@ void OtPIRImpl::asyncRunTask()
444445
<< LOG_KV("requestAgencyDataset", taskMessage.requestAgencyDataset)
445446
<< LOG_KV("prefixLength", taskMessage.prefixLength)
446447
<< LOG_KV("searchId", taskMessage.searchId);
447-
auto writer = loadWriter(task->id(), dataResource, m_enableOutputExists);
448+
auto writer =
449+
loadWriter(task->id(), dataResource, m_taskState->task()->enableOutputExists());
448450
m_taskState->setWriter(writer);
449451
runSenderGenerateCipher(taskMessage);
450452
}

cpp/wedpr-computing/ppc-pir/src/OtPIRImpl.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class OtPIRImpl : public std::enable_shared_from_this<OtPIRImpl>,
5959
// register to the front to get the message related to ot-pir
6060
void onReceiveMessage(ppc::front::PPCMessageFace::Ptr _message) override;
6161

62-
void onReceivedErrorNotification(const std::string& _taskID) override;
62+
void onReceivedErrorNotification(ppc::front::PPCMessageFace::Ptr const& _message) override;
6363
void onSelfError(
6464
const std::string& _taskID, bcos::Error::Ptr _error, bool _noticePeer) override;
6565

@@ -150,8 +150,6 @@ class OtPIRImpl : public std::enable_shared_from_this<OtPIRImpl>,
150150
m_senders.erase(it);
151151
}
152152
}
153-
// allow the output-path exists, for ut
154-
bool m_enableOutputExists = false;
155153
// 为true时启动时会从配置中加载文件作为匹配源
156154
bool m_enableMemoryFile = false;
157155
ppc::protocol::DataResource m_resource;

cpp/wedpr-computing/ppc-pir/tests/FakeOtPIRFactory.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,7 @@ class FakeOtPIRImpl : public OtPIRImpl
4646
using Ptr = std::shared_ptr<FakeOtPIRImpl>;
4747
FakeOtPIRImpl(OtPIRConfig::Ptr const& _config, unsigned _idleTimeMs = 0)
4848
: OtPIRImpl(_config, _idleTimeMs)
49-
{
50-
m_enableOutputExists = true;
51-
}
49+
{}
5250
~FakeOtPIRImpl() override = default;
5351
};
5452

cpp/wedpr-computing/ppc-pir/tests/TestBaseOT.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ void testOTPIRImplFunc(const std::string& _taskID, const std::string& _params, b
194194
auto senderPIRTask = std::make_shared<JsonTaskImpl>(senderAgencyName);
195195
senderPIRTask->setId(_taskID);
196196
senderPIRTask->setParam(_params);
197+
senderPIRTask->setEnableOutputExists(true);
197198
senderPIRTask->setSelf(_senderParty);
198199
senderPIRTask->addParty(_receiverParty);
199200
senderPIRTask->setSyncResultToPeer(_syncResults);
@@ -203,6 +204,7 @@ void testOTPIRImplFunc(const std::string& _taskID, const std::string& _params, b
203204
auto receiverPIRTask = std::make_shared<JsonTaskImpl>(receiverAgencyName);
204205
receiverPIRTask->setId(_taskID);
205206
receiverPIRTask->setParam(_params);
207+
receiverPIRTask->setEnableOutputExists(true);
206208
receiverPIRTask->setSelf(_receiverParty);
207209
receiverPIRTask->addParty(_senderParty);
208210
receiverPIRTask->setSyncResultToPeer(_syncResults);

cpp/wedpr-computing/ppc-psi/src/cm2020-psi/CM2020PSIImpl.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -152,15 +152,14 @@ void CM2020PSIImpl::asyncRunTask()
152152
{
153153
return;
154154
}
155-
155+
CM2020_PSI_LOG(INFO) << LOG_DESC("noticePeerToFinish") << printTaskInfo(task);
156156
psi->noticePeerToFinish(task);
157157
});
158-
// check the memory
159-
checkHostResource(m_config->minNeededMemoryGB());
160-
addPendingTask(taskState);
161-
162158
try
163159
{
160+
addPendingTask(taskState);
161+
// check the memory
162+
checkHostResource(m_config->minNeededMemoryGB());
164163
// prepare reader and writer
165164
auto dataResource = task->selfParty()->dataResource();
166165
auto reader = loadReader(task->id(), dataResource, DataSchema::Bytes);
@@ -169,7 +168,7 @@ void CM2020PSIImpl::asyncRunTask()
169168
auto role = task->selfParty()->partyIndex();
170169
if (role == uint16_t(PartyType::Client) || task->syncResultToPeer())
171170
{
172-
auto writer = loadWriter(task->id(), dataResource, m_enableOutputExists);
171+
auto writer = loadWriter(task->id(), dataResource, task->enableOutputExists());
173172
taskState->setWriter(writer);
174173
}
175174

@@ -319,10 +318,11 @@ void CM2020PSIImpl::stop()
319318
CM2020_PSI_LOG(INFO) << LOG_DESC("CM2020-PSI stopped");
320319
}
321320

322-
void CM2020PSIImpl::onReceivedErrorNotification(const std::string& _taskID)
321+
void CM2020PSIImpl::onReceivedErrorNotification(ppc::front::PPCMessageFace::Ptr const& _message)
323322
{
323+
CM2020_PSI_LOG(INFO) << LOG_DESC("onReceivedErrorNotification") << printPPCMsg(_message);
324324
// finish the task while the peer is failed
325-
auto taskState = findPendingTask(_taskID);
325+
auto taskState = findPendingTask(_message->taskID());
326326
if (taskState)
327327
{
328328
taskState->onPeerNotifyFinish();
@@ -410,7 +410,7 @@ void CM2020PSIImpl::handleReceivedMessage(const ppc::front::PPCMessageFace::Ptr&
410410
{
411411
case int(CommonMessageType::ErrorNotification):
412412
{
413-
psi->onReceivedErrorNotification(_message->taskID());
413+
psi->onReceivedErrorNotification(_message);
414414
break;
415415
}
416416
case int(CommonMessageType::PingPeer):

cpp/wedpr-computing/ppc-psi/src/cm2020-psi/CM2020PSIImpl.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class CM2020PSIImpl : public bcos::Worker,
6262

6363
void start() override;
6464
void stop() override;
65-
void onReceivedErrorNotification(const std::string& _taskID) override;
65+
void onReceivedErrorNotification(ppc::front::PPCMessageFace::Ptr const& _message) override;
6666
void onSelfError(
6767
const std::string& _taskID, bcos::Error::Ptr _error, bool _noticePeer) override;
6868

@@ -148,10 +148,6 @@ class CM2020PSIImpl : public bcos::Worker,
148148
}
149149
}
150150

151-
protected:
152-
// allow the output-path exists, for ut
153-
bool m_enableOutputExists = false;
154-
155151
private:
156152
void waitSignal()
157153
{

0 commit comments

Comments
 (0)