2424-define (OTHER_METRICS , [recv_cnt , send_cnt , send_pend , garbage_collection , state ,
2525 timeout ]).
2626
27- -record (reader_state , {socket , conn_name , parse_state , processor_state , state ,
28- conserve_resources , recv_outstanding , stats_timer ,
29- parent , connection , heartbeat_sup , heartbeat ,
30- timeout_sec % % heartbeat timeout value used, 0 means
31- % % heartbeats are disabled
32- }).
27+ -record (reader_state , {
28+ socket ,
29+ conn_name ,
30+ parse_state ,
31+ processor_state ,
32+ state ,
33+ conserve_resources ,
34+ recv_outstanding ,
35+ max_frame_size ,
36+ current_frame_size ,
37+ stats_timer ,
38+ parent ,
39+ connection ,
40+ heartbeat_sup , heartbeat ,
41+ % % heartbeat timeout value used, 0 means
42+ % % heartbeats are disabled
43+ timeout_sec
44+ }).
3345
3446% %----------------------------------------------------------------------------
3547
@@ -69,6 +81,7 @@ init([SupHelperPid, Ref, Configuration]) ->
6981 _ = register_resource_alarm (),
7082
7183 LoginTimeout = application :get_env (rabbitmq_stomp , login_timeout , 10_000 ),
84+ MaxFrameSize = application :get_env (rabbitmq_stomp , max_frame_size , ? DEFAULT_MAX_FRAME_SIZE ),
7285 erlang :send_after (LoginTimeout , self (), login_timeout ),
7386
7487 gen_server2 :enter_loop (? MODULE , [],
@@ -80,6 +93,8 @@ init([SupHelperPid, Ref, Configuration]) ->
8093 processor_state = ProcState ,
8194 heartbeat_sup = SupHelperPid ,
8295 heartbeat = {none , none },
96+ max_frame_size = MaxFrameSize ,
97+ current_frame_size = 0 ,
8398 state = running ,
8499 conserve_resources = false ,
85100 recv_outstanding = false })), # reader_state .stats_timer ),
@@ -222,23 +237,41 @@ process_received_bytes([], State) ->
222237 {ok , State };
223238process_received_bytes (Bytes ,
224239 State = # reader_state {
225- processor_state = ProcState ,
226- parse_state = ParseState }) ->
240+ max_frame_size = MaxFrameSize ,
241+ current_frame_size = FrameLength ,
242+ processor_state = ProcState ,
243+ parse_state = ParseState }) ->
227244 case rabbit_stomp_frame :parse (Bytes , ParseState ) of
228245 {more , ParseState1 } ->
229- {ok , State # reader_state {parse_state = ParseState1 }};
246+ FrameLength1 = FrameLength + byte_size (Bytes ),
247+ case FrameLength1 > MaxFrameSize of
248+ true ->
249+ log_reason ({network_error , {frame_too_big , {FrameLength1 , MaxFrameSize }}}, State ),
250+ {stop , normal , State };
251+ false ->
252+ {ok , State # reader_state {parse_state = ParseState1 ,
253+ current_frame_size = FrameLength1 }}
254+ end ;
230255 {ok , Frame , Rest } ->
231- case rabbit_stomp_processor :process_frame (Frame , ProcState ) of
232- {ok , NewProcState , Conn } ->
233- PS = rabbit_stomp_frame :initial_state (),
234- NextState = maybe_block (State , Frame ),
235- process_received_bytes (Rest , NextState # reader_state {
236- processor_state = NewProcState ,
237- parse_state = PS ,
238- connection = Conn });
239- {stop , Reason , NewProcState } ->
240- {stop , Reason ,
241- processor_state (NewProcState , State )}
256+ FrameLength1 = FrameLength + byte_size (Bytes ) - byte_size (Rest ),
257+ case FrameLength1 > MaxFrameSize of
258+ true ->
259+ log_reason ({network_error , {frame_too_big , {FrameLength1 , MaxFrameSize }}}, State ),
260+ {stop , normal , State };
261+ false ->
262+ case rabbit_stomp_processor :process_frame (Frame , ProcState ) of
263+ {ok , NewProcState , Conn } ->
264+ PS = rabbit_stomp_frame :initial_state (),
265+ NextState = maybe_block (State , Frame ),
266+ process_received_bytes (Rest , NextState # reader_state {
267+ current_frame_size = 0 ,
268+ processor_state = NewProcState ,
269+ parse_state = PS ,
270+ connection = Conn });
271+ {stop , Reason , NewProcState } ->
272+ {stop , Reason ,
273+ processor_state (NewProcState , State )}
274+ end
242275 end ;
243276 {error , Reason } ->
244277 % % The parser couldn't parse data. We log the reason right
0 commit comments