Skip to content

Commit 10ad8ba

Browse files
committed
Distribution: add support for rpc from other nodes
Signed-off-by: Paul Guyot <pguyot@kallisys.net>
1 parent 5072552 commit 10ad8ba

File tree

7 files changed

+185
-28
lines changed

7 files changed

+185
-28
lines changed

libs/estdlib/src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ set(ERLANG_MODULES
3131
crypto
3232
dist_util
3333
erl_epmd
34+
erpc
3435
erts_debug
3536
ets
3637
gen_event

src/libAtomVM/defaultatoms.def

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,3 +178,11 @@ X(INET_ATOM, "\x4", "inet")
178178
X(TIMEOUT_ATOM, "\x7", "timeout")
179179

180180
X(DIST_DATA_ATOM, "\x9", "dist_data")
181+
182+
X(REQUEST_ATOM, "\x7", "request")
183+
X(REPLY_TAG_ATOM, "\x9", "reply_tag")
184+
X(SPAWN_REPLY_ATOM, "\xB", "spawn_reply")
185+
X(REPLY_ATOM, "\x5", "reply")
186+
X(YES_ATOM, "\x3", "yes")
187+
X(NO_ATOM, "\x2", "no")
188+
X(ERROR_ONLY_ATOM, "\xA", "error_only")

src/libAtomVM/dist_nifs.c

Lines changed: 101 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ enum
7070
OPERATION_ALIAS_SEND_TT = 34,
7171
};
7272

73+
enum {
74+
SPAWN_REPLY_FLAGS_LINK_CREATED = 1,
75+
SPAWN_REPLY_FLAGS_MONITOR_CREATED = 2,
76+
};
77+
7378
struct DistributionPacket
7479
{
7580
struct ListHead head;
@@ -318,6 +323,37 @@ static term nif_erlang_dist_ctrl_get_data(Context *ctx, int argc, term argv[])
318323
return result;
319324
}
320325

326+
term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx)
327+
{
328+
int target_process_id = 0;
329+
if (term_is_local_pid(target_proc)) {
330+
target_process_id = term_to_local_process_id(target_proc);
331+
} else if (term_is_atom(target_proc)) {
332+
target_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc));
333+
} else {
334+
RAISE_ERROR(BADARG_ATOM);
335+
}
336+
struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor));
337+
monitor->target_proc = target_proc;
338+
monitor->pid_number = term_get_external_pid_process_id(from_pid);
339+
monitor->pid_serial = term_get_external_pid_serial(from_pid);
340+
monitor->ref_len = term_get_external_reference_len(monitor_ref);
341+
memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len);
342+
if (target_process_id) {
343+
synclist_append(&conn_obj->remote_monitors, &monitor->head);
344+
ErlNifPid target_process_pid = target_process_id;
345+
if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) {
346+
synclist_remove(&conn_obj->remote_monitors, &monitor->head);
347+
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
348+
free(monitor);
349+
}
350+
} else {
351+
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
352+
free(monitor);
353+
}
354+
return OK_ATOM;
355+
}
356+
321357
static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
322358
{
323359
UNUSED(argc);
@@ -369,32 +405,7 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
369405
term from_pid = term_get_tuple_element(control, 1);
370406
term target_proc = term_get_tuple_element(control, 2);
371407
term monitor_ref = term_get_tuple_element(control, 3);
372-
int target_process_id = 0;
373-
if (term_is_local_pid(target_proc)) {
374-
target_process_id = term_to_local_process_id(target_proc);
375-
} else if (term_is_atom(target_proc)) {
376-
target_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc));
377-
} else {
378-
RAISE_ERROR(BADARG_ATOM);
379-
}
380-
struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor));
381-
monitor->target_proc = target_proc;
382-
monitor->pid_number = term_get_external_pid_process_id(from_pid);
383-
monitor->pid_serial = term_get_external_pid_serial(from_pid);
384-
monitor->ref_len = term_get_external_reference_len(monitor_ref);
385-
memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len);
386-
if (target_process_id) {
387-
synclist_append(&conn_obj->remote_monitors, &monitor->head);
388-
ErlNifPid target_process_pid = target_process_id;
389-
if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) {
390-
synclist_remove(&conn_obj->remote_monitors, &monitor->head);
391-
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
392-
free(monitor);
393-
}
394-
} else {
395-
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
396-
free(monitor);
397-
}
408+
dist_monitor(conn_obj, from_pid, target_proc, monitor_ref, ctx);
398409

399410
break;
400411
}
@@ -421,6 +432,52 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
421432
synclist_unlock(&conn_obj->remote_monitors);
422433
break;
423434
}
435+
case OPERATION_SPAWN_REQUEST: {
436+
if (UNLIKELY(arity != 6)) {
437+
RAISE_ERROR(BADARG_ATOM);
438+
}
439+
term roots[4];
440+
roots[0] = argv[0];
441+
roots[1] = argv[1];
442+
roots[2] = control;
443+
term arglist = externalterm_to_term_with_roots(data + 1 + bytes_read, binary_len - 1 - bytes_read, ctx, ExternalTermCopy, &bytes_read, 3, roots);
444+
roots[3] = arglist;
445+
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(4)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
446+
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
447+
}
448+
term mfa = term_get_tuple_element(control, 4);
449+
if (UNLIKELY(!term_is_tuple(mfa) || term_get_tuple_arity(mfa) != 3)) {
450+
RAISE_ERROR(BADARG_ATOM);
451+
}
452+
if (UNLIKELY(!term_is_list(arglist))) {
453+
RAISE_ERROR(BADARG_ATOM);
454+
}
455+
term reqid = term_get_tuple_element(control, 1);
456+
term from = term_get_tuple_element(control, 2);
457+
if (UNLIKELY(!term_is_pid(from))) {
458+
RAISE_ERROR(BADARG_ATOM);
459+
}
460+
// term groupleader = term_get_tuple_element(control, 3);
461+
term options = term_get_tuple_element(control, 5);
462+
463+
term request_tuple = term_alloc_tuple(4, &ctx->heap);
464+
term_put_tuple_element(request_tuple, 0, argv[0]);
465+
term_put_tuple_element(request_tuple, 1, reqid);
466+
term_put_tuple_element(request_tuple, 2, from);
467+
term_put_tuple_element(request_tuple, 3, options);
468+
term request_opt = term_alloc_tuple(2, &ctx->heap);
469+
term_put_tuple_element(request_opt, 0, REQUEST_ATOM);
470+
term_put_tuple_element(request_opt, 1, request_tuple);
471+
term spawn_opts = term_list_prepend(request_opt, term_nil(), &ctx->heap);
472+
473+
// reuse roots for args
474+
roots[0] = term_get_tuple_element(mfa, 0);
475+
roots[1] = term_get_tuple_element(mfa, 1);
476+
roots[2] = arglist;
477+
roots[3] = spawn_opts;
478+
nif_erlang_spawn_opt(ctx, 4, roots);
479+
break;
480+
}
424481
default:
425482
printf("Unknown distribution protocol operation id %d\n", (int) term_to_int(operation));
426483
RAISE_ERROR(BADARG_ATOM);
@@ -446,6 +503,24 @@ void dist_send_message(term external_pid, term payload, Context *ctx)
446503
synclist_unlock(&ctx->global->dist_connections);
447504
}
448505

506+
void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global)
507+
{
508+
int flags =
509+
(link ? SPAWN_REPLY_FLAGS_LINK_CREATED : 0)
510+
| (monitor ? SPAWN_REPLY_FLAGS_MONITOR_CREATED : 0);
511+
// allocate tuple
512+
BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(5), heap)
513+
term control_message = term_alloc_tuple(5, &heap);
514+
term_put_tuple_element(control_message, 0, term_from_int(OPERATION_SPAWN_REPLY));
515+
term_put_tuple_element(control_message, 1, req_id);
516+
term_put_tuple_element(control_message, 2, to_pid);
517+
term_put_tuple_element(control_message, 3, term_from_int(flags));
518+
term_put_tuple_element(control_message, 4, result);
519+
520+
dist_enqueue_message(control_message, term_invalid_term(), connection, global);
521+
END_WITH_STACK_HEAP(heap, global)
522+
}
523+
449524
const struct Nif setnode_3_nif = {
450525
.base.type = NIFFunctionType,
451526
.nif_ptr = nif_erlang_setnode_3

src/libAtomVM/dist_nifs.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,35 @@ extern const struct Nif dist_ctrl_get_data_notification_nif;
4141
extern const struct Nif dist_ctrl_get_data_nif;
4242
extern const struct Nif dist_ctrl_put_data_nif;
4343

44+
struct DistConnection;
45+
4446
void dist_send_message(term external_pid, term payload, Context *ctx);
4547

48+
/**
49+
* @doc Setup a monitor on a local process for a distributed process.
50+
* @end
51+
* @param conn_obj object of the connection
52+
* @param from_pid remote pid setting up the monitor
53+
* @param target_proc atom (for registered process) or pid of the local
54+
* process to monitor
55+
* @param monitor_ref reference used for monitor
56+
* @param ctx context for memory allocation
57+
*/
58+
term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx);
59+
60+
/**
61+
* @doc Send a spawn reply signal to a node
62+
* @end
63+
* @param conn_obj object of the connection
64+
* @param req_id reference identifying the request
65+
* @param to_pid (remote) process id identifying the caller
66+
* @param link if a link was created
67+
* @param monitor if a monitor was created
68+
* @param result pid of the spawned process or atom for an error
69+
* @param ctx context for memory allocation
70+
*/
71+
void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global);
72+
4673
#ifdef __cplusplus
4774
}
4875
#endif

src/libAtomVM/nifs.c

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "defaultatoms.h"
4242
#include "dictionary.h"
4343
#include "dist_nifs.h"
44+
#include "erl_nif_priv.h"
4445
#include "ets.h"
4546
#include "externalterm.h"
4647
#include "globalcontext.h"
@@ -134,7 +135,7 @@ static term nif_erlang_register_2(Context *ctx, int argc, term argv[]);
134135
static term nif_erlang_unregister_1(Context *ctx, int argc, term argv[]);
135136
static term nif_erlang_send_2(Context *ctx, int argc, term argv[]);
136137
static term nif_erlang_setelement_3(Context *ctx, int argc, term argv[]);
137-
static term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]);
138+
// static term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]);
138139
static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[]);
139140
static term nif_erlang_whereis_1(Context *ctx, int argc, term argv[]);
140141
static term nif_erlang_system_time_1(Context *ctx, int argc, term argv[]);
@@ -1222,6 +1223,7 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
12221223
term link_term = interop_proplist_get_value(opts_term, LINK_ATOM);
12231224
term monitor_term = interop_proplist_get_value(opts_term, MONITOR_ATOM);
12241225
term heap_growth_strategy = interop_proplist_get_value_default(opts_term, ATOMVM_HEAP_GROWTH_ATOM, BOUNDED_FREE_ATOM);
1226+
term request_term = interop_proplist_get_value(opts_term, REQUEST_ATOM);
12251227

12261228
if (min_heap_size_term != term_nil()) {
12271229
if (UNLIKELY(!term_is_integer(min_heap_size_term))) {
@@ -1303,6 +1305,34 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
13031305
term_put_tuple_element(pid_ref_tuple, 1, ref);
13041306

13051307
return pid_ref_tuple;
1308+
} else if (UNLIKELY(request_term != term_nil())) {
1309+
// Handling of spawn_request
1310+
// spawn_request requires that the reply is enqueued before
1311+
// any message from the spawned process
1312+
1313+
term dhandle = term_get_tuple_element(request_term, 0);
1314+
term request_ref = term_get_tuple_element(request_term, 1);
1315+
term request_from = term_get_tuple_element(request_term, 2);
1316+
term request_opts = term_get_tuple_element(request_term, 3);
1317+
monitor_term = interop_proplist_get_value(request_opts, MONITOR_ATOM);
1318+
// link_term = interop_proplist_get_value(request_opts, LINK_ATOM);
1319+
1320+
void *rsrc_obj_ptr;
1321+
if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), dhandle, ctx->global->dist_connection_resource_type, &rsrc_obj_ptr))) {
1322+
RAISE_ERROR(BADARG_ATOM);
1323+
}
1324+
struct DistConnection *conn_obj = (struct DistConnection *) rsrc_obj_ptr;
1325+
1326+
dist_spawn_reply(request_ref, request_from, false, monitor_term != term_nil(), new_pid, conn_obj, ctx->global);
1327+
1328+
// Also setup monitor, if any.
1329+
if (monitor_term != term_nil()) {
1330+
printf("setting up the monitor\n");
1331+
dist_monitor(conn_obj, request_from, new_pid, request_ref, ctx);
1332+
}
1333+
1334+
scheduler_init_ready(new_ctx);
1335+
return new_pid;
13061336
} else {
13071337
scheduler_init_ready(new_ctx);
13081338
return new_pid;
@@ -1360,7 +1390,7 @@ static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[])
13601390
return do_spawn(ctx, new_ctx, opts_term);
13611391
}
13621392

1363-
static term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
1393+
term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
13641394
{
13651395
UNUSED(argc);
13661396

src/libAtomVM/nifs.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ extern "C" {
4848

4949
const struct Nif *nifs_get(AtomString module, AtomString function, int arity);
5050

51+
// spawn opt is used by distribution nifs
52+
term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]);
5153
#ifdef __cplusplus
5254
}
5355
#endif

tests/libs/estdlib/test_net_kernel.erl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ test() ->
3333
ok = setup(Platform),
3434
ok = test_ping_from_beam(Platform),
3535
ok = test_fail_with_wrong_cookie(Platform),
36+
ok = test_rpc_from_beam(Platform),
3637
ok;
3738
false ->
3839
io:format("~s: skipped\n", [?MODULE]),
@@ -92,6 +93,19 @@ test_fail_with_wrong_cookie(Platform) ->
9293
net_kernel:stop(),
9394
ok.
9495

96+
test_rpc_from_beam(Platform) ->
97+
{ok, _NetKernelPid} = net_kernel:start(atomvm, #{name_domain => shortnames}),
98+
Node = node(),
99+
erlang:set_cookie('AtomVM'),
100+
Result = execute_command(
101+
Platform,
102+
"erl -sname otp -setcookie AtomVM -eval \"R = rpc:call('" ++ atom_to_list(Node) ++
103+
"', erlang, system_info, [machine]), erlang:display(R).\" -s init stop -noshell"
104+
),
105+
true = Result =:= lists:flatten(io_lib:format("~p\r\n", [Platform])),
106+
net_kernel:stop(),
107+
ok.
108+
95109
% On AtomVM, we need to start kernel.
96110
setup("BEAM") ->
97111
ok;

0 commit comments

Comments
 (0)