@@ -228,25 +228,23 @@ void DhtMemberImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::dht_findVa
228228 td::Promise<td::BufferSlice> promise) {
229229 find_value_queries_++;
230230 auto it = values_.find (DhtKeyId{query.key_ });
231- if (it != values_.end () && it->second .expired ()) {
232- values_.erase (it);
233- it = values_.end ();
234- }
235- if (it != values_.end ()) {
231+ if (it != values_.end () && !it->second .expired ()) {
236232 promise.set_value (create_serialize_tl_object<ton_api::dht_valueFound>(it->second .tl ()));
237233 return ;
238234 }
239235
240236 auto k = static_cast <td::uint32>(query.k_ );
241- if (k > max_k ()) {
242- k = max_k ();
243- }
237+ k = std::min (k, max_k ());
244238 auto R = get_nearest_nodes (DhtKeyId{query.key_ }, k);
245239
246240 promise.set_value (create_serialize_tl_object<ton_api::dht_valueNotFound>(R.tl ()));
247241}
248242
249243td::Status DhtMemberImpl::store_in (DhtValue value) {
244+ if (value.ttl () > (td::uint32)td::Clocks::system () + 3600 + 60 ) {
245+ // clients typically set ttl = 1 hour
246+ return td::Status::Error (" ttl is too big" );
247+ }
250248 if (value.expired ()) {
251249 VLOG (DHT_INFO) << this << " : dropping expired value: " << value.key_id () << " expire_at = " << value.ttl ();
252250 return td::Status::OK ();
@@ -259,10 +257,12 @@ td::Status DhtMemberImpl::store_in(DhtValue value) {
259257 if (dist < k_ + 10 ) {
260258 auto it = values_.find (key_id);
261259 if (it != values_.end ()) {
260+ CHECK (values_ttl_order_.erase ({it->second .ttl (), it->first }));
262261 it->second .update (std::move (value));
263262 } else {
264- values_.emplace (key_id, std::move (value));
263+ it = values_.emplace (key_id, std::move (value)). first ;
265264 }
265+ CHECK (values_ttl_order_.insert ({it->second .ttl (), it->first }).second );
266266 } else {
267267 VLOG (DHT_INFO) << this << " : dropping too remote value: " << value.key_id () << " distance = " << dist;
268268 }
@@ -324,33 +324,32 @@ void DhtMemberImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::dht_regist
324324 TRY_RESULT_PROMISE (promise, encryptor, pub.create_encryptor ());
325325 TRY_STATUS_PROMISE (promise, encryptor->check_signature (to_sign, query.signature_ ));
326326 DhtKeyId key_id = get_reverse_connection_key (client_id).compute_key_id ();
327- reverse_connections_[client_id] = ReverseConnection{src, key_id, td::Timestamp::at_unix (std::min (ttl, now + 300 ))};
327+ auto it = reverse_connections_.find (client_id);
328+ if (it != reverse_connections_.end ()) {
329+ CHECK (reverse_connections_ttl_order_.erase ({it->second .ttl_ , client_id}));
330+ }
331+ auto &entry = reverse_connections_[client_id] =
332+ ReverseConnection{src, key_id, td::Timestamp::at_unix (std::min (ttl, now + 300 ))};
333+ CHECK (reverse_connections_ttl_order_.insert ({entry.ttl_ , client_id}).second );
328334 promise.set_value (create_serialize_tl_object<ton_api::dht_stored>());
329335}
330336
331337void DhtMemberImpl::process_query (adnl::AdnlNodeIdShort src, ton_api::dht_requestReversePing &query,
332338 td::Promise<td::BufferSlice> promise) {
333339 adnl::AdnlNodeIdShort client{query.client_ };
334340 auto it = reverse_connections_.find (client);
335- if (it != reverse_connections_.end ()) {
336- if (it->second .ttl_ .is_in_past ()) {
337- reverse_connections_.erase (it);
338- } else {
339- PublicKey pub{query.target_ ->id_ };
340- TRY_RESULT_PROMISE (promise, encryptor, pub.create_encryptor ());
341- TRY_STATUS_PROMISE (promise,
342- encryptor->check_signature (serialize_tl_object (query.target_ , true ), query.signature_ ));
343- td::actor::send_closure (adnl_, &adnl::Adnl::send_message, id_, it->second .dht_node_ ,
344- create_serialize_tl_object<ton_api::dht_requestReversePingCont>(
345- std::move (query.target_ ), std::move (query.signature_ ), query.client_ ));
346- promise.set_result (create_serialize_tl_object<ton_api::dht_reversePingOk>());
347- return ;
348- }
341+ if (it != reverse_connections_.end () && !it->second .ttl_ .is_in_past ()) {
342+ PublicKey pub{query.target_ ->id_ };
343+ TRY_RESULT_PROMISE (promise, encryptor, pub.create_encryptor ());
344+ TRY_STATUS_PROMISE (promise, encryptor->check_signature (serialize_tl_object (query.target_ , true ), query.signature_ ));
345+ td::actor::send_closure (adnl_, &adnl::Adnl::send_message, id_, it->second .dht_node_ ,
346+ create_serialize_tl_object<ton_api::dht_requestReversePingCont>(
347+ std::move (query.target_ ), std::move (query.signature_ ), query.client_ ));
348+ promise.set_result (create_serialize_tl_object<ton_api::dht_reversePingOk>());
349+ return ;
349350 }
350351 auto k = static_cast <td::uint32>(query.k_ );
351- if (k > max_k ()) {
352- k = max_k ();
353- }
352+ k = std::min (k, max_k ());
354353 auto R = get_nearest_nodes (get_reverse_connection_key (client).compute_key_id (), k);
355354 promise.set_value (create_serialize_tl_object<ton_api::dht_clientNotFound>(R.tl ()));
356355}
@@ -434,7 +433,7 @@ void DhtMemberImpl::receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice d
434433 auto S = [&]() -> td::Status {
435434 auto f = F.move_as_ok ();
436435 adnl::AdnlNodeIdShort client{f->client_ };
437- if (!our_reverse_connections_.count (client)) {
436+ if (!our_reverse_connections_.contains (client)) {
438437 return td::Status::Error (PSTRING () << " : unknown id for reverse ping: " << client);
439438 }
440439 TRY_RESULT_PREFIX (node, adnl::AdnlNode::create (f->target_ ), " failed to parse node: " );
@@ -501,16 +500,17 @@ void DhtMemberImpl::register_reverse_connection(adnl::AdnlNodeIdFull client, td:
501500 td::actor::send_closure (keyring_, &keyring::Keyring::sign_message, client_short.pubkey_hash (),
502501 register_reverse_connection_to_sign (client_short, id_, ttl),
503502 [=, print_id = print_id (), list = get_nearest_nodes (key_id, k_ * 2 ), SelfId = actor_id (this ),
504- promise = std::move (promise)](td::Result<td::BufferSlice> R) mutable {
503+ promise = std::move (promise), id = id_, k = k_, a = a_, network_id = network_id_,
504+ client_only = client_only_, adnl = adnl_](td::Result<td::BufferSlice> R) mutable {
505505 TRY_RESULT_PROMISE_PREFIX (promise, signature, std::move (R), " Failed to sign: " );
506506 td::actor::send_closure (SelfId, &DhtMemberImpl::get_self_node,
507507 [=, list = std::move (list), signature = std::move (signature),
508508 promise = std::move (promise)](td::Result<DhtNode> R) mutable {
509509 R.ensure ();
510510 td::actor::create_actor<DhtQueryRegisterReverseConnection>(
511511 " RegisterReverseQuery" , key_id, std::move (client), ttl,
512- std::move (signature), print_id, id_ , std::move (list), k_, a_ ,
513- network_id_ , R.move_as_ok (), client_only_ , SelfId, adnl_ ,
512+ std::move (signature), print_id, id , std::move (list), k, a ,
513+ network_id , R.move_as_ok (), client_only , SelfId, adnl ,
514514 std::move (promise))
515515 .release ();
516516 });
@@ -534,25 +534,22 @@ void DhtMemberImpl::request_reverse_ping(adnl::AdnlNode target, adnl::AdnlNodeId
534534void DhtMemberImpl::request_reverse_ping_cont (adnl::AdnlNode target, td::BufferSlice signature,
535535 adnl::AdnlNodeIdShort client, td::Promise<td::Unit> promise) {
536536 auto it = reverse_connections_.find (client);
537- if (it != reverse_connections_.end ()) {
538- if (it->second .ttl_ .is_in_past ()) {
539- reverse_connections_.erase (it);
540- } else {
541- td::actor::send_closure (adnl_, &adnl::Adnl::send_message, id_, it->second .dht_node_ ,
542- create_serialize_tl_object<ton_api::dht_requestReversePingCont>(
543- target.tl (), std::move (signature), client.bits256_value ()));
544- promise.set_result (td::Unit ());
545- return ;
546- }
537+ if (it != reverse_connections_.end () && !it->second .ttl_ .is_in_past ()) {
538+ td::actor::send_closure (adnl_, &adnl::Adnl::send_message, id_, it->second .dht_node_ ,
539+ create_serialize_tl_object<ton_api::dht_requestReversePingCont>(
540+ target.tl (), std::move (signature), client.bits256_value ()));
541+ promise.set_result (td::Unit ());
542+ return ;
547543 }
548544 auto key_id = get_reverse_connection_key (client).compute_key_id ();
549545 get_self_node ([=, target = std::move (target), signature = std::move (signature), promise = std::move (promise),
550546 SelfId = actor_id (this ), print_id = print_id (), list = get_nearest_nodes (key_id, k_ * 2 ),
551- client_only = client_only_](td::Result<DhtNode> R) mutable {
547+ client_only = client_only_, id = id_, k = k_, a = a_, adnl = adnl_,
548+ network_id = network_id_](td::Result<DhtNode> R) mutable {
552549 R.ensure ();
553550 td::actor::create_actor<DhtQueryRequestReversePing>(
554- " RequestReversePing" , client, std::move (target), std::move (signature), print_id, id_ , std::move (list), k_, a_ ,
555- network_id_ , R.move_as_ok (), client_only, SelfId, adnl_ , std::move (promise))
551+ " RequestReversePing" , client, std::move (target), std::move (signature), print_id, id , std::move (list), k, a ,
552+ network_id , R.move_as_ok (), client_only, SelfId, adnl , std::move (promise))
556553 .release ();
557554 });
558555}
@@ -561,32 +558,36 @@ void DhtMemberImpl::check() {
561558 VLOG (DHT_INFO) << this << " : ping=" << ping_queries_ << " fnode=" << find_node_queries_
562559 << " fvalue=" << find_value_queries_ << " store=" << store_queries_
563560 << " addrlist=" << get_addr_list_queries_;
561+ VLOG (DHT_INFO) << this << " : values=" << values_.size () << " our_values=" << our_values_.size ();
562+ VLOG (DHT_INFO) << this << " : reverse_conns=" << reverse_connections_.size ()
563+ << " our_reverse_conns=" << our_reverse_connections_.size ();
564564 for (auto &bucket : buckets_) {
565565 bucket.check (client_only_, adnl_, actor_id (this ), id_);
566566 }
567567 if (next_save_to_db_at_.is_in_past ()) {
568568 save_to_db ();
569569 }
570570
571- if (values_.size () > 0 ) {
571+ for (auto it = values_ttl_order_.begin ();
572+ it != values_ttl_order_.end () && (it->first < td::Clocks::system () || values_.size () > MAX_VALUES);) {
573+ CHECK (values_.erase (it->second ));
574+ it = values_ttl_order_.erase (it);
575+ }
576+ if (!values_.empty ()) {
572577 auto it = values_.lower_bound (last_check_key_);
573578 if (it != values_.end () && it->first == last_check_key_) {
574- it++ ;
579+ ++it ;
575580 }
576581 if (it == values_.end ()) {
577582 it = values_.begin ();
578583 }
579584
580585 td::uint32 cnt = 0 ;
581586 auto s = last_check_key_;
582- while (values_.size () > 0 && cnt < 1 && it->first != s) {
587+ while (! values_.empty () && cnt < 1 && it->first != s) {
583588 last_check_key_ = it->first ;
584589 cnt++;
585- if (it->second .expired ()) {
586- it = values_.erase (it);
587-
588- // do not republish soon-to-be-expired values
589- } else if (it->second .ttl () > td::Clocks::system () + 60 ) {
590+ if (it->second .ttl () > td::Clocks::system () + 60 ) {
590591 auto dist = distance (it->first , k_ + 10 );
591592
592593 if (dist == 0 ) {
@@ -598,38 +599,35 @@ void DhtMemberImpl::check() {
598599 });
599600 send_store (it->second .clone (), std::move (P));
600601 }
601- it++ ;
602+ ++it ;
602603 } else if (dist >= k_ + 10 ) {
604+ CHECK (values_ttl_order_.erase ({it->second .ttl (), it->first }));
603605 it = values_.erase (it);
604606 } else {
605- it++ ;
607+ ++it ;
606608 }
607609 } else {
608- it++ ;
610+ ++it ;
609611 }
610- if (values_.size () == 0 ) {
612+ if (values_.empty () ) {
611613 break ;
612614 }
613615 if (it == values_.end ()) {
614616 it = values_.begin ();
615617 }
616618 }
617619 }
618- if (reverse_connections_.size () > 0 ) {
619- auto it = reverse_connections_.upper_bound (last_check_reverse_conn_);
620- if (it == reverse_connections_.end ()) {
621- it = reverse_connections_.begin ();
622- }
623- last_check_reverse_conn_ = it->first ;
624- if (it->second .ttl_ .is_in_past ()) {
625- reverse_connections_.erase (it);
626- }
620+ for (auto it = reverse_connections_ttl_order_.begin ();
621+ it != reverse_connections_ttl_order_.end () &&
622+ (it->first .is_in_past () || reverse_connections_.size () > MAX_REVERSE_CONNECTIONS);) {
623+ CHECK (reverse_connections_.erase (it->second ));
624+ it = reverse_connections_ttl_order_.erase (it);
627625 }
628626
629627 if (republish_att_.is_in_past ()) {
630628 auto it = our_values_.lower_bound (last_republish_key_);
631629 if (it != our_values_.end () && it->first == last_republish_key_) {
632- it++ ;
630+ ++it ;
633631 }
634632 if (it == our_values_.end ()) {
635633 it = our_values_.begin ();
@@ -705,8 +703,8 @@ void DhtMemberImpl::send_store(DhtValue value, td::Promise<td::Unit> promise) {
705703}
706704
707705void DhtMemberImpl::get_self_node (td::Promise<DhtNode> promise) {
708- auto P = td::PromiseCreator::lambda ([promise = std::move (promise), print_id = print_id (), id = id_ ,
709- keyring = keyring_, client_only = client_only_,
706+ auto P = td::PromiseCreator::lambda ([promise = std::move (promise), id = id_, keyring = keyring_ ,
707+ client_only = client_only_,
710708 network_id = network_id_](td::Result<adnl::AdnlNode> R) mutable {
711709 R.ensure ();
712710 auto node = R.move_as_ok ();
0 commit comments