33use crate :: agents:: {
44 AgentRegistry , CrawlTask , InlineString , DEFAULT_AGENT_CAPACITY , DEFAULT_QUEUE_DEPTH ,
55} ;
6+ #[ cfg( feature = "multi_thread" ) ]
7+ use crate :: controls:: PartitionStrategyArg ;
68use crate :: frontier:: { Frontier , FrontierError , DEFAULT_FRONTIER_QUEUE , DEFAULT_FRONTIER_SEEN } ;
79use crate :: html:: { stream_links, HtmlStreamError } ;
810use crate :: { Cli , CrawlControls } ;
@@ -15,6 +17,11 @@ use std::sync::Arc;
1517#[ cfg( feature = "multi_thread" ) ]
1618use std:: thread;
1719use std:: time:: { Duration , Instant } ;
20+ #[ cfg( feature = "multi_thread" ) ]
21+ use std:: {
22+ collections:: hash_map:: DefaultHasher ,
23+ hash:: { Hash , Hasher } ,
24+ } ;
1825use tokio:: runtime:: Builder ;
1926#[ cfg( feature = "multi_thread" ) ]
2027use tokio:: sync:: mpsc;
@@ -26,6 +33,8 @@ const USER_AGENT: &str = "fastcrawl-example/0.1 (+https://github.com/aaronlifton
2633const DRAIN_POLL_INTERVAL : Duration = Duration :: from_millis ( 100 ) ;
2734#[ cfg( feature = "multi_thread" ) ]
2835const LINK_BATCH_CHANNEL_CAPACITY : usize = DEFAULT_AGENT_CAPACITY * 4 ;
36+ #[ cfg( feature = "multi_thread" ) ]
37+ const SHARD_STACK_SIZE : usize = 8 * 1024 * 1024 ;
2938
3039/// Predicate used to accept or reject discovered URLs.
3140pub type UrlFilter = Arc < dyn Fn ( & Url ) -> bool + Send + Sync > ;
@@ -53,40 +62,102 @@ impl SharedRun {
5362}
5463
5564#[ cfg( feature = "multi_thread" ) ]
56- #[ derive( Clone , Copy ) ]
65+ #[ derive( Clone ) ]
5766struct ShardPartition {
5867 index : usize ,
59- total : usize ,
68+ strategy : Arc < ShardStrategy > ,
6069}
6170
6271#[ cfg( feature = "multi_thread" ) ]
6372impl ShardPartition {
64- fn new ( index : usize , total : usize ) -> Self {
65- Self { index, total }
73+ fn new ( index : usize , strategy : Arc < ShardStrategy > ) -> Self {
74+ Self { index, strategy }
6675 }
6776
68- fn owner_for ( url : & Url , total : usize ) -> usize {
69- use std:: collections:: hash_map:: DefaultHasher ;
70- use std:: hash:: { Hash , Hasher } ;
71- let mut hasher = DefaultHasher :: new ( ) ;
72- url. as_str ( ) . hash ( & mut hasher) ;
73- ( hasher. finish ( ) as usize ) % total. max ( 1 )
77+ fn shard_for_url ( & self , url : & Url ) -> usize {
78+ self . strategy . owner_for ( url)
79+ }
80+
81+ fn shard_for_str ( & self , value : & str ) -> usize {
82+ self . strategy . owner_for_str ( value)
83+ }
84+
85+ fn index ( & self ) -> usize {
86+ self . index
87+ }
88+ }
89+
90+ #[ cfg( feature = "multi_thread" ) ]
91+ #[ derive( Clone ) ]
92+ struct ShardStrategy {
93+ total : usize ,
94+ kind : PartitionKind ,
95+ }
96+
97+ #[ cfg( feature = "multi_thread" ) ]
98+ impl ShardStrategy {
99+ fn new ( total : usize , kind : PartitionKind ) -> Self {
100+ Self {
101+ total : total. max ( 1 ) ,
102+ kind,
103+ }
104+ }
105+
106+ fn owner_for ( & self , url : & Url ) -> usize {
107+ match self . kind {
108+ PartitionKind :: Hash => Self :: hash ( url. as_str ( ) , self . total ) ,
109+ PartitionKind :: WikiPrefix => self
110+ . wiki_bucket ( url)
111+ . unwrap_or_else ( || Self :: hash ( url. as_str ( ) , self . total ) ) ,
112+ }
113+ }
114+
115+ fn owner_for_str ( & self , value : & str ) -> usize {
116+ Url :: parse ( value)
117+ . ok ( )
118+ . map ( |url| self . owner_for ( & url) )
119+ . unwrap_or_else ( || Self :: hash ( value, self . total ) )
74120 }
75121
76- fn owner_for_str ( input : & str , total : usize ) -> usize {
77- use std:: collections:: hash_map:: DefaultHasher ;
78- use std:: hash:: { Hash , Hasher } ;
122+ fn wiki_bucket ( & self , url : & Url ) -> Option < usize > {
123+ if !url
124+ . domain ( )
125+ . map ( |d| d. contains ( "wikipedia.org" ) )
126+ . unwrap_or ( false )
127+ {
128+ return None ;
129+ }
130+ let slug = url. path ( ) . strip_prefix ( "/wiki/" ) ?;
131+ let first = slug. chars ( ) . next ( ) ?;
132+ if first. is_ascii_alphabetic ( ) {
133+ let idx = ( first. to_ascii_uppercase ( ) as u8 - b'A' ) as usize ;
134+ Some ( idx % self . total )
135+ } else {
136+ None
137+ }
138+ }
139+
140+ fn hash ( value : & str , total : usize ) -> usize {
79141 let mut hasher = DefaultHasher :: new ( ) ;
80- input . hash ( & mut hasher) ;
142+ value . hash ( & mut hasher) ;
81143 ( hasher. finish ( ) as usize ) % total. max ( 1 )
82144 }
145+ }
83146
84- fn total ( & self ) -> usize {
85- self . total
86- }
147+ #[ cfg( feature = "multi_thread" ) ]
148+ #[ derive( Clone , Copy ) ]
149+ enum PartitionKind {
150+ Hash ,
151+ WikiPrefix ,
152+ }
87153
88- fn index ( & self ) -> usize {
89- self . index
154+ #[ cfg( feature = "multi_thread" ) ]
155+ impl From < PartitionStrategyArg > for PartitionKind {
156+ fn from ( value : PartitionStrategyArg ) -> Self {
157+ match value {
158+ PartitionStrategyArg :: Hash => PartitionKind :: Hash ,
159+ PartitionStrategyArg :: WikiPrefix => PartitionKind :: WikiPrefix ,
160+ }
90161 }
91162}
92163
@@ -169,12 +240,13 @@ async fn run_single_thread(cli: Cli, seeds: &[&str], filter: UrlFilter) -> Resul
169240
170241#[ cfg( feature = "multi_thread" ) ]
171242fn run_multi_thread ( cli : Cli , seeds : & [ & str ] , filter : UrlFilter ) -> Result < ( ) , DynError > {
172- const SHARD_STACK_SIZE : usize = 8 * 1024 * 1024 ;
173243 let shard_count = thread:: available_parallelism ( )
174244 . map ( |n| n. get ( ) )
175245 . unwrap_or ( 4 )
176246 . max ( 1 ) ;
177247 let shared = SharedRun :: new ( & cli) ;
248+ let partition_kind = PartitionKind :: from ( cli. partition_strategy ( ) ) ;
249+ let strategy = Arc :: new ( ShardStrategy :: new ( shard_count, partition_kind) ) ;
178250 let seeds_owned: Vec < String > = seeds. iter ( ) . map ( |s| s. to_string ( ) ) . collect ( ) ;
179251 let start = Instant :: now ( ) ;
180252
@@ -193,6 +265,7 @@ fn run_multi_thread(cli: Cli, seeds: &[&str], filter: UrlFilter) -> Result<(), D
193265 let filter_clone = Arc :: clone ( & filter) ;
194266 let senders_clone = Arc :: clone ( & senders) ;
195267 let seeds_clone = seeds_owned. clone ( ) ;
268+ let partition = ShardPartition :: new ( index, Arc :: clone ( & strategy) ) ;
196269 let builder = thread:: Builder :: new ( )
197270 . name ( format ! ( "fastcrawl-shard-{index}" ) )
198271 . stack_size ( SHARD_STACK_SIZE ) ;
@@ -203,7 +276,7 @@ fn run_multi_thread(cli: Cli, seeds: &[&str], filter: UrlFilter) -> Result<(), D
203276 seeds_clone,
204277 filter_clone,
205278 shared_clone,
206- ShardPartition :: new ( index , shard_count ) ,
279+ partition ,
207280 senders_clone,
208281 receiver,
209282 ) ) )
@@ -231,7 +304,7 @@ async fn run_shard_streaming(
231304 inbox : mpsc:: Receiver < LinkBatch > ,
232305) -> Result < ( ) , DynError > {
233306 let state = AppState :: new_with_shared ( filter, & shared) ?;
234- seed_partitioned_frontier ( state. frontier . as_ref ( ) , & seeds, partition) . await ;
307+ seed_partitioned_frontier ( state. frontier . as_ref ( ) , & seeds, & partition) . await ;
235308 let start = Instant :: now ( ) ;
236309
237310 let dispatcher = {
@@ -646,7 +719,7 @@ async fn route_discovered_links<const QUEUE: usize, const SEEN: usize>(
646719 let mut local_links = Vec :: new ( ) ;
647720 let mut remote: HashMap < usize , Vec < Url > > = HashMap :: new ( ) ;
648721 for link in discovered_links {
649- let owner = ShardPartition :: owner_for ( & link , router. partition . total ( ) ) ;
722+ let owner = router. partition . shard_for_url ( & link ) ;
650723 if owner == router. partition . index ( ) {
651724 local_links. push ( link) ;
652725 } else {
@@ -655,6 +728,7 @@ async fn route_discovered_links<const QUEUE: usize, const SEEN: usize>(
655728 }
656729
657730 if !local_links. is_empty ( ) {
731+ metrics. record_local_shard_links ( local_links. len ( ) ) ;
658732 enqueue_discovered_links (
659733 local_links,
660734 parent_depth,
@@ -669,6 +743,7 @@ async fn route_discovered_links<const QUEUE: usize, const SEEN: usize>(
669743
670744 for ( shard, links) in remote {
671745 if let Some ( sender) = router. remotes . get ( shard) {
746+ let batch_len = links. len ( ) ;
672747 if sender
673748 . send ( LinkBatch {
674749 depth : parent_depth,
@@ -681,6 +756,8 @@ async fn route_discovered_links<const QUEUE: usize, const SEEN: usize>(
681756 "shard {}: remote shard {shard} channel closed" ,
682757 router. partition. index( )
683758 ) ;
759+ } else {
760+ metrics. record_remote_shard_links ( batch_len) ;
684761 }
685762 }
686763 }
@@ -763,10 +840,10 @@ async fn seed_frontier(
763840async fn seed_partitioned_frontier (
764841 frontier : & Frontier < DEFAULT_FRONTIER_QUEUE , DEFAULT_FRONTIER_SEEN > ,
765842 seeds : & [ String ] ,
766- partition : ShardPartition ,
843+ partition : & ShardPartition ,
767844) {
768845 for seed in seeds {
769- let owner = ShardPartition :: owner_for_str ( seed , partition. total ( ) ) ;
846+ let owner = partition. shard_for_str ( seed ) ;
770847 if owner != partition. index ( ) {
771848 continue ;
772849 }
@@ -805,6 +882,12 @@ struct Metrics {
805882 frontier_rejections : AtomicUsize ,
806883 http_errors : AtomicUsize ,
807884 parse_errors : AtomicUsize ,
885+ #[ cfg( feature = "multi_thread" ) ]
886+ local_shard_links : AtomicUsize ,
887+ #[ cfg( feature = "multi_thread" ) ]
888+ remote_shard_links : AtomicUsize ,
889+ #[ cfg( feature = "multi_thread" ) ]
890+ remote_batches : AtomicUsize ,
808891}
809892
810893impl Metrics {
@@ -841,6 +924,17 @@ impl Metrics {
841924 }
842925 }
843926
927+ #[ cfg( feature = "multi_thread" ) ]
928+ fn record_local_shard_links ( & self , count : usize ) {
929+ self . local_shard_links . fetch_add ( count, Ordering :: Relaxed ) ;
930+ }
931+
932+ #[ cfg( feature = "multi_thread" ) ]
933+ fn record_remote_shard_links ( & self , count : usize ) {
934+ self . remote_shard_links . fetch_add ( count, Ordering :: Relaxed ) ;
935+ self . remote_batches . fetch_add ( 1 , Ordering :: Relaxed ) ;
936+ }
937+
844938 fn report ( & self , elapsed : Duration ) {
845939 let secs = elapsed. as_secs_f32 ( ) . max ( f32:: EPSILON ) ;
846940 let fetched = self . pages_fetched . load ( Ordering :: Relaxed ) ;
@@ -868,6 +962,18 @@ impl Metrics {
868962 "url parse errors: {}" ,
869963 self . parse_errors. load( Ordering :: Relaxed )
870964 ) ;
965+ #[ cfg( feature = "multi_thread" ) ]
966+ {
967+ println ! (
968+ "local shard enqueues: {}" ,
969+ self . local_shard_links. load( Ordering :: Relaxed )
970+ ) ;
971+ println ! (
972+ "remote shard links: {} (batches {})" ,
973+ self . remote_shard_links. load( Ordering :: Relaxed ) ,
974+ self . remote_batches. load( Ordering :: Relaxed )
975+ ) ;
976+ }
871977 }
872978}
873979
0 commit comments