@@ -156,6 +156,21 @@ static int32_t tagFromRequestId(LlmRequest::RequestIdType requestId)
156156 return ((requestId & 0xFFF ) << 8 ) | (kDATA_TAG & 0xFF );
157157}
158158
159+ namespace fs = std::filesystem;
160+
161+ static fs::path getTransferOutputPath (char const * tag)
162+ {
163+ auto outputPath = common::getEnvKVCacheTransferOutputPath ();
164+ if (!outputPath.empty ())
165+ {
166+ auto rank = mpi::MpiComm::world ().getRank ();
167+ auto path = fs::path (outputPath);
168+ fs::create_directories (path);
169+ return path / (" rank_" + std::to_string (rank) + " _" + tag + " .csv" );
170+ }
171+ return {};
172+ }
173+
159174struct ReceiveCacheResource
160175{
161176 runtime::BufferManager mBufferManager ;
@@ -282,6 +297,17 @@ class CacheSender::Impl
282297 auto it = mRequestToSession .find (requestId);
283298 TLLM_CHECK (it != mRequestToSession .end ());
284299 std::unique_lock<std::mutex> lk (mMtxForMap );
300+ if (!common::getEnvKVCacheTransferOutputPath ().empty ())
301+ {
302+ if (!mMeasuresFile .is_open ())
303+ {
304+ auto outputPath = getTransferOutputPath (" send" );
305+ mMeasuresFile .open (outputPath);
306+ TLLM_CHECK_WITH_INFO (
307+ mMeasuresFile .is_open (), " Failed to open transfer output file: %s" , outputPath.string ().c_str ());
308+ }
309+ it->second .exportMeasure (mMeasuresFile , true );
310+ }
285311 mRequestToSession .erase (it);
286312 }
287313
@@ -331,7 +357,8 @@ class CacheSender::Impl
331357 if (it == mRequestToSession .end ())
332358 {
333359 auto session = TransferSession (std::vector<Connection const *>(peerRelativeRanks.size (), nullptr ),
334- DataContext{tagFromRequestId (requestId)}, mSelfState , info.getTransState (), mBufferManager );
360+ DataContext{tagFromRequestId (requestId)}, mSelfState , info.getTransState (), mBufferManager , nullptr ,
361+ !common::getEnvKVCacheTransferOutputPath ().empty ());
335362 it = mRequestToSession .emplace (requestId, std::move (session)).first ;
336363 }
337364 it->second .setConnection (peerIdx, connection);
@@ -527,6 +554,7 @@ class CacheSender::Impl
527554 std::unique_ptr<BaseCacheFormatter> mFormatter ;
528555 std::mutex mMtxForMap ;
529556 runtime::BufferManager mBufferManager ;
557+ std::ofstream mMeasuresFile ;
530558};
531559
532560class CacheReceiver ::Impl
@@ -587,6 +615,18 @@ class CacheReceiver::Impl
587615 void receiveSync (TransferSession& session)
588616 {
589617 mFormatter ->unformat (session);
618+ if (!common::getEnvKVCacheTransferOutputPath ().empty ())
619+ {
620+ std::unique_lock<std::mutex> lock (mMeasuresFileMutex );
621+ if (!mMeasuresFile .is_open ())
622+ {
623+ auto outputPath = getTransferOutputPath (" recv" );
624+ mMeasuresFile .open (outputPath);
625+ TLLM_CHECK_WITH_INFO (
626+ mMeasuresFile .is_open (), " Failed to open transfer output file: %s" , outputPath.string ().c_str ());
627+ }
628+ session.exportMeasure (mMeasuresFile , false );
629+ }
590630 }
591631
592632 TransferSession sendRequestInfo (LlmRequest const & llmRequest)
@@ -652,7 +692,7 @@ class CacheReceiver::Impl
652692 }
653693 auto const & resource = getReceiveCacheResource (llmRequest);
654694 return TransferSession (std::move (counterPartConnections), DataContext{tagFromRequestId (requestId)}, mSelfState ,
655- contextState, resource->mBufferManager , &llmRequest);
695+ contextState, resource->mBufferManager , &llmRequest, ! common::getEnvKVCacheTransferOutputPath (). empty () );
656696 }
657697
658698 std::unique_ptr<ReceiveCacheResource> const & getReceiveCacheResource (LlmRequest const & llmRequest)
@@ -831,6 +871,8 @@ class CacheReceiver::Impl
831871 std::unordered_map<std::string, std::unique_ptr<ReceiveCacheResource>> mProcessToResources ;
832872 std::mutex mProcessIoResouceMutex ;
833873 runtime::BufferManager mBufferManager ;
874+ std::ofstream mMeasuresFile ;
875+ std::mutex mMeasuresFileMutex ;
834876};
835877
836878void CacheSender::ImplDeleter::operator ()(Impl* ptr)
0 commit comments