Skip to content

Commit 6453f5c

Browse files
committed
update generated sdk
1 parent fbdba98 commit 6453f5c

File tree

26 files changed

+1221
-956
lines changed

26 files changed

+1221
-956
lines changed
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*
16-
* @file ServiceType.h
16+
* @file Constant.h
1717
* @author: yujiechen
1818
* @date 2024-11-07
1919
*/
@@ -26,4 +26,5 @@ namespace ppc::protocol
2626
{
2727
const static std::string PSI_SERVICE_TYPE = "PSI";
2828
const static std::string MPC_SERVICE_TYPE = "MPC";
29+
const static size_t LARGE_MSG_THRESHOLD = 30 * 1024 * 1024;
2930
} // namespace ppc::protocol

cpp/ppc-framework/protocol/GrpcConfig.h

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -89,29 +89,45 @@ class GrpcConfig
8989
m_compressAlgorithm = compressAlgorithm;
9090
}
9191

92+
uint64_t maxMsgSize() const { return m_maxMsgSize; }
93+
void setMaxMsgSize(uint64_t maxMsgSize)
94+
{
95+
if (maxMsgSize > c_maxMsgSize)
96+
{
97+
BOOST_THROW_EXCEPTION(WeDPRException() << bcos::errinfo_comment(
98+
"The maxMsgSize limit is " + std::to_string(c_maxMsgSize)));
99+
}
100+
m_maxMsgSize = maxMsgSize;
101+
}
102+
92103
protected:
93104
bool m_enableHealthCheck = true;
94105
std::string m_loadBalancePolicy = "round_robin";
95106
bool m_enableDnslookup = false;
107+
96108
// Note: grpc use int to set the maxMsgSize
97109
uint64_t const c_maxMsgSize = INT_MAX;
98110

99111
// the max send message size in bytes
100112
uint64_t m_maxSendMessageSize = c_maxMsgSize;
101113
// the max received message size in bytes
102114
uint64_t m_maxReceivedMessageSize = c_maxMsgSize;
115+
// the max msg size
116+
uint64_t m_maxMsgSize = c_maxMsgSize;
103117
int m_compressAlgorithm = 0;
104118
};
105119

106-
class GrpcServerConfig : public GrpcConfig
120+
class GrpcServerConfig
107121
{
108122
public:
109123
using Ptr = std::shared_ptr<GrpcServerConfig>;
110-
GrpcServerConfig() = default;
111-
GrpcServerConfig(EndPoint endPoint, bool enableHealthCheck)
112-
: m_endPoint(std::move(endPoint)), m_enableHealthCheck(enableHealthCheck)
113-
{}
114-
~GrpcServerConfig() override = default;
124+
GrpcServerConfig() { m_grpcConfig = std::make_shared<GrpcConfig>(); }
125+
GrpcServerConfig(EndPoint endPoint, bool enableHealthCheck) : GrpcServerConfig()
126+
{
127+
m_endPoint = std::move(endPoint);
128+
m_enableHealthCheck = enableHealthCheck;
129+
}
130+
virtual ~GrpcServerConfig() = default;
115131

116132
std::string listenEndPoint() const { return m_endPoint.listenEndPoint(); }
117133

@@ -122,21 +138,13 @@ class GrpcServerConfig : public GrpcConfig
122138
EndPoint& mutableEndPoint() { return m_endPoint; }
123139
bool enableHealthCheck() const { return m_enableHealthCheck; }
124140

125-
uint64_t maxMsgSize() const { return m_maxMsgSize; }
126-
void setMaxMsgSize(uint64_t maxMsgSize)
127-
{
128-
if (maxMsgSize > c_maxMsgSize)
129-
{
130-
BOOST_THROW_EXCEPTION(WeDPRException() << bcos::errinfo_comment(
131-
"The maxMsgSize limit is " + std::to_string(c_maxMsgSize)));
132-
}
133-
m_maxMsgSize = maxMsgSize;
134-
}
141+
GrpcConfig::Ptr const& grpcConfig() const { return m_grpcConfig; }
135142

136143
protected:
137144
ppc::protocol::EndPoint m_endPoint;
138145
bool m_enableHealthCheck = true;
139-
uint64_t m_maxMsgSize = c_maxMsgSize;
146+
// the grpc config
147+
GrpcConfig::Ptr m_grpcConfig;
140148
};
141149

142150

cpp/ppc-framework/protocol/Message.h

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,6 @@ class MessageHeader
187187
class Message
188188
{
189189
public:
190-
const static size_t LARGER_MSG_THRESHOLD = 30 * 1024 * 1024;
191-
192190
using Ptr = std::shared_ptr<Message>;
193191
Message() = default;
194192
virtual ~Message() {}
@@ -207,7 +205,14 @@ class Message
207205

208206
bool isRespPacket() const { return m_header->isRespPacket(); }
209207
void setRespPacket() { m_header->setRespPacket(); }
210-
void setPayload(std::shared_ptr<bcos::bytes> _payload) { m_payload = std::move(_payload); }
208+
void setPayload(std::shared_ptr<bcos::bytes> _payload)
209+
{
210+
m_payload = std::move(_payload);
211+
if (m_payload)
212+
{
213+
m_payloadLen = m_payload->size();
214+
}
215+
}
211216
// for swig wrapper
212217
OutputBuffer payloadBuffer() const
213218
{
@@ -218,9 +223,18 @@ class Message
218223
return OutputBuffer{(unsigned char*)m_payload->data(), m_payload->size()};
219224
}
220225

221-
void setFrontMessage(MessagePayload::Ptr frontMessage)
226+
void setFrontMessage(MessagePayload::Ptr frontMessage, bool releasePayload = false)
222227
{
223228
m_frontMessage = std::move(frontMessage);
229+
if (!releasePayload)
230+
{
231+
return;
232+
}
233+
if (m_payload)
234+
{
235+
m_payload->clear();
236+
bcos::bytes().swap(*m_payload);
237+
}
224238
}
225239

226240
MessagePayload::Ptr const& frontMessage() const { return m_frontMessage; }
@@ -229,10 +243,7 @@ class Message
229243
virtual bool encode(bcos::bytes& _buffer) = 0;
230244
virtual int64_t decode(bcos::bytesConstRef _buffer) = 0;
231245

232-
virtual uint32_t length() const
233-
{
234-
return m_header->length() + (m_payload ? m_payload->size() : 0);
235-
}
246+
virtual uint32_t length() const { return m_header->length() + m_payloadLen; }
236247

237248
virtual std::shared_ptr<bcos::bytes> payload() const { return m_payload; }
238249

@@ -253,6 +264,9 @@ class Message
253264
MessageHeader::Ptr m_header;
254265
// Note: allocate here in case of wsService nullptr access caused coredump
255266
std::shared_ptr<bcos::bytes> m_payload = std::make_shared<bcos::bytes>();
267+
uint64_t m_payloadLen = 0;
268+
;
269+
256270
MessagePayload::Ptr m_frontMessage = nullptr;
257271
};
258272

cpp/wedpr-computing/ppc-psi/src/PSIConfig.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
#include "ppc-framework/Helper.h"
2424
#include "ppc-framework/front/FrontInterface.h"
2525
#include "ppc-framework/io/DataResourceLoader.h"
26+
#include "ppc-framework/protocol/Constant.h"
2627
#include "ppc-framework/protocol/Protocol.h"
2728
#include "psi-framework/interfaces/PSIMessageInterface.h"
29+
#include <gperftools/malloc_extension.h>
2830
#include <future>
2931
#include <utility>
3032

@@ -95,6 +97,14 @@ class PSIConfig
9597
}
9698
},
9799
_responseCallback);
100+
// release the large buffer if no-need to use
101+
if (ppcMsg->data() && ppcMsg->data()->size() > ppc::protocol::LARGE_MSG_THRESHOLD)
102+
{
103+
PSI_LOG(INFO) << LOG_DESC("sendMsg: Release large buffer since the message")
104+
<< LOG_KV("size", ppcMsg->data()->size());
105+
ppcMsg->releasePayload();
106+
MallocExtension::instance()->ReleaseFreeMemory();
107+
}
98108
}
99109

100110
void asyncSendResponse(bcos::bytes const& fromNode, std::string const& _taskID,

cpp/wedpr-computing/ppc-psi/src/ecdh-multi-psi/EcdhMultiCache.cpp

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,8 @@ bool MasterCache::tryToIntersection()
184184
}
185185
m_cacheState = CacheState::IntersectionProgressing;
186186

187-
ECDH_MULTI_LOG(INFO) << LOG_DESC("tryToIntersection ") << printCacheState()
188-
<< LOG_KV("masterData", m_masterDataRef.size());
187+
ECDH_MULTI_LOG(INFO) << LOG_DESC("* tryToIntersection ") << printCacheState()
188+
<< LOG_KV("* masterData", m_masterDataRef.size());
189189
auto startT = utcSteadyTime();
190190
// iterator the masterDataRef to obtain intersection
191191
for (auto&& it : m_masterDataRef)
@@ -208,9 +208,9 @@ bool MasterCache::tryToIntersection()
208208
}
209209
releaseCache();
210210
m_cacheState = CacheState::Intersectioned;
211-
ECDH_MULTI_LOG(INFO) << LOG_DESC("tryToIntersection success") << printCacheState()
212-
<< LOG_KV("intersectionSize", m_intersecCipher.size())
213-
<< LOG_KV("timecost", (utcSteadyTime() - startT));
211+
ECDH_MULTI_LOG(INFO) << LOG_DESC("* tryToIntersection success") << printCacheState()
212+
<< LOG_KV("* intersectionSize", m_intersecCipher.size())
213+
<< LOG_KV("* timecost", (utcSteadyTime() - startT));
214214
return true;
215215
}
216216

@@ -257,8 +257,8 @@ bool CalculatorCache::tryToFinalize()
257257
return false;
258258
}
259259
auto startT = utcSteadyTime();
260-
ECDH_MULTI_LOG(INFO) << LOG_DESC("tryToFinalize: compute intersection")
261-
<< LOG_KV("cipherRef", m_cipherRef.size()) << printCacheState();
260+
ECDH_MULTI_LOG(INFO) << LOG_DESC("* tryToFinalize: compute intersection")
261+
<< LOG_KV("* cipherRef", m_cipherRef.size()) << printCacheState();
262262
m_cacheState = CacheState::Finalizing;
263263
// find the intersection
264264
for (auto const& it : m_cipherRef)
@@ -273,28 +273,28 @@ bool CalculatorCache::tryToFinalize()
273273
}
274274
}
275275
m_cacheState = CacheState::Finalized;
276-
ECDH_MULTI_LOG(INFO) << LOG_DESC("tryToFinalize: compute intersection success")
277-
<< printCacheState() << LOG_KV("cipherRef", m_cipherRef.size())
278-
<< LOG_KV("intersectionSize", m_intersectionResult.size())
279-
<< LOG_KV("timecost", (utcSteadyTime() - startT));
276+
ECDH_MULTI_LOG(INFO) << LOG_DESC("* tryToFinalize: compute intersection success")
277+
<< printCacheState() << LOG_KV("* cipherRef", m_cipherRef.size())
278+
<< LOG_KV("* intersectionSize", m_intersectionResult.size())
279+
<< LOG_KV("* timecost", (utcSteadyTime() - startT));
280280

281281
releaseDataAfterFinalize();
282-
ECDH_MULTI_LOG(INFO) << LOG_DESC("tryToFinalize: syncIntersections") << printCacheState();
282+
ECDH_MULTI_LOG(INFO) << LOG_DESC("* tryToFinalize: syncIntersections") << printCacheState();
283283
m_cacheState = CacheState::Syncing;
284284
syncIntersections();
285285
m_cacheState = CacheState::Synced;
286286

287287
m_cacheState = CacheState::StoreProgressing;
288288
m_taskState->storePSIResult(m_config->dataResourceLoader(), m_intersectionResult);
289289
m_cacheState = CacheState::Stored;
290-
ECDH_MULTI_LOG(INFO) << LOG_DESC("tryToFinalize: syncIntersections and store success")
290+
ECDH_MULTI_LOG(INFO) << LOG_DESC("* tryToFinalize: syncIntersections and store success")
291291
<< printCacheState();
292292
return true;
293293
}
294294

295295
void CalculatorCache::syncIntersections()
296296
{
297-
ECDH_MULTI_LOG(INFO) << LOG_DESC("syncIntersections") << printCacheState();
297+
ECDH_MULTI_LOG(INFO) << LOG_DESC("*** syncIntersections **") << printCacheState();
298298
auto peers = m_taskState->task()->getAllPeerParties();
299299
auto taskID = m_taskState->task()->id();
300300
// notify task result

cpp/wedpr-computing/ppc-psi/src/ecdh-multi-psi/EcdhMultiPSIImpl.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "EcdhMultiPSIImpl.h"
22
#include "Common.h"
3+
#include "ppc-framework/protocol/Constant.h"
34
#include <gperftools/malloc_extension.h>
45

56
using namespace ppc::psi;
@@ -314,10 +315,10 @@ void EcdhMultiPSIImpl::executeWorker()
314315
psiMsg->setUUID(pop_msg->uuid());
315316
ECDH_MULTI_LOG(TRACE) << LOG_DESC("onReceiveMessage") << printPSIMessage(psiMsg)
316317
<< LOG_KV("uuid", psiMsg->uuid());
317-
// release the larger payload immediately
318-
if (payLoad->size() >= ppc::protocol::Message::LARGER_MSG_THRESHOLD)
318+
// release the large payload immediately
319+
if (payLoad && payLoad->size() >= ppc::protocol::LARGE_MSG_THRESHOLD)
319320
{
320-
ECDH_MULTI_LOG(INFO) << LOG_DESC("Release larger message payload")
321+
ECDH_MULTI_LOG(INFO) << LOG_DESC("Release large message payload")
321322
<< LOG_KV("size", payLoad->size());
322323
pop_msg->releasePayload();
323324
MallocExtension::instance()->ReleaseFreeMemory();

cpp/wedpr-computing/ppc-psi/src/ecdh-multi-psi/core/EcdhMultiPSIMaster.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,16 @@ void EcdhMultiPSIMaster::encAndSendIntersectionData()
9595
uint32_t(EcdhMultiPSIMessageType::SEND_ENCRYPTED_INTERSECTION_SET_TO_CALCULATOR));
9696
message->constructDataMap(encryptedData);
9797
message->setFrom(m_taskState->task()->selfParty()->id());
98+
ECDH_MASTER_LOG(INFO) << LOG_DESC("send intersection cipher to calculator")
99+
<< LOG_KV("taskID", m_taskState->task()->id())
100+
<< LOG_KV("intersectionSize", encryptedData.size());
101+
// release the encryptedData since the encryptedData maybe large in some cases
102+
encryptedData.clear();
103+
std::vector<std::pair<uint64_t, bcos::bytes>>().swap(encryptedData);
104+
MallocExtension::instance()->ReleaseFreeMemory();
105+
98106
for (auto& calcultor : m_calculatorParties)
99107
{
100-
ECDH_MASTER_LOG(INFO) << LOG_DESC("send intersection cipher to calculator")
101-
<< LOG_KV("taskID", m_taskState->task()->id())
102-
<< LOG_KV("intersectionSize", encryptedData.size())
103-
<< LOG_KV("target", calcultor.first);
104108
m_config->generateAndSendPPCMessage(calcultor.first, m_taskID, message,
105109
[self = weak_from_this()](bcos::Error::Ptr&& _error) {
106110
if (!_error)
@@ -193,6 +197,7 @@ void EcdhMultiPSIMaster::blindData()
193197
dataBatch->release();
194198
ECDH_MASTER_LOG(INFO) << LOG_DESC("blindData: encrypt data success")
195199
<< LOG_KV("dataSize", cipher.size()) << LOG_KV("task", m_taskID)
200+
<< LOG_KV("seq", seq)
196201
<< LOG_KV("timecost", (utcSteadyTime() - startT));
197202

198203
ECDH_MASTER_LOG(INFO) << LOG_DESC("blindData: send encrypted data to all calculator")

cpp/wedpr-computing/ppc-psi/src/ecdh-multi-psi/core/EcdhMultiPSIPartner.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,9 @@ void EcdhMultiPSIPartner::onReceiveRandomA(bcos::bytesPointer _randA)
7979
m_taskState->setFinished(true);
8080
}
8181
}
82-
ECDH_PARTNER_LOG(INFO)
83-
<< LOG_DESC("blindData: encode parterner cipher")
84-
<< LOG_KV("size", dataBatch->size()) << printTaskInfo(m_taskState->task());
82+
ECDH_PARTNER_LOG(INFO) << LOG_DESC("blindData: encode parterner cipher")
83+
<< LOG_KV("size", dataBatch->size()) << LOG_KV("seq", seq)
84+
<< LOG_KV("task", m_taskState->task()->id());
8585
auto startT = utcSteadyTime();
8686
std::vector<bcos::bytes> cipherData(dataBatch->size());
8787
tbb::parallel_for(

cpp/wedpr-computing/ppc-psi/src/psi-framework/PSIFramework.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
*/
2020
#include "PSIFramework.h"
2121
#include "../Common.h"
22+
#include "ppc-framework/protocol/Constant.h"
2223
#include "ppc-framework/protocol/GlobalConfig.h"
2324

2425
using namespace ppc::task;
@@ -154,6 +155,14 @@ void PSIFramework::onReceiveMessage(PPCMessageFace::Ptr _msg)
154155
m_msgQueue->push(psiMsg);
155156
PSI_FRAMEWORK_LOG(TRACE) << LOG_DESC("onReceiveMessage") << printPSIMessage(psiMsg)
156157
<< LOG_KV("uuid", _msg->uuid());
158+
// release the large payload immediately
159+
if (payLoad && payLoad->size() >= ppc::protocol::LARGE_MSG_THRESHOLD)
160+
{
161+
PSI_FRAMEWORK_LOG(INFO)
162+
<< LOG_DESC("Release large message payload") << LOG_KV("size", payLoad->size());
163+
_msg->releasePayload();
164+
MallocExtension::instance()->ReleaseFreeMemory();
165+
}
157166
// notify to handle the message
158167
m_signalled.notify_all();
159168
}

cpp/wedpr-computing/ppc-psi/src/psi-framework/TaskState.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class TaskState : public std::enable_shared_from_this<TaskState>
5050

5151
virtual ~TaskState()
5252
{
53-
PSI_LOG(INFO) << LOG_DESC("TaskState Destructor") << LOG_KV("task", m_task->id());
53+
PSI_LOG(INFO) << LOG_DESC("** TaskState Destructor") << LOG_KV("task", m_task->id());
5454
}
5555

5656
ppc::task::TaskResponseCallback const& callback() { return m_callback; }
@@ -368,8 +368,9 @@ class TaskState : public std::enable_shared_from_this<TaskState>
368368
m_writer->writeLine(dataBatch, ppc::io::DataSchema::Bytes);
369369
m_writer->flush();
370370
PSI_LOG(INFO) << LOG_DESC("**** storePSIResult success ****")
371-
<< LOG_KV("*task", m_task->id()) << LOG_KV("*IntersectionCount", _data.size())
372-
<< LOG_KV("TaskTimecost", taskPendingTime());
371+
<< LOG_KV("* task", m_task->id())
372+
<< LOG_KV("* IntersectionCount", _data.size())
373+
<< LOG_KV("* TaskTimecost", taskPendingTime());
373374
}
374375

375376
std::function<void()> takeFinalizeHandler() { return std::move(m_finalizeHandler); }

0 commit comments

Comments
 (0)