@@ -4,39 +4,118 @@ static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
44
55use veloflux:: server;
66
7- #[ tokio:: main]
8- async fn main ( ) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
7+ #[ derive( Debug , Clone ) ]
8+ struct WorkerCliArgs {
9+ instance_id : String ,
10+ config_path : String ,
11+ startup_gate_dir : Option < String > ,
12+ startup_gate_timeout_ms : Option < u64 > ,
13+ }
14+
15+ impl WorkerCliArgs {
16+ fn parse ( args : Vec < String > ) -> Result < Self , Box < dyn std:: error:: Error + Send + Sync > > {
17+ let mut instance_id = None ;
18+ let mut config_path = None ;
19+ let mut startup_gate_dir = None ;
20+ let mut startup_gate_timeout_ms = None ;
21+ let mut it = args. into_iter ( ) . peekable ( ) ;
22+ while let Some ( arg) = it. next ( ) {
23+ match arg. as_str ( ) {
24+ "--flow-instance-id" => {
25+ instance_id = it. next ( ) ;
26+ }
27+ "--config" => {
28+ config_path = it. next ( ) ;
29+ }
30+ "--startup-gate-dir" => {
31+ startup_gate_dir = it. next ( ) ;
32+ }
33+ "--startup-gate-timeout-ms" => {
34+ if let Some ( raw) = it. next ( ) {
35+ let parsed = raw
36+ . parse :: < u64 > ( )
37+ . map_err ( |_| format ! ( "invalid --startup-gate-timeout-ms: {raw}" ) ) ?;
38+ startup_gate_timeout_ms = Some ( parsed) ;
39+ }
40+ }
41+ _ => { }
42+ }
43+ }
44+
45+ let instance_id = instance_id. ok_or ( "--flow-instance-id is required in --worker mode" ) ?;
46+ let config_path = config_path. ok_or ( "--config is required in --worker mode" ) ?;
47+ Ok ( Self {
48+ instance_id,
49+ config_path,
50+ startup_gate_dir,
51+ startup_gate_timeout_ms,
52+ } )
53+ }
54+ }
55+
56+ fn main ( ) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
957 let args = std:: env:: args ( ) . skip ( 1 ) . collect :: < Vec < _ > > ( ) ;
1058 if args. iter ( ) . any ( |arg| arg == "--worker" ) {
11- return run_worker ( args) . await ;
59+ let worker_args = WorkerCliArgs :: parse ( args) ?;
60+ if let Some ( run_dir) = worker_args. startup_gate_dir . as_deref ( ) {
61+ let timeout_ms = worker_args
62+ . startup_gate_timeout_ms
63+ . unwrap_or ( veloflux:: startup_gate:: DEFAULT_STARTUP_GATE_TIMEOUT_MS ) ;
64+ veloflux:: startup_gate:: wait_until_ready (
65+ run_dir,
66+ & worker_args. instance_id ,
67+ timeout_ms,
68+ ) ?;
69+ }
70+ let rt = tokio:: runtime:: Builder :: new_multi_thread ( )
71+ . enable_all ( )
72+ . build ( ) ?;
73+ return rt. block_on ( run_worker ( worker_args) ) ;
1274 }
1375
1476 let result = veloflux:: bootstrap:: default_init ( ) ?;
1577 // Keep logging guard alive for the duration of the application
1678 let _logging_guard = result. logging_guard ;
1779
18- let ctx = server:: init ( result. options , result. instance ) . await ?;
19- server:: start ( ctx) . await
20- }
21-
22- async fn run_worker ( args : Vec < String > ) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
23- let mut instance_id = None ;
24- let mut config_path = None ;
25- let mut it = args. into_iter ( ) . peekable ( ) ;
26- while let Some ( arg) = it. next ( ) {
27- match arg. as_str ( ) {
28- "--flow-instance-id" => {
29- instance_id = it. next ( ) ;
80+ if let Some ( path) = result. options . default_cgroup_path . as_deref ( ) {
81+ match veloflux:: cgroup:: join_current_process ( path) {
82+ Ok ( ( ) ) => {
83+ tracing:: info!(
84+ flow_instance_id = "default" ,
85+ pid = std:: process:: id( ) ,
86+ cgroup_path = %path,
87+ reason = "manager process joined target cgroup (pre-runtime single-thread stage)" ,
88+ "flow instance bound to cgroup"
89+ ) ;
3090 }
31- "--config" => {
32- config_path = it. next ( ) ;
91+ Err ( err) => {
92+ tracing:: error!(
93+ flow_instance_id = "default" ,
94+ pid = std:: process:: id( ) ,
95+ cgroup_path = %path,
96+ reason = %err,
97+ cgroup_snapshot = %veloflux:: cgroup:: debug_snapshot( path) ,
98+ "failed to bind flow instance to cgroup"
99+ ) ;
100+ return Err ( err) ;
33101 }
34- _ => { }
35102 }
36103 }
37104
38- let instance_id = instance_id. ok_or ( "--flow-instance-id is required in --worker mode" ) ?;
39- let config_path = config_path. ok_or ( "--config is required in --worker mode" ) ?;
105+ let rt = tokio:: runtime:: Builder :: new_multi_thread ( )
106+ . enable_all ( )
107+ . build ( ) ?;
108+ rt. block_on ( async move {
109+ let ctx = server:: init ( result. options , result. instance ) . await ?;
110+ server:: start ( ctx) . await
111+ } )
112+ }
113+
114+ async fn run_worker (
115+ worker_args : WorkerCliArgs ,
116+ ) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
117+ let instance_id = worker_args. instance_id ;
118+ let config_path = worker_args. config_path ;
40119
41120 flow:: init_process_once ( ) ;
42121 flow:: metrics:: set_flow_instance_id ( & instance_id) ;
0 commit comments