7070 OPERATION_ALIAS_SEND_TT = 34 ,
7171};
7272
73+ enum
74+ {
75+ SPAWN_REPLY_FLAGS_LINK_CREATED = 1 ,
76+ SPAWN_REPLY_FLAGS_MONITOR_CREATED = 2 ,
77+ };
78+
7379struct DistributionPacket
7480{
7581 struct ListHead head ;
@@ -318,6 +324,37 @@ static term nif_erlang_dist_ctrl_get_data(Context *ctx, int argc, term argv[])
318324 return result ;
319325}
320326
327+ term dist_monitor (struct DistConnection * conn_obj , term from_pid , term target_proc , term monitor_ref , Context * ctx )
328+ {
329+ int target_process_id = 0 ;
330+ if (term_is_local_pid (target_proc )) {
331+ target_process_id = term_to_local_process_id (target_proc );
332+ } else if (term_is_atom (target_proc )) {
333+ target_process_id = globalcontext_get_registered_process (ctx -> global , term_to_atom_index (target_proc ));
334+ } else {
335+ RAISE_ERROR (BADARG_ATOM );
336+ }
337+ struct RemoteMonitor * monitor = malloc (sizeof (struct RemoteMonitor ));
338+ monitor -> target_proc = target_proc ;
339+ monitor -> pid_number = term_get_external_pid_process_id (from_pid );
340+ monitor -> pid_serial = term_get_external_pid_serial (from_pid );
341+ monitor -> ref_len = term_get_external_reference_len (monitor_ref );
342+ memcpy (monitor -> ref_words , term_get_external_reference_words (monitor_ref ), sizeof (uint32_t ) * monitor -> ref_len );
343+ if (target_process_id ) {
344+ synclist_append (& conn_obj -> remote_monitors , & monitor -> head );
345+ ErlNifPid target_process_pid = target_process_id ;
346+ if (UNLIKELY (enif_monitor_process (erl_nif_env_from_context (ctx ), conn_obj , & target_process_pid , & monitor -> process_monitor ) != 0 )) {
347+ synclist_remove (& conn_obj -> remote_monitors , & monitor -> head );
348+ dist_enqueue_monitor_exit_message (monitor , NOPROC_ATOM , conn_obj , ctx -> global );
349+ free (monitor );
350+ }
351+ } else {
352+ dist_enqueue_monitor_exit_message (monitor , NOPROC_ATOM , conn_obj , ctx -> global );
353+ free (monitor );
354+ }
355+ return OK_ATOM ;
356+ }
357+
321358static term nif_erlang_dist_ctrl_put_data (Context * ctx , int argc , term argv [])
322359{
323360 UNUSED (argc );
@@ -369,32 +406,7 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
369406 term from_pid = term_get_tuple_element (control , 1 );
370407 term target_proc = term_get_tuple_element (control , 2 );
371408 term monitor_ref = term_get_tuple_element (control , 3 );
372- int target_process_id = 0 ;
373- if (term_is_local_pid (target_proc )) {
374- target_process_id = term_to_local_process_id (target_proc );
375- } else if (term_is_atom (target_proc )) {
376- target_process_id = globalcontext_get_registered_process (ctx -> global , term_to_atom_index (target_proc ));
377- } else {
378- RAISE_ERROR (BADARG_ATOM );
379- }
380- struct RemoteMonitor * monitor = malloc (sizeof (struct RemoteMonitor ));
381- monitor -> target_proc = target_proc ;
382- monitor -> pid_number = term_get_external_pid_process_id (from_pid );
383- monitor -> pid_serial = term_get_external_pid_serial (from_pid );
384- monitor -> ref_len = term_get_external_reference_len (monitor_ref );
385- memcpy (monitor -> ref_words , term_get_external_reference_words (monitor_ref ), sizeof (uint32_t ) * monitor -> ref_len );
386- if (target_process_id ) {
387- synclist_append (& conn_obj -> remote_monitors , & monitor -> head );
388- ErlNifPid target_process_pid = target_process_id ;
389- if (UNLIKELY (enif_monitor_process (erl_nif_env_from_context (ctx ), conn_obj , & target_process_pid , & monitor -> process_monitor ) != 0 )) {
390- synclist_remove (& conn_obj -> remote_monitors , & monitor -> head );
391- dist_enqueue_monitor_exit_message (monitor , NOPROC_ATOM , conn_obj , ctx -> global );
392- free (monitor );
393- }
394- } else {
395- dist_enqueue_monitor_exit_message (monitor , NOPROC_ATOM , conn_obj , ctx -> global );
396- free (monitor );
397- }
409+ dist_monitor (conn_obj , from_pid , target_proc , monitor_ref , ctx );
398410
399411 break ;
400412 }
@@ -421,6 +433,53 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
421433 synclist_unlock (& conn_obj -> remote_monitors );
422434 break ;
423435 }
436+ case OPERATION_SPAWN_REQUEST : {
437+ if (UNLIKELY (arity != 6 )) {
438+ RAISE_ERROR (BADARG_ATOM );
439+ }
440+ term roots [4 ];
441+ roots [0 ] = argv [0 ];
442+ roots [1 ] = argv [1 ];
443+ roots [2 ] = control ;
444+ roots [3 ] = externalterm_to_term_with_roots (data + 1 + bytes_read , binary_len - 1 - bytes_read , ctx , ExternalTermCopy , & bytes_read , 3 , roots );
445+ if (UNLIKELY (memory_ensure_free_with_roots (ctx , LIST_SIZE (1 , TUPLE_SIZE (2 ) + TUPLE_SIZE (4 )), 4 , roots , MEMORY_CAN_SHRINK ) != MEMORY_GC_OK )) {
446+ RAISE_ERROR (OUT_OF_MEMORY_ATOM );
447+ }
448+ control = roots [2 ];
449+ term arglist = roots [3 ];
450+ term mfa = term_get_tuple_element (control , 4 );
451+ if (UNLIKELY (!term_is_tuple (mfa ) || term_get_tuple_arity (mfa ) != 3 )) {
452+ RAISE_ERROR (BADARG_ATOM );
453+ }
454+ if (UNLIKELY (!term_is_list (arglist ))) {
455+ RAISE_ERROR (BADARG_ATOM );
456+ }
457+ term reqid = term_get_tuple_element (control , 1 );
458+ term from = term_get_tuple_element (control , 2 );
459+ if (UNLIKELY (!term_is_pid (from ))) {
460+ RAISE_ERROR (BADARG_ATOM );
461+ }
462+ // term groupleader = term_get_tuple_element(control, 3);
463+ term options = term_get_tuple_element (control , 5 );
464+
465+ term request_tuple = term_alloc_tuple (4 , & ctx -> heap );
466+ term_put_tuple_element (request_tuple , 0 , roots [0 ]);
467+ term_put_tuple_element (request_tuple , 1 , reqid );
468+ term_put_tuple_element (request_tuple , 2 , from );
469+ term_put_tuple_element (request_tuple , 3 , options );
470+ term request_opt = term_alloc_tuple (2 , & ctx -> heap );
471+ term_put_tuple_element (request_opt , 0 , REQUEST_ATOM );
472+ term_put_tuple_element (request_opt , 1 , request_tuple );
473+ term spawn_opts = term_list_prepend (request_opt , term_nil (), & ctx -> heap );
474+
475+ // reuse roots for args
476+ roots [0 ] = term_get_tuple_element (mfa , 0 );
477+ roots [1 ] = term_get_tuple_element (mfa , 1 );
478+ roots [2 ] = arglist ;
479+ roots [3 ] = spawn_opts ;
480+ nif_erlang_spawn_opt (ctx , 4 , roots );
481+ break ;
482+ }
424483 default :
425484 printf ("Unknown distribution protocol operation id %d\n" , (int ) term_to_int (operation ));
426485 RAISE_ERROR (BADARG_ATOM );
@@ -446,6 +505,23 @@ void dist_send_message(term external_pid, term payload, Context *ctx)
446505 synclist_unlock (& ctx -> global -> dist_connections );
447506}
448507
508+ void dist_spawn_reply (term req_id , term to_pid , bool link , bool monitor , term result , struct DistConnection * connection , GlobalContext * global )
509+ {
510+ int flags = (link ? SPAWN_REPLY_FLAGS_LINK_CREATED : 0 )
511+ | (monitor ? SPAWN_REPLY_FLAGS_MONITOR_CREATED : 0 );
512+ // allocate tuple
513+ BEGIN_WITH_STACK_HEAP (TUPLE_SIZE (5 ), heap )
514+ term control_message = term_alloc_tuple (5 , & heap );
515+ term_put_tuple_element (control_message , 0 , term_from_int (OPERATION_SPAWN_REPLY ));
516+ term_put_tuple_element (control_message , 1 , req_id );
517+ term_put_tuple_element (control_message , 2 , to_pid );
518+ term_put_tuple_element (control_message , 3 , term_from_int (flags ));
519+ term_put_tuple_element (control_message , 4 , result );
520+
521+ dist_enqueue_message (control_message , term_invalid_term (), connection , global );
522+ END_WITH_STACK_HEAP (heap , global )
523+ }
524+
449525const struct Nif setnode_3_nif = {
450526 .base .type = NIFFunctionType ,
451527 .nif_ptr = nif_erlang_setnode_3
0 commit comments