Skip to content

Commit d71b56e

Browse files
committed
Distribution: add support for rpc from other nodes
Signed-off-by: Paul Guyot <pguyot@kallisys.net>
1 parent 5072552 commit d71b56e

File tree

9 files changed

+269
-30
lines changed

9 files changed

+269
-30
lines changed

libs/estdlib/src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ set(ERLANG_MODULES
3131
crypto
3232
dist_util
3333
erl_epmd
34+
erpc
3435
erts_debug
3536
ets
3637
gen_event

libs/estdlib/src/erpc.erl

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
%
2+
% This file is part of AtomVM.
3+
%
4+
% Copyright 2025 Paul Guyot <pguyot@kallisys.net>
5+
%
6+
% Licensed under the Apache License, Version 2.0 (the "License");
7+
% you may not use this file except in compliance with the License.
8+
% You may obtain a copy of the License at
9+
%
10+
% http://www.apache.org/licenses/LICENSE-2.0
11+
%
12+
% Unless required by applicable law or agreed to in writing, software
13+
% distributed under the License is distributed on an "AS IS" BASIS,
14+
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
% See the License for the specific language governing permissions and
16+
% limitations under the License.
17+
%
18+
% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
19+
%
20+
21+
%%-----------------------------------------------------------------------------
22+
%% @doc An implementation of the Erlang/OTP erpc interface.
23+
%%
24+
%% This module implements a strict subset of the Erlang/OTP erpc
25+
%% interface.
26+
%% @end
27+
%%-----------------------------------------------------------------------------
28+
-module(erpc).
29+
30+
% api
31+
-export([
32+
execute_call/4
33+
]).
34+
35+
%%-----------------------------------------------------------------------------
36+
%% @param Reference reference of the request, passed in exit tuple
37+
%% @param Module module to call
38+
%% @param Func function to call
39+
%% @param Args argument of the call
40+
%% @doc Execute a call locally, exiting with the result.
41+
%% This function is called from rpc on other nodes using spawn_request BIF.
42+
%% @end
43+
%%-----------------------------------------------------------------------------
44+
-spec execute_call(Reference :: reference(), Module :: module(), Func :: atom(), Args :: [any()]) ->
45+
no_return().
46+
execute_call(Reference, Module, Func, Args) ->
47+
Reply =
48+
try
49+
Result = apply(Module, Func, Args),
50+
{Reference, return, Result}
51+
catch
52+
throw:Reason ->
53+
{Reference, throw, Reason};
54+
exit:Reason ->
55+
{Reference, exit, Reason};
56+
error:Reason:Stack ->
57+
{Reference, error, Reason, Stack}
58+
end,
59+
exit(Reply).

src/libAtomVM/defaultatoms.def

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,3 +178,11 @@ X(INET_ATOM, "\x4", "inet")
178178
X(TIMEOUT_ATOM, "\x7", "timeout")
179179

180180
X(DIST_DATA_ATOM, "\x9", "dist_data")
181+
182+
X(REQUEST_ATOM, "\x7", "request")
183+
X(REPLY_TAG_ATOM, "\x9", "reply_tag")
184+
X(SPAWN_REPLY_ATOM, "\xB", "spawn_reply")
185+
X(REPLY_ATOM, "\x5", "reply")
186+
X(YES_ATOM, "\x3", "yes")
187+
X(NO_ATOM, "\x2", "no")
188+
X(ERROR_ONLY_ATOM, "\xA", "error_only")

src/libAtomVM/dist_nifs.c

Lines changed: 101 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ enum
7070
OPERATION_ALIAS_SEND_TT = 34,
7171
};
7272

73+
enum
74+
{
75+
SPAWN_REPLY_FLAGS_LINK_CREATED = 1,
76+
SPAWN_REPLY_FLAGS_MONITOR_CREATED = 2,
77+
};
78+
7379
struct DistributionPacket
7480
{
7581
struct ListHead head;
@@ -318,6 +324,37 @@ static term nif_erlang_dist_ctrl_get_data(Context *ctx, int argc, term argv[])
318324
return result;
319325
}
320326

327+
term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx)
328+
{
329+
int target_process_id = 0;
330+
if (term_is_local_pid(target_proc)) {
331+
target_process_id = term_to_local_process_id(target_proc);
332+
} else if (term_is_atom(target_proc)) {
333+
target_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc));
334+
} else {
335+
RAISE_ERROR(BADARG_ATOM);
336+
}
337+
struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor));
338+
monitor->target_proc = target_proc;
339+
monitor->pid_number = term_get_external_pid_process_id(from_pid);
340+
monitor->pid_serial = term_get_external_pid_serial(from_pid);
341+
monitor->ref_len = term_get_external_reference_len(monitor_ref);
342+
memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len);
343+
if (target_process_id) {
344+
synclist_append(&conn_obj->remote_monitors, &monitor->head);
345+
ErlNifPid target_process_pid = target_process_id;
346+
if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) {
347+
synclist_remove(&conn_obj->remote_monitors, &monitor->head);
348+
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
349+
free(monitor);
350+
}
351+
} else {
352+
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
353+
free(monitor);
354+
}
355+
return OK_ATOM;
356+
}
357+
321358
static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
322359
{
323360
UNUSED(argc);
@@ -369,32 +406,7 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
369406
term from_pid = term_get_tuple_element(control, 1);
370407
term target_proc = term_get_tuple_element(control, 2);
371408
term monitor_ref = term_get_tuple_element(control, 3);
372-
int target_process_id = 0;
373-
if (term_is_local_pid(target_proc)) {
374-
target_process_id = term_to_local_process_id(target_proc);
375-
} else if (term_is_atom(target_proc)) {
376-
target_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc));
377-
} else {
378-
RAISE_ERROR(BADARG_ATOM);
379-
}
380-
struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor));
381-
monitor->target_proc = target_proc;
382-
monitor->pid_number = term_get_external_pid_process_id(from_pid);
383-
monitor->pid_serial = term_get_external_pid_serial(from_pid);
384-
monitor->ref_len = term_get_external_reference_len(monitor_ref);
385-
memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len);
386-
if (target_process_id) {
387-
synclist_append(&conn_obj->remote_monitors, &monitor->head);
388-
ErlNifPid target_process_pid = target_process_id;
389-
if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) {
390-
synclist_remove(&conn_obj->remote_monitors, &monitor->head);
391-
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
392-
free(monitor);
393-
}
394-
} else {
395-
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
396-
free(monitor);
397-
}
409+
dist_monitor(conn_obj, from_pid, target_proc, monitor_ref, ctx);
398410

399411
break;
400412
}
@@ -421,6 +433,52 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
421433
synclist_unlock(&conn_obj->remote_monitors);
422434
break;
423435
}
436+
case OPERATION_SPAWN_REQUEST: {
437+
if (UNLIKELY(arity != 6)) {
438+
RAISE_ERROR(BADARG_ATOM);
439+
}
440+
term roots[4];
441+
roots[0] = argv[0];
442+
roots[1] = argv[1];
443+
roots[2] = control;
444+
term arglist = externalterm_to_term_with_roots(data + 1 + bytes_read, binary_len - 1 - bytes_read, ctx, ExternalTermCopy, &bytes_read, 3, roots);
445+
roots[3] = arglist;
446+
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(4)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
447+
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
448+
}
449+
term mfa = term_get_tuple_element(control, 4);
450+
if (UNLIKELY(!term_is_tuple(mfa) || term_get_tuple_arity(mfa) != 3)) {
451+
RAISE_ERROR(BADARG_ATOM);
452+
}
453+
if (UNLIKELY(!term_is_list(arglist))) {
454+
RAISE_ERROR(BADARG_ATOM);
455+
}
456+
term reqid = term_get_tuple_element(control, 1);
457+
term from = term_get_tuple_element(control, 2);
458+
if (UNLIKELY(!term_is_pid(from))) {
459+
RAISE_ERROR(BADARG_ATOM);
460+
}
461+
// term groupleader = term_get_tuple_element(control, 3);
462+
term options = term_get_tuple_element(control, 5);
463+
464+
term request_tuple = term_alloc_tuple(4, &ctx->heap);
465+
term_put_tuple_element(request_tuple, 0, argv[0]);
466+
term_put_tuple_element(request_tuple, 1, reqid);
467+
term_put_tuple_element(request_tuple, 2, from);
468+
term_put_tuple_element(request_tuple, 3, options);
469+
term request_opt = term_alloc_tuple(2, &ctx->heap);
470+
term_put_tuple_element(request_opt, 0, REQUEST_ATOM);
471+
term_put_tuple_element(request_opt, 1, request_tuple);
472+
term spawn_opts = term_list_prepend(request_opt, term_nil(), &ctx->heap);
473+
474+
// reuse roots for args
475+
roots[0] = term_get_tuple_element(mfa, 0);
476+
roots[1] = term_get_tuple_element(mfa, 1);
477+
roots[2] = arglist;
478+
roots[3] = spawn_opts;
479+
nif_erlang_spawn_opt(ctx, 4, roots);
480+
break;
481+
}
424482
default:
425483
printf("Unknown distribution protocol operation id %d\n", (int) term_to_int(operation));
426484
RAISE_ERROR(BADARG_ATOM);
@@ -446,6 +504,23 @@ void dist_send_message(term external_pid, term payload, Context *ctx)
446504
synclist_unlock(&ctx->global->dist_connections);
447505
}
448506

507+
void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global)
508+
{
509+
int flags = (link ? SPAWN_REPLY_FLAGS_LINK_CREATED : 0)
510+
| (monitor ? SPAWN_REPLY_FLAGS_MONITOR_CREATED : 0);
511+
// allocate tuple
512+
BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(5), heap)
513+
term control_message = term_alloc_tuple(5, &heap);
514+
term_put_tuple_element(control_message, 0, term_from_int(OPERATION_SPAWN_REPLY));
515+
term_put_tuple_element(control_message, 1, req_id);
516+
term_put_tuple_element(control_message, 2, to_pid);
517+
term_put_tuple_element(control_message, 3, term_from_int(flags));
518+
term_put_tuple_element(control_message, 4, result);
519+
520+
dist_enqueue_message(control_message, term_invalid_term(), connection, global);
521+
END_WITH_STACK_HEAP(heap, global)
522+
}
523+
449524
const struct Nif setnode_3_nif = {
450525
.base.type = NIFFunctionType,
451526
.nif_ptr = nif_erlang_setnode_3

src/libAtomVM/dist_nifs.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,35 @@ extern const struct Nif dist_ctrl_get_data_notification_nif;
4141
extern const struct Nif dist_ctrl_get_data_nif;
4242
extern const struct Nif dist_ctrl_put_data_nif;
4343

44+
struct DistConnection;
45+
4446
void dist_send_message(term external_pid, term payload, Context *ctx);
4547

48+
/**
49+
* @doc Setup a monitor on a local process for a distributed process.
50+
* @end
51+
* @param conn_obj object of the connection
52+
* @param from_pid remote pid setting up the monitor
53+
* @param target_proc atom (for registered process) or pid of the local
54+
* process to monitor
55+
* @param monitor_ref reference used for monitor
56+
* @param ctx context for memory allocation
57+
*/
58+
term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx);
59+
60+
/**
61+
* @doc Send a spawn reply signal to a node
62+
* @end
63+
* @param conn_obj object of the connection
64+
* @param req_id reference identifying the request
65+
* @param to_pid (remote) process id identifying the caller
66+
* @param link if a link was created
67+
* @param monitor if a monitor was created
68+
* @param result pid of the spawned process or atom for an error
69+
* @param ctx context for memory allocation
70+
*/
71+
void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global);
72+
4673
#ifdef __cplusplus
4774
}
4875
#endif

src/libAtomVM/nifs.c

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "defaultatoms.h"
4242
#include "dictionary.h"
4343
#include "dist_nifs.h"
44+
#include "erl_nif_priv.h"
4445
#include "ets.h"
4546
#include "externalterm.h"
4647
#include "globalcontext.h"
@@ -134,7 +135,7 @@ static term nif_erlang_register_2(Context *ctx, int argc, term argv[]);
134135
static term nif_erlang_unregister_1(Context *ctx, int argc, term argv[]);
135136
static term nif_erlang_send_2(Context *ctx, int argc, term argv[]);
136137
static term nif_erlang_setelement_3(Context *ctx, int argc, term argv[]);
137-
static term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]);
138+
// static term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]);
138139
static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[]);
139140
static term nif_erlang_whereis_1(Context *ctx, int argc, term argv[]);
140141
static term nif_erlang_system_time_1(Context *ctx, int argc, term argv[]);
@@ -1222,6 +1223,7 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
12221223
term link_term = interop_proplist_get_value(opts_term, LINK_ATOM);
12231224
term monitor_term = interop_proplist_get_value(opts_term, MONITOR_ATOM);
12241225
term heap_growth_strategy = interop_proplist_get_value_default(opts_term, ATOMVM_HEAP_GROWTH_ATOM, BOUNDED_FREE_ATOM);
1226+
term request_term = interop_proplist_get_value(opts_term, REQUEST_ATOM);
12251227

12261228
if (min_heap_size_term != term_nil()) {
12271229
if (UNLIKELY(!term_is_integer(min_heap_size_term))) {
@@ -1303,6 +1305,33 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
13031305
term_put_tuple_element(pid_ref_tuple, 1, ref);
13041306

13051307
return pid_ref_tuple;
1308+
} else if (UNLIKELY(request_term != term_nil())) {
1309+
// Handling of spawn_request
1310+
// spawn_request requires that the reply is enqueued before
1311+
// any message from the spawned process
1312+
1313+
term dhandle = term_get_tuple_element(request_term, 0);
1314+
term request_ref = term_get_tuple_element(request_term, 1);
1315+
term request_from = term_get_tuple_element(request_term, 2);
1316+
term request_opts = term_get_tuple_element(request_term, 3);
1317+
monitor_term = interop_proplist_get_value(request_opts, MONITOR_ATOM);
1318+
// link_term = interop_proplist_get_value(request_opts, LINK_ATOM);
1319+
1320+
void *rsrc_obj_ptr;
1321+
if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), dhandle, ctx->global->dist_connection_resource_type, &rsrc_obj_ptr))) {
1322+
RAISE_ERROR(BADARG_ATOM);
1323+
}
1324+
struct DistConnection *conn_obj = (struct DistConnection *) rsrc_obj_ptr;
1325+
1326+
dist_spawn_reply(request_ref, request_from, false, monitor_term != term_nil(), new_pid, conn_obj, ctx->global);
1327+
1328+
// Also setup monitor, if any.
1329+
if (monitor_term != term_nil()) {
1330+
dist_monitor(conn_obj, request_from, new_pid, request_ref, ctx);
1331+
}
1332+
1333+
scheduler_init_ready(new_ctx);
1334+
return new_pid;
13061335
} else {
13071336
scheduler_init_ready(new_ctx);
13081337
return new_pid;
@@ -1360,7 +1389,7 @@ static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[])
13601389
return do_spawn(ctx, new_ctx, opts_term);
13611390
}
13621391

1363-
static term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
1392+
term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
13641393
{
13651394
UNUSED(argc);
13661395

src/libAtomVM/nifs.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ extern "C" {
4848

4949
const struct Nif *nifs_get(AtomString module, AtomString function, int arity);
5050

51+
// spawn opt is used by distribution nifs
52+
term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]);
5153
#ifdef __cplusplus
5254
}
5355
#endif

src/libAtomVM/otp_net.c

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,18 @@ static term nif_net_gethostname(Context *ctx, int argc, term argv[])
344344
}
345345
return make_error_tuple(posix_errno_to_term(errno, ctx->global), ctx);
346346
}
347-
348-
size_t len = strlen(buf);
347+
// Truncate name to first dot
348+
char *end_str = buf;
349+
while (1) {
350+
char c = *end_str++;
351+
if (c == 0) {
352+
break;
353+
}
354+
if (c == '.') {
355+
break;
356+
}
357+
}
358+
size_t len = end_str - buf;
349359
if (UNLIKELY(memory_ensure_free_opt(ctx, TUPLE_SIZE(2) + LIST_SIZE(len, 1), MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
350360
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
351361
}

0 commit comments

Comments
 (0)