7070 OPERATION_ALIAS_SEND_TT = 34 ,
7171};
7272
73+ enum
74+ {
75+ SPAWN_REPLY_FLAGS_LINK_CREATED = 1 ,
76+ SPAWN_REPLY_FLAGS_MONITOR_CREATED = 2 ,
77+ };
78+
7379struct DistributionPacket
7480{
7581 struct ListHead head ;
@@ -129,7 +135,7 @@ static void dist_connection_dtor(ErlNifEnv *caller_env, void *obj)
129135
130136static void dist_enqueue_message (term control_message , term payload , struct DistConnection * connection , GlobalContext * global )
131137{
132- size_t control_message_size = 0 ; // some compilers including esp-idf 5.0.7 is not smart enough
138+ size_t control_message_size = 0 ; // some compilers including esp-idf 5.0.7 are not smart enough
133139 enum ExternalTermResult serialize_result = externalterm_compute_external_size (control_message , & control_message_size , global );
134140 if (LIKELY (serialize_result == EXTERNAL_TERM_OK )) {
135141 size_t payload_size = 0 ;
@@ -195,10 +201,7 @@ static void dist_connection_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pi
195201
196202 struct DistConnection * conn_obj = (struct DistConnection * ) obj ;
197203
198- if (UNLIKELY (enif_compare_monitors (& conn_obj -> connection_process_monitor , mon ) == 0 )) {
199- struct RefcBinary * rsrc_refc = refc_binary_from_data (obj );
200- refc_binary_decrement_refcount (rsrc_refc , caller_env -> global );
201- } else {
204+ if (enif_compare_monitors (& conn_obj -> connection_process_monitor , mon ) != 0 ) {
202205 struct ListHead * remote_monitors = synclist_wrlock (& conn_obj -> remote_monitors );
203206 struct ListHead * item ;
204207 LIST_FOR_EACH (item , remote_monitors ) {
@@ -280,10 +283,6 @@ static term nif_erlang_setnode_3(Context *ctx, int argc, term argv[])
280283 list_prepend (dist_connections , & conn_obj -> head );
281284 synclist_unlock (& ctx -> global -> dist_connections );
282285
283- // Increment reference count as the resource should be alive until controller process dies
284- struct RefcBinary * rsrc_refc = refc_binary_from_data (conn_obj );
285- refc_binary_increment_refcount (rsrc_refc );
286-
287286 if (UNLIKELY (memory_ensure_free_opt (ctx , TERM_BOXED_RESOURCE_SIZE , MEMORY_CAN_SHRINK ) != MEMORY_GC_OK )) {
288287 RAISE_ERROR (OUT_OF_MEMORY_ATOM );
289288 }
@@ -339,6 +338,38 @@ static term nif_erlang_dist_ctrl_get_data(Context *ctx, int argc, term argv[])
339338 return result ;
340339}
341340
341+ term dist_monitor (struct DistConnection * conn_obj , term from_pid , term target_proc , term monitor_ref , Context * ctx )
342+ {
343+ if (term_is_atom (target_proc )) {
344+ target_proc = globalcontext_get_registered_process (ctx -> global , term_to_atom_index (target_proc ));
345+ }
346+ int target_process_id = 0 ;
347+ if (term_is_local_pid (target_proc )) {
348+ target_process_id = term_to_local_process_id (target_proc );
349+ } else {
350+ RAISE_ERROR (BADARG_ATOM );
351+ }
352+ struct RemoteMonitor * monitor = malloc (sizeof (struct RemoteMonitor ));
353+ monitor -> target_proc = target_proc ;
354+ monitor -> pid_number = term_get_external_pid_process_id (from_pid );
355+ monitor -> pid_serial = term_get_external_pid_serial (from_pid );
356+ monitor -> ref_len = term_get_external_reference_len (monitor_ref );
357+ memcpy (monitor -> ref_words , term_get_external_reference_words (monitor_ref ), sizeof (uint32_t ) * monitor -> ref_len );
358+ if (target_process_id ) {
359+ synclist_append (& conn_obj -> remote_monitors , & monitor -> head );
360+ ErlNifPid target_process_pid = target_process_id ;
361+ if (UNLIKELY (enif_monitor_process (erl_nif_env_from_context (ctx ), conn_obj , & target_process_pid , & monitor -> process_monitor ) != 0 )) {
362+ synclist_remove (& conn_obj -> remote_monitors , & monitor -> head );
363+ dist_enqueue_monitor_exit_message (monitor , NOPROC_ATOM , conn_obj , ctx -> global );
364+ free (monitor );
365+ }
366+ } else {
367+ dist_enqueue_monitor_exit_message (monitor , NOPROC_ATOM , conn_obj , ctx -> global );
368+ free (monitor );
369+ }
370+ return OK_ATOM ;
371+ }
372+
342373static term nif_erlang_dist_ctrl_put_data (Context * ctx , int argc , term argv [])
343374{
344375 UNUSED (argc );
@@ -390,32 +421,8 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
390421 term from_pid = term_get_tuple_element (control , 1 );
391422 term target_proc = term_get_tuple_element (control , 2 );
392423 term monitor_ref = term_get_tuple_element (control , 3 );
393- if (term_is_atom (target_proc )) {
394- target_proc = globalcontext_get_registered_process (ctx -> global , term_to_atom_index (target_proc ));
395- }
396- int target_process_id = 0 ;
397- if (term_is_local_pid (target_proc )) {
398- target_process_id = term_to_local_process_id (target_proc );
399- } else {
400- RAISE_ERROR (BADARG_ATOM );
401- }
402- struct RemoteMonitor * monitor = malloc (sizeof (struct RemoteMonitor ));
403- monitor -> target_proc = target_proc ;
404- monitor -> pid_number = term_get_external_pid_process_id (from_pid );
405- monitor -> pid_serial = term_get_external_pid_serial (from_pid );
406- monitor -> ref_len = term_get_external_reference_len (monitor_ref );
407- memcpy (monitor -> ref_words , term_get_external_reference_words (monitor_ref ), sizeof (uint32_t ) * monitor -> ref_len );
408- if (target_process_id ) {
409- synclist_append (& conn_obj -> remote_monitors , & monitor -> head );
410- ErlNifPid target_process_pid = target_process_id ;
411- if (UNLIKELY (enif_monitor_process (erl_nif_env_from_context (ctx ), conn_obj , & target_process_pid , & monitor -> process_monitor ) != 0 )) {
412- synclist_remove (& conn_obj -> remote_monitors , & monitor -> head );
413- dist_enqueue_monitor_exit_message (monitor , NOPROC_ATOM , conn_obj , ctx -> global );
414- free (monitor );
415- }
416- } else {
417- dist_enqueue_monitor_exit_message (monitor , NOPROC_ATOM , conn_obj , ctx -> global );
418- free (monitor );
424+ if (UNLIKELY (term_is_invalid_term (dist_monitor (conn_obj , from_pid , target_proc , monitor_ref , ctx )))) {
425+ return term_invalid_term ();
419426 }
420427
421428 break ;
@@ -443,6 +450,53 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
443450 synclist_unlock (& conn_obj -> remote_monitors );
444451 break ;
445452 }
453+ case OPERATION_SPAWN_REQUEST : {
454+ if (UNLIKELY (arity != 6 )) {
455+ RAISE_ERROR (BADARG_ATOM );
456+ }
457+ term roots [4 ];
458+ roots [0 ] = argv [0 ];
459+ roots [1 ] = argv [1 ];
460+ roots [2 ] = control ;
461+ roots [3 ] = externalterm_to_term_with_roots (data + 1 + bytes_read , binary_len - 1 - bytes_read , ctx , ExternalTermCopy , & bytes_read , 3 , roots );
462+ if (UNLIKELY (memory_ensure_free_with_roots (ctx , LIST_SIZE (1 , TUPLE_SIZE (2 ) + TUPLE_SIZE (4 )), 4 , roots , MEMORY_CAN_SHRINK ) != MEMORY_GC_OK )) {
463+ RAISE_ERROR (OUT_OF_MEMORY_ATOM );
464+ }
465+ control = roots [2 ];
466+ term arglist = roots [3 ];
467+ term mfa = term_get_tuple_element (control , 4 );
468+ if (UNLIKELY (!term_is_tuple (mfa ) || term_get_tuple_arity (mfa ) != 3 )) {
469+ RAISE_ERROR (BADARG_ATOM );
470+ }
471+ if (UNLIKELY (!term_is_list (arglist ))) {
472+ RAISE_ERROR (BADARG_ATOM );
473+ }
474+ term reqid = term_get_tuple_element (control , 1 );
475+ term from = term_get_tuple_element (control , 2 );
476+ if (UNLIKELY (!term_is_pid (from ))) {
477+ RAISE_ERROR (BADARG_ATOM );
478+ }
479+ // term groupleader = term_get_tuple_element(control, 3);
480+ term options = term_get_tuple_element (control , 5 );
481+
482+ term request_tuple = term_alloc_tuple (4 , & ctx -> heap );
483+ term_put_tuple_element (request_tuple , 0 , roots [0 ]);
484+ term_put_tuple_element (request_tuple , 1 , reqid );
485+ term_put_tuple_element (request_tuple , 2 , from );
486+ term_put_tuple_element (request_tuple , 3 , options );
487+ term request_opt = term_alloc_tuple (2 , & ctx -> heap );
488+ term_put_tuple_element (request_opt , 0 , REQUEST_ATOM );
489+ term_put_tuple_element (request_opt , 1 , request_tuple );
490+ term spawn_opts = term_list_prepend (request_opt , term_nil (), & ctx -> heap );
491+
492+ // reuse roots for args
493+ roots [0 ] = term_get_tuple_element (mfa , 0 );
494+ roots [1 ] = term_get_tuple_element (mfa , 1 );
495+ roots [2 ] = arglist ;
496+ roots [3 ] = spawn_opts ;
497+ nif_erlang_spawn_opt (ctx , 4 , roots );
498+ break ;
499+ }
446500 default :
447501 printf ("Unknown distribution protocol operation id %d\n" , (int ) term_to_int (operation ));
448502 RAISE_ERROR (BADARG_ATOM );
@@ -468,6 +522,23 @@ void dist_send_message(term external_pid, term payload, Context *ctx)
468522 synclist_unlock (& ctx -> global -> dist_connections );
469523}
470524
525+ void dist_spawn_reply (term req_id , term to_pid , bool link , bool monitor , term result , struct DistConnection * connection , GlobalContext * global )
526+ {
527+ int flags = (link ? SPAWN_REPLY_FLAGS_LINK_CREATED : 0 )
528+ | (monitor ? SPAWN_REPLY_FLAGS_MONITOR_CREATED : 0 );
529+ // allocate tuple
530+ BEGIN_WITH_STACK_HEAP (TUPLE_SIZE (5 ), heap )
531+ term control_message = term_alloc_tuple (5 , & heap );
532+ term_put_tuple_element (control_message , 0 , term_from_int (OPERATION_SPAWN_REPLY ));
533+ term_put_tuple_element (control_message , 1 , req_id );
534+ term_put_tuple_element (control_message , 2 , to_pid );
535+ term_put_tuple_element (control_message , 3 , term_from_int (flags ));
536+ term_put_tuple_element (control_message , 4 , result );
537+
538+ dist_enqueue_message (control_message , term_invalid_term (), connection , global );
539+ END_WITH_STACK_HEAP (heap , global )
540+ }
541+
471542const struct Nif setnode_3_nif = {
472543 .base .type = NIFFunctionType ,
473544 .nif_ptr = nif_erlang_setnode_3
0 commit comments