Skip to content

Commit 0bbb2ec

Browse files
committed
Support OTP 26, add routers/handlers, and prepare for version 1.0.0 release
1 parent 8d8b347 commit 0bbb2ec

File tree

69 files changed

+6765
-733
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+6765
-733
lines changed

.formatter.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@
66

77
# Used by "mix format"
88
[
9-
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
9+
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}", "apps/erldist_filter/{config,lib,test}/**/*.{ex,exs}"],
1010
line_length: 132
1111
]

CHANGELOG.md

Lines changed: 0 additions & 5 deletions
This file was deleted.

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
apps/erldist_filter/CHANGELOG.md

apps/erldist_filter/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Changelog
2+
3+
## 1.0.0 (2023-09-07)
4+
5+
* Initial release.

apps/erldist_filter/c_src/nif/action.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ action_into_term_emit(ErlNifEnv *env, action_t *action, ERL_NIF_TERM *termp)
7777
action_term = vec_into_binary_term(env, &action->data.emit.vec);
7878
action_term = enif_make_tuple2(env, ATOM(emit), action_term);
7979
*termp = action_term;
80-
(void)action_init_free(action);
80+
(void)action_destroy(action);
8181
return 1;
8282
}
8383

apps/erldist_filter/c_src/nif/channel/edf_channel.c

Lines changed: 10 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
ErlNifResourceType *edf_channel_resource_type = NULL;
2020
static edf_channel_resource_table_t edf_channel_resource_table_internal = {._link = {.next = NULL, .prev = NULL}};
2121
edf_channel_resource_table_t *edf_channel_resource_table = &edf_channel_resource_table_internal;
22-
static edf_channel_index_table_t edf_channel_index_table_internal = {._link = {.next = NULL, .prev = NULL}};
23-
edf_channel_index_table_t *edf_channel_index_table = &edf_channel_index_table_internal;
2422

2523
/* Static Declarations */
2624

@@ -30,8 +28,6 @@ static void edf_channel_resource_type_down(ErlNifEnv *env, void *obj, ErlNifPid
3028
static int edf_channel_create(ErlNifEnv *env, size_t packet_size, ERL_NIF_TERM sysname, uint32_t creation, uint32_t connection_id,
3129
uint64_t dflags, edf_channel_resource_t *resource, edf_channel_t **channelp,
3230
ERL_NIF_TERM *error_term);
33-
static void edf_channel_stats_init_empty(edf_channel_stats_t *stats);
34-
static void edf_channel_stats_dop_init_empty(edf_channel_stats_dop_t *dop);
3531

3632
/* Function Definitions */
3733

@@ -59,15 +55,6 @@ edf_channel_load(ErlNifEnv *env)
5955
(void)linklist_init_anchor(&edf_channel_resource_table->_link);
6056
}
6157

62-
if (!linklist_is_linked(&edf_channel_index_table->_link)) {
63-
retval = enif_tsd_key_create("erldist_filter.channel_index_table_tsd_key", &edf_channel_index_table->key);
64-
if (retval != 0) {
65-
return retval;
66-
}
67-
(void)core_rwlock_create(&edf_channel_index_table->rwlock, "erldist_filter.channel_index_table_rwlock");
68-
(void)linklist_init_anchor(&edf_channel_index_table->_link);
69-
}
70-
7158
return retval;
7259
}
7360

@@ -114,7 +101,7 @@ edf_channel_resource_open(ErlNifEnv *env, size_t packet_size, ERL_NIF_TERM sysna
114101
// Don't release the resource here, so a reference is kept on the root table.
115102
// (void)enif_release_resource((void *)resource);
116103

117-
(void)atomic_fetch_add(&edf_world->channels_created, 1);
104+
WORLD_STATS_COUNT(channel, create, 1);
118105

119106
if (resourcep != NULL) {
120107
*resourcep = resource;
@@ -198,6 +185,8 @@ edf_channel_create(ErlNifEnv *env, size_t packet_size, ERL_NIF_TERM sysname, uin
198185
channel->creation = creation;
199186
channel->connection_id = connection_id;
200187
channel->dflags = dflags;
188+
channel->rx.router_name = edf_channel_router_name(env, channel->sysname);
189+
channel->rx.sort = 0;
201190
channel->rx.packet_size = packet_size;
202191
channel->rx.state = EDF_CHANNEL_RX_STATE_PACKET_HEADER;
203192
(void)ioq_init_free(&channel->rx.ioq);
@@ -287,93 +276,16 @@ edf_channel_destroy(ErlNifEnv *env, edf_channel_resource_t *resource, edf_channe
287276
}
288277
resource->inner = NULL;
289278
(void)enif_free((void *)channel);
290-
(void)atomic_fetch_add(&edf_world->channels_destroyed, 1);
279+
WORLD_STATS_COUNT(channel, destroy, 1);
291280
return;
292281
}
293282

294-
edf_channel_index_slot_t *
295-
edf_channel_index_get_slow(void)
296-
{
297-
edf_channel_index_slot_t *slot = NULL;
298-
slot = (void *)enif_tsd_get(edf_channel_index_table->key);
299-
if (slot == NULL) {
300-
slot = enif_alloc(sizeof(edf_channel_index_slot_t));
301-
if (slot == NULL) {
302-
unreachable();
303-
(void)perror("Too many processors or threads on this machine: OOM, unable to allocate edf_channel_index_slot_t!");
304-
abort();
305-
return NULL;
306-
}
307-
slot->_link.prev = NULL;
308-
slot->_link.next = NULL;
309-
(void)edf_channel_stats_init_empty(&slot->rx_stats);
310-
(void)core_rwlock_write_lock(&edf_channel_index_table->rwlock);
311-
(void)linklist_insert(&edf_channel_index_table->_link, &slot->_link);
312-
(void)core_rwlock_write_unlock(&edf_channel_index_table->rwlock);
313-
(void)enif_tsd_set(edf_channel_index_table->key, (void *)slot);
314-
return slot;
315-
}
316-
return slot;
317-
}
318-
319-
void
320-
edf_channel_stats_init_empty(edf_channel_stats_t *stats)
283+
ERL_NIF_TERM
284+
edf_channel_router_name(ErlNifEnv *env, ERL_NIF_TERM sysname)
321285
{
322-
stats->packet_count = 0;
323-
stats->emit_count = 0;
324-
stats->drop_count = 0;
325-
stats->dist_header_count = 0;
326-
stats->dist_frag_header_count = 0;
327-
stats->dist_frag_cont_count = 0;
328-
stats->dist_pass_through_count = 0;
329-
stats->atom_cache_read_count = 0;
330-
stats->atom_cache_write_count = 0;
331-
stats->atom_cache_overwrite_count = 0;
332-
stats->rewrite_fragment_header_count = 0;
333-
stats->rollback_atom_cache_count = 0;
334-
stats->compact_external_count = 0;
335-
stats->compact_fragment_count = 0;
336-
stats->control_has_export_ext = 0;
337-
stats->control_has_new_fun_ext = 0;
338-
stats->payload_has_export_ext = 0;
339-
stats->payload_has_new_fun_ext = 0;
340-
(void)edf_channel_stats_dop_init_empty(&stats->dop_link);
341-
(void)edf_channel_stats_dop_init_empty(&stats->dop_send);
342-
(void)edf_channel_stats_dop_init_empty(&stats->dop_exit);
343-
(void)edf_channel_stats_dop_init_empty(&stats->dop_unlink);
344-
(void)edf_channel_stats_dop_init_empty(&stats->dop_reg_send);
345-
(void)edf_channel_stats_dop_init_empty(&stats->dop_group_leader);
346-
(void)edf_channel_stats_dop_init_empty(&stats->dop_exit2);
347-
(void)edf_channel_stats_dop_init_empty(&stats->dop_send_tt);
348-
(void)edf_channel_stats_dop_init_empty(&stats->dop_exit_tt);
349-
(void)edf_channel_stats_dop_init_empty(&stats->dop_reg_send_tt);
350-
(void)edf_channel_stats_dop_init_empty(&stats->dop_exit2_tt);
351-
(void)edf_channel_stats_dop_init_empty(&stats->dop_monitor_p);
352-
(void)edf_channel_stats_dop_init_empty(&stats->dop_demonitor_p);
353-
(void)edf_channel_stats_dop_init_empty(&stats->dop_monitor_p_exit);
354-
(void)edf_channel_stats_dop_init_empty(&stats->dop_send_sender);
355-
(void)edf_channel_stats_dop_init_empty(&stats->dop_send_sender_tt);
356-
(void)edf_channel_stats_dop_init_empty(&stats->dop_payload_exit);
357-
(void)edf_channel_stats_dop_init_empty(&stats->dop_payload_exit_tt);
358-
(void)edf_channel_stats_dop_init_empty(&stats->dop_payload_exit2);
359-
(void)edf_channel_stats_dop_init_empty(&stats->dop_payload_exit2_tt);
360-
(void)edf_channel_stats_dop_init_empty(&stats->dop_payload_monitor_p_exit);
361-
(void)edf_channel_stats_dop_init_empty(&stats->dop_spawn_request);
362-
(void)edf_channel_stats_dop_init_empty(&stats->dop_spawn_request_tt);
363-
(void)edf_channel_stats_dop_init_empty(&stats->dop_spawn_reply);
364-
(void)edf_channel_stats_dop_init_empty(&stats->dop_spawn_reply_tt);
365-
(void)edf_channel_stats_dop_init_empty(&stats->dop_alias_send);
366-
(void)edf_channel_stats_dop_init_empty(&stats->dop_alias_send_tt);
367-
(void)edf_channel_stats_dop_init_empty(&stats->dop_unlink_id);
368-
(void)edf_channel_stats_dop_init_empty(&stats->dop_unlink_id_ack);
369-
return;
370-
}
286+
ErlNifUInt64 router_number;
371287

372-
inline void
373-
edf_channel_stats_dop_init_empty(edf_channel_stats_dop_t *dop)
374-
{
375-
dop->seen = 0;
376-
dop->emit = 0;
377-
dop->drop = 0;
378-
return;
288+
router_number = enif_hash(ERL_NIF_INTERNAL_HASH, sysname, 0);
289+
router_number %= erldist_filter_router_count;
290+
return erldist_filter_router_names[router_number];
379291
}

apps/erldist_filter/c_src/nif/channel/edf_channel.h

Lines changed: 8 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -24,93 +24,24 @@ extern "C" {
2424
#include "edf_atom_cache.h"
2525
#include "edf_external.h"
2626
#include "edf_external_sequence.h"
27+
#include "edf_channel_stats.h"
28+
#include "../world/edf_world.h"
2729

2830
/* Macro Definitions */
2931

30-
#define CHANNEL_RX_STATS_COUNT(channel, field, inc) \
32+
#define CHANNEL_RX_STATS_COUNT(channelp, field, inc) \
3133
do { \
32-
(channel)->rx.stats.field += inc; \
33-
(edf_channel_index_get())->rx_stats.field += inc; \
34+
(channelp)->rx.stats.field += inc; \
35+
(edf_world_get())->stats.channel.rx_stats.field += inc; \
3436
} while (0)
3537

3638
/* Type Definitions */
3739

3840
typedef struct edf_channel_s edf_channel_t;
39-
typedef struct edf_channel_stats_s edf_channel_stats_t;
40-
typedef struct edf_channel_stats_dop_s edf_channel_stats_dop_t;
41-
typedef struct edf_channel_index_table_s edf_channel_index_table_t;
42-
typedef struct edf_channel_index_slot_s edf_channel_index_slot_t;
4341
typedef enum edf_channel_rx_state_t edf_channel_rx_state_t;
4442
typedef struct edf_channel_resource_s edf_channel_resource_t;
4543
typedef struct edf_channel_resource_table_s edf_channel_resource_table_t;
4644

47-
struct edf_channel_stats_dop_s {
48-
uint64_t seen;
49-
uint64_t emit;
50-
uint64_t drop;
51-
};
52-
53-
struct edf_channel_stats_s {
54-
uint64_t packet_count;
55-
uint64_t emit_count;
56-
uint64_t drop_count;
57-
uint64_t dist_header_count;
58-
uint64_t dist_frag_header_count;
59-
uint64_t dist_frag_cont_count;
60-
uint64_t dist_pass_through_count;
61-
uint64_t atom_cache_read_count;
62-
uint64_t atom_cache_write_count;
63-
uint64_t atom_cache_overwrite_count;
64-
uint64_t rewrite_fragment_header_count;
65-
uint64_t rollback_atom_cache_count;
66-
uint64_t compact_external_count;
67-
uint64_t compact_fragment_count;
68-
uint64_t control_has_export_ext;
69-
uint64_t control_has_new_fun_ext;
70-
uint64_t payload_has_export_ext;
71-
uint64_t payload_has_new_fun_ext;
72-
edf_channel_stats_dop_t dop_link;
73-
edf_channel_stats_dop_t dop_send;
74-
edf_channel_stats_dop_t dop_exit;
75-
edf_channel_stats_dop_t dop_unlink;
76-
edf_channel_stats_dop_t dop_reg_send;
77-
edf_channel_stats_dop_t dop_group_leader;
78-
edf_channel_stats_dop_t dop_exit2;
79-
edf_channel_stats_dop_t dop_send_tt;
80-
edf_channel_stats_dop_t dop_exit_tt;
81-
edf_channel_stats_dop_t dop_reg_send_tt;
82-
edf_channel_stats_dop_t dop_exit2_tt;
83-
edf_channel_stats_dop_t dop_monitor_p;
84-
edf_channel_stats_dop_t dop_demonitor_p;
85-
edf_channel_stats_dop_t dop_monitor_p_exit;
86-
edf_channel_stats_dop_t dop_send_sender;
87-
edf_channel_stats_dop_t dop_send_sender_tt;
88-
edf_channel_stats_dop_t dop_payload_exit;
89-
edf_channel_stats_dop_t dop_payload_exit_tt;
90-
edf_channel_stats_dop_t dop_payload_exit2;
91-
edf_channel_stats_dop_t dop_payload_exit2_tt;
92-
edf_channel_stats_dop_t dop_payload_monitor_p_exit;
93-
edf_channel_stats_dop_t dop_spawn_request;
94-
edf_channel_stats_dop_t dop_spawn_request_tt;
95-
edf_channel_stats_dop_t dop_spawn_reply;
96-
edf_channel_stats_dop_t dop_spawn_reply_tt;
97-
edf_channel_stats_dop_t dop_alias_send;
98-
edf_channel_stats_dop_t dop_alias_send_tt;
99-
edf_channel_stats_dop_t dop_unlink_id;
100-
edf_channel_stats_dop_t dop_unlink_id_ack;
101-
};
102-
103-
struct edf_channel_index_slot_s {
104-
linklist_t _link;
105-
edf_channel_stats_t rx_stats;
106-
};
107-
108-
struct edf_channel_index_table_s {
109-
linklist_t _link;
110-
ErlNifTSDKey key;
111-
core_rwlock_t rwlock;
112-
};
113-
11445
enum edf_channel_rx_state_t {
11546
EDF_CHANNEL_RX_STATE_PACKET_HEADER = 0,
11647
EDF_CHANNEL_RX_STATE_PACKET_DATA,
@@ -135,6 +66,8 @@ struct edf_channel_s {
13566
uint32_t connection_id;
13667
uint64_t dflags;
13768
struct {
69+
ERL_NIF_TERM router_name;
70+
uint64_t sort;
13871
size_t packet_size;
13972
edf_channel_rx_state_t state;
14073
ioq_t ioq;
@@ -160,7 +93,6 @@ struct edf_channel_resource_table_s {
16093

16194
extern ErlNifResourceType *edf_channel_resource_type;
16295
extern edf_channel_resource_table_t *edf_channel_resource_table;
163-
extern edf_channel_index_table_t *edf_channel_index_table;
16496

16597
/* Function Declarations */
16698

@@ -170,9 +102,8 @@ extern ERL_NIF_TERM edf_channel_resource_open(ErlNifEnv *env, size_t packet_size
170102
uint32_t connection_id, uint64_t dflags, edf_channel_resource_t **resourcep,
171103
edf_channel_t **channelp);
172104
extern void edf_channel_destroy(ErlNifEnv *env, edf_channel_resource_t *resource, edf_channel_t *channel);
173-
static edf_channel_index_slot_t *edf_channel_index_get(void);
174-
extern edf_channel_index_slot_t *edf_channel_index_get_slow(void);
175105
static int edf_channel_is_tracing_enabled(const edf_channel_t *channel);
106+
extern ERL_NIF_TERM edf_channel_router_name(ErlNifEnv *env, ERL_NIF_TERM sysname);
176107
static int edf_channel_tracing_send(edf_channel_t *channel, ErlNifEnv *caller_env, ErlNifEnv *msg_env, ERL_NIF_TERM msg);
177108

178109
#ifdef ERLDIST_FILTER_NIF_INTERNAL_API
@@ -187,16 +118,6 @@ static void edf_channel_resource_release(edf_channel_resource_t **resourcep, edf
187118

188119
/* Inline Function Definitions */
189120

190-
inline edf_channel_index_slot_t *
191-
edf_channel_index_get(void)
192-
{
193-
edf_channel_index_slot_t *slot = (void *)enif_tsd_get(edf_channel_index_table->key);
194-
if (slot == NULL) {
195-
return edf_channel_index_get_slow();
196-
}
197-
return slot;
198-
}
199-
200121
inline int
201122
edf_channel_is_tracing_enabled(const edf_channel_t *channel)
202123
{

apps/erldist_filter/c_src/nif/channel/edf_channel_impl.c

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -92,56 +92,6 @@ erldist_filter_nif_channel_close_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM
9292
return out_term;
9393
}
9494

95-
ERL_NIF_TERM
96-
erldist_filter_nif_channel_index_get_0(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
97-
{
98-
#define RET_MAP_SIZE (2)
99-
100-
ERL_NIF_TERM out_term;
101-
ERL_NIF_TERM keys[RET_MAP_SIZE];
102-
ERL_NIF_TERM vals[RET_MAP_SIZE];
103-
size_t k = 0;
104-
size_t v = 0;
105-
ErlNifUInt64 slots;
106-
edf_channel_stats_t acc[1];
107-
edf_channel_index_slot_t *root = NULL;
108-
edf_channel_index_slot_t *slot = NULL;
109-
110-
if (argc != 0) {
111-
return EXCP_BADARG(env, "argc must be 0");
112-
}
113-
114-
(void)memset((void *)acc, 0, sizeof(edf_channel_stats_t));
115-
116-
slots = 0;
117-
(void)core_rwlock_read_lock(&edf_channel_index_table->rwlock);
118-
root = (void *)&edf_channel_index_table->_link;
119-
slot = (void *)root->_link.next;
120-
while (slot != root) {
121-
(void)core_simd_add_vec_u64((uint64_t *)acc, (const uint64_t *)&slot->rx_stats,
122-
(sizeof(edf_channel_stats_t) / sizeof(uint64_t)));
123-
slots += 1;
124-
slot = (void *)slot->_link.next;
125-
}
126-
(void)core_rwlock_read_unlock(&edf_channel_index_table->rwlock);
127-
keys[k++] = ATOM(rx_stats);
128-
if (!edf_channel_inspect_stats(env, acc, &out_term)) {
129-
return out_term;
130-
}
131-
vals[v++] = out_term;
132-
133-
keys[k++] = ATOM(slots);
134-
vals[v++] = enif_make_uint64(env, slots);
135-
136-
if (!enif_make_map_from_arrays(env, keys, vals, RET_MAP_SIZE, &out_term)) {
137-
return EXCP_BADARG(env, "Call to enif_make_map_from_arrays() failed: duplicate keys detected");
138-
}
139-
140-
return out_term;
141-
142-
#undef RET_MAP_SIZE
143-
}
144-
14595
ERL_NIF_TERM
14696
erldist_filter_nif_channel_inspect_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
14797
{

apps/erldist_filter/c_src/nif/channel/edf_channel_impl.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ extern "C" {
2525

2626
extern ERL_NIF_TERM erldist_filter_nif_channel_open_5(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
2727
extern ERL_NIF_TERM erldist_filter_nif_channel_close_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
28-
extern ERL_NIF_TERM erldist_filter_nif_channel_index_get_0(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
2928
extern ERL_NIF_TERM erldist_filter_nif_channel_inspect_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
3029
extern ERL_NIF_TERM erldist_filter_nif_channel_list_0(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
3130
extern ERL_NIF_TERM erldist_filter_nif_channel_list_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

0 commit comments

Comments
 (0)