Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/grpc_client.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
[{description,"gRPC client in Erlang"},
{vsn,"0.1.0"},
{modules,[]},
{applications, [grpc_lib,http2_client]},
{registered, []},
{env, []},
{applications,[]}]}.
6 changes: 2 additions & 4 deletions src/grpc_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,14 @@ new_stream(Connection, Service, Rpc, DecoderModule, Options) ->

-spec send(Stream::client_stream(), Msg::map()) -> ok.
%% @doc Send a message from the client to the server.
send(Stream, Msg) when is_pid(Stream),
is_map(Msg) ->
send(Stream, Msg) when is_pid(Stream) ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please update the function spec here?

grpc_client_stream:send(Stream, Msg).

-spec send_last(Stream::client_stream(), Msg::map()) -> ok.
%% @doc Send a message to server and mark it as the last message
%% on the stream. For simple RPC and client-streaming RPCs that
%% will trigger the response from the server.
send_last(Stream, Msg) when is_pid(Stream),
is_map(Msg) ->
send_last(Stream, Msg) when is_pid(Stream) ->
grpc_client_stream:send_last(Stream, Msg).

-spec rcv(Stream::client_stream()) -> rcv_response().
Expand Down
38 changes: 29 additions & 9 deletions src/grpc_client_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ new_stream(Connection, Service, Rpc, Encoder, Options) ->
Compression = proplists:get_value(compression, Options, none),
Metadata = proplists:get_value(metadata, Options, #{}),
TransportOptions = proplists:get_value(http2_options, Options, []),
RecordsEncoder = proplists:get_value(msgs_as_records, Options, []),
ClientPid = proplists:get_value(async_notification, Options),
{ok, StreamId} = grpc_client_connection:new_stream(Connection, TransportOptions),
Package = Encoder:get_package_name(),
RpcDef = Encoder:find_rpc_def(Service, Rpc),
Expand All @@ -238,8 +240,10 @@ new_stream(Connection, Service, Rpc, Encoder, Options) ->
rpc => Rpc,
queue => queue:new(),
response_pending => false,
async_notification => ClientPid,
state => idle,
encoder => Encoder,
records_encoder => RecordsEncoder,
connection => Connection,
headers_sent => false,
metadata => Metadata,
Expand Down Expand Up @@ -314,6 +318,9 @@ add_metadata(Headers, Metadata) ->
lists:keystore(K, 1, Acc, {K,V})
end, Headers, maps:to_list(Metadata)).

info_response(Response, #{async_notification := Client} = Stream) when is_pid(Client) ->
Client ! {notification,Response},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether it would be better to have the notification message as something like {grpc_notification, Response}. Using the atom notification seems too generic.

{noreply, Stream};
info_response(Response, #{response_pending := true,
client := Client} = Stream) ->
gen_server:reply(Client, Response),
Expand All @@ -325,17 +332,24 @@ info_response(Response, #{queue := Queue} = Stream) ->
%% TODO: fix the error handling, currently it is very hard to understand the
%% error that results from a bad message (Map).
encode(#{encoder := Encoder,
input := MsgType,
compression := CompressionMethod}, Map) ->
%% RequestData = Encoder:encode_msg(Map, MsgType),
try Encoder:encode_msg(Map, MsgType) of
RequestData ->
records_encoder := RecordsEncoder,
input := MsgType,
compression := CompressionMethod}, Msg) ->
try
begin
RequestData = case is_map(Msg) of
true ->
Encoder:encode_msg(Msg, MsgType);
false when is_tuple(Msg) ->
RecordsEncoder:encode_msg(Msg)
end,
maybe_compress(RequestData, CompressionMethod)
end
catch
error:function_clause ->
throw({error, {failed_to_encode, MsgType, Map}});
throw({error, {failed_to_encode, MsgType, Msg}});
Error:Reason ->
throw({error, {Error, Reason}})
throw({error, {Error, Reason}})
end.

maybe_compress(Encoded, none) ->
Expand All @@ -351,12 +365,18 @@ maybe_compress(_Encoded, Other) ->
decode(Encoded, Binary,
#{response_encoding := Method,
encoder := Encoder,
records_encoder := RecordsEncoder,
output := MsgType}) ->
Message = case Encoded of
Message = case Encoded of
1 -> decompress(Binary, Method);
0 -> Binary
end,
Encoder:decode_msg(Message, MsgType).
case RecordsEncoder == [] of
true ->
Encoder:decode_msg(Message, MsgType);
_ ->
RecordsEncoder:decode_msg(Message, MsgType)
end.

decompress(Compressed, <<"gzip">>) ->
zlib:gunzip(Compressed);
Expand Down