Skip to content

Commit 8c45405

Browse files
committed
optimize memory use
1 parent acb76e9 commit 8c45405

File tree

18 files changed

+162
-65
lines changed

18 files changed

+162
-65
lines changed

cpp/ppc-framework/io/DataBatch.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#pragma once
2121
#include "../Common.h"
2222
#include <bcos-utilities/Common.h>
23+
#include <gperftools/malloc_extension.h>
2324
#include <boost/lexical_cast.hpp>
2425
#include <algorithm>
2526
#include <memory>
@@ -207,7 +208,10 @@ class DataBatch
207208
{
208209
return;
209210
}
211+
m_data->clear();
210212
m_data.reset();
213+
// free after release
214+
MallocExtension::instance()->ReleaseFreeMemory();
211215
}
212216

213217
private:

cpp/ppc-framework/protocol/Message.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ class MessageHeader
187187
class Message
188188
{
189189
public:
190+
const static size_t LARGER_MSG_THRESHOLD = 30 * 1024 * 1024;
191+
190192
using Ptr = std::shared_ptr<Message>;
191193
Message() = default;
192194
virtual ~Message() {}
@@ -234,6 +236,19 @@ class Message
234236

235237
virtual std::shared_ptr<bcos::bytes> payload() const { return m_payload; }
236238

239+
void releasePayload()
240+
{
241+
if (m_payload)
242+
{
243+
m_payload->clear();
244+
bcos::bytes().swap(*m_payload);
245+
}
246+
if (m_frontMessage)
247+
{
248+
m_frontMessage->releasePayload();
249+
}
250+
}
251+
237252
protected:
238253
MessageHeader::Ptr m_header;
239254
// Note: allocate here in case of wsService nullptr access caused coredump

cpp/ppc-framework/protocol/MessagePayload.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ class MessagePayload
7777

7878
virtual bool isRespPacket() { return m_ext &= (uint16_t)FrontMsgExtFlag::Response; }
7979

80+
void releasePayload()
81+
{
82+
m_data.clear();
83+
bcos::bytes().swap(m_data);
84+
}
85+
8086
protected:
8187
// the front payload version, used to support compatibility
8288
// Note: must init here to 0, otherwise, it will be unexpected value in some other platform

cpp/wedpr-computing/ppc-psi/src/ecdh-multi-psi/EcdhMultiCache.cpp

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ using namespace bcos;
2727
void MasterCache::addCalculatorCipher(std::string _peerId,
2828
std::map<uint32_t, bcos::bytes>&& _cipherData, uint32_t seq, uint32_t dataBatchCount)
2929
{
30+
auto peerIndex = getPeerIndex(_peerId);
31+
if (peerIndex == -1)
32+
{
33+
ECDH_MULTI_LOG(WARNING) << LOG_DESC("Invalid calculator") << LOG_KV("peer", _peerId);
34+
return;
35+
}
3036
bcos::Guard l(m_mutex);
3137
m_calculatorCipherSeqs.insert(seq);
3238
if (dataBatchCount)
@@ -35,7 +41,7 @@ void MasterCache::addCalculatorCipher(std::string _peerId,
3541
}
3642
for (auto&& it : _cipherData)
3743
{
38-
updateMasterDataRef(_peerId, std::move(it.second), it.first);
44+
updateMasterDataRef(peerIndex, std::move(it.second), it.first);
3945
}
4046
// try to merge the
4147
if (m_calculatorDataBatchCount > 0 &&
@@ -46,18 +52,22 @@ void MasterCache::addCalculatorCipher(std::string _peerId,
4652
<< LOG_KV("masterData", m_masterDataRef.size()) << printCacheState();
4753
m_finishedPartners.insert(_peerId);
4854
// try to merge
49-
mergeMasterCipher(_peerId);
55+
mergeMasterCipher(_peerId, peerIndex);
5056
}
5157
ECDH_MULTI_LOG(INFO) << LOG_DESC(
5258
"addCalculatorCipher: master receive cipher data from calculator")
5359
<< LOG_KV("calculator", _peerId) << printCacheState()
5460
<< LOG_KV("receivedSize", _cipherData.size())
5561
<< LOG_KV("masterData", m_masterDataRef.size())
5662
<< LOG_KV("dataBatchCount", m_calculatorDataBatchCount);
63+
// release the cipherData
64+
_cipherData.clear();
65+
std::map<uint32_t, bcos::bytes>().swap(_cipherData);
66+
MallocExtension::instance()->ReleaseFreeMemory();
5767
}
5868

5969
void MasterCache::updateMasterDataRef(
60-
std::string const& _peerId, bcos::bytes&& data, int32_t dataIndex)
70+
unsigned short _peerIndex, bcos::bytes&& data, int32_t dataIndex)
6171
{
6272
// not merged case
6373
if (!m_peerMerged)
@@ -66,21 +76,21 @@ void MasterCache::updateMasterDataRef(
6676
if (!m_masterDataRef.count(data))
6777
{
6878
MasterCipherRef ref;
69-
ref.refInfo.insert(_peerId);
79+
ref.refInfo.insert(_peerIndex);
7080
ref.updateDataIndex(dataIndex);
7181
m_masterDataRef.insert(std::make_pair(std::move(data), ref));
7282
return;
7383
}
7484
// existed data case
75-
m_masterDataRef[data].refInfo.insert(_peerId);
85+
m_masterDataRef[data].refInfo.insert(_peerIndex);
7686
m_masterDataRef[data].updateDataIndex(dataIndex);
7787
return;
7888
}
7989

80-
// merged case, only record the intersection case
90+
// merged case, only record the intersection case, increase the refCount
8191
if (m_masterDataRef.count(data))
8292
{
83-
m_masterDataRef[data].refInfo.insert(_peerId);
93+
m_masterDataRef[data].refCount += 1;
8494
m_masterDataRef[data].updateDataIndex(dataIndex);
8595
}
8696
}
@@ -89,18 +99,27 @@ void MasterCache::updateMasterDataRef(
8999
void MasterCache::addPartnerCipher(std::string _peerId, std::vector<bcos::bytes>&& _cipherData,
90100
uint32_t seq, uint32_t parternerDataCount)
91101
{
102+
auto peerIndex = getPeerIndex(_peerId);
103+
if (peerIndex == -1)
104+
{
105+
ECDH_MULTI_LOG(WARNING) << LOG_DESC("Invalid peerId") << LOG_KV("peer", _peerId);
106+
return;
107+
}
92108
bcos::Guard lock(m_mutex);
93109
// record the data-ref-count
94110
for (auto&& data : _cipherData)
95111
{
96-
updateMasterDataRef(_peerId, std::move(data), -1);
112+
updateMasterDataRef(peerIndex, std::move(data), -1);
97113
}
98114
m_partnerCipherSeqs[_peerId].insert(seq);
99115
ECDH_MULTI_LOG(INFO) << LOG_DESC("addPartnerCipher") << LOG_KV("partner", _peerId)
100116
<< LOG_KV("seqSize", m_partnerCipherSeqs.at(_peerId).size())
101117
<< LOG_KV("cipherDataSize", _cipherData.size())
102118
<< LOG_KV("masterDataSize", m_masterDataRef.size())
103119
<< LOG_KV("parternerDataCount", parternerDataCount) << printCacheState();
120+
_cipherData.clear();
121+
std::vector<bcos::bytes>().swap(_cipherData);
122+
MallocExtension::instance()->ReleaseFreeMemory();
104123
if (parternerDataCount > 0)
105124
{
106125
m_parternerDataCount.insert(std::make_pair(_peerId, parternerDataCount));
@@ -114,11 +133,11 @@ void MasterCache::addPartnerCipher(std::string _peerId, std::vector<bcos::bytes>
114133
{
115134
m_finishedPartners.insert(_peerId);
116135
// merge when find the send-finished peer
117-
mergeMasterCipher(_peerId);
136+
mergeMasterCipher(_peerId, peerIndex);
118137
}
119138
}
120139

121-
void MasterCache::mergeMasterCipher(std::string const& peer)
140+
void MasterCache::mergeMasterCipher(std::string const& peerId, unsigned short peerIndex)
122141
{
123142
if (m_peerMerged)
124143
{
@@ -131,22 +150,28 @@ void MasterCache::mergeMasterCipher(std::string const& peer)
131150
}
132151
ECDH_MULTI_LOG(INFO) << LOG_DESC("Receive whole data from peer, mergeMasterCipher")
133152
<< LOG_KV("distinct-masterDataSize-before-merge", m_masterDataRef.size())
134-
<< LOG_KV("finishedPeer", peer) << LOG_KV("partnerCount", m_peerCount);
153+
<< LOG_KV("finishedPeer", peerId) << LOG_KV("partnerCount", m_peerCount);
135154
auto startT = utcSteadyTime();
136155
for (auto it = m_masterDataRef.begin(); it != m_masterDataRef.end();)
137156
{
138157
// not has intersect-element with the finished peer
139-
if (!it->second.refInfo.count(peer))
158+
if (!it->second.refInfo.count(peerIndex))
140159
{
141160
it = m_masterDataRef.erase(it);
142161
continue;
143162
}
163+
// set the refCount
164+
it->second.refCount = it->second.refInfo.size();
165+
// release the refInfo
166+
std::set<unsigned short>().swap(it->second.refInfo);
144167
it++;
145168
}
146169
m_peerMerged = true;
170+
// release the free memory after merged
171+
MallocExtension::instance()->ReleaseFreeMemory();
147172
ECDH_MULTI_LOG(INFO) << LOG_DESC("mergeMasterCipher finished")
148173
<< LOG_KV("distinct-masterDataSize-after-merge", m_masterDataRef.size())
149-
<< LOG_KV("finishedPeer", peer)
174+
<< LOG_KV("finishedPeer", peerId) << LOG_KV("peerIndex", peerIndex)
150175
<< LOG_KV("timecost", (utcSteadyTime() - startT));
151176
}
152177

@@ -169,7 +194,7 @@ bool MasterCache::tryToIntersection()
169194
{
170195
continue;
171196
}
172-
if (m_masterDataRef.at(it.first).refInfo.size() != m_peerCount)
197+
if (m_masterDataRef.at(it.first).refCount != m_peerCount)
173198
{
174199
continue;
175200
}
@@ -366,7 +391,10 @@ bool CalculatorCache::appendMasterCipher(
366391
}
367392
ECDH_MULTI_LOG(INFO) << LOG_DESC("appendMasterCipher") << LOG_KV("dataSize", _cipherData.size())
368393
<< LOG_KV("cipherRefSize", m_cipherRef.size()) << printCacheState();
369-
394+
// release the cipherData
395+
_cipherData.clear();
396+
std::vector<bcos::bytes>().swap(_cipherData);
397+
MallocExtension::instance()->ReleaseFreeMemory();
370398
return m_receiveAllMasterCipher;
371399
}
372400

@@ -383,4 +411,8 @@ void CalculatorCache::setIntersectionCipher(std::map<uint32_t, bcos::bytes>&& _c
383411
m_receiveIntersection = true;
384412
ECDH_MULTI_LOG(INFO) << LOG_DESC("setIntersectionCipher finshed")
385413
<< LOG_KV("cipherRefSize", m_cipherRef.size()) << printCacheState();
414+
// release the cipherData
415+
_cipherData.clear();
416+
std::map<uint32_t, bcos::bytes>().swap(_cipherData);
417+
MallocExtension::instance()->ReleaseFreeMemory();
386418
}

cpp/wedpr-computing/ppc-psi/src/ecdh-multi-psi/EcdhMultiCache.h

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ namespace ppc::psi
3030
{
3131
struct MasterCipherRef
3232
{
33-
std::set<std::string> refInfo;
33+
std::set<unsigned short> refInfo;
34+
unsigned short refCount = 0;
3435
int32_t dataIndex = -1;
3536

3637
void updateDataIndex(int32_t index)
@@ -51,7 +52,13 @@ class MasterCache
5152
: m_taskState(taskState),
5253
m_config(config),
5354
m_peerCount(m_taskState->task()->getAllPeerParties().size())
54-
{}
55+
{
56+
for (auto const& it : m_taskState->task()->getAllPeerParties())
57+
{
58+
m_peers.emplace_back(it.first);
59+
}
60+
}
61+
5562
virtual ~MasterCache()
5663
{
5764
releaseItersection();
@@ -126,13 +133,27 @@ class MasterCache
126133
<< LOG_KV("taskID", m_taskState->task()->id());
127134
}
128135

129-
void mergeMasterCipher(std::string const& peer);
130-
void updateMasterDataRef(std::string const& _peerId, bcos::bytes&& data, int32_t dataIndex);
136+
void mergeMasterCipher(std::string const& peerId, unsigned short peerIndex);
137+
void updateMasterDataRef(unsigned short _peerIndex, bcos::bytes&& data, int32_t dataIndex);
138+
139+
signed short getPeerIndex(std::string const& peer)
140+
{
141+
for (unsigned short i = 0; i < m_peers.size(); i++)
142+
{
143+
if (m_peers[i] == peer)
144+
{
145+
return i;
146+
}
147+
}
148+
return -1;
149+
}
131150

132151
private:
133152
TaskState::Ptr m_taskState;
134153
EcdhMultiPSIConfig::Ptr m_config;
135154
unsigned short m_peerCount;
155+
std::vector<std::string> m_peers;
156+
136157
CacheState m_cacheState = CacheState::Evaluating;
137158

138159
// the intersection cipher data of the master

cpp/wedpr-computing/ppc-psi/src/ecdh-multi-psi/EcdhMultiPSIImpl.cpp

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "EcdhMultiPSIImpl.h"
22
#include "Common.h"
3+
#include <gperftools/malloc_extension.h>
34

45
using namespace ppc::psi;
56
using namespace ppc::protocol;
@@ -313,6 +314,14 @@ void EcdhMultiPSIImpl::executeWorker()
313314
psiMsg->setUUID(pop_msg->uuid());
314315
ECDH_MULTI_LOG(TRACE) << LOG_DESC("onReceiveMessage") << printPSIMessage(psiMsg)
315316
<< LOG_KV("uuid", psiMsg->uuid());
317+
// release the larger payload immediately
318+
if (payLoad.size() >= ppc::protocol::Message::LARGER_MSG_THRESHOLD)
319+
{
320+
ECDH_MULTI_LOG(INFO) << LOG_DESC("Release larger message payload")
321+
<< LOG_KV("size", payLoad.size());
322+
pop_msg->releasePayload();
323+
MallocExtension::instance()->ReleaseFreeMemory();
324+
}
316325
handlerPSIReceiveMessage(psiMsg);
317326
return;
318327
}
@@ -321,8 +330,6 @@ void EcdhMultiPSIImpl::executeWorker()
321330

322331
void EcdhMultiPSIImpl::onReceiveRandomA(PSIMessageInterface::Ptr _msg)
323332
{
324-
ECDH_MULTI_LOG(INFO) << LOG_DESC("onReceiveRandomA") << printPSIMessage(_msg);
325-
auto startT = utcSteadyTime();
326333
auto partner = findPartner(_msg->taskID());
327334
if (partner)
328335
{
@@ -337,8 +344,6 @@ void EcdhMultiPSIImpl::onReceiveRandomA(PSIMessageInterface::Ptr _msg)
337344

338345
void EcdhMultiPSIImpl::onReceiveCalCipher(PSIMessageInterface::Ptr _msg)
339346
{
340-
ECDH_MULTI_LOG(INFO) << LOG_DESC("onReceiveCalCipher") << printPSIMessage(_msg);
341-
auto startT = utcSteadyTime();
342347
auto master = findMaster(_msg->taskID());
343348
if (master)
344349
{
@@ -348,8 +353,6 @@ void EcdhMultiPSIImpl::onReceiveCalCipher(PSIMessageInterface::Ptr _msg)
348353

349354
void EcdhMultiPSIImpl::onReceiveCipherFromPartner(PSIMessageInterface::Ptr _msg)
350355
{
351-
ECDH_MULTI_LOG(INFO) << LOG_DESC("onReceiveCipherFromPartner") << printPSIMessage(_msg);
352-
auto startT = utcSteadyTime();
353356
auto master = findMaster(_msg->taskID());
354357
if (master)
355358
{
@@ -359,8 +362,6 @@ void EcdhMultiPSIImpl::onReceiveCipherFromPartner(PSIMessageInterface::Ptr _msg)
359362

360363
void EcdhMultiPSIImpl::onReceiveIntersecCipher(PSIMessageInterface::Ptr _msg)
361364
{
362-
ECDH_MULTI_LOG(INFO) << LOG_DESC("onReceiveIntersecCipher") << printPSIMessage(_msg);
363-
auto startT = utcSteadyTime();
364365
auto calculator = findCalculator(_msg->taskID());
365366
if (calculator)
366367
{
@@ -370,8 +371,6 @@ void EcdhMultiPSIImpl::onReceiveIntersecCipher(PSIMessageInterface::Ptr _msg)
370371

371372
void EcdhMultiPSIImpl::onReceiveMasterCipher(PSIMessageInterface::Ptr _msg)
372373
{
373-
ECDH_MULTI_LOG(INFO) << LOG_DESC("onReceiveMasterCipher") << printPSIMessage(_msg);
374-
auto startT = utcSteadyTime();
375374
auto calculator = findCalculator(_msg->taskID());
376375
if (calculator)
377376
{

0 commit comments

Comments
 (0)