7070 OPERATION_ALIAS_SEND_TT = 34 ,
7171};
7272
73+ enum {
74+ SPAWN_REPLY_FLAGS_LINK_CREATED = 1 ,
75+ SPAWN_REPLY_FLAGS_MONITOR_CREATED = 2 ,
76+ };
77+
7378struct DistributionPacket
7479{
7580 struct ListHead head ;
@@ -318,6 +323,37 @@ static term nif_erlang_dist_ctrl_get_data(Context *ctx, int argc, term argv[])
318323 return result ;
319324}
320325
326+ term dist_monitor (struct DistConnection * conn_obj , term from_pid , term target_proc , term monitor_ref , Context * ctx )
327+ {
328+ int target_process_id = 0 ;
329+ if (term_is_local_pid (target_proc )) {
330+ target_process_id = term_to_local_process_id (target_proc );
331+ } else if (term_is_atom (target_proc )) {
332+ target_process_id = globalcontext_get_registered_process (ctx -> global , term_to_atom_index (target_proc ));
333+ } else {
334+ RAISE_ERROR (BADARG_ATOM );
335+ }
336+ struct RemoteMonitor * monitor = malloc (sizeof (struct RemoteMonitor ));
337+ monitor -> target_proc = target_proc ;
338+ monitor -> pid_number = term_get_external_pid_process_id (from_pid );
339+ monitor -> pid_serial = term_get_external_pid_serial (from_pid );
340+ monitor -> ref_len = term_get_external_reference_len (monitor_ref );
341+ memcpy (monitor -> ref_words , term_get_external_reference_words (monitor_ref ), sizeof (uint32_t ) * monitor -> ref_len );
342+ if (target_process_id ) {
343+ synclist_append (& conn_obj -> remote_monitors , & monitor -> head );
344+ ErlNifPid target_process_pid = target_process_id ;
345+ if (UNLIKELY (enif_monitor_process (erl_nif_env_from_context (ctx ), conn_obj , & target_process_pid , & monitor -> process_monitor ) != 0 )) {
346+ synclist_remove (& conn_obj -> remote_monitors , & monitor -> head );
347+ dist_enqueue_monitor_exit_message (monitor , NOPROC_ATOM , conn_obj , ctx -> global );
348+ free (monitor );
349+ }
350+ } else {
351+ dist_enqueue_monitor_exit_message (monitor , NOPROC_ATOM , conn_obj , ctx -> global );
352+ free (monitor );
353+ }
354+ return OK_ATOM ;
355+ }
356+
321357static term nif_erlang_dist_ctrl_put_data (Context * ctx , int argc , term argv [])
322358{
323359 UNUSED (argc );
@@ -369,32 +405,7 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
369405 term from_pid = term_get_tuple_element (control , 1 );
370406 term target_proc = term_get_tuple_element (control , 2 );
371407 term monitor_ref = term_get_tuple_element (control , 3 );
372- int target_process_id = 0 ;
373- if (term_is_local_pid (target_proc )) {
374- target_process_id = term_to_local_process_id (target_proc );
375- } else if (term_is_atom (target_proc )) {
376- target_process_id = globalcontext_get_registered_process (ctx -> global , term_to_atom_index (target_proc ));
377- } else {
378- RAISE_ERROR (BADARG_ATOM );
379- }
380- struct RemoteMonitor * monitor = malloc (sizeof (struct RemoteMonitor ));
381- monitor -> target_proc = target_proc ;
382- monitor -> pid_number = term_get_external_pid_process_id (from_pid );
383- monitor -> pid_serial = term_get_external_pid_serial (from_pid );
384- monitor -> ref_len = term_get_external_reference_len (monitor_ref );
385- memcpy (monitor -> ref_words , term_get_external_reference_words (monitor_ref ), sizeof (uint32_t ) * monitor -> ref_len );
386- if (target_process_id ) {
387- synclist_append (& conn_obj -> remote_monitors , & monitor -> head );
388- ErlNifPid target_process_pid = target_process_id ;
389- if (UNLIKELY (enif_monitor_process (erl_nif_env_from_context (ctx ), conn_obj , & target_process_pid , & monitor -> process_monitor ) != 0 )) {
390- synclist_remove (& conn_obj -> remote_monitors , & monitor -> head );
391- dist_enqueue_monitor_exit_message (monitor , NOPROC_ATOM , conn_obj , ctx -> global );
392- free (monitor );
393- }
394- } else {
395- dist_enqueue_monitor_exit_message (monitor , NOPROC_ATOM , conn_obj , ctx -> global );
396- free (monitor );
397- }
408+ dist_monitor (conn_obj , from_pid , target_proc , monitor_ref , ctx );
398409
399410 break ;
400411 }
@@ -421,6 +432,52 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
421432 synclist_unlock (& conn_obj -> remote_monitors );
422433 break ;
423434 }
435+ case OPERATION_SPAWN_REQUEST : {
436+ if (UNLIKELY (arity != 6 )) {
437+ RAISE_ERROR (BADARG_ATOM );
438+ }
439+ term roots [4 ];
440+ roots [0 ] = argv [0 ];
441+ roots [1 ] = argv [1 ];
442+ roots [2 ] = control ;
443+ term arglist = externalterm_to_term_with_roots (data + 1 + bytes_read , binary_len - 1 - bytes_read , ctx , ExternalTermCopy , & bytes_read , 3 , roots );
444+ roots [3 ] = arglist ;
445+ if (UNLIKELY (memory_ensure_free_with_roots (ctx , LIST_SIZE (1 , TUPLE_SIZE (2 ) + TUPLE_SIZE (4 )), 4 , roots , MEMORY_CAN_SHRINK ) != MEMORY_GC_OK )) {
446+ RAISE_ERROR (OUT_OF_MEMORY_ATOM );
447+ }
448+ term mfa = term_get_tuple_element (control , 4 );
449+ if (UNLIKELY (!term_is_tuple (mfa ) || term_get_tuple_arity (mfa ) != 3 )) {
450+ RAISE_ERROR (BADARG_ATOM );
451+ }
452+ if (UNLIKELY (!term_is_list (arglist ))) {
453+ RAISE_ERROR (BADARG_ATOM );
454+ }
455+ term reqid = term_get_tuple_element (control , 1 );
456+ term from = term_get_tuple_element (control , 2 );
457+ if (UNLIKELY (!term_is_pid (from ))) {
458+ RAISE_ERROR (BADARG_ATOM );
459+ }
460+ // term groupleader = term_get_tuple_element(control, 3);
461+ term options = term_get_tuple_element (control , 5 );
462+
463+ term request_tuple = term_alloc_tuple (4 , & ctx -> heap );
464+ term_put_tuple_element (request_tuple , 0 , argv [0 ]);
465+ term_put_tuple_element (request_tuple , 1 , reqid );
466+ term_put_tuple_element (request_tuple , 2 , from );
467+ term_put_tuple_element (request_tuple , 3 , options );
468+ term request_opt = term_alloc_tuple (2 , & ctx -> heap );
469+ term_put_tuple_element (request_opt , 0 , REQUEST_ATOM );
470+ term_put_tuple_element (request_opt , 1 , request_tuple );
471+ term spawn_opts = term_list_prepend (request_opt , term_nil (), & ctx -> heap );
472+
473+ // reuse roots for args
474+ roots [0 ] = term_get_tuple_element (mfa , 0 );
475+ roots [1 ] = term_get_tuple_element (mfa , 1 );
476+ roots [2 ] = arglist ;
477+ roots [3 ] = spawn_opts ;
478+ nif_erlang_spawn_opt (ctx , 4 , roots );
479+ break ;
480+ }
424481 default :
425482 printf ("Unknown distribution protocol operation id %d\n" , (int ) term_to_int (operation ));
426483 RAISE_ERROR (BADARG_ATOM );
@@ -446,6 +503,24 @@ void dist_send_message(term external_pid, term payload, Context *ctx)
446503 synclist_unlock (& ctx -> global -> dist_connections );
447504}
448505
506+ void dist_spawn_reply (term req_id , term to_pid , bool link , bool monitor , term result , struct DistConnection * connection , GlobalContext * global )
507+ {
508+ int flags =
509+ (link ? SPAWN_REPLY_FLAGS_LINK_CREATED : 0 )
510+ | (monitor ? SPAWN_REPLY_FLAGS_MONITOR_CREATED : 0 );
511+ // allocate tuple
512+ BEGIN_WITH_STACK_HEAP (TUPLE_SIZE (5 ), heap )
513+ term control_message = term_alloc_tuple (5 , & heap );
514+ term_put_tuple_element (control_message , 0 , term_from_int (OPERATION_SPAWN_REPLY ));
515+ term_put_tuple_element (control_message , 1 , req_id );
516+ term_put_tuple_element (control_message , 2 , to_pid );
517+ term_put_tuple_element (control_message , 3 , term_from_int (flags ));
518+ term_put_tuple_element (control_message , 4 , result );
519+
520+ dist_enqueue_message (control_message , term_invalid_term (), connection , global );
521+ END_WITH_STACK_HEAP (heap , global )
522+ }
523+
449524const struct Nif setnode_3_nif = {
450525 .base .type = NIFFunctionType ,
451526 .nif_ptr = nif_erlang_setnode_3
0 commit comments