@@ -419,13 +419,12 @@ void WorkerService::PullKVCache(::google::protobuf::RpcController* controller,
419419
420420void WorkerService::TransferBlocks (
421421 ::google::protobuf::RpcController* controller,
422- const ::xllm:: proto::BlockTransferInfos* req,
423- ::xllm:: proto::TransferStatus* resp,
422+ const proto::BlockTransferInfos* req,
423+ proto::TransferStatus* resp,
424424 ::google::protobuf::Closure* done) {
425425 brpc::ClosureGuard done_guard (done);
426426 std::vector<BlockTransferInfo> block_transfer_info;
427- uint64_t batch_id;
428- proto_to_block_transfer_info (*req, batch_id, block_transfer_info);
427+ uint64_t batch_id = proto_to_block_transfer_info (*req, block_transfer_info);
429428
430429 if (batch_id == 0x0 ) {
431430 resp->set_success_cnt (worker_->transfer_kv_blocks (block_transfer_info));
@@ -435,6 +434,114 @@ void WorkerService::TransferBlocks(
435434 return ;
436435}
437436
437+ class ServerStreamHandler : public brpc ::StreamInputHandler {
438+ private:
439+ std::promise<void > close_promise_;
440+ std::atomic<bool > promise_set_{false };
441+
442+ public:
443+ ~ServerStreamHandler () {
444+ if (!promise_set_.exchange (true )) {
445+ try {
446+ close_promise_.set_value ();
447+ } catch (const std::exception& e) {
448+ LOG (WARNING) << " Exception in destructor: " << e.what ();
449+ }
450+ }
451+ }
452+
453+ std::future<void > get_close_future () { return close_promise_.get_future (); }
454+
455+ int on_received_messages (brpc::StreamId id,
456+ butil::IOBuf* const messages[],
457+ size_t size) override {
458+ LOG (WARNING) << " ServerStreamHandler::on_received_messages not implement." ;
459+ return 0 ;
460+ }
461+
462+ void on_closed (brpc::StreamId id) override {
463+ if (!promise_set_.exchange (true )) {
464+ close_promise_.set_value ();
465+ }
466+ }
467+
468+ void on_idle_timeout (brpc::StreamId id) override {
469+ if (!promise_set_.exchange (true )) {
470+ LOG (WARNING) << " Stream idle timeout: " << id;
471+ close_promise_.set_value ();
472+ }
473+ }
474+ };
475+
476+ void WorkerService::PrefetchFromStorage (
477+ google::protobuf::RpcController* controller,
478+ const proto::BlockTransferInfos* req,
479+ proto::Status* resp,
480+ google::protobuf::Closure* done) {
481+ brpc::ClosureGuard done_guard (done);
482+ brpc::Controller* cntl = static_cast <brpc::Controller*>(controller);
483+
484+ auto stream_handler = std::make_unique<ServerStreamHandler>();
485+ auto stream_id = std::make_unique<brpc::StreamId>();
486+ brpc::StreamOptions stream_options;
487+ stream_options.handler = stream_handler.get ();
488+ if (brpc::StreamAccept (stream_id.get (), *cntl, &stream_options) != 0 ) {
489+ resp->set_ok (false );
490+ LOG (ERROR) << " Failed to accept stream!" ;
491+ return ;
492+ }
493+
494+ std::vector<BlockTransferInfo> block_transfer_info;
495+ proto_to_block_transfer_info (*req, block_transfer_info);
496+
497+ copy_threadpool_.schedule (
498+ [this ,
499+ block_transfer_info = std::move (block_transfer_info),
500+ stream_id = std::move (stream_id),
501+ stream_handler = std::move (stream_handler)]() mutable {
502+ Slice<BlockTransferInfo> transfer_slice{block_transfer_info};
503+ auto close_future = stream_handler->get_close_future ();
504+ bool is_completed = false ;
505+
506+ for (size_t i = 0 ; i < transfer_slice.size ();
507+ i += stream_copy_batch_size_) {
508+ auto current_slice = transfer_slice.slice (
509+ i, std::min (i + stream_copy_batch_size_, transfer_slice.size ()));
510+
511+ auto success_cnt = worker_->prefetch_from_storage (current_slice);
512+
513+ if (success_cnt != current_slice.size () ||
514+ i + stream_copy_batch_size_ >= transfer_slice.size ()) {
515+ is_completed = true ;
516+ }
517+
518+ butil::IOBuf buf;
519+ buf.append (std::to_string (success_cnt));
520+ if (brpc::StreamWrite (*stream_id.get (), buf) != 0 ) {
521+ brpc::StreamClose (*stream_id.get ());
522+ is_completed = false ;
523+ break ;
524+ }
525+
526+ if (is_completed) {
527+ if (success_cnt != 0 ) {
528+ butil::IOBuf buf_end;
529+ buf_end.append (" 0" );
530+ brpc::StreamWrite (*stream_id.get (), buf_end);
531+ }
532+ break ;
533+ }
534+ }
535+ if (is_completed) {
536+ close_future.wait ();
537+ }
538+ brpc::StreamClose (*stream_id.get ());
539+ });
540+
541+ resp->set_ok (true );
542+ return ;
543+ }
544+
438545void WorkerService::GetDeviceInfo (::google::protobuf::RpcController* controller,
439546 const proto::Empty* req,
440547 proto::DeviceInfo* resp,
0 commit comments