@@ -7,15 +7,13 @@ local plugin_discovery = require("plugin_discovery")
77local logger = require (" logger" ):named (" relay" )
88
99local function run ()
10- -- Load configuration
1110 local config = consts .get_config ()
1211
1312 logger :info (" relay central hub starting" , {
1413 max_connections_per_user = config .max_connections_per_user ,
1514 inactivity_timeout = config .user_hub_inactivity_timeout
1615 })
1716
18- -- Validate required configuration
1917 if not config .user_security_scope or config .user_security_scope == " " then
2018 error (" RELAY_USER_SECURITY_SCOPE environment variable is required" )
2119 end
@@ -24,36 +22,27 @@ local function run()
2422 error (" RELAY_HOST environment variable is required" )
2523 end
2624
27- -- Discover plugins once at startup
2825 local plugins , plugin_err = plugin_discovery .get_plugins ()
2926 if plugin_err then
3027 error (" Failed to discover plugins: " .. plugin_err )
3128 end
3229
3330 logger :info (" plugin discovery complete" , { plugin_count = table_length (plugins ) })
3431
35- -- Initialize state
3632 local state = {
3733 config = config ,
3834 plugins = plugins ,
39- user_hubs = {}, -- user_id -> { hub_pid, last_activity, client_count }
35+ user_hubs = {},
4036 total_hubs = 0
4137 }
4238
43- -- Register this process
4439 process .registry .register (consts .CENTRAL_HUB_REGISTRY_NAME )
45-
46- -- Set trap_links to handle user hub failures gracefully
4740 process .set_options ({ trap_links = true })
4841
49- -- Set up GC ticker
5042 local gc_ticker = time .ticker (config .gc_check_interval )
51-
52- -- Set up channels
5343 local inbox = process .inbox ()
5444 local events = process .events ()
5545
56- -- Main loop
5746 while true do
5847 local result = channel .select ({
5948 inbox :case_receive (),
@@ -73,14 +62,12 @@ local function run()
7362 if topic == consts .WS_TOPICS .JOIN then
7463 handle_client_connection (state , payload .client_pid , payload .metadata )
7564 elseif topic == consts .WS_TOPICS .LEAVE then
76- -- Log user leave in central for tracking
7765 if payload .metadata and payload .metadata .user_id then
7866 logger :info (" user leaving central hub" , { user_id = payload .metadata .user_id })
7967 end
8068 elseif topic == consts .HUB_TOPICS .ACTIVITY_UPDATE then
8169 handle_activity_update (state , payload )
8270 else
83- -- Broadcast unknown messages to all user hubs
8471 for user_id , hub_info in pairs (state .user_hubs ) do
8572 if hub_info .hub_pid then
8673 process .send (hub_info .hub_pid , topic , payload )
@@ -90,19 +77,21 @@ local function run()
9077
9178 elseif result .channel == events then
9279 local event = result .value
80+ if event .kind == process .event .CANCEL then
81+ logger :info (" received cancel signal" )
82+ break
83+ end
9384 handle_process_event (state , event )
9485
9586 elseif result .channel == gc_ticker :channel () then
9687 check_inactive_hubs (state )
9788 end
9889 end
9990
100- -- Cleanup on exit
10191 gc_ticker :stop ()
10292
10393 logger :info (" shutting down relay central hub" , { active_hubs = state .total_hubs })
10494
105- -- Cancel all user hubs
10695 for user_id , hub_info in pairs (state .user_hubs ) do
10796 if hub_info .hub_pid then
10897 process .cancel (hub_info .hub_pid , consts .CANCEL_TIMEOUT )
@@ -118,7 +107,6 @@ function table_length(t)
118107 return count
119108end
120109
121- -- Handle client connection request
122110function handle_client_connection (state , client_pid , metadata )
123111 local user_id = metadata and metadata .user_id
124112 if not user_id then
@@ -129,7 +117,6 @@ function handle_client_connection(state, client_pid, metadata)
129117 return
130118 end
131119
132- -- Check connection limit
133120 if state .user_hubs [user_id ] and
134121 state .user_hubs [user_id ].client_count >= state .config .max_connections_per_user then
135122 logger :warn (" connection limit exceeded" , { user_id = user_id , limit = state .config .max_connections_per_user })
@@ -141,7 +128,6 @@ function handle_client_connection(state, client_pid, metadata)
141128 return
142129 end
143130
144- -- Get or create user hub
145131 local user_hub_pid = get_or_create_user_hub (state , user_id , metadata )
146132 if not user_hub_pid then
147133 logger :error (" user hub creation failed" , { user_id = user_id })
@@ -152,37 +138,30 @@ function handle_client_connection(state, client_pid, metadata)
152138 return
153139 end
154140
155- -- Send control message to redirect client
156141 process .send (client_pid , consts .WS_TOPICS .CONTROL , {
157142 target_pid = user_hub_pid ,
158143 metadata = metadata ,
159144 plugins = state .plugins
160145 })
161146
162- -- Update activity
163147 if state .user_hubs [user_id ] then
164148 state .user_hubs [user_id ].last_activity = time .now ()
165149 end
166150end
167151
168- -- Get or create user hub for user
169152function get_or_create_user_hub (state , user_id , metadata )
170- -- Check if hub already exists
171153 if state .user_hubs [user_id ] and state .user_hubs [user_id ].hub_pid then
172154 return state .user_hubs [user_id ].hub_pid
173155 end
174156
175- -- Create user actor
176157 local user_metadata = metadata .user_metadata or {}
177158 local user_actor = security .new_actor (user_id , user_metadata )
178159
179- -- Get user scope
180160 local user_scope , scope_err = security .named_scope (state .config .user_security_scope )
181161 if scope_err then
182162 error (" Failed to get user security scope: " .. scope_err )
183163 end
184164
185- -- Spawn user hub process with linking and monitoring
186165 local hub_pid , err = process .with_context ({})
187166 :with_scope (user_scope )
188167 :with_actor (user_actor )
@@ -203,7 +182,6 @@ function get_or_create_user_hub(state, user_id, metadata)
203182 return nil
204183 end
205184
206- -- Store hub info
207185 state .user_hubs [user_id ] = {
208186 hub_pid = hub_pid ,
209187 created_at = time .now (),
@@ -218,7 +196,6 @@ function get_or_create_user_hub(state, user_id, metadata)
218196 return hub_pid
219197end
220198
221- -- Handle activity updates from user hubs
222199function handle_activity_update (state , payload )
223200 local user_id = payload .user_id
224201 if user_id and state .user_hubs [user_id ] then
@@ -232,15 +209,13 @@ function handle_activity_update(state, payload)
232209 end
233210end
234211
235- -- Handle process events
236212function handle_process_event (state , event )
237213 if event .kind ~= process .event .EXIT and event .kind ~= process .event .LINK_DOWN then
238214 return
239215 end
240216
241217 local from_pid = event .from
242218
243- -- Find and remove terminated hub
244219 for user_id , hub_info in pairs (state .user_hubs ) do
245220 if hub_info .hub_pid == from_pid then
246221 state .user_hubs [user_id ] = nil
@@ -256,18 +231,15 @@ function handle_process_event(state, event)
256231 end
257232end
258233
259- -- Check for inactive user hubs
260234function check_inactive_hubs (state )
261235 local now = time .now ()
262236 local inactivity_duration , _ = time .parse_duration (state .config .user_hub_inactivity_timeout )
263237
264238 for user_id , hub_info in pairs (state .user_hubs ) do
265- -- Skip if hub has clients or is being terminated
266239 if hub_info .client_count > 0 or hub_info .terminating then
267240 goto continue
268241 end
269242
270- -- Check inactivity
271243 local last_activity = hub_info .last_activity or hub_info .created_at
272244 local time_since_activity = now :sub (last_activity )
273245
@@ -284,4 +256,4 @@ function check_inactive_hubs(state)
284256 end
285257end
286258
287- return { run = run }
259+ return { run = run }
0 commit comments