1111#include "edf_channel_inspect.h"
1212#include "edf_channel_recv.h"
1313
14+ #include "../config/edf_config.h"
1415#include "../core/simd.h"
1516#include "../erts/dist.h"
17+ #include "../uterm/uterm.h"
1618
1719ERL_NIF_TERM
1820erldist_filter_nif_channel_open_5 (ErlNifEnv * env , int argc , const ERL_NIF_TERM argv [])
@@ -212,33 +214,73 @@ erldist_filter_nif_channel_list_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM a
212214 return list ;
213215}
214216
217+ static int channel_recv_2_fast (ErlNifEnv * env , edf_channel_t * channel , ERL_NIF_TERM input_term , ErlNifBinary * input_binary ,
218+ ERL_NIF_TERM * out_termp , ERL_NIF_TERM * err_termp );
219+
215220ERL_NIF_TERM
216221erldist_filter_nif_channel_recv_2 (ErlNifEnv * env , int argc , const ERL_NIF_TERM argv [])
217222{
218- ERL_NIF_TERM out_term ;
223+ ERL_NIF_TERM out_term = THE_NON_VALUE ;
224+ ERL_NIF_TERM err_term = THE_NON_VALUE ;
219225 ERL_NIF_TERM trap_term ;
220226 edf_channel_resource_t * resource = NULL ;
221227 edf_channel_t * channel = NULL ;
222228 int flags = (EDF_CHANNEL_RESOURCE_FLAG_OWNER_REQUIRED | EDF_CHANNEL_RESOURCE_FLAG_WRITE_LOCK );
229+ bool eligible_for_fast_recv = true;
230+ unsigned int iovec_length ;
231+ ERL_NIF_TERM iovec_tail = THE_NON_VALUE ;
232+ ERL_NIF_TERM input_term = THE_NON_VALUE ;
233+ ErlNifBinary input_binary ;
223234
224235 if (argc != 2 ) {
225236 return EXCP_BADARG (env , "argc must be 2" );
226237 }
227238
228- if (!edf_channel_resource_acquire (env , argv [0 ], & resource , & channel , & out_term , flags )) {
229- return out_term ;
239+ if (!edf_channel_resource_acquire (env , argv [0 ], & resource , & channel , & err_term , flags )) {
240+ return err_term ;
241+ }
242+
243+ if (ioq_size (& channel -> rx .ioq ) > 0 || edf_config_is_untrusted_enabled ()) {
244+ eligible_for_fast_recv = false;
245+ }
246+
247+ if (eligible_for_fast_recv && enif_is_list (env , argv [1 ]) && enif_get_list_length (env , argv [1 ], & iovec_length ) &&
248+ iovec_length == 1 && enif_get_list_cell (env , argv [1 ], & input_term , & iovec_tail ) && enif_is_binary (env , input_term ) &&
249+ enif_is_empty_list (env , iovec_tail )) {
250+ eligible_for_fast_recv = true;
251+ } else {
252+ input_term = argv [1 ];
253+ eligible_for_fast_recv = (eligible_for_fast_recv && enif_is_binary (env , argv [1 ])) ? true : false;
254+ }
255+
256+ if (eligible_for_fast_recv && enif_is_binary (env , input_term ) && enif_inspect_binary (env , input_term , & input_binary )) {
257+ int retval = channel_recv_2_fast (env , channel , input_term , & input_binary , & out_term , & err_term );
258+ if (retval == 0 ) {
259+ (void )edf_channel_resource_release (& resource , & channel , flags );
260+ return err_term ;
261+ } else if (retval == 1 ) {
262+ (void )edf_channel_resource_release (& resource , & channel , flags );
263+ return out_term ;
264+ } else {
265+ eligible_for_fast_recv = false;
266+ }
230267 }
231268
232- if (!ioq_inspect_iovec_and_enqv (env , ~(0ULL ), argv [1 ], & out_term , & channel -> rx .ioq )) {
269+ if (enif_is_binary (env , input_term )) {
270+ input_term = enif_make_list1 (env , input_term );
271+ }
272+
273+ if (!ioq_inspect_iovec_and_enqv (env , ~(0ULL ), input_term , & out_term , & channel -> rx .ioq )) {
233274 (void )edf_channel_resource_release (& resource , & channel , flags );
234- return EXCP_BADARG (env , "Iodata is invalid" );
275+ return EXCP_BADARG (env , "Input is invalid" );
235276 }
236277
237278 if (!enif_is_empty_list (env , out_term )) {
238279 (void )edf_channel_resource_release (& resource , & channel , flags );
239280 return EXCP_BADARG (env , "Call to enif_ioq_enqv() failed: expected tail to be an empty list" );
240281 }
241282
283+ CHANNEL_RX_STATS_COUNT (channel , slowpath , 1 );
242284 trap_term = edf_channel_recv_trap_open (env , resource , channel , NULL );
243285 if (enif_is_exception (env , trap_term )) {
244286 (void )edf_channel_resource_release (& resource , & channel , flags );
@@ -248,6 +290,203 @@ erldist_filter_nif_channel_recv_2(ErlNifEnv *env, int argc, const ERL_NIF_TERM a
248290 return edf_trap_schedule_from_term (env , trap_term );
249291}
250292
293+ int
294+ channel_recv_2_fast (ErlNifEnv * env , edf_channel_t * channel , ERL_NIF_TERM input_term , ErlNifBinary * input_binary ,
295+ ERL_NIF_TERM * out_termp , ERL_NIF_TERM * err_termp )
296+ {
297+ uint8_t dist_header_type ;
298+ uint64_t fragment_id ;
299+ const uint8_t * p = NULL ;
300+ const uint8_t * pend = NULL ;
301+ size_t packet_length = 0 ;
302+
303+ if (input_binary -> size == 0 ) {
304+ * out_termp = enif_make_list (env , 0 );
305+ return 1 ;
306+ }
307+
308+ p = (void * )input_binary -> data ;
309+ pend = p + input_binary -> size ;
310+
311+ if (channel -> rx .packet_size != 0 ) {
312+ if ((pend - p ) < channel -> rx .packet_size ) {
313+ goto try_decode_slow ;
314+ }
315+ switch (channel -> rx .packet_size ) {
316+ case 1 :
317+ packet_length = (size_t )(* p ++ );
318+ break ;
319+ case 2 :
320+ packet_length = (size_t )(be16toh (* ((uint16_t * )(void * )(p ))));
321+ p += 2 ;
322+ break ;
323+ case 4 :
324+ packet_length = (size_t )(be32toh (* ((uint32_t * )(void * )(p ))));
325+ p += 4 ;
326+ break ;
327+ case 8 :
328+ packet_length = (size_t )(be64toh (* ((uint64_t * )(void * )(p ))));
329+ p += 8 ;
330+ break ;
331+ default :
332+ goto try_decode_slow ;
333+ }
334+ if ((pend - p ) != packet_length ) {
335+ goto try_decode_slow ;
336+ }
337+ } else {
338+ packet_length = input_binary -> size ;
339+ }
340+
341+ if ((channel -> dflags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_FRAGMENTS )) == 0 ) {
342+ // If none of these distribution flags are set, then we fallback to "pass-through" mode.
343+ // We expect a PASS_THROUGH byte to be present.
344+ if (* (p ++ ) != PASS_THROUGH ) {
345+ * err_termp = EXCP_ERROR_F (env , "Dist Frame in pass-through mode must have an 8-bit type field equal to %u (was %u)\n" ,
346+ PASS_THROUGH , * (p - 1 ));
347+ return 0 ;
348+ }
349+ dist_header_type = PASS_THROUGH ;
350+ goto try_decode_slow ;
351+ } else {
352+ // Otherwise, we expect a VERSION_MAGIC byte to be present.
353+ if (* (p ++ ) != VERSION_MAGIC ) {
354+ * err_termp = EXCP_ERROR_F (env , "Dist Frame in pass-through mode must have an 8-bit type field equal to %u (was %u)\n" ,
355+ PASS_THROUGH , * (p - 1 ));
356+ return 0 ;
357+ }
358+ if (* p == DIST_HEADER ) {
359+ p ++ ;
360+ if ((channel -> dflags & (DFLAG_DIST_HDR_ATOM_CACHE )) == 0 ) {
361+ * err_termp = EXCP_ERROR (env , "Dist Frame tagged with 'DIST_HEADER' not support by current "
362+ "distribution flags (missing: DFLAG_DIST_HDR_ATOM_CACHE).\n" );
363+ return 0 ;
364+ }
365+ dist_header_type = DIST_HEADER ;
366+ goto try_decode_fast ;
367+ } else if (* p == DIST_FRAG_HEADER ) {
368+ p ++ ;
369+ if ((channel -> dflags & (DFLAG_FRAGMENTS )) == 0 ) {
370+ * err_termp = EXCP_ERROR (env , "Dist Frame tagged with 'DIST_FRAG_HEADER' not support by current "
371+ "distribution flags (missing: DFLAG_FRAGMENTS).\n" );
372+ return 0 ;
373+ }
374+ if ((pend - p ) < 16 ) {
375+ * err_termp = EXCP_ERROR_F (
376+ env , "Dist Frame tagged with 'DIST_FRAG_HEADER' must be at least 16-bytes in size (was %u-bytes).\n" ,
377+ (pend - p ));
378+ return 0 ;
379+ }
380+ // skip sequence_id
381+ p += 8 ;
382+ fragment_id = be64toh (* ((uint64_t * )(void * )(p )));
383+ p += 8 ;
384+ dist_header_type = DIST_FRAG_HEADER ;
385+ if (fragment_id != 1 ) {
386+ goto try_decode_slow ;
387+ }
388+ goto try_decode_fast ;
389+ } else if (* p == DIST_FRAG_CONT ) {
390+ dist_header_type = DIST_FRAG_CONT ;
391+ goto try_decode_slow ;
392+ } else {
393+ * err_termp = EXCP_ERROR_F (env , "Dist Frame tagged with unrecognized tag '%u' is not supported.\n" , * p );
394+ return 0 ;
395+ }
396+ }
397+ try_decode_fast : {
398+ vec_t vec [1 ];
399+ vec_t control_vec [1 ];
400+ vec_reader_t vr [1 ];
401+ edf_atom_translation_table_t attab [1 ];
402+ int external_flags = 0 ;
403+ slice_t headers [1 ];
404+ slice_t control [1 ];
405+ slice_t payload [1 ];
406+ vterm_env_t vtenv [1 ];
407+ udist_t up [1 ];
408+ (void )edf_atom_translation_table_init (attab );
409+ (void )vec_init_free (vec );
410+ (void )vec_create_from_slice (vec , p , pend );
411+ (void )vec_reader_create (vr , vec , 0 );
412+ if (!etf_fast_decode_dist_header (env , channel , attab , vr , & external_flags , headers , err_termp )) {
413+ return 0 ;
414+ }
415+ control -> head = vec_reader_raw_bytes (vr );
416+ if (!etf_fast_skip_terms (env , false, vr , 1 , err_termp )) {
417+ return 0 ;
418+ }
419+ control -> tail = vec_reader_raw_bytes (vr );
420+ vtenv -> nif_env = env ;
421+ vtenv -> attab = attab ;
422+ (void )vec_init_free (control_vec );
423+ (void )vec_create_from_slice (control_vec , control -> head , control -> tail );
424+ (void )udist_init (up );
425+ if (!etf_decode_udist_control (env , vtenv , false, true, control_vec , up , err_termp )) {
426+ return 0 ;
427+ }
428+ if (udist_control_is_send_to_name (up ) &&
429+ (up -> control .data .send_to == ATOM (net_kernel ) || up -> control .data .send_to == ATOM (rex ))) {
430+ goto try_decode_slow ;
431+ }
432+ if (up -> info .payload && udist_control_is_send (up )) {
433+ payload -> head = vec_reader_raw_bytes (vr );
434+ if (!etf_fast_skip_terms (env , false, vr , 1 , err_termp )) {
435+ return 0 ;
436+ }
437+ payload -> tail = vec_reader_raw_bytes (vr );
438+ if (!udist_classify (env , vtenv , up , false, false, payload , err_termp )) {
439+ return 0 ;
440+ }
441+ if (up -> flags == UDIST_CLASSIFY_FLAG_EMIT ) {
442+ if (packet_length < input_binary -> size ) {
443+ input_term = enif_make_sub_binary (env , input_term , channel -> rx .packet_size , packet_length );
444+ }
445+ * out_termp = enif_make_list1 (env , enif_make_tuple2 (env , ATOM (emit ), input_term ));
446+ {
447+ edf_channel_stats_dop_t * statsdop = NULL ;
448+ edf_world_slot_t * slot = NULL ;
449+ CHANNEL_RX_STATS_COUNT (channel , fastpath , 1 );
450+ CHANNEL_RX_STATS_COUNT (channel , packet_count , 1 );
451+ CHANNEL_RX_STATS_COUNT (channel , emit_count , 1 );
452+ switch (dist_header_type ) {
453+ case PASS_THROUGH :
454+ CHANNEL_RX_STATS_COUNT (channel , dist_pass_through_count , 1 );
455+ break ;
456+ case DIST_HEADER :
457+ CHANNEL_RX_STATS_COUNT (channel , dist_header_count , 1 );
458+ break ;
459+ case DIST_FRAG_HEADER :
460+ CHANNEL_RX_STATS_COUNT (channel , dist_frag_header_count , 1 );
461+ break ;
462+ case DIST_FRAG_CONT :
463+ CHANNEL_RX_STATS_COUNT (channel , dist_frag_cont_count , 1 );
464+ break ;
465+ default :
466+ break ;
467+ }
468+ statsdop = NULL ;
469+ if (udist_get_channel_stats_dop (up , & channel -> rx .stats , & statsdop ) && statsdop != NULL ) {
470+ statsdop -> seen += 1 ;
471+ statsdop -> emit += 1 ;
472+ }
473+ statsdop = NULL ;
474+ slot = edf_world_get ();
475+ if (udist_get_channel_stats_dop (up , & slot -> stats .channel .rx_stats , & statsdop ) && statsdop != NULL ) {
476+ statsdop -> seen += 1 ;
477+ statsdop -> emit += 1 ;
478+ }
479+ };
480+ return 1 ;
481+ }
482+ }
483+ goto try_decode_slow ;
484+ }
485+ try_decode_slow : {
486+ return -1 ;
487+ }
488+ }
489+
251490ERL_NIF_TERM
252491erldist_filter_nif_channel_set_controlling_process_2 (ErlNifEnv * env , int argc , const ERL_NIF_TERM argv [])
253492{
0 commit comments