1818
1919//! Chain api required for the transaction pool.
2020
21- use crate :: LOG_TARGET ;
22- use codec:: Encode ;
23- use futures:: {
24- channel:: { mpsc, oneshot} ,
25- future:: { ready, Future , FutureExt , Ready } ,
26- lock:: Mutex ,
27- SinkExt , StreamExt ,
21+ use crate :: {
22+ common:: { sliding_stat:: DurationSlidingStats , STAT_SLIDING_WINDOW } ,
23+ graph:: ValidateTransactionPriority ,
24+ insert_and_log_throttled, LOG_TARGET , LOG_TARGET_STAT ,
2825} ;
29- use std :: { marker :: PhantomData , pin :: Pin , sync :: Arc } ;
30-
26+ use codec :: Encode ;
27+ use futures :: future :: { ready , Future , FutureExt , Ready } ;
3128use prometheus_endpoint:: Registry as PrometheusRegistry ;
3229use sc_client_api:: { blockchain:: HeaderBackend , BlockBackend } ;
3330use sp_api:: { ApiExt , ProvideRuntimeApi } ;
@@ -39,38 +36,85 @@ use sp_runtime::{
3936 transaction_validity:: { TransactionSource , TransactionValidity } ,
4037} ;
4138use sp_transaction_pool:: runtime_api:: TaggedTransactionQueue ;
39+ use std:: {
40+ marker:: PhantomData ,
41+ pin:: Pin ,
42+ sync:: Arc ,
43+ time:: { Duration , Instant } ,
44+ } ;
45+ use tokio:: sync:: { mpsc, oneshot, Mutex } ;
4246
4347use super :: {
4448 error:: { self , Error } ,
4549 metrics:: { ApiMetrics , ApiMetricsExt } ,
4650} ;
4751use crate :: graph;
48- use tracing:: { trace, warn} ;
52+ use tracing:: { trace, warn, Level } ;
4953
5054/// The transaction pool logic for full client.
5155pub struct FullChainApi < Client , Block > {
5256 client : Arc < Client > ,
5357 _marker : PhantomData < Block > ,
5458 metrics : Option < Arc < ApiMetrics > > ,
55- validation_pool : mpsc:: Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
59+ validation_pool_normal : mpsc:: Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
60+ validation_pool_maintained : mpsc:: Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
61+ validate_transaction_normal_stats : DurationSlidingStats ,
62+ validate_transaction_maintained_stats : DurationSlidingStats ,
5663}
5764
5865/// Spawn a validation task that will be used by the transaction pool to validate transactions.
5966fn spawn_validation_pool_task (
6067 name : & ' static str ,
61- receiver : Arc < Mutex < mpsc:: Receiver < Pin < Box < dyn Future < Output = ( ) > + Send > > > > > ,
68+ receiver_normal : Arc < Mutex < mpsc:: Receiver < Pin < Box < dyn Future < Output = ( ) > + Send > > > > > ,
69+ receiver_maintained : Arc < Mutex < mpsc:: Receiver < Pin < Box < dyn Future < Output = ( ) > + Send > > > > > ,
6270 spawner : & impl SpawnEssentialNamed ,
71+ stats : DurationSlidingStats ,
72+ blocking_stats : DurationSlidingStats ,
6373) {
6474 spawner. spawn_essential_blocking (
6575 name,
6676 Some ( "transaction-pool" ) ,
6777 async move {
6878 loop {
69- let task = receiver. lock ( ) . await . next ( ) . await ;
70- match task {
71- None => return ,
72- Some ( task) => task. await ,
73- }
79+ let start = Instant :: now ( ) ;
80+
81+ let task = {
82+ let receiver_maintained = receiver_maintained. clone ( ) ;
83+ let receiver_normal = receiver_normal. clone ( ) ;
84+ tokio:: select! {
85+ Some ( task) = async {
86+ receiver_maintained. lock( ) . await . recv( ) . await
87+ } => { task }
88+ Some ( task) = async {
89+ receiver_normal. lock( ) . await . recv( ) . await
90+ } => { task }
91+ else => {
92+ return
93+ }
94+ }
95+ } ;
96+
97+ let blocking_duration = {
98+ let start = Instant :: now ( ) ;
99+ task. await ;
100+ start. elapsed ( )
101+ } ;
102+
103+ insert_and_log_throttled ! (
104+ Level :: DEBUG ,
105+ target: LOG_TARGET_STAT ,
106+ prefix: format!( "validate_transaction_inner_stats" ) ,
107+ stats,
108+ start. elapsed( ) . into( )
109+ ) ;
110+ insert_and_log_throttled ! (
111+ Level :: DEBUG ,
112+ target: LOG_TARGET_STAT ,
113+ prefix: format!( "validate_transaction_blocking_stats" ) ,
114+ blocking_stats,
115+ blocking_duration. into( )
116+ ) ;
117+ trace ! ( target: LOG_TARGET , duration=?start. elapsed( ) , "spawn_validation_pool_task" ) ;
74118 }
75119 }
76120 . boxed ( ) ,
@@ -84,6 +128,9 @@ impl<Client, Block> FullChainApi<Client, Block> {
84128 prometheus : Option < & PrometheusRegistry > ,
85129 spawner : & impl SpawnEssentialNamed ,
86130 ) -> Self {
131+ let stats = DurationSlidingStats :: new ( Duration :: from_secs ( STAT_SLIDING_WINDOW ) ) ;
132+ let blocking_stats = DurationSlidingStats :: new ( Duration :: from_secs ( STAT_SLIDING_WINDOW ) ) ;
133+
87134 let metrics = prometheus. map ( ApiMetrics :: register) . and_then ( |r| match r {
88135 Err ( error) => {
89136 warn ! (
@@ -96,13 +143,41 @@ impl<Client, Block> FullChainApi<Client, Block> {
96143 Ok ( api) => Some ( Arc :: new ( api) ) ,
97144 } ) ;
98145
99- let ( sender, receiver) = mpsc:: channel ( 0 ) ;
146+ let ( sender, receiver) = mpsc:: channel ( 1 ) ;
147+ let ( sender_maintained, receiver_maintained) = mpsc:: channel ( 1 ) ;
100148
101149 let receiver = Arc :: new ( Mutex :: new ( receiver) ) ;
102- spawn_validation_pool_task ( "transaction-pool-task-0" , receiver. clone ( ) , spawner) ;
103- spawn_validation_pool_task ( "transaction-pool-task-1" , receiver, spawner) ;
104-
105- FullChainApi { client, validation_pool : sender, _marker : Default :: default ( ) , metrics }
150+ let receiver_maintained = Arc :: new ( Mutex :: new ( receiver_maintained) ) ;
151+ spawn_validation_pool_task (
152+ "transaction-pool-task-0" ,
153+ receiver. clone ( ) ,
154+ receiver_maintained. clone ( ) ,
155+ spawner,
156+ stats. clone ( ) ,
157+ blocking_stats. clone ( ) ,
158+ ) ;
159+ spawn_validation_pool_task (
160+ "transaction-pool-task-1" ,
161+ receiver,
162+ receiver_maintained,
163+ spawner,
164+ stats. clone ( ) ,
165+ blocking_stats. clone ( ) ,
166+ ) ;
167+
168+ FullChainApi {
169+ client,
170+ validation_pool_normal : sender,
171+ validation_pool_maintained : sender_maintained,
172+ _marker : Default :: default ( ) ,
173+ metrics,
174+ validate_transaction_normal_stats : DurationSlidingStats :: new ( Duration :: from_secs (
175+ STAT_SLIDING_WINDOW ,
176+ ) ) ,
177+ validate_transaction_maintained_stats : DurationSlidingStats :: new ( Duration :: from_secs (
178+ STAT_SLIDING_WINDOW ,
179+ ) ) ,
180+ }
106181 }
107182}
108183
@@ -132,10 +207,25 @@ where
132207 at : <Self :: Block as BlockT >:: Hash ,
133208 source : TransactionSource ,
134209 uxt : graph:: ExtrinsicFor < Self > ,
210+ validation_priority : ValidateTransactionPriority ,
135211 ) -> Self :: ValidationFuture {
212+ let start = Instant :: now ( ) ;
136213 let ( tx, rx) = oneshot:: channel ( ) ;
137214 let client = self . client . clone ( ) ;
138- let mut validation_pool = self . validation_pool . clone ( ) ;
215+ let ( stats, validation_pool, prefix) =
216+ if validation_priority == ValidateTransactionPriority :: Maintained {
217+ (
218+ self . validate_transaction_maintained_stats . clone ( ) ,
219+ self . validation_pool_maintained . clone ( ) ,
220+ "validate_transaction_maintained_stats" ,
221+ )
222+ } else {
223+ (
224+ self . validate_transaction_normal_stats . clone ( ) ,
225+ self . validation_pool_normal . clone ( ) ,
226+ "validate_transaction_stats" ,
227+ )
228+ } ;
139229 let metrics = self . metrics . clone ( ) ;
140230
141231 async move {
@@ -155,10 +245,20 @@ where
155245 . map_err ( |e| Error :: RuntimeApi ( format ! ( "Validation pool down: {:?}" , e) ) ) ?;
156246 }
157247
158- match rx. await {
248+ let validity = match rx. await {
159249 Ok ( r) => r,
160250 Err ( _) => Err ( Error :: RuntimeApi ( "Validation was canceled" . into ( ) ) ) ,
161- }
251+ } ;
252+
253+ insert_and_log_throttled ! (
254+ Level :: DEBUG ,
255+ target: LOG_TARGET_STAT ,
256+ prefix: prefix,
257+ stats,
258+ start. elapsed( ) . into( )
259+ ) ;
260+
261+ validity
162262 }
163263 . boxed ( )
164264 }
0 commit comments