Skip to content

Commit 1f887ea

Browse files
authored
optimize log (#91)
1 parent ee0c5d7 commit 1f887ea

File tree

10 files changed

+648
-556
lines changed

10 files changed

+648
-556
lines changed

cpp/wedpr-computing/ppc-pir/src/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,4 @@ add_library(${PIR_TARGET} ${SOURCES} ${OUT_TARS_H_LIST})
2828
target_include_directories(${PIR_TARGET} PUBLIC
2929
$<BUILD_INTERFACE:${TARS_HEADER_DIR}>)
3030

31-
target_link_libraries(${PIR_TARGET} PUBLIC ${IO_TARGET} ${FRONT_TARGET} ${BCOS_UTILITIES_TARGET} ${TARS_PROTOCOL_TARGET} TBB::tbb TCMalloc)
31+
target_link_libraries(${PIR_TARGET} PUBLIC ${PSI_FRAMEWORK_TARGET} ${IO_TARGET} ${FRONT_TARGET} ${BCOS_UTILITIES_TARGET} ${TARS_PROTOCOL_TARGET} TBB::tbb TCMalloc)

cpp/wedpr-computing/ppc-psi/src/ecdh-multi-psi/EcdhMultiCache.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ bool CalculatorCache::tryToFinalize()
328328
<< LOG_KV("* intersectionSize", m_intersectionResult.size())
329329
<< LOG_KV("* timecost", (utcSteadyTime() - startT));
330330

331-
releaseDataAfterFinalize();
331+
releaseCipherCache();
332332
ECDH_MULTI_LOG(INFO) << LOG_DESC("* tryToFinalize: syncIntersections") << printCacheState();
333333
m_cacheState = CacheState::Syncing;
334334
syncIntersections();
@@ -472,8 +472,9 @@ void CalculatorCache::addIntersectionCipher(std::vector<bcos::bytes>&& _cipherDa
472472
{
473473
m_receiveAllIntersection = true;
474474
}
475-
ECDH_MULTI_LOG(INFO) << LOG_DESC("addIntersectionCipher finshed")
475+
ECDH_MULTI_LOG(INFO) << LOG_DESC("addIntersectionCipher success")
476476
<< LOG_KV("timecost", utcSteadyTime() - startT) << LOG_KV("seq", seq)
477+
<< LOG_KV("dataSize", _cipherData.size())
477478
<< LOG_KV("receiveAll", m_receiveAllIntersection)
478479
<< LOG_KV("cipherRefSize", m_cipherRef.size()) << printCacheState();
479480
// release the cipherData

cpp/wedpr-computing/ppc-psi/src/ecdh-multi-psi/EcdhMultiCache.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ class CalculatorCache : public std::enable_shared_from_this<CalculatorCache>
221221
{}
222222
virtual ~CalculatorCache()
223223
{
224-
releaseDataAfterFinalize();
224+
releaseCipherCache();
225225

226226
m_intersectionResult.clear();
227227
std::vector<bcos::bytes>().swap(m_intersectionResult);
@@ -275,7 +275,7 @@ class CalculatorCache : public std::enable_shared_from_this<CalculatorCache>
275275

276276
void syncIntersections();
277277

278-
void releaseDataAfterFinalize()
278+
void releaseCipherCache()
279279
{
280280
for (auto const& it : m_plainData)
281281
{
@@ -284,7 +284,7 @@ class CalculatorCache : public std::enable_shared_from_this<CalculatorCache>
284284
m_cipherRef.clear();
285285
std::map<bcos::bytes, CipherRefDetail>().swap(m_cipherRef);
286286
MallocExtension::instance()->ReleaseFreeMemory();
287-
ECDH_MULTI_LOG(INFO) << LOG_DESC("releaseDataAfterFinalize")
287+
ECDH_MULTI_LOG(INFO) << LOG_DESC("releaseCipherCache")
288288
<< LOG_KV("taskID", m_taskState->task()->id());
289289
}
290290

cpp/wedpr-computing/ppc-psi/src/ecdh-multi-psi/core/EcdhMultiPSICalculator.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,12 @@ void EcdhMultiPSICalculator::blindData(std::string _taskID, bcos::bytes _randA)
123123
encryptedData[i] = m_config->eccCrypto()->ecMultiply(point, _randA);
124124
}
125125
});
126-
ECDH_CAL_LOG(INFO) << LOG_DESC("blindData encrypt success")
126+
ECDH_CAL_LOG(INFO) << LOG_DESC("blindData encrypt success") << LOG_KV("seq", seq)
127127
<< LOG_KV("dataSize", encryptedData.size())
128-
<< LOG_KV("task", m_taskState->task()->id()) << LOG_KV("seq", seq)
128+
<< LOG_KV("task", m_taskState->task()->id())
129129
<< LOG_KV("timecost", (utcSteadyTime() - startT));
130130
ECDH_CAL_LOG(INFO) << LOG_DESC("blindData: send cipher to the master")
131-
<< LOG_KV("masterSize", m_masterParties.size())
131+
<< LOG_KV("seq", seq) << LOG_KV("masterSize", m_masterParties.size())
132132
<< LOG_KV("dataSize", encryptedData.size())
133133
<< LOG_KV("task", m_taskState->task()->id());
134134
auto message = m_config->psiMsgFactory()->createPSIMessage(

cpp/wedpr-computing/ppc-psi/src/ecdh-multi-psi/core/EcdhMultiPSIMaster.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ void EcdhMultiPSIMaster::blindData()
147147
}
148148
}
149149
auto startT = utcSteadyTime();
150-
ECDH_MASTER_LOG(INFO) << LOG_DESC("blindData: encrypt data")
150+
ECDH_MASTER_LOG(INFO) << LOG_DESC("blindData: encrypt data") << LOG_KV("seq", seq)
151+
<< LOG_KV("task", m_taskState->task()->id())
151152
<< LOG_KV("dataSize", dataBatch->size());
152153
std::vector<bcos::bytes> cipher(dataBatch->size());
153154
tbb::parallel_for(
@@ -169,7 +170,7 @@ void EcdhMultiPSIMaster::blindData()
169170
<< LOG_KV("timecost", (utcSteadyTime() - startT));
170171

171172
ECDH_MASTER_LOG(INFO) << LOG_DESC("blindData: send encrypted data to all calculator")
172-
<< LOG_KV("task", m_taskID)
173+
<< LOG_KV("seq", seq) << LOG_KV("task", m_taskID)
173174
<< LOG_KV("calculators", m_calculatorParties.size());
174175
auto message = m_config->psiMsgFactory()->createPSIMessage(
175176
uint32_t(EcdhMultiPSIMessageType::SEND_ENCRYPTED_SET_TO_CALCULATOR));

cpp/wedpr-computing/ppc-psi/src/ecdh-multi-psi/core/EcdhMultiPSIPartner.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,14 @@ void EcdhMultiPSIPartner::onReceiveRandomA(bcos::bytesPointer _randA)
9595
cipherData[i] = m_config->eccCrypto()->ecMultiply(point, *_randA);
9696
}
9797
});
98-
ECDH_PARTNER_LOG(INFO) << LOG_DESC("blindData: encode parterner cipher success")
99-
<< LOG_KV("size", dataBatch->size())
100-
<< LOG_KV("timecost", (utcSteadyTime() - startT))
101-
<< printTaskInfo(m_taskState->task());
98+
ECDH_PARTNER_LOG(INFO)
99+
<< LOG_DESC("blindData: encode parterner cipher success") << LOG_KV("seq", seq)
100+
<< LOG_KV("task", m_taskState->task()->id()) << LOG_KV("size", dataBatch->size())
101+
<< LOG_KV("timecost", (utcSteadyTime() - startT));
102102

103103
ECDH_PARTNER_LOG(INFO)
104-
<< LOG_DESC("blindData: send cipher data to master")
105-
<< LOG_KV("size", dataBatch->size()) << printTaskInfo(m_taskState->task());
104+
<< LOG_DESC("blindData: send cipher data to master") << LOG_KV("seq", seq)
105+
<< LOG_KV("size", dataBatch->size()) << LOG_KV("task", m_taskState->task()->id());
106106
auto message = m_config->psiMsgFactory()->createPSIMessage(
107107
uint32_t(EcdhMultiPSIMessageType::SEND_ENCRYPTED_SET_TO_MASTER_FROM_PARTNER));
108108
message->setData(std::move(cipherData));
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
/**
2+
* Copyright (C) 2022 WeDPR.
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* @file TaskGuarder.cpp
17+
* @author: shawnhe
18+
* @date 2022-01-07
19+
*/
20+
#include "TaskGuarder.h"
21+
22+
using namespace ppc::psi;
23+
using namespace ppc::protocol;
24+
using namespace ppc::io;
25+
26+
27+
bcos::Error::Ptr TaskGuarder::checkTask(const Task::ConstPtr& _task, uint16_t _partiesCount,
28+
bool _enforceSelfInput, bool _enforceSelfOutput, bool _enforcePeerResource,
29+
bool _enforceSelfResource)
30+
{
31+
{
32+
// check task id
33+
bcos::ReadGuard l(x_pendingTasks);
34+
if (m_pendingTasks.contains(_task->id()))
35+
{
36+
return std::make_shared<bcos::Error>(
37+
(int)PSIRetCode::DuplicatedTask, "task already exists");
38+
}
39+
}
40+
41+
// check self party
42+
auto const& selfParty = _task->selfParty();
43+
if (!selfParty)
44+
{
45+
return std::make_shared<bcos::Error>(
46+
(int)PSIRetCode::TaskParamsError, "must specify self party info");
47+
}
48+
if (selfParty->partyIndex() == (uint16_t)protocol::PartyType::Client)
49+
{
50+
_enforceSelfInput = true;
51+
_enforceSelfOutput = true;
52+
}
53+
54+
// check self data resource
55+
if (_enforceSelfResource)
56+
{
57+
auto const& dataResource = selfParty->dataResource();
58+
if (!dataResource)
59+
{
60+
return std::make_shared<bcos::Error>(
61+
(int)PSIRetCode::TaskParamsError, "no data resource specified for self party");
62+
}
63+
auto originData = dataResource->rawData();
64+
if (_enforceSelfInput && originData.empty() && !dataResource->desc())
65+
{
66+
return std::make_shared<bcos::Error>(
67+
(int)PSIRetCode::TaskParamsError, "no input resource specified for self party");
68+
}
69+
if (_enforceSelfOutput && originData.empty() && !dataResource->outputDesc())
70+
{
71+
return std::make_shared<bcos::Error>(
72+
(int)PSIRetCode::TaskParamsError, "no output resource specified for self party");
73+
}
74+
}
75+
76+
// check party index
77+
std::set<uint16_t> indexSet;
78+
auto index = selfParty->partyIndex();
79+
if (_partiesCount > 1 && index >= _partiesCount)
80+
{
81+
return std::make_shared<bcos::Error>(
82+
(int)PSIRetCode::TaskParamsError, "invalid partyIndex: " + std::to_string(index));
83+
}
84+
indexSet.insert(index);
85+
86+
// check the peer size
87+
auto const& peerParties = _task->getAllPeerParties();
88+
if (peerParties.size() != uint(_partiesCount - 1))
89+
{
90+
std::string errorMessage = "expected parties count: " + std::to_string(_partiesCount) +
91+
", current is " + std::to_string(peerParties.size());
92+
return std::make_shared<bcos::Error>((int)PSIRetCode::TaskParamsError, errorMessage);
93+
}
94+
95+
// check the party index of peers
96+
for (auto& it : peerParties)
97+
{
98+
index = it.second->partyIndex();
99+
if (_partiesCount > 1 && index >= _partiesCount)
100+
{
101+
return std::make_shared<bcos::Error>(
102+
(int)PSIRetCode::TaskParamsError, "invalid partyIndex: " + std::to_string(index));
103+
}
104+
if (indexSet.contains(index))
105+
{
106+
return std::make_shared<bcos::Error>(
107+
(int)PSIRetCode::TaskParamsError, "repeated party index: " + std::to_string(index));
108+
}
109+
indexSet.insert(index);
110+
}
111+
112+
// check the data resource of peer
113+
if (!_enforcePeerResource)
114+
{
115+
return nullptr;
116+
}
117+
for (auto& it : peerParties)
118+
{
119+
if (!it.second->dataResource())
120+
{
121+
return std::make_shared<bcos::Error>(
122+
(int)PSIRetCode::NotSpecifyPeerDataResource, "must specify the peer data resource");
123+
}
124+
}
125+
return nullptr;
126+
}
127+
128+
129+
void TaskGuarder::noticePeerToFinish(const Task::ConstPtr& _task)
130+
{
131+
auto const& peerParties = _task->getAllPeerParties();
132+
if (peerParties.empty())
133+
{
134+
return;
135+
}
136+
137+
for (const auto& peer : peerParties)
138+
{
139+
noticePeerToFinish(_task->id(), peer.first);
140+
}
141+
}
142+
143+
void TaskGuarder::noticePeerToFinish(const std::string& _taskID, const std::string& _peer)
144+
{
145+
PSI_LOG(INFO) << LOG_DESC("noticePeerToFinish") << LOG_KV("task", _taskID)
146+
<< LOG_KV("peer", _peer);
147+
try
148+
{
149+
if (_peer.empty())
150+
{
151+
return;
152+
}
153+
154+
auto message = m_config->ppcMsgFactory()->buildPPCMessage(uint8_t(protocol::TaskType::PSI),
155+
uint8_t(m_type), _taskID, std::make_shared<bcos::bytes>());
156+
message->setMessageType(uint8_t(CommonMessageType::ErrorNotification));
157+
m_config->front()->asyncSendMessage(
158+
_peer, message, m_config->networkTimeout(), [](const bcos::Error::Ptr&) {}, nullptr);
159+
}
160+
catch (std::exception& e)
161+
{
162+
PSI_LOG(ERROR) << LOG_DESC("noticePeerToFinish")
163+
<< LOG_KV("exception", boost::diagnostic_information(e));
164+
}
165+
}
166+
167+
void TaskGuarder::checkPeerActivity()
168+
{
169+
bcos::ReadGuard l(x_pendingTasks);
170+
for (auto const& it : m_pendingTasks)
171+
{
172+
if (it.second->onlySelfRun())
173+
{
174+
// no need to check peer
175+
continue;
176+
}
177+
178+
auto task = it.second->task();
179+
auto const& peerParties = task->getAllPeerParties();
180+
if (peerParties.empty())
181+
{
182+
continue;
183+
}
184+
185+
for (const auto& peer : peerParties)
186+
{
187+
auto message =
188+
m_config->ppcMsgFactory()->buildPPCMessage(uint8_t(protocol::TaskType::PSI),
189+
uint8_t(m_type), task->id(), std::make_shared<bcos::bytes>());
190+
message->setMessageType(uint8_t(CommonMessageType::PingPeer));
191+
m_config->front()->asyncSendMessage(
192+
peer.first, message, m_config->networkTimeout(),
193+
[this, task, peerID = peer.first](bcos::Error::Ptr&& _error) {
194+
if (!_error || _error->errorCode() == 0)
195+
{
196+
return;
197+
}
198+
onSelfError(task->id(),
199+
std::make_shared<bcos::Error>(
200+
(int)PSIRetCode::PeerNodeDown, "peer node is down, id: " + peerID),
201+
false);
202+
},
203+
nullptr);
204+
}
205+
}
206+
m_pingTimer->restart();
207+
}
208+
209+
210+
LineReader::Ptr TaskGuarder::loadReader(std::string const& _taskID,
211+
protocol::DataResource::ConstPtr const& _dataResource, DataSchema _dataSchema,
212+
uint32_t _columnSize)
213+
{
214+
if (!_dataResource->rawData().empty())
215+
{
216+
return nullptr;
217+
}
218+
auto reader =
219+
m_config->dataResourceLoader()->loadReader(_dataResource->desc(), _dataSchema, true);
220+
221+
if (reader->columnSize() == 0 ||
222+
(reader->type() == DataResourceType::MySQL && reader->columnSize() != _columnSize))
223+
{
224+
auto errorMsg = "load data for task " + _taskID + "failed, expect " +
225+
std::to_string(_columnSize) + " column, current column size is " +
226+
std::to_string(reader->columnSize());
227+
BOOST_THROW_EXCEPTION(BCOS_ERROR((int)PSIRetCode::LoadDataFailed, errorMsg));
228+
}
229+
return reader;
230+
}
231+
232+
LineWriter::Ptr TaskGuarder::loadWriter(std::string const& _taskID,
233+
DataResource::ConstPtr const& _dataResource, bool _enableOutputExists)
234+
{
235+
if (!_dataResource->rawData().empty())
236+
{
237+
return nullptr;
238+
}
239+
if (!_enableOutputExists)
240+
{
241+
// Note: if the output-resource already exists, will throw exception
242+
m_config->dataResourceLoader()->checkResourceExists(_dataResource->outputDesc());
243+
}
244+
return m_config->dataResourceLoader()->loadWriter(_dataResource->outputDesc(), true);
245+
}
246+
247+
TaskState::Ptr TaskGuarder::findPendingTask(const std::string& _taskID)
248+
{
249+
bcos::ReadGuard l(x_pendingTasks);
250+
auto it = m_pendingTasks.find(_taskID);
251+
if (it == m_pendingTasks.end())
252+
{
253+
return nullptr;
254+
}
255+
return it->second;
256+
}
257+
258+
void TaskGuarder::addPendingTask(TaskState::Ptr _taskState)
259+
{
260+
bcos::WriteGuard l(x_pendingTasks);
261+
auto id = _taskState->task()->id();
262+
m_pendingTasks[id] = std::move(_taskState);
263+
}
264+
265+
void TaskGuarder::removePendingTask(const std::string& _taskID)
266+
{
267+
bcos::WriteGuard l(x_pendingTasks);
268+
auto it = m_pendingTasks.find(_taskID);
269+
if (it != m_pendingTasks.end())
270+
{
271+
m_pendingTasks.erase(it);
272+
}
273+
}

0 commit comments

Comments
 (0)