1
1
#include " manager.hpp"
2
2
#include " log.hpp"
3
3
4
+ #include < algorithm>
5
+
4
6
using namespace pc ;
5
7
6
8
#define PC_TPU_PROXY_PORT 8898
@@ -10,6 +12,8 @@ using namespace pc;
10
12
#define PC_PUB_INTERVAL (227L *PC_NSECS_IN_MSEC)
11
13
#define PC_RPC_HOST " localhost"
12
14
#define PC_MAX_BATCH 8
15
+ // Flush partial batches if not completed within 400 ms.
16
+ #define PC_FLUSH_INTERVAL (400L *PC_NSECS_IN_MSEC)
13
17
14
18
// /////////////////////////////////////////////////////////////////////////
15
19
// manager_sub
@@ -437,6 +441,34 @@ void manager::reset_status( int status )
437
441
status_ &= ~status;
438
442
}
439
443
444
+ void manager::send_pending_ups ()
445
+ {
446
+ uint32_t n_to_send = 0 ;
447
+
448
+ // the batch will be sent if its size is greater than max batch size
449
+ // or time since the previously sent batch is greater than PC_FLUSH_INTERVAL
450
+ // the buffer is being updated by user class un user::parse_upd_price
451
+ int64_t curr_ts = get_now ();
452
+ if (curr_ts - last_upd_ts_> PC_FLUSH_INTERVAL) {
453
+ n_to_send = pending_upds_.size ();
454
+ } else if (pending_upds_.size () >= get_max_batch_size ()) {
455
+ n_to_send = get_max_batch_size ();
456
+ }
457
+
458
+ if (n_to_send == 0 ) {
459
+ return ;
460
+ }
461
+
462
+ // send batch of price updates to solana
463
+ price::send ( pending_upds_.data (), n_to_send);
464
+
465
+ // remove the sent elements from the vector
466
+ pending_upds_.erase (pending_upds_.begin (), pending_upds_.begin () + n_to_send);
467
+
468
+ // record the current time
469
+ last_upd_ts_= curr_ts;
470
+ }
471
+
440
472
void manager::poll ( bool do_wait )
441
473
{
442
474
// poll for any socket events
@@ -506,10 +538,7 @@ void manager::poll( bool do_wait )
506
538
// request product quotes from pythd's clients while connected
507
539
poll_schedule ();
508
540
509
- // Flush any pending complete batches of price updates by submitting solana TXs.
510
- for ( user *uptr = olist_.first (); uptr; uptr = uptr->get_next () ) {
511
- uptr->send_pending_upds ();
512
- }
541
+ send_pending_ups ();
513
542
} else {
514
543
reconnect_rpc ();
515
544
}
@@ -927,6 +956,13 @@ price *manager::get_price( const pub_key& acc )
927
956
return it ? dynamic_cast <price*>( amap_.obj ( it ) ) : nullptr ;
928
957
}
929
958
959
+ void manager::add_dirty_price (price* sptr)
960
+ {
961
+ if ( std::find (pending_upds_.begin (), pending_upds_.end (), sptr) == pending_upds_.end () ) {
962
+ pending_upds_.emplace_back ( sptr );
963
+ }
964
+ }
965
+
930
966
unsigned manager::get_num_product () const
931
967
{
932
968
return svec_.size ();
0 commit comments