Skip to content

Commit 20a5f22

Browse files
committed
Distribution: add support for rpc from other nodes
Signed-off-by: Paul Guyot <pguyot@kallisys.net>
1 parent fdd51a0 commit 20a5f22

File tree

9 files changed

+271
-30
lines changed

9 files changed

+271
-30
lines changed

libs/estdlib/src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ set(ERLANG_MODULES
3131
crypto
3232
dist_util
3333
erl_epmd
34+
erpc
3435
erts_debug
3536
ets
3637
gen_event

libs/estdlib/src/erpc.erl

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
%
2+
% This file is part of AtomVM.
3+
%
4+
% Copyright 2025 Paul Guyot <pguyot@kallisys.net>
5+
%
6+
% Licensed under the Apache License, Version 2.0 (the "License");
7+
% you may not use this file except in compliance with the License.
8+
% You may obtain a copy of the License at
9+
%
10+
% http://www.apache.org/licenses/LICENSE-2.0
11+
%
12+
% Unless required by applicable law or agreed to in writing, software
13+
% distributed under the License is distributed on an "AS IS" BASIS,
14+
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
% See the License for the specific language governing permissions and
16+
% limitations under the License.
17+
%
18+
% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
19+
%
20+
21+
%%-----------------------------------------------------------------------------
22+
%% @doc An implementation of the Erlang/OTP erpc interface.
23+
%%
24+
%% This module implements a strict subset of the Erlang/OTP erpc
25+
%% interface.
26+
%% @end
27+
%%-----------------------------------------------------------------------------
28+
-module(erpc).
29+
30+
% api
31+
-export([
32+
execute_call/4
33+
]).
34+
35+
%%-----------------------------------------------------------------------------
36+
%% @param Reference reference of the request, passed in exit tuple
37+
%% @param Module module to call
38+
%% @param Func function to call
39+
%% @param Args argument of the call
40+
%% @doc Execute a call locally, exiting with the result.
41+
%% This function is called from rpc on other nodes using spawn_request BIF.
42+
%% @end
43+
%%-----------------------------------------------------------------------------
44+
-spec execute_call(Reference :: reference(), Module :: module(), Func :: atom(), Args :: [any()]) ->
45+
no_return().
46+
execute_call(Reference, Module, Func, Args) ->
47+
Reply =
48+
try
49+
Result = apply(Module, Func, Args),
50+
{Reference, return, Result}
51+
catch
52+
throw:Reason ->
53+
{Reference, throw, Reason};
54+
exit:Reason ->
55+
{Reference, exit, Reason};
56+
error:Reason:Stack ->
57+
{Reference, error, Reason, Stack}
58+
end,
59+
exit(Reply).

src/libAtomVM/defaultatoms.def

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,3 +178,11 @@ X(INET_ATOM, "\x4", "inet")
178178
X(TIMEOUT_ATOM, "\x7", "timeout")
179179

180180
X(DIST_DATA_ATOM, "\x9", "dist_data")
181+
182+
X(REQUEST_ATOM, "\x7", "request")
183+
X(REPLY_TAG_ATOM, "\x9", "reply_tag")
184+
X(SPAWN_REPLY_ATOM, "\xB", "spawn_reply")
185+
X(REPLY_ATOM, "\x5", "reply")
186+
X(YES_ATOM, "\x3", "yes")
187+
X(NO_ATOM, "\x2", "no")
188+
X(ERROR_ONLY_ATOM, "\xA", "error_only")

src/libAtomVM/dist_nifs.c

Lines changed: 102 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ enum
7070
OPERATION_ALIAS_SEND_TT = 34,
7171
};
7272

73+
enum
74+
{
75+
SPAWN_REPLY_FLAGS_LINK_CREATED = 1,
76+
SPAWN_REPLY_FLAGS_MONITOR_CREATED = 2,
77+
};
78+
7379
struct DistributionPacket
7480
{
7581
struct ListHead head;
@@ -339,6 +345,37 @@ static term nif_erlang_dist_ctrl_get_data(Context *ctx, int argc, term argv[])
339345
return result;
340346
}
341347

348+
term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx)
349+
{
350+
int target_process_id = 0;
351+
if (term_is_local_pid(target_proc)) {
352+
target_process_id = term_to_local_process_id(target_proc);
353+
} else if (term_is_atom(target_proc)) {
354+
target_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc));
355+
} else {
356+
RAISE_ERROR(BADARG_ATOM);
357+
}
358+
struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor));
359+
monitor->target_proc = target_proc;
360+
monitor->pid_number = term_get_external_pid_process_id(from_pid);
361+
monitor->pid_serial = term_get_external_pid_serial(from_pid);
362+
monitor->ref_len = term_get_external_reference_len(monitor_ref);
363+
memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len);
364+
if (target_process_id) {
365+
synclist_append(&conn_obj->remote_monitors, &monitor->head);
366+
ErlNifPid target_process_pid = target_process_id;
367+
if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) {
368+
synclist_remove(&conn_obj->remote_monitors, &monitor->head);
369+
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
370+
free(monitor);
371+
}
372+
} else {
373+
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
374+
free(monitor);
375+
}
376+
return OK_ATOM;
377+
}
378+
342379
static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
343380
{
344381
UNUSED(argc);
@@ -390,32 +427,7 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
390427
term from_pid = term_get_tuple_element(control, 1);
391428
term target_proc = term_get_tuple_element(control, 2);
392429
term monitor_ref = term_get_tuple_element(control, 3);
393-
int target_process_id = 0;
394-
if (term_is_local_pid(target_proc)) {
395-
target_process_id = term_to_local_process_id(target_proc);
396-
} else if (term_is_atom(target_proc)) {
397-
target_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc));
398-
} else {
399-
RAISE_ERROR(BADARG_ATOM);
400-
}
401-
struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor));
402-
monitor->target_proc = target_proc;
403-
monitor->pid_number = term_get_external_pid_process_id(from_pid);
404-
monitor->pid_serial = term_get_external_pid_serial(from_pid);
405-
monitor->ref_len = term_get_external_reference_len(monitor_ref);
406-
memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len);
407-
if (target_process_id) {
408-
synclist_append(&conn_obj->remote_monitors, &monitor->head);
409-
ErlNifPid target_process_pid = target_process_id;
410-
if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) {
411-
synclist_remove(&conn_obj->remote_monitors, &monitor->head);
412-
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
413-
free(monitor);
414-
}
415-
} else {
416-
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
417-
free(monitor);
418-
}
430+
dist_monitor(conn_obj, from_pid, target_proc, monitor_ref, ctx);
419431

420432
break;
421433
}
@@ -442,6 +454,53 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
442454
synclist_unlock(&conn_obj->remote_monitors);
443455
break;
444456
}
457+
case OPERATION_SPAWN_REQUEST: {
458+
if (UNLIKELY(arity != 6)) {
459+
RAISE_ERROR(BADARG_ATOM);
460+
}
461+
term roots[4];
462+
roots[0] = argv[0];
463+
roots[1] = argv[1];
464+
roots[2] = control;
465+
roots[3] = externalterm_to_term_with_roots(data + 1 + bytes_read, binary_len - 1 - bytes_read, ctx, ExternalTermCopy, &bytes_read, 3, roots);
466+
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(4)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
467+
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
468+
}
469+
control = roots[2];
470+
term arglist = roots[3];
471+
term mfa = term_get_tuple_element(control, 4);
472+
if (UNLIKELY(!term_is_tuple(mfa) || term_get_tuple_arity(mfa) != 3)) {
473+
RAISE_ERROR(BADARG_ATOM);
474+
}
475+
if (UNLIKELY(!term_is_list(arglist))) {
476+
RAISE_ERROR(BADARG_ATOM);
477+
}
478+
term reqid = term_get_tuple_element(control, 1);
479+
term from = term_get_tuple_element(control, 2);
480+
if (UNLIKELY(!term_is_pid(from))) {
481+
RAISE_ERROR(BADARG_ATOM);
482+
}
483+
// term groupleader = term_get_tuple_element(control, 3);
484+
term options = term_get_tuple_element(control, 5);
485+
486+
term request_tuple = term_alloc_tuple(4, &ctx->heap);
487+
term_put_tuple_element(request_tuple, 0, roots[0]);
488+
term_put_tuple_element(request_tuple, 1, reqid);
489+
term_put_tuple_element(request_tuple, 2, from);
490+
term_put_tuple_element(request_tuple, 3, options);
491+
term request_opt = term_alloc_tuple(2, &ctx->heap);
492+
term_put_tuple_element(request_opt, 0, REQUEST_ATOM);
493+
term_put_tuple_element(request_opt, 1, request_tuple);
494+
term spawn_opts = term_list_prepend(request_opt, term_nil(), &ctx->heap);
495+
496+
// reuse roots for args
497+
roots[0] = term_get_tuple_element(mfa, 0);
498+
roots[1] = term_get_tuple_element(mfa, 1);
499+
roots[2] = arglist;
500+
roots[3] = spawn_opts;
501+
nif_erlang_spawn_opt(ctx, 4, roots);
502+
break;
503+
}
445504
default:
446505
printf("Unknown distribution protocol operation id %d\n", (int) term_to_int(operation));
447506
RAISE_ERROR(BADARG_ATOM);
@@ -467,6 +526,23 @@ void dist_send_message(term external_pid, term payload, Context *ctx)
467526
synclist_unlock(&ctx->global->dist_connections);
468527
}
469528

529+
void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global)
530+
{
531+
int flags = (link ? SPAWN_REPLY_FLAGS_LINK_CREATED : 0)
532+
| (monitor ? SPAWN_REPLY_FLAGS_MONITOR_CREATED : 0);
533+
// allocate tuple
534+
BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(5), heap)
535+
term control_message = term_alloc_tuple(5, &heap);
536+
term_put_tuple_element(control_message, 0, term_from_int(OPERATION_SPAWN_REPLY));
537+
term_put_tuple_element(control_message, 1, req_id);
538+
term_put_tuple_element(control_message, 2, to_pid);
539+
term_put_tuple_element(control_message, 3, term_from_int(flags));
540+
term_put_tuple_element(control_message, 4, result);
541+
542+
dist_enqueue_message(control_message, term_invalid_term(), connection, global);
543+
END_WITH_STACK_HEAP(heap, global)
544+
}
545+
470546
const struct Nif setnode_3_nif = {
471547
.base.type = NIFFunctionType,
472548
.nif_ptr = nif_erlang_setnode_3

src/libAtomVM/dist_nifs.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,35 @@ extern const struct Nif dist_ctrl_get_data_notification_nif;
4141
extern const struct Nif dist_ctrl_get_data_nif;
4242
extern const struct Nif dist_ctrl_put_data_nif;
4343

44+
struct DistConnection;
45+
4446
void dist_send_message(term external_pid, term payload, Context *ctx);
4547

48+
/**
49+
* @doc Setup a monitor on a local process for a distributed process.
50+
* @end
51+
* @param conn_obj object of the connection
52+
* @param from_pid remote pid setting up the monitor
53+
* @param target_proc atom (for registered process) or pid of the local
54+
* process to monitor
55+
* @param monitor_ref reference used for monitor
56+
* @param ctx context for memory allocation
57+
*/
58+
term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx);
59+
60+
/**
61+
* @doc Send a spawn reply signal to a node
62+
* @end
63+
* @param conn_obj object of the connection
64+
* @param req_id reference identifying the request
65+
* @param to_pid (remote) process id identifying the caller
66+
* @param link if a link was created
67+
* @param monitor if a monitor was created
68+
* @param result pid of the spawned process or atom for an error
69+
* @param ctx context for memory allocation
70+
*/
71+
void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global);
72+
4673
#ifdef __cplusplus
4774
}
4875
#endif

src/libAtomVM/nifs.c

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "defaultatoms.h"
4242
#include "dictionary.h"
4343
#include "dist_nifs.h"
44+
#include "erl_nif_priv.h"
4445
#include "ets.h"
4546
#include "externalterm.h"
4647
#include "globalcontext.h"
@@ -134,7 +135,7 @@ static term nif_erlang_register_2(Context *ctx, int argc, term argv[]);
134135
static term nif_erlang_unregister_1(Context *ctx, int argc, term argv[]);
135136
static term nif_erlang_send_2(Context *ctx, int argc, term argv[]);
136137
static term nif_erlang_setelement_3(Context *ctx, int argc, term argv[]);
137-
static term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]);
138+
// static term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]);
138139
static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[]);
139140
static term nif_erlang_whereis_1(Context *ctx, int argc, term argv[]);
140141
static term nif_erlang_system_time_1(Context *ctx, int argc, term argv[]);
@@ -1222,6 +1223,7 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
12221223
term link_term = interop_proplist_get_value(opts_term, LINK_ATOM);
12231224
term monitor_term = interop_proplist_get_value(opts_term, MONITOR_ATOM);
12241225
term heap_growth_strategy = interop_proplist_get_value_default(opts_term, ATOMVM_HEAP_GROWTH_ATOM, BOUNDED_FREE_ATOM);
1226+
term request_term = interop_proplist_get_value(opts_term, REQUEST_ATOM);
12251227

12261228
if (min_heap_size_term != term_nil()) {
12271229
if (UNLIKELY(!term_is_integer(min_heap_size_term))) {
@@ -1303,6 +1305,33 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
13031305
term_put_tuple_element(pid_ref_tuple, 1, ref);
13041306

13051307
return pid_ref_tuple;
1308+
} else if (UNLIKELY(request_term != term_nil())) {
1309+
// Handling of spawn_request
1310+
// spawn_request requires that the reply is enqueued before
1311+
// any message from the spawned process
1312+
1313+
term dhandle = term_get_tuple_element(request_term, 0);
1314+
term request_ref = term_get_tuple_element(request_term, 1);
1315+
term request_from = term_get_tuple_element(request_term, 2);
1316+
term request_opts = term_get_tuple_element(request_term, 3);
1317+
monitor_term = interop_proplist_get_value(request_opts, MONITOR_ATOM);
1318+
// link_term = interop_proplist_get_value(request_opts, LINK_ATOM);
1319+
1320+
void *rsrc_obj_ptr;
1321+
if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), dhandle, ctx->global->dist_connection_resource_type, &rsrc_obj_ptr))) {
1322+
RAISE_ERROR(BADARG_ATOM);
1323+
}
1324+
struct DistConnection *conn_obj = (struct DistConnection *) rsrc_obj_ptr;
1325+
1326+
dist_spawn_reply(request_ref, request_from, false, monitor_term != term_nil(), new_pid, conn_obj, ctx->global);
1327+
1328+
// Also setup monitor, if any.
1329+
if (monitor_term != term_nil()) {
1330+
dist_monitor(conn_obj, request_from, new_pid, request_ref, ctx);
1331+
}
1332+
1333+
scheduler_init_ready(new_ctx);
1334+
return new_pid;
13061335
} else {
13071336
scheduler_init_ready(new_ctx);
13081337
return new_pid;
@@ -1360,7 +1389,7 @@ static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[])
13601389
return do_spawn(ctx, new_ctx, opts_term);
13611390
}
13621391

1363-
static term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
1392+
term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
13641393
{
13651394
UNUSED(argc);
13661395

src/libAtomVM/nifs.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ extern "C" {
4848

4949
const struct Nif *nifs_get(AtomString module, AtomString function, int arity);
5050

51+
// spawn opt is used by distribution nifs
52+
term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]);
5153
#ifdef __cplusplus
5254
}
5355
#endif

src/libAtomVM/otp_net.c

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,18 @@ static term nif_net_gethostname(Context *ctx, int argc, term argv[])
344344
}
345345
return make_error_tuple(posix_errno_to_term(errno, ctx->global), ctx);
346346
}
347-
348-
size_t len = strlen(buf);
347+
// Truncate name to first dot
348+
char *end_str = buf;
349+
while (1) {
350+
char c = *end_str++;
351+
if (c == 0) {
352+
break;
353+
}
354+
if (c == '.') {
355+
break;
356+
}
357+
}
358+
size_t len = end_str - buf;
349359
if (UNLIKELY(memory_ensure_free_opt(ctx, TUPLE_SIZE(2) + LIST_SIZE(len, 1), MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
350360
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
351361
}

0 commit comments

Comments
 (0)