2424using namespace ppc ::psi;
2525using namespace bcos ;
2626
27- void MasterCache::addCalculatorCipher (std::string _peerId,
28- std::map< uint32_t , bcos::bytes>&& _cipherData , uint32_t seq, uint32_t dataBatchCount)
27+ void MasterCache::addCalculatorCipher (std::string _peerId, std::vector<bcos::bytes>&& _cipherData,
28+ std::vector< long > const & dataIndex , uint32_t seq, uint32_t dataBatchCount)
2929{
3030 auto peerIndex = getPeerIndex (_peerId);
3131 if (peerIndex == -1 )
@@ -39,9 +39,11 @@ void MasterCache::addCalculatorCipher(std::string _peerId,
3939 {
4040 m_calculatorDataBatchCount = dataBatchCount;
4141 }
42+ uint64_t i = 0 ;
4243 for (auto && it : _cipherData)
4344 {
44- updateMasterDataRef (peerIndex, std::move (it.second ), it.first );
45+ updateMasterDataRef (peerIndex, std::move (it), dataIndex[i]);
46+ i++;
4547 }
4648 // try to merge the
4749 if (m_calculatorDataBatchCount > 0 &&
@@ -62,7 +64,7 @@ void MasterCache::addCalculatorCipher(std::string _peerId,
6264 << LOG_KV (" dataBatchCount" , m_calculatorDataBatchCount);
6365 // release the cipherData
6466 _cipherData.clear ();
65- std::map< uint32_t , bcos::bytes>().swap (_cipherData);
67+ std::vector< bcos::bytes>().swap (_cipherData);
6668 MallocExtension::instance ()->ReleaseFreeMemory ();
6769}
6870
@@ -184,8 +186,8 @@ bool MasterCache::tryToIntersection()
184186 }
185187 m_cacheState = CacheState::IntersectionProgressing;
186188
187- ECDH_MULTI_LOG (INFO) << LOG_DESC (" tryToIntersection " ) << printCacheState ()
188- << LOG_KV (" masterData" , m_masterDataRef.size ());
189+ ECDH_MULTI_LOG (INFO) << LOG_DESC (" * tryToIntersection " ) << printCacheState ()
190+ << LOG_KV (" * masterData" , m_masterDataRef.size ());
189191 auto startT = utcSteadyTime ();
190192 // iterator the masterDataRef to obtain intersection
191193 for (auto && it : m_masterDataRef)
@@ -208,30 +210,32 @@ bool MasterCache::tryToIntersection()
208210 }
209211 releaseCache ();
210212 m_cacheState = CacheState::Intersectioned;
211- ECDH_MULTI_LOG (INFO) << LOG_DESC (" tryToIntersection success" ) << printCacheState ()
212- << LOG_KV (" intersectionSize" , m_intersecCipher.size ())
213- << LOG_KV (" timecost" , (utcSteadyTime () - startT));
213+ ECDH_MULTI_LOG (INFO) << LOG_DESC (" * tryToIntersection success" ) << printCacheState ()
214+ << LOG_KV (" * intersectionSize" , m_intersecCipher.size ())
215+ << LOG_KV (" * timecost" , (utcSteadyTime () - startT));
214216 return true ;
215217}
216218
217- std::vector<std::pair<uint64_t , bcos::bytes>> MasterCache::encryptIntersection (
218- bcos::bytes const & randomKey)
219+ PSIMessageInterface::Ptr MasterCache::encryptIntersection (bcos::bytes const & randomKey)
219220{
220- std::vector<std::pair<uint64_t , bcos::bytes>> cipherData (m_intersecCipher.size ());
221+ auto message = m_config->psiMsgFactory ()->createPSIMessage (
222+ uint32_t (EcdhMultiPSIMessageType::SEND_ENCRYPTED_INTERSECTION_SET_TO_CALCULATOR));
223+ message->setFrom (m_taskState->task ()->selfParty ()->id ());
224+ message->resizeData (m_intersecCipher.size ());
221225 tbb::parallel_for (
222226 tbb::blocked_range<size_t >(0U , m_intersecCipher.size ()), [&](auto const & range) {
223227 for (auto i = range.begin (); i < range.end (); i++)
224228 {
225229 auto cipherValue =
226230 m_config->eccCrypto ()->ecMultiply (m_intersecCipher[i], randomKey);
227- cipherData[i] = std::make_pair ( m_intersecCipherIndex[i], cipherValue);
231+ message-> setDataPair (i, m_intersecCipherIndex[i], cipherValue);
228232 }
229233 });
234+ ECDH_MULTI_LOG (INFO) << LOG_DESC (" encryptIntersection" )
235+ << LOG_KV (" cipherCount" , m_intersecCipher.size ()) << printCacheState ();
230236 // Note: release the m_intersecCipher, make share it not been used after released
231237 releaseItersection ();
232- ECDH_MULTI_LOG (INFO) << LOG_DESC (" encryptIntersection" )
233- << LOG_KV (" cipherCount" , cipherData.size ()) << printCacheState ();
234- return cipherData;
238+ return message;
235239}
236240
237241bcos::bytes CalculatorCache::getPlainDataByIndex (uint64_t index)
@@ -257,8 +261,8 @@ bool CalculatorCache::tryToFinalize()
257261 return false ;
258262 }
259263 auto startT = utcSteadyTime ();
260- ECDH_MULTI_LOG (INFO) << LOG_DESC (" tryToFinalize: compute intersection" )
261- << LOG_KV (" cipherRef" , m_cipherRef.size ()) << printCacheState ();
264+ ECDH_MULTI_LOG (INFO) << LOG_DESC (" * tryToFinalize: compute intersection" )
265+ << LOG_KV (" * cipherRef" , m_cipherRef.size ()) << printCacheState ();
262266 m_cacheState = CacheState::Finalizing;
263267 // find the intersection
264268 for (auto const & it : m_cipherRef)
@@ -273,28 +277,28 @@ bool CalculatorCache::tryToFinalize()
273277 }
274278 }
275279 m_cacheState = CacheState::Finalized;
276- ECDH_MULTI_LOG (INFO) << LOG_DESC (" tryToFinalize: compute intersection success" )
277- << printCacheState () << LOG_KV (" cipherRef" , m_cipherRef.size ())
278- << LOG_KV (" intersectionSize" , m_intersectionResult.size ())
279- << LOG_KV (" timecost" , (utcSteadyTime () - startT));
280+ ECDH_MULTI_LOG (INFO) << LOG_DESC (" * tryToFinalize: compute intersection success" )
281+ << printCacheState () << LOG_KV (" * cipherRef" , m_cipherRef.size ())
282+ << LOG_KV (" * intersectionSize" , m_intersectionResult.size ())
283+ << LOG_KV (" * timecost" , (utcSteadyTime () - startT));
280284
281285 releaseDataAfterFinalize ();
282- ECDH_MULTI_LOG (INFO) << LOG_DESC (" tryToFinalize: syncIntersections" ) << printCacheState ();
286+ ECDH_MULTI_LOG (INFO) << LOG_DESC (" * tryToFinalize: syncIntersections" ) << printCacheState ();
283287 m_cacheState = CacheState::Syncing;
284288 syncIntersections ();
285289 m_cacheState = CacheState::Synced;
286290
287291 m_cacheState = CacheState::StoreProgressing;
288292 m_taskState->storePSIResult (m_config->dataResourceLoader (), m_intersectionResult);
289293 m_cacheState = CacheState::Stored;
290- ECDH_MULTI_LOG (INFO) << LOG_DESC (" tryToFinalize: syncIntersections and store success" )
294+ ECDH_MULTI_LOG (INFO) << LOG_DESC (" * tryToFinalize: syncIntersections and store success" )
291295 << printCacheState ();
292296 return true ;
293297}
294298
295299void CalculatorCache::syncIntersections ()
296300{
297- ECDH_MULTI_LOG (INFO) << LOG_DESC (" syncIntersections" ) << printCacheState ();
301+ ECDH_MULTI_LOG (INFO) << LOG_DESC (" *** syncIntersections ** " ) << printCacheState ();
298302 auto peers = m_taskState->task ()->getAllPeerParties ();
299303 auto taskID = m_taskState->task ()->id ();
300304 // notify task result
@@ -398,21 +402,24 @@ bool CalculatorCache::appendMasterCipher(
398402 return m_receiveAllMasterCipher;
399403}
400404
401- void CalculatorCache::setIntersectionCipher (std::map<uint32_t , bcos::bytes>&& _cipherData)
405+ void CalculatorCache::setIntersectionCipher (
406+ std::vector<bcos::bytes>&& _cipherData, std::vector<long > const & dataIndex)
402407{
403408 ECDH_MULTI_LOG (INFO) << LOG_DESC (" setIntersectionCipher" )
404409 << LOG_KV (" dataSize" , _cipherData.size ())
405410 << LOG_KV (" cipherRefSize" , m_cipherRef.size ()) << printCacheState ();
406411 bcos::Guard lock (m_mutex);
412+ uint64_t i = 0 ;
407413 for (auto && it : _cipherData)
408414 {
409- updateCipherRef (std::move (it.second ), it.first );
415+ updateCipherRef (std::move (it), dataIndex[i]);
416+ i++;
410417 }
411418 m_receiveIntersection = true ;
412419 ECDH_MULTI_LOG (INFO) << LOG_DESC (" setIntersectionCipher finshed" )
413420 << LOG_KV (" cipherRefSize" , m_cipherRef.size ()) << printCacheState ();
414421 // release the cipherData
415422 _cipherData.clear ();
416- std::map< uint32_t , bcos::bytes>().swap (_cipherData);
423+ std::vector< bcos::bytes>().swap (_cipherData);
417424 MallocExtension::instance ()->ReleaseFreeMemory ();
418425}
0 commit comments