diff --git a/components/proto_json/src/proto_json_rpc.erl b/components/proto_json/src/proto_json_rpc.erl index e01a985..9722890 100644 --- a/components/proto_json/src/proto_json_rpc.erl +++ b/components/proto_json/src/proto_json_rpc.erl @@ -22,7 +22,7 @@ -define(SERVER, ?MODULE). -export([start_json_server/0]). --export([send_message/8, +-export([send_message/8, send_message/9, receive_message/3]). -record(st, { @@ -42,6 +42,10 @@ start_json_server() -> rvi_common:start_json_rpc_server(protocol, ?MODULE, proto_json_sup). +send_message(CompSpec, TID, ServiceName, Timeout, ProtoOpts, + DataLinkMod, DataLinkOpts, Parameters) -> + send_message(CompSpec, TID, ServiceName, Timeout, ProtoOpts, + DataLinkMod, DataLinkOpts, [], Parameters). send_message(CompSpec, TID, @@ -50,6 +54,7 @@ send_message(CompSpec, ProtoOpts, DataLinkMod, DataLinkOpts, + Extra, Parameters) -> rvi_common:request(protocol, ?MODULE, send_message, [{ transaction_id, TID }, @@ -58,6 +63,7 @@ send_message(CompSpec, { protocol_opts, ProtoOpts }, { data_link_mod, DataLinkMod }, { data_link_opts, DataLinkOpts }, + { extra, Extra }, { parameters, Parameters }], [ status ], CompSpec). @@ -79,6 +85,7 @@ handle_rpc(<<"send_message">>, Args) -> {ok, ProtoOpts} = rvi_common:get_json_element(["protocol_opts"], Args), {ok, DataLinkMod} = rvi_common:get_json_element(["data_link_mod"], Args), {ok, DataLinkOpts} = rvi_common:get_json_element(["data_link_opts"], Args), + Extra = rvi_common:get_opt_json_element(["extra"], [], Args), {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), [ ok ] = gen_server:call(?SERVER, { rvi, send_message, [TID, @@ -87,6 +94,7 @@ handle_rpc(<<"send_message">>, Args) -> ProtoOpts, DataLinkMod, DataLinkOpts, + Extra, Parameters, LogId]}), {ok, [ {status, rvi_common:json_rpc_status(ok)} ]}; @@ -121,6 +129,7 @@ handle_call({rvi, send_message, ProtoOpts, DataLinkMod, DataLinkOpts, + Extra, Parameters | _LogId]}, _From, St) -> ?debug(" protocol:send(): transaction id: ~p~n", [TID]), ?debug(" protocol:send(): service name: ~p~n", [ServiceName]), @@ -128,10 +137,12 @@ handle_call({rvi, send_message, ?debug(" protocol:send(): opts: ~p~n", [ProtoOpts]), ?debug(" protocol:send(): data_link_mod: ~p~n", [DataLinkMod]), ?debug(" protocol:send(): data_link_opts: ~p~n", [DataLinkOpts]), + ?debug(" protocol:send(): extra: ~p~n", [Extra]), ?debug(" protocol:send(): parameters: ~p~n", [Parameters]), Data = [{ <<"service">>, ServiceName }, { <<"timeout">>, Timeout }, { <<"parameters">>, Parameters } + | Extra ], RviOpts = rvi_common:rvi_options(Parameters), Res = DataLinkMod:send_data( @@ -146,17 +157,19 @@ handle_call(Other, _From, St) -> handle_cast({rvi, receive_message, [Elems, IP, Port | _LogId]} = Msg, St) -> ?debug("~p:handle_cast(~p)", [?MODULE, Msg]), - [ ServiceName, Timeout, Parameters ] = - opts([<<"service">>, <<"timeout">>, <<"parameters">>], + [ ServiceName, Timeout, Src, Parameters ] = + opts([<<"service">>, <<"timeout">>, <<"src">>, <<"parameters">>], Elems, undefined), ?debug(" protocol:rcv(): service name: ~p~n", [ServiceName]), ?debug(" protocol:rcv(): timeout: ~p~n", [Timeout]), + ?debug(" protocol:rcv(): src: ~p~n", [Src]), ?debug(" protocol:rcv(): remote IP/Port: ~p~n", [{IP, Port}]), service_edge_rpc:handle_remote_message(St#st.cs, {IP, Port}, ServiceName, Timeout, + [{<<"src">>, Src}], Parameters), {noreply, St}; diff --git a/components/proto_msgpack/src/proto_msgpack_rpc.erl b/components/proto_msgpack/src/proto_msgpack_rpc.erl index 7035353..7fdc210 100644 --- a/components/proto_msgpack/src/proto_msgpack_rpc.erl +++ b/components/proto_msgpack/src/proto_msgpack_rpc.erl @@ -22,7 +22,7 @@ -define(SERVER, ?MODULE). -export([start_json_server/0]). --export([send_message/8, +-export([send_message/8, send_message/9, receive_message/3]). -record(st, { @@ -44,7 +44,10 @@ init([]) -> start_json_server() -> rvi_common:start_json_rpc_server(protocol, ?MODULE, proto_msgpack_sup). - +send_message(CompSpec, TID, ServiceName, Timeout, ProtoOpts, + DataLinkMod, DataLinkOpts, Parameters) -> + send_message(CompSpec, TID, ServiceName, Timeout, ProtoOpts, + DataLinkMod, DataLinkOpts, [], Parameters). send_message(CompSpec, TID, @@ -53,6 +56,7 @@ send_message(CompSpec, ProtoOpts, DataLinkMod, DataLinkOpts, + Extra, Parameters) -> rvi_common:request(protocol, ?MODULE, send_message, [{ transaction_id, TID }, @@ -61,6 +65,7 @@ send_message(CompSpec, { protocol_opts, ProtoOpts }, { data_link_mod, DataLinkMod }, { data_link_opts, DataLinkOpts }, + { extra, Extra }, { parameters, Parameters }], [ status ], CompSpec). @@ -82,6 +87,7 @@ handle_rpc(<<"send_message">>, Args) -> {ok, ProtoOpts} = rvi_common:get_json_element(["protocol_opts"], Args), {ok, DataLinkMod} = rvi_common:get_json_element(["data_link_mod"], Args), {ok, DataLinkOpts} = rvi_common:get_json_element(["data_link_opts"], Args), + Extra = rvi_common:get_opt_json_element(["extra"], [], Args), {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), [ ok ] = gen_server:call(?SERVER, { rvi, send_message, [TID, @@ -90,6 +96,7 @@ handle_rpc(<<"send_message">>, Args) -> ProtoOpts, DataLinkMod, DataLinkOpts, + Extra, Parameters, LogId]}), {ok, [ {status, rvi_common:json_rpc_status(ok)} ]}; @@ -123,18 +130,21 @@ handle_call({rvi, send_message, ProtoOpts, DataLinkMod, DataLinkOpts, + Extra, Parameters - | LogId]}, _From, St) -> + | _LogId]}, _From, St) -> ?debug(" protocol:send(): transaction id: ~p~n", [TID]), ?debug(" protocol:send(): service name: ~p~n", [ServiceName]), ?debug(" protocol:send(): timeout: ~p~n", [Timeout]), ?debug(" protocol:send(): opts: ~p~n", [ProtoOpts]), ?debug(" protocol:send(): data_link_mod: ~p~n", [DataLinkMod]), ?debug(" protocol:send(): data_link_opts: ~p~n", [DataLinkOpts]), + ?debug(" protocol:send(): extra: ~p~n", [Extra]), ?debug(" protocol:send(): parameters: ~p~n", [Parameters]), Data = [ { <<"service">>, ServiceName }, { <<"timeout">>, Timeout }, - { <<"parameters">>, Parameters } ], + { <<"parameters">>, Parameters } + | Extra ], RviOpts = rvi_common:rvi_options(Parameters), Res = DataLinkMod:send_data( St#st.cs, ?MODULE, ServiceName, RviOpts ++ DataLinkOpts, Data), @@ -146,11 +156,11 @@ handle_call(Other, _From, St) -> %% Convert list-based data to binary. -handle_cast({rvi, receive_message, [Elems, IP, Port | LogId]} = Msg, St) -> +handle_cast({rvi, receive_message, [Elems, IP, Port | _LogId]} = Msg, St) -> ?debug("~p:handle_cast(~p)", [?MODULE, Msg]), - [ ServiceName, Timeout, Parameters ] = - opts([<<"service">>, <<"timeout">>, <<"parameters">>], + [ ServiceName, Timeout, Src, Parameters ] = + opts([<<"service">>, <<"timeout">>, <<"src">>, <<"parameters">>], Elems, undefined), ?debug(" protocol:rcv(): service name: ~p~n", [ServiceName]), @@ -161,6 +171,7 @@ handle_cast({rvi, receive_message, [Elems, IP, Port | LogId]} = Msg, St) -> {IP, Port}, ServiceName, Timeout, + [{<<"src">>, Src}], Parameters), {noreply, St}; diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl index 7f5a0a9..55684ab 100644 --- a/components/schedule/src/schedule_rpc.erl +++ b/components/schedule/src/schedule_rpc.erl @@ -13,7 +13,7 @@ -include_lib("rvi_common/include/rvi_common.hrl"). %% API -export([start_link/0]). --export([schedule_message/4]). +-export([schedule_message/4, schedule_message/5]). %% Invoked by service discovery %% FIXME: Should be rvi_service_discovery behavior @@ -64,6 +64,7 @@ routes, %% Routes retrieved for this timeout_tref, %% Reference to erlang timer associated with this message. log_id, + extra = [], parameters }). @@ -120,20 +121,24 @@ init([]) -> start_json_server() -> rvi_common:start_json_rpc_server(schedule, ?MODULE, schedule_sup). +schedule_message(CompSpec, SvcName, Timeout, Parameters) -> + schedule_message(CompSpec, SvcName, Timeout, [], Parameters). + schedule_message(CompSpec, SvcName, Timeout, + Extra, Parameters) -> rvi_common:request(schedule, ?MODULE, schedule_message, [{ service, SvcName }, { timeout, Timeout }, + { extra, Extra }, { parameters, Parameters }], [status, transaction_id], CompSpec). - service_available(CompSpec, SvcName, DataLinkModule) -> rvi_common:notification(schedule, ?MODULE, @@ -156,6 +161,7 @@ handle_rpc(<<"schedule_message">>, Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args), {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), + Extra = rvi_common:get_opt_json_element(["extra"], [], Args), LogId = rvi_common:get_json_log_id(Args), ?debug("schedule_rpc:schedule_request(): service: ~p", [ SvcName]), @@ -165,6 +171,7 @@ handle_rpc(<<"schedule_message">>, Args) -> [ok, TransID] = gen_server:call(?SERVER, { rvi, schedule_message, [ SvcName, Timeout, + Extra, Parameters, LogId ]}), @@ -208,6 +215,7 @@ handle_notification(Other, _Args) -> handle_call( { rvi, schedule_message, [SvcName, Timeout, + Extra, Parameters | LogId] }, _From, St) -> ?debug("sched:sched_msg(): service: ~p", [SvcName]), @@ -222,6 +230,7 @@ handle_call( { rvi, schedule_message, Msg = #message{transaction_id = TransID, service = SvcName, timeout = Timeout, + extra = Extra, parameters = Parameters, log_id = LogId}, {_, NSt2 }= queue_message(Msg, @@ -495,6 +504,7 @@ send_message(local, _, _, _, Msg, St) -> service_edge_rpc:handle_remote_message(St#st.cs, Msg#message.service, Msg#message.timeout, + Msg#message.extra, Msg#message.parameters), {ok, St}; @@ -514,6 +524,7 @@ send_message(DataLinkMod, DataLinkOpts, ProtoOpts, DataLinkMod, DataLinkOpts, + Msg#message.extra, Msg#message.parameters) of %% Success @@ -755,7 +766,7 @@ create_transaction_id(St) -> %% Calculate a relative timeout based on the Msec UnixTime TS we are %% provided with. calc_relative_tout(UnixTimeMS) -> - { Mega, Sec, Micro } = now(), + { Mega, Sec, Micro } = os:timestamp(), Now = Mega * 1000000000 + Sec * 1000 + trunc(Micro / 1000) , ?debug("sched:calc_relative_tout(): TimeoutUnixMS(~p) - Now(~p) = ~p", [ UnixTimeMS, Now, UnixTimeMS - Now ]), diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl index b3fa760..7cbd44b 100644 --- a/components/service_edge/src/service_edge_rpc.erl +++ b/components/service_edge/src/service_edge_rpc.erl @@ -24,7 +24,7 @@ terminate/2, code_change/3]). --export([handle_remote_message/5, +-export([handle_remote_message/6, handle_local_timeout/3]). -export([start_json_server/0, @@ -199,8 +199,8 @@ service_unavailable(CompSpec, SvcName, DataLinkModule) -> [{ service, SvcName }, { data_link_module, DataLinkModule }], CompSpec). -handle_remote_message(CompSpec, Conn, SvcName, Timeout, Params) -> - ?event({handle_remote_message, [Conn, SvcName, Timeout, Params]}), +handle_remote_message(CompSpec, Conn, SvcName, Timeout, Extra, Params) -> + ?event({handle_remote_message, [Conn, SvcName, Timeout, Extra, Params]}), {IP, Port} = Conn, rvi_common:notification(service_edge, ?MODULE, handle_remote_message, @@ -208,6 +208,7 @@ handle_remote_message(CompSpec, Conn, SvcName, Timeout, Params) -> { port, Port }, { service, SvcName }, { timeout, Timeout }, + { extra, Extra }, { parameters, Params }], CompSpec). @@ -258,18 +259,21 @@ handle_ws_json_rpc(WSock, <<"message">>, Params, _Arg ) -> { ok, Timeout } = rvi_common:get_json_element(["timeout"], Params), { ok, Parameters } = rvi_common:get_json_element(["parameters"], Params), SvcName = iolist_to_binary(SvcName0), + Pfx = rvi_common:get_local_service_prefix(), + Extra = [{<<"src">>, Pfx}], ?event({message, ws, [SvcName, Timeout, Parameters]}), ?debug("WS Parameters: ~p", [Parameters]), %% Parameters = parse_ws_params(Parameters0), LogId = log_id_json_tail(Params ++ Parameters), ?debug("service_edge_rpc:handle_websocket(~p) params!: ~p", [ WSock, Params ]), ?debug("service_edge_rpc:handle_websocket(~p) service: ~p", [ WSock, SvcName ]), + ?debug("service_edge_rpc:handle_websocket(~p) extra: ~p", [ WSock, Extra ]), ?debug("service_edge_rpc:handle_websocket(~p) parameters: ~p", [ WSock, Parameters ]), case gen_server:call( ?SERVER, {rvi, handle_local_message, - [ SvcName, Timeout, Parameters | LogId ]}) of + [ SvcName, Timeout, Extra, Parameters | LogId ]}) of [not_found] -> {ok, [{status, rvi_common:json(not_found)}]}; [Res, TID] -> @@ -364,9 +368,12 @@ handle_rpc(<<"message">>, Args) -> {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args), {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), ?event({message, json_rpc, [SvcName, Timeout, Parameters]}), + Pfx = rvi_common:local_service_prefix(), + Extra = [{<<"src">>, Pfx}], LogId = log_id_json_tail(Args ++ Parameters), - [ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message, - [ SvcName, Timeout, Parameters | LogId]}), + [ Res, TID ] = gen_server:call( + ?SERVER, { rvi, handle_local_message, + [ SvcName, Timeout, Extra, Parameters | LogId]}), {ok, [ { status, rvi_common:json_rpc_status(Res) }, { transaction_id, TID }, { method, <<"message">>} @@ -423,6 +430,7 @@ handle_notification(<<"handle_remote_message">>, Args) -> { ok, Port } = rvi_common:get_json_element(["port"], Args), { ok, SvcName } = rvi_common:get_json_element(["service"], Args), { ok, Timeout } = rvi_common:get_json_element(["timeout"], Args), + { ok, Extra } = rvi_common:get_json_element(["extra"], Args), { ok, Parameters } = rvi_common:get_json_element(["parameters"], Args), gen_server:cast(?SERVER, { rvi, handle_remote_message, [ @@ -430,6 +438,7 @@ handle_notification(<<"handle_remote_message">>, Args) -> Port, SvcName, Timeout, + Extra, Parameters ]}), @@ -510,10 +519,11 @@ handle_call({rvi, get_available_services, []}, _From, St) -> %% 13:48:12.943 [debug] service_edge_rpc:local_msg: parameters: [{struct,[{"a","b"}]}] handle_call({ rvi, handle_local_message, - [SvcName, TimeoutArg, Parameters | Tail] = Args }, From, + [SvcName, TimeoutArg, Extra, Parameters | Tail] = Args }, From, #st{pending = Pend} = St) -> ?debug("service_edge_rpc:local_msg: service_name: ~p", [SvcName]), ?debug("service_edge_rpc:local_msg: timeout: ~p", [TimeoutArg]), + ?debug("service_edge_rpc:local_msg: extra: ~p", [Extra]), ?debug("service_edge_rpc:local_msg: parameters: ~p", [Parameters]), CS = start_log(Tail, "local_message: ~s", [SvcName], St#st.cs), %% @@ -555,12 +565,13 @@ handle_cast({rvi, handle_remote_message, Port, SvcName, Timeout, + Extra, Parameters ] }, #st{cs = CS} = St) -> ?event({handle_remote_message, [IP, Port, SvcName, Timeout]}, St), spawn(fun() -> handle_remote_message_( - IP, Port, SvcName, Timeout, Parameters, CS) + IP, Port, SvcName, Timeout, Extra, Parameters, CS) end), {noreply, St}; @@ -601,11 +612,12 @@ terminate(_Reason, _St) -> code_change(_OldVsn, St, _Extra) -> {ok, St}. -handle_remote_message_(IP, Port, SvcName, Timeout, Parameters, CS) -> +handle_remote_message_(IP, Port, SvcName, Timeout, Extra, Parameters, CS) -> ?debug("service_edge:remote_msg(): remote_ip: ~p", [IP]), ?debug("service_edge:remote_msg(): remote_port: ~p", [Port]), ?debug("service_edge:remote_msg(): service_name: ~p", [SvcName]), ?debug("service_edge:remote_msg(): timeout: ~p", [Timeout]), + ?debug("service_edge:remote_msg(): extra: ~p", [Extra]), ?debug("service_edge:remote_msg(): parameters: ~p", [Parameters]), %% Check if this is a local message. @@ -619,10 +631,11 @@ handle_remote_message_(IP, Port, SvcName, Timeout, Parameters, CS) -> {remote_port, Port}, {service_name, SvcName}, {timeout, Timeout}, + {extra, Extra}, {parameters, Parameters1}]) of [ ok ] -> forward_message_to_local_service( - URL, SvcName, Parameters, CS); + URL, SvcName, Extra, Parameters, CS); [ _Other ] -> ?warning("service_entry:remote_msg(): " "Failed to authenticate ~p (~p)", @@ -633,13 +646,15 @@ handle_remote_message_(IP, Port, SvcName, Timeout, Parameters, CS) -> [SvcName]) end. -handle_local_message_([SvcName, TimeoutArg, Parameters | _] = Args, CS) -> +handle_local_message_([SvcName, TimeoutArg, Extra, Parameters | _] = Args, CS) -> ?debug("CS = ~p", [lager:pr(CS, rvi_common)]), case authorize_rpc:authorize_local_message( CS, SvcName, [{service_name, SvcName}, {timeout, TimeoutArg}, %% {parameters, Parameters}, + {extra, Extra}, {parameters, Parameters} + | Extra ]) of [ok] -> do_handle_local_message_(Args, CS); @@ -647,7 +662,7 @@ handle_local_message_([SvcName, TimeoutArg, Parameters | _] = Args, CS) -> [not_found] end. -do_handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS) -> +do_handle_local_message_([SvcName, TimeoutArg, Extra, Parameters | _Tail], CS) -> %% %% Slick but ugly. %% If the timeout is more than 24 hrs old when parsed as unix time, @@ -672,6 +687,8 @@ do_handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS) -> %% LookupRes = ets:lookup(?SERVICE_TABLE, SvcName), ?debug("Service LookupRes = ~p", [LookupRes]), + Pfx = rvi_common:local_service_prefix(), + Extra = [{<<"src">>, Pfx}], case LookupRes of [ #service_entry { url = URL } = E ] -> %% SvcName is local. Forward message ?debug("service_edge_rpc:local_msg(): Service is local. Forwarding."), @@ -679,6 +696,7 @@ do_handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS) -> ?event({matching_service_entry, E}), Res = forward_message_to_local_service(URL, SvcName, + Extra, Parameters, CS), Res; @@ -690,6 +708,7 @@ do_handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS) -> [ _, TID ] = schedule_rpc:schedule_message(CS, SvcName, Timeout, + Extra, Parameters), [ok, TID ] end. @@ -733,12 +752,12 @@ dispatch_to_local_service("ws:" ++ WSPidStr, services_unavailable, dispatch_to_local_service("ws:" ++ WSPidStr, message, [{ <<"service_name">>, SvcName}, - { <<"parameters">>, Args}]) -> + { <<"parameters">>, Args} | Extra]) -> ?info("service_edge:dispatch_to_local_service(message/alt, websock): ~p", [Args]), wse_server:send(list_to_pid(WSPidStr), json_rpc_notification(<<"message">>, [{<<"service_name">>, SvcName}, - {<<"parameters">>, Args}])), + {<<"parameters">>, Args} | Extra])), %% No response expected. ?debug("service_edge:dispatch_to_local_service(message, websock): Done"), ok; @@ -761,7 +780,7 @@ dispatch_to_local_service(URL, Command, Args) -> %% Forward a message to a specific locally connected service. %% Called by forward_message_to_local_service/2. %% -forward_message_to_local_service(URL,SvcName, Parameters, CompSpec) -> +forward_message_to_local_service(URL, SvcName, Extra, Parameters, CompSpec) -> ?debug("service_edge:forward_to_local(): URL: ~p", [URL]), ?debug("service_edge:forward_to_local(): Parameters: ~p", [Parameters]), @@ -781,10 +800,11 @@ forward_message_to_local_service(URL,SvcName, Parameters, CompSpec) -> try log_outcome( rvi_common:get_request_result( - dispatch_to_local_service(URL, - message, - [{<<"service_name">>, LocalSvcName }, - {<<"parameters">>, Parameters }])), + dispatch_to_local_service( + URL, + message, + [{<<"service_name">>, LocalSvcName }, + {<<"parameters">>, Parameters } | Extra])), SvcName, CompSpec) catch Tag:Err -> diff --git a/test/rvi_core_SUITE.erl b/test/rvi_core_SUITE.erl index d4a8237..5ee142d 100644 --- a/test/rvi_core_SUITE.erl +++ b/test/rvi_core_SUITE.erl @@ -543,7 +543,8 @@ handle_body(Socket, _Request, Body, _St) -> {<<"parameters">>, [ {<<"data">>, Data}, {<<"sendto">>, SendTo}, - {<<"rvi.max_msg_size">>, _}]} + {<<"rvi.max_msg_size">>, _}]}, + {<<"src">>, _} ]}] -> binary_to_existing_atom(SendTo, latin1) ! {message, [{service_name, SvcName},