@@ -655,9 +655,10 @@ bool price::update(
655
655
if ( PC_UNLIKELY ( !get_is_ready_publish () ) ) {
656
656
return false ;
657
657
}
658
+ preq_->set_price ( price, conf, st, is_agg );
658
659
manager *mgr = get_manager ();
659
660
const uint64_t slot = mgr->get_slot ();
660
- preq_->set_price ( price, conf, st, slot, is_agg );
661
+ preq_->set_slot ( slot );
661
662
preq_->set_block_hash ( mgr->get_recent_block_hash () );
662
663
if ( mgr->get_do_tx () )
663
664
mgr->submit ( preq_ );
@@ -688,6 +689,101 @@ bool price::update(
688
689
return true ;
689
690
}
690
691
692
+ void price::update_no_send (
693
+ const int64_t price, const uint64_t conf
694
+ , const symbol_status st, const bool is_agg
695
+ )
696
+ {
697
+ preq_->set_price ( price, conf, st, is_agg );
698
+ }
699
+
700
+ bool price::send ( price *prices[], const unsigned n )
701
+ {
702
+ static std::vector< rpc::upd_price * > upds_;
703
+
704
+ upds_.clear ();
705
+
706
+ manager *mgr1 = nullptr ;
707
+
708
+ for ( unsigned i = 0 , j = 0 ; i < n; ++i ) {
709
+ price *const p = prices[ i ];
710
+ if ( PC_UNLIKELY ( ! p->init_ && ! p->init_publish () ) ) {
711
+ continue ;
712
+ }
713
+ if ( PC_UNLIKELY ( ! p->has_publisher () ) ) {
714
+ continue ;
715
+ }
716
+ if ( PC_UNLIKELY ( ! p->get_is_ready_publish () ) ) {
717
+ continue ;
718
+ }
719
+ manager *const mgr = p->get_manager ();
720
+ if ( ! mgr1 ) {
721
+ mgr1 = mgr;
722
+ }
723
+ else if ( mgr != mgr1 ) {
724
+ PC_LOG_ERR ( " unexpected manager" ).end ();
725
+ continue ;
726
+ }
727
+ const uint64_t slot = mgr->get_slot ();
728
+ p->preq_ ->set_slot ( slot );
729
+ p->preq_ ->set_block_hash ( mgr->get_recent_block_hash () );
730
+ upds_.emplace_back ( p->preq_ );
731
+
732
+ if (
733
+ upds_.size () >= rpc::upd_price::MAX_UPDATES
734
+ || ( upds_.size () && ( i + 1 ) == n )
735
+ ) {
736
+ if ( mgr->get_do_tx () ) {
737
+ net_wtr msg;
738
+ if ( rpc::upd_price::build ( msg, &upds_[ 0 ], upds_.size () ) ) {
739
+ mgr->submit ( msg );
740
+ }
741
+ else {
742
+ PC_LOG_ERR ( " failed to build msg" );
743
+ }
744
+ }
745
+ else {
746
+ p->get_rpc_client ()->send ( &upds_[ 0 ], upds_.size () );
747
+ for ( unsigned k = j; k <= i; ++k ) {
748
+ price *const p1 = prices[ k ];
749
+ p1->tvec_ .emplace_back (
750
+ std::string ( 100 , ' \0 ' ), p1->preq_ ->get_sent_time ()
751
+ );
752
+ p1->preq_ ->get_signature ()->enc_base58 ( p1->tvec_ .back ().first );
753
+ PC_LOG_DBG ( " sent price update transaction" )
754
+ .add ( " price_account" , *p1->get_account () )
755
+ .add ( " product_account" , *p1->prod_ ->get_account () )
756
+ .add ( " symbol" , p1->get_symbol () )
757
+ .add ( " price_type" , price_type_to_str ( p1->get_price_type () ) )
758
+ .add ( " sig" , p1->tvec_ .back ().first )
759
+ .add ( " pub_slot" , slot )
760
+ .end ();
761
+ if ( PC_UNLIKELY ( p1->tvec_ .size () >= 100 ) ) {
762
+ PC_LOG_WRN ( " too many unacked price update transactions" )
763
+ .add ( " price_account" , *p1->get_account () )
764
+ .add ( " product_account" , *p1->prod_ ->get_account () )
765
+ .add ( " symbol" , p1->get_symbol () )
766
+ .add ( " price_type" , price_type_to_str ( p1->get_price_type () ) )
767
+ .add ( " num_txid" , p1->tvec_ .size () )
768
+ .end ();
769
+ p1->tvec_ .erase ( p1->tvec_ .begin (), p1->tvec_ .begin () + 50 );
770
+ }
771
+ }
772
+ }
773
+
774
+ for ( unsigned k = j; k <= i; ++k ) {
775
+ price *const p1 = prices[ k ];
776
+ p1->inc_sent ();
777
+ }
778
+
779
+ j = i;
780
+ upds_.clear ();
781
+ }
782
+ }
783
+
784
+ return true ;
785
+ }
786
+
691
787
void price::submit ()
692
788
{
693
789
if ( st_ == e_subscribe ) {
0 commit comments