|
| 1 | +%% The contents of this file are subject to the Mozilla Public License |
| 2 | +%% Version 1.1 (the "License"); you may not use this file except in |
| 3 | +%% compliance with the License. You may obtain a copy of the License |
| 4 | +%% at http://www.mozilla.org/MPL/ |
| 5 | +%% |
| 6 | +%% Software distributed under the License is distributed on an "AS IS" |
| 7 | +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See |
| 8 | +%% the License for the specific language governing rights and |
| 9 | +%% limitations under the License. |
| 10 | +%% |
| 11 | +%% The Original Code is RabbitMQ. |
| 12 | +%% |
| 13 | +%% The Initial Developer of the Original Code is GoPivotal, Inc. |
| 14 | +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. |
| 15 | +%% |
| 16 | + |
| 17 | +-module(rabbit_basic). |
| 18 | +-include("rabbit.hrl"). |
| 19 | +-include("rabbit_framing.hrl"). |
| 20 | + |
| 21 | +-export([publish/4, publish/5, publish/1, |
| 22 | + message/3, message/4, properties/1, prepend_table_header/3, |
| 23 | + extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4, |
| 24 | + header_routes/1, parse_expiration/1, header/2, header/3]). |
| 25 | +-export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]). |
| 26 | + |
| 27 | +%%---------------------------------------------------------------------------- |
| 28 | + |
| 29 | +-type properties_input() :: |
| 30 | + rabbit_framing:amqp_property_record() | [{atom(), any()}]. |
| 31 | +-type publish_result() :: |
| 32 | + {ok, [pid()]} | rabbit_types:error('not_found'). |
| 33 | +-type header() :: any(). |
| 34 | +-type headers() :: rabbit_framing:amqp_table() | 'undefined'. |
| 35 | + |
| 36 | +-type exchange_input() :: rabbit_types:exchange() | rabbit_exchange:name(). |
| 37 | +-type body_input() :: binary() | [binary()]. |
| 38 | + |
| 39 | +-spec publish |
| 40 | + (exchange_input(), rabbit_router:routing_key(), properties_input(), |
| 41 | + body_input()) -> |
| 42 | + publish_result(). |
| 43 | +-spec publish |
| 44 | + (exchange_input(), rabbit_router:routing_key(), boolean(), |
| 45 | + properties_input(), body_input()) -> |
| 46 | + publish_result(). |
| 47 | +-spec publish(rabbit_types:delivery()) -> publish_result(). |
| 48 | +-spec delivery |
| 49 | + (boolean(), boolean(), rabbit_types:message(), undefined | integer()) -> |
| 50 | + rabbit_types:delivery(). |
| 51 | +-spec message |
| 52 | + (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), |
| 53 | + binary()) -> |
| 54 | + rabbit_types:message(). |
| 55 | +-spec message |
| 56 | + (rabbit_exchange:name(), rabbit_router:routing_key(), |
| 57 | + rabbit_types:decoded_content()) -> |
| 58 | + rabbit_types:ok_or_error2(rabbit_types:message(), any()). |
| 59 | +-spec properties |
| 60 | + (properties_input()) -> rabbit_framing:amqp_property_record(). |
| 61 | + |
| 62 | +-spec prepend_table_header |
| 63 | + (binary(), rabbit_framing:amqp_table(), headers()) -> headers(). |
| 64 | + |
| 65 | +-spec header(header(), headers()) -> 'undefined' | any(). |
| 66 | +-spec header(header(), headers(), any()) -> 'undefined' | any(). |
| 67 | + |
| 68 | +-spec extract_headers(rabbit_types:content()) -> headers(). |
| 69 | + |
| 70 | +-spec map_headers |
| 71 | + (fun((headers()) -> headers()), rabbit_types:content()) -> |
| 72 | + rabbit_types:content(). |
| 73 | + |
| 74 | +-spec header_routes(undefined | rabbit_framing:amqp_table()) -> [string()]. |
| 75 | +-spec build_content |
| 76 | + (rabbit_framing:amqp_property_record(), binary() | [binary()]) -> |
| 77 | + rabbit_types:content(). |
| 78 | +-spec from_content |
| 79 | + (rabbit_types:content()) -> |
| 80 | + {rabbit_framing:amqp_property_record(), binary()}. |
| 81 | +-spec parse_expiration |
| 82 | + (rabbit_framing:amqp_property_record()) -> |
| 83 | + rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any()). |
| 84 | + |
| 85 | +%%---------------------------------------------------------------------------- |
| 86 | + |
| 87 | +%% Convenience function, for avoiding round-trips in calls across the |
| 88 | +%% erlang distributed network. |
| 89 | +publish(Exchange, RoutingKeyBin, Properties, Body) -> |
| 90 | + publish(Exchange, RoutingKeyBin, false, Properties, Body). |
| 91 | + |
| 92 | +%% Convenience function, for avoiding round-trips in calls across the |
| 93 | +%% erlang distributed network. |
| 94 | +publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) -> |
| 95 | + Message = message(XName, RKey, properties(Props), Body), |
| 96 | + publish(X, delivery(Mandatory, false, Message, undefined)); |
| 97 | +publish(XName, RKey, Mandatory, Props, Body) -> |
| 98 | + Message = message(XName, RKey, properties(Props), Body), |
| 99 | + publish(delivery(Mandatory, false, Message, undefined)). |
| 100 | + |
| 101 | +publish(Delivery = #delivery{ |
| 102 | + message = #basic_message{exchange_name = XName}}) -> |
| 103 | + case rabbit_exchange:lookup(XName) of |
| 104 | + {ok, X} -> publish(X, Delivery); |
| 105 | + Err -> Err |
| 106 | + end. |
| 107 | + |
| 108 | +publish(X, Delivery) -> |
| 109 | + Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)), |
| 110 | + DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery), |
| 111 | + {ok, DeliveredQPids}. |
| 112 | + |
| 113 | +delivery(Mandatory, Confirm, Message, MsgSeqNo) -> |
| 114 | + #delivery{mandatory = Mandatory, confirm = Confirm, sender = self(), |
| 115 | + message = Message, msg_seq_no = MsgSeqNo, flow = noflow}. |
| 116 | + |
| 117 | +build_content(Properties, BodyBin) when is_binary(BodyBin) -> |
| 118 | + build_content(Properties, [BodyBin]); |
| 119 | + |
| 120 | +build_content(Properties, PFR) -> |
| 121 | + %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 |
| 122 | + {ClassId, _MethodId} = |
| 123 | + rabbit_framing_amqp_0_9_1:method_id('basic.publish'), |
| 124 | + #content{class_id = ClassId, |
| 125 | + properties = Properties, |
| 126 | + properties_bin = none, |
| 127 | + protocol = none, |
| 128 | + payload_fragments_rev = PFR}. |
| 129 | + |
| 130 | +from_content(Content) -> |
| 131 | + #content{class_id = ClassId, |
| 132 | + properties = Props, |
| 133 | + payload_fragments_rev = FragmentsRev} = |
| 134 | + rabbit_binary_parser:ensure_content_decoded(Content), |
| 135 | + %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 |
| 136 | + {ClassId, _MethodId} = |
| 137 | + rabbit_framing_amqp_0_9_1:method_id('basic.publish'), |
| 138 | + {Props, list_to_binary(lists:reverse(FragmentsRev))}. |
| 139 | + |
| 140 | +%% This breaks the spec rule forbidding message modification |
| 141 | +strip_header(#content{properties = #'P_basic'{headers = undefined}} |
| 142 | + = DecodedContent, _Key) -> |
| 143 | + DecodedContent; |
| 144 | +strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} |
| 145 | + = DecodedContent, Key) -> |
| 146 | + case lists:keysearch(Key, 1, Headers) of |
| 147 | + false -> DecodedContent; |
| 148 | + {value, Found} -> Headers0 = lists:delete(Found, Headers), |
| 149 | + rabbit_binary_generator:clear_encoded_content( |
| 150 | + DecodedContent#content{ |
| 151 | + properties = Props#'P_basic'{ |
| 152 | + headers = Headers0}}) |
| 153 | + end. |
| 154 | + |
| 155 | +message(XName, RoutingKey, #content{properties = Props} = DecodedContent) -> |
| 156 | + try |
| 157 | + {ok, #basic_message{ |
| 158 | + exchange_name = XName, |
| 159 | + content = strip_header(DecodedContent, ?DELETED_HEADER), |
| 160 | + id = rabbit_guid:gen(), |
| 161 | + is_persistent = is_message_persistent(DecodedContent), |
| 162 | + routing_keys = [RoutingKey | |
| 163 | + header_routes(Props#'P_basic'.headers)]}} |
| 164 | + catch |
| 165 | + {error, _Reason} = Error -> Error |
| 166 | + end. |
| 167 | + |
| 168 | +message(XName, RoutingKey, RawProperties, Body) -> |
| 169 | + Properties = properties(RawProperties), |
| 170 | + Content = build_content(Properties, Body), |
| 171 | + {ok, Msg} = message(XName, RoutingKey, Content), |
| 172 | + Msg. |
| 173 | + |
| 174 | +properties(P = #'P_basic'{}) -> |
| 175 | + P; |
| 176 | +properties(P) when is_list(P) -> |
| 177 | + %% Yes, this is O(length(P) * record_info(size, 'P_basic') / 2), |
| 178 | + %% i.e. slow. Use the definition of 'P_basic' directly if |
| 179 | + %% possible! |
| 180 | + lists:foldl(fun ({Key, Value}, Acc) -> |
| 181 | + case indexof(record_info(fields, 'P_basic'), Key) of |
| 182 | + 0 -> throw({unknown_basic_property, Key}); |
| 183 | + N -> setelement(N + 1, Acc, Value) |
| 184 | + end |
| 185 | + end, #'P_basic'{}, P). |
| 186 | + |
| 187 | +prepend_table_header(Name, Info, undefined) -> |
| 188 | + prepend_table_header(Name, Info, []); |
| 189 | +prepend_table_header(Name, Info, Headers) -> |
| 190 | + case rabbit_misc:table_lookup(Headers, Name) of |
| 191 | + {array, Existing} -> |
| 192 | + prepend_table(Name, Info, Existing, Headers); |
| 193 | + undefined -> |
| 194 | + prepend_table(Name, Info, [], Headers); |
| 195 | + Other -> |
| 196 | + Headers2 = prepend_table(Name, Info, [], Headers), |
| 197 | + set_invalid_header(Name, Other, Headers2) |
| 198 | + end. |
| 199 | + |
| 200 | +prepend_table(Name, Info, Prior, Headers) -> |
| 201 | + rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]). |
| 202 | + |
| 203 | +set_invalid_header(Name, {_, _}=Value, Headers) when is_list(Headers) -> |
| 204 | + case rabbit_misc:table_lookup(Headers, ?INVALID_HEADERS_KEY) of |
| 205 | + undefined -> |
| 206 | + set_invalid([{Name, array, [Value]}], Headers); |
| 207 | + {table, ExistingHdr} -> |
| 208 | + update_invalid(Name, Value, ExistingHdr, Headers); |
| 209 | + Other -> |
| 210 | + %% somehow the x-invalid-headers header is corrupt |
| 211 | + Invalid = [{?INVALID_HEADERS_KEY, array, [Other]}], |
| 212 | + set_invalid_header(Name, Value, set_invalid(Invalid, Headers)) |
| 213 | + end. |
| 214 | + |
| 215 | +set_invalid(NewHdr, Headers) -> |
| 216 | + rabbit_misc:set_table_value(Headers, ?INVALID_HEADERS_KEY, table, NewHdr). |
| 217 | + |
| 218 | +update_invalid(Name, Value, ExistingHdr, Header) -> |
| 219 | + Values = case rabbit_misc:table_lookup(ExistingHdr, Name) of |
| 220 | + undefined -> [Value]; |
| 221 | + {array, Prior} -> [Value | Prior] |
| 222 | + end, |
| 223 | + NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values), |
| 224 | + set_invalid(NewHdr, Header). |
| 225 | + |
| 226 | +header(_Header, undefined) -> |
| 227 | + undefined; |
| 228 | +header(_Header, []) -> |
| 229 | + undefined; |
| 230 | +header(Header, Headers) -> |
| 231 | + header(Header, Headers, undefined). |
| 232 | + |
| 233 | +header(Header, Headers, Default) -> |
| 234 | + case lists:keysearch(Header, 1, Headers) of |
| 235 | + false -> Default; |
| 236 | + {value, Val} -> Val |
| 237 | + end. |
| 238 | + |
| 239 | +extract_headers(Content) -> |
| 240 | + #content{properties = #'P_basic'{headers = Headers}} = |
| 241 | + rabbit_binary_parser:ensure_content_decoded(Content), |
| 242 | + Headers. |
| 243 | + |
| 244 | +extract_timestamp(Content) -> |
| 245 | + #content{properties = #'P_basic'{timestamp = Timestamp}} = |
| 246 | + rabbit_binary_parser:ensure_content_decoded(Content), |
| 247 | + Timestamp. |
| 248 | + |
| 249 | +map_headers(F, Content) -> |
| 250 | + Content1 = rabbit_binary_parser:ensure_content_decoded(Content), |
| 251 | + #content{properties = #'P_basic'{headers = Headers} = Props} = Content1, |
| 252 | + Headers1 = F(Headers), |
| 253 | + rabbit_binary_generator:clear_encoded_content( |
| 254 | + Content1#content{properties = Props#'P_basic'{headers = Headers1}}). |
| 255 | + |
| 256 | +indexof(L, Element) -> indexof(L, Element, 1). |
| 257 | + |
| 258 | +indexof([], _Element, _N) -> 0; |
| 259 | +indexof([Element | _Rest], Element, N) -> N; |
| 260 | +indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1). |
| 261 | + |
| 262 | +is_message_persistent(#content{properties = #'P_basic'{ |
| 263 | + delivery_mode = Mode}}) -> |
| 264 | + case Mode of |
| 265 | + 1 -> false; |
| 266 | + 2 -> true; |
| 267 | + undefined -> false; |
| 268 | + Other -> throw({error, {delivery_mode_unknown, Other}}) |
| 269 | + end. |
| 270 | + |
| 271 | +%% Extract CC routes from headers |
| 272 | +header_routes(undefined) -> |
| 273 | + []; |
| 274 | +header_routes(HeadersTable) -> |
| 275 | + lists:append( |
| 276 | + [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of |
| 277 | + {array, Routes} -> [Route || {longstr, Route} <- Routes]; |
| 278 | + undefined -> []; |
| 279 | + {Type, _Val} -> throw({error, {unacceptable_type_in_header, |
| 280 | + binary_to_list(HeaderKey), Type}}) |
| 281 | + end || HeaderKey <- ?ROUTING_HEADERS]). |
| 282 | + |
| 283 | +parse_expiration(#'P_basic'{expiration = undefined}) -> |
| 284 | + {ok, undefined}; |
| 285 | +parse_expiration(#'P_basic'{expiration = Expiration}) -> |
| 286 | + case string:to_integer(binary_to_list(Expiration)) of |
| 287 | + {error, no_integer} = E -> |
| 288 | + E; |
| 289 | + {N, ""} -> |
| 290 | + case rabbit_misc:check_expiry(N) of |
| 291 | + ok -> {ok, N}; |
| 292 | + E = {error, _} -> E |
| 293 | + end; |
| 294 | + {_, S} -> |
| 295 | + {error, {leftover_string, S}} |
| 296 | + end. |
| 297 | + |
| 298 | +maybe_gc_large_msg(Content) -> |
| 299 | + rabbit_writer:maybe_gc_large_msg(Content). |
| 300 | + |
| 301 | +msg_size(Content) -> |
| 302 | + rabbit_writer:msg_size(Content). |
0 commit comments