1010-include (" rabbit_stomp_frame.hrl" ).
1111-include (" rabbit_stomp_headers.hrl" ).
1212
13- -export ([parse /2 , initial_state /0 ]).
13+ -export ([parse /2 , initial_state /0 , initial_state / 1 ]).
1414-export ([header /2 , header /3 ,
1515 boolean_header /2 , boolean_header /3 ,
1616 integer_header /2 , integer_header /3 ,
1717 binary_header /2 , binary_header /3 ]).
1818-export ([stream_offset_header /1 , stream_filter_header /1 ]).
1919-export ([serialize /1 , serialize /2 ]).
2020
21- initial_state () -> none .
21+ initial_state () -> {none , ? DEFAULT_STOMP_PARSER_CONFIG }.
22+ initial_state (Config ) -> {none , Config }.
2223
2324% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
2425% % STOMP 1.1 frames basic syntax
@@ -73,11 +74,29 @@ initial_state() -> none.
7374-define (COLON_ESC , $c ).
7475-define (CR_ESC , $r ).
7576
77+ -define (COMMAND_TREE ,
78+ #{$S => #{$E => #{$N => #{$D => 'SEND' }},
79+ $U => #{$B => #{$S => #{$C => #{$R => #{$I => #{$B => #{$E => 'SUBSCRIBE' }}}}}}},
80+ $T => #{$O => #{$M => #{$P => 'STOMP' }}}},
81+ $U => #{$N => #{$S => #{$U => #{$B => #{$S => #{$C => #{$R => #{$I => #{$B => #{$E => 'UNSUBSCRIBE' }}}}}}}}}},
82+ $B => #{$E => #{$G => #{$I => #{$N => 'BEGIN' }}}},
83+ $C => #{$O => #{$M => #{$M => #{$I => #{$T => 'COMMIT' }}},
84+ $N => #{$N => #{$E => #{$C => #{$T => {'CONNECT' ,
85+ #{$E => #{$D => 'CONNECTED' }}}}}}}}},
86+ $A => #{$B => #{$O => #{$R => #{$T => 'ABORT' }}},
87+ $C => #{$K => 'ACK' }},
88+ $N => #{$A => #{$C => #{$K => 'NACK' }}},
89+ $D => #{$I => #{$S => #{$C => #{$O => #{$N => #{$N => #{$E => #{$C => #{$T => 'DISCONNECT' }}}}}}}}},
90+ $M => #{$E => #{$S => #{$S => #{$A => #{$G => #{$E => 'MESSAGE' }}}}}},
91+ $R => #{$E => #{$C => #{$E => #{$I => #{$P => #{$T => 'RECEIPT' }}}}}},
92+ $E => #{$R => #{$R => #{$O => #{$R => 'ERROR' }}}}}).
93+
7694% % parser state
77- -record (state , {acc , cmd , hdrs , hdrname }).
95+ -record (state , {acc , cmd , cmd_tree = ? COMMAND_TREE , hdrs , hdrname , hdrl = 0 ,
96+ config }).
7897
7998parse (Content , {resume , Continuation }) -> Continuation (Content );
80- parse (Content , none ) -> parser (Content , noframe , # state {}).
99+ parse (Content , { none , Config } ) -> parser (Content , noframe , # state {config = Config }).
81100
82101more (Continuation ) -> {more , {resume , Continuation }}.
83102
@@ -103,14 +122,40 @@ parser( Rest, headers , State) -> goto(headers, hdrname,
103122parser (<<? COLON , Rest /binary >>, hdrname , State ) -> goto (hdrname , hdrvalue , Rest , State );
104123parser (<<? LF , Rest /binary >>, hdrname , State ) -> goto (hdrname , headers , Rest , State );
105124parser (<<? LF , Rest /binary >>, hdrvalue , State ) -> goto (hdrvalue , headers , Rest , State );
125+ parser (<<Ch :8 , Rest /binary >>, command , # state {cmd_tree = {_ , CmdTree }} = State ) ->
126+ case maps :get (Ch , CmdTree , undefined ) of
127+ undefined -> {error , unknown_command };
128+ NewCmdTree -> parser (Rest , command , State # state {cmd_tree = NewCmdTree })
129+ end ;
130+ parser (<<Ch :8 , Rest /binary >>, command , # state {cmd_tree = #{} = CmdTree } = State ) ->
131+ case maps :get (Ch , CmdTree , undefined ) of
132+ undefined -> {error , unknown_command };
133+ NewCmdTree -> parser (Rest , command , State # state {cmd_tree = NewCmdTree })
134+ end ;
135+ parser (<<_Ch :8 , _Rest /binary >>, command , _ ) ->
136+ {error , unknown_command };
106137% % accumulate
138+ % % parser(<<Ch:8, Rest/binary>>, Term = hdrname , State = #state{config = #stomp_parser_config{max_header_length = MaxHeaderLength}}) ->
139+
140+ % % parser(Rest, Term, accum(Ch, State));
141+ % % parser(<<Ch:8, Rest/binary>>, Term = hdrvalue , State = #state{config = #stomp_parser_config{max_header_length = MaxHeaderLength}}) ->
142+ % % parser(Rest, Term, accum(Ch, State));
107143parser (<<Ch :8 , Rest /binary >>, Term , State ) -> parser (Rest , Term , accum (Ch , State )).
108144
109145% % state transitions
110- goto (noframe , command , Rest , State ) -> parser (Rest , command , State # state {acc = []});
111- goto (command , headers , Rest , State = # state {acc = Acc } ) -> parser (Rest , headers , State # state {cmd = lists :reverse (Acc ), hdrs = []});
112- goto (headers , body , Rest , # state {cmd = Cmd , hdrs = Hdrs }) -> parse_body (Rest , # stomp_frame {command = Cmd , headers = Hdrs });
113- goto (headers , hdrname , Rest , State ) -> parser (Rest , hdrname , State # state {acc = []});
146+ goto (noframe , command , Rest , State ) -> parser (Rest , command , State # state {acc = [], cmd_tree = ? COMMAND_TREE });
147+ goto (command , headers , Rest , State = # state {cmd_tree = Command }) when is_atom (Command ) ->
148+ parser (Rest , headers , State # state {cmd = Command , hdrs = []});
149+ goto (command , headers , Rest , State = # state {cmd_tree = {Command , _ }}) when is_atom (Command ) ->
150+ parser (Rest , headers , State # state {cmd = Command , hdrs = []});
151+ goto (command , headers , _Rest , _State )->
152+ {error , unknown_command };
153+ goto (headers , body , Rest , State ) -> parse_body (Rest , State );
154+ goto (headers , hdrname , Rest , State = # state {hdrs = Headers , config = # stomp_parser_config {max_headers = MaxHeaders }}) ->
155+ case length (Headers ) == MaxHeaders of
156+ true -> {error , {max_headeres , MaxHeaders }};
157+ _ -> parser (Rest , hdrname , State # state {acc = []})
158+ end ;
114159goto (hdrname , hdrvalue , Rest , State = # state {acc = Acc } ) -> parser (Rest , hdrvalue , State # state {acc = [], hdrname = lists :reverse (Acc )});
115160goto (hdrname , headers , _Rest , # state {acc = Acc } ) -> {error , {header_no_value , lists :reverse (Acc )}}; % badly formed header -- fatal error
116161goto (hdrvalue , headers , Rest , State = # state {acc = Acc , hdrs = Headers , hdrname = HdrName }) ->
@@ -140,28 +185,51 @@ insert_header(Headers, Name, Value) ->
140185 false -> [{Name , Value } | Headers ]
141186 end .
142187
143- parse_body (Content , Frame = # stomp_frame {command = Command }) ->
144- case Command of
145- " SEND" -> parse_body (Content , Frame , [], integer_header (Frame , ? HEADER_CONTENT_LENGTH , unknown ));
146- _ -> parse_body (Content , Frame , [], unknown )
188+ parse_body (Content , State ) ->
189+ # state {cmd = Cmd , hdrs = Hdrs , config = # stomp_parser_config {max_body_length = MaxBodyLength }} = State ,
190+ Frame = # stomp_frame {command = Cmd , headers = Hdrs },
191+ case Cmd of
192+ 'SEND' ->
193+ case integer_header (Frame , ? HEADER_CONTENT_LENGTH , unknown ) of
194+ ContentLength when is_integer (ContentLength ) and (ContentLength > MaxBodyLength ) ->
195+ {error , {max_body_length , ContentLength }};
196+ ContentLength when is_integer (ContentLength ) ->
197+ parse_known_body (Content , Frame , [], ContentLength );
198+ _ ->
199+ parse_unknown_body (Content , Frame , [], MaxBodyLength )
200+ end ;
201+ _ ->
202+ parse_unknown_body (Content , Frame , [], MaxBodyLength )
203+ end .
204+
205+ -define (MORE_BODY (Content , Frame , Chunks , Remaining ),
206+ Chunks1 = finalize_chunk (Content , Chunks ),
207+ more (fun (Rest ) -> ? FUNCTION_NAME (Rest , Frame , Chunks1 , Remaining ) end )).
208+
209+ parse_unknown_body (Content , Frame , Chunks , Remaining ) ->
210+ case firstnull (Content ) of
211+ - 1 ->
212+ ChunkSize = byte_size (Content ),
213+ case ChunkSize > Remaining of
214+ true -> {error , {max_body_length , unknown }};
215+ false -> ? MORE_BODY (Content , Frame , Chunks , Remaining - ChunkSize )
216+ end ;
217+ Pos ->
218+ case Pos > Remaining of
219+ true -> {error , {max_body_length , unknown }};
220+ false -> finish_body (Content , Frame , Chunks , Pos )
221+ end
147222 end .
148223
149- parse_body (Content , Frame , Chunks , unknown ) ->
150- parse_body2 (Content , Frame , Chunks , case firstnull (Content ) of
151- - 1 -> {more , unknown };
152- Pos -> {done , Pos }
153- end );
154- parse_body (Content , Frame , Chunks , Remaining ) ->
224+ parse_known_body (Content , Frame , Chunks , Remaining ) ->
155225 Size = byte_size (Content ),
156- parse_body2 (Content , Frame , Chunks , case Remaining >= Size of
157- true -> {more , Remaining - Size };
158- false -> {done , Remaining }
159- end ).
160-
161- parse_body2 (Content , Frame , Chunks , {more , Left }) ->
162- Chunks1 = finalize_chunk (Content , Chunks ),
163- more (fun (Rest ) -> parse_body (Rest , Frame , Chunks1 , Left ) end );
164- parse_body2 (Content , Frame , Chunks , {done , Pos }) ->
226+ case Remaining >= Size of
227+ true ->
228+ ? MORE_BODY (Content , Frame , Chunks , Remaining - Size );
229+ false -> finish_body (Content , Frame , Chunks , Remaining )
230+ end .
231+
232+ finish_body (Content , Frame , Chunks , Pos ) ->
165233 <<Chunk :Pos /binary , 0 , Rest /binary >> = Content ,
166234 Body = finalize_chunk (Chunk , Chunks ),
167235 {ok , Frame # stomp_frame {body_iolist_rev = Body }, Rest }.
@@ -251,7 +319,7 @@ serialize(#stomp_frame{command = Command,
251319 headers = Headers ,
252320 body_iolist_rev = BodyFragments }, false ) ->
253321 Len = iolist_size (BodyFragments ),
254- [Command , ? LF ,
322+ [serialize_command ( Command ) , ? LF ,
255323 lists :map (fun serialize_header /1 ,
256324 lists :keydelete (? HEADER_CONTENT_LENGTH , 1 , Headers )),
257325 if
@@ -263,6 +331,10 @@ serialize(#stomp_frame{command = Command,
263331 _ -> lists :reverse (BodyFragments )
264332 end , 0 ].
265333
334+ serialize_command (Command ) when is_atom (Command ) ->
335+ atom_to_binary (Command , utf8 );
336+ serialize_command (Command ) -> Command .
337+
266338serialize_header ({K , V }) when is_integer (V ) -> hdr (escape (K ), integer_to_list (V ));
267339serialize_header ({K , V }) when is_boolean (V ) -> hdr (escape (K ), boolean_to_list (V ));
268340serialize_header ({K , V }) when is_list (V ) -> hdr (escape (K ), escape (V )).
0 commit comments