@@ -7,20 +7,19 @@ use std::{
77} ;
88use tokio:: {
99 runtime:: { Handle , Runtime } ,
10- sync:: RwLock ,
10+ sync:: { RwLock , Semaphore } ,
1111 task:: JoinSet ,
1212} ;
1313
1414use crate :: {
1515 config:: ServerOpts ,
1616 dns:: { DnsRequest , DnsResponse , SerialMessage } ,
1717 dns_conf:: RuntimeConfig ,
18- dns_error:: LookupError ,
1918 dns_mw:: { DnsMiddlewareBuilder , DnsMiddlewareHandler } ,
2019 dns_mw_cache:: DnsCache ,
2120 log,
2221 server:: { DnsHandle , IncomingDnsRequest , ServerHandle } ,
23- third_ext:: { FutureJoinAllExt as _, FutureTimeoutExt } ,
22+ third_ext:: FutureJoinAllExt as _,
2423} ;
2524
2625pub struct App {
@@ -75,18 +74,12 @@ impl App {
7574
7675 cfg. summary ( ) ;
7776
78- let runtime = {
79- use tokio:: runtime;
80- let mut builder = runtime:: Builder :: new_multi_thread ( ) ;
81- builder. enable_all ( ) ;
82- if let Some ( num_workers) = cfg. num_workers ( ) {
83- builder. worker_threads ( num_workers) ;
84- }
85- builder
86- . thread_name ( "smartdns-runtime" )
87- . build ( )
88- . expect ( "failed to initialize Tokio Runtime" )
89- } ;
77+ let runtime = tokio:: runtime:: Builder :: new_multi_thread ( )
78+ . worker_threads ( cfg. num_workers ( ) )
79+ . enable_all ( )
80+ . thread_name ( "smartdns-runtime" )
81+ . build ( )
82+ . expect ( "failed to initialize Tokio Runtime" ) ;
9083
9184 let handler = DnsMiddlewareBuilder :: new ( ) . build ( cfg. clone ( ) ) ;
9285
@@ -230,13 +223,19 @@ pub fn bootstrap(conf: Option<PathBuf>) {
230223
231224 const MAX_IDLE : Duration = Duration :: from_secs ( 30 * 60 ) ; // 30 min
232225
226+ let background_concurrency = Arc :: new ( Semaphore :: new ( 1 ) ) ;
227+
233228 while let Some ( ( message, server_opts, sender) ) = incoming_request. recv ( ) . await {
234229 let handler = app. mw_handler . read ( ) . await . clone ( ) ;
235230
236231 if server_opts. is_background {
237232 if Instant :: now ( ) - last_activity < MAX_IDLE {
233+ let background_concurrency = background_concurrency. clone ( ) ;
238234 inner_join_set. spawn ( async move {
239- let _ = sender. send ( process ( handler, message, server_opts) . await ) ;
235+ if let Ok ( permit) = background_concurrency. acquire_owned ( ) . await {
236+ let _ = sender. send ( process ( handler, message, server_opts) . await ) ;
237+ drop ( permit) ;
238+ }
240239 } ) ;
241240 }
242241 } else {
@@ -388,27 +387,7 @@ async fn process(
388387 response_header. set_authoritative ( false ) ;
389388
390389 let response = {
391- let res =
392- handler
393- . search ( & request, & server_opts)
394- . timeout ( Duration :: from_secs (
395- if server_opts. is_background { 60 } else { 5 } ,
396- ) )
397- . await
398- . unwrap_or_else ( |_| {
399- let query = request. query ( ) . original ( ) . to_owned ( ) ;
400- log:: warn!(
401- "Query {} {} {} timeout." ,
402- query. name( ) ,
403- query. query_type( ) ,
404- if server_opts. is_background {
405- "in background"
406- } else {
407- ""
408- }
409- ) ;
410- Err ( LookupError :: no_records_found ( query, 10 ) )
411- } ) ;
390+ let res = handler. search ( & request, & server_opts) . await ;
412391 match res {
413392 Ok ( lookup) => lookup,
414393 Err ( e) => {
0 commit comments