Skip to content

Add a coordinator threads framework#5749

Open
ndptech wants to merge 11 commits intoFreeRADIUS:masterfrom
ndptech:v4_coordinator_threads
Open

Add a coordinator threads framework#5749
ndptech wants to merge 11 commits intoFreeRADIUS:masterfrom
ndptech:v4_coordinator_threads

Conversation

@ndptech
Copy link
Member

@ndptech ndptech commented Feb 20, 2026

Add the ability for modules to register coordinators and the associated callbacks to run when messages are passed between workers and coordinators.

In addition to raw data messages, coord_pair adds the ability to use pair lists as the data sent, encoded with the internal encoder.

@arr2036
Copy link
Member

arr2036 commented Feb 23, 2026

@claude review for all issue types, thinking extra hard about concurrency issues

@arr2036
Copy link
Member

arr2036 commented Feb 24, 2026

...and the action is broken. That's great.

@FreeRADIUS FreeRADIUS deleted a comment from claude bot Feb 24, 2026
@arr2036
Copy link
Member

arr2036 commented Feb 26, 2026

Manually triggered review

  Bugs — High Severity
                                                                                                                                                              
  1. sizeof(fr_coord_t) instead of sizeof(fr_coord_data_t) in fr_message_set_create — coord.c (diff ~1129)

  coord->ms[i] = fr_message_set_create(coord, FR_CONTROL_MAX_MESSAGES, sizeof(fr_coord_t),
                                       coord_reg->outbound_rb_size, true);

  The message_size parameter is documented as "the size of each message, INCLUDING fr_message_t" (message.c:121). The existing codebase consistently passes
  the concrete message struct: sizeof(fr_channel_data_t) in both worker.c:309 and network.c:1430-1431.

  Here, the messages are fr_coord_data_t (embeds fr_message_t m + uint32_t coord_cb_id), but the code passes sizeof(fr_coord_t) — the entire coordinator
  struct with pointers to event lists, rbtrees, heaps, slab allocators, etc. message_size is capped at 1024 (message.c:148), so if sizeof(fr_coord_t) > 1024,
  fr_message_set_create returns NULL and the coordinator fails to start. If it's under 1024, it massively over-allocates each message slot. The
  worker->coordinator direction at diff ~1497 correctly uses sizeof(fr_coord_data_t).

  Fix: sizeof(fr_coord_t) -> sizeof(fr_coord_data_t)

  2. Off-by-one in callback bounds check — coord.c (diff ~807)

  if (cd->coord_cb_id > coord->num_callbacks) {

  Should be >=. If coord_cb_id == num_callbacks, this indexes one past the end of the array. The worker-side check at diff ~834 correctly uses >=:

  if (cd->coord_cb_id >= cw->num_callbacks) {

  3. coord_pair_reg->cb_id is never set — coord_pair.c / coord_pair.h

  The fr_coord_pair_reg_t struct has a cb_id field (diff ~1742), used in fr_coord_to_worker_reply_send (diff ~1867) and fr_worker_to_coord_pair_send (diff
  ~1893):

  ret = fr_coord_to_worker_send(packet_ctx->coord, worker_id, coord_pair_reg->cb_id, &dbuff);

  But fr_coord_pair_register never sets coord_pair_reg->cb_id — it's zero-initialized via the compound literal. The FR_COORD_PAIR_CB_CTX_SET macro (diff
  ~1977) takes _id but only sets uctx, not cb_id. So coord_pair_reg->cb_id is always 0, meaning pair-list replies will always be dispatched to callback ID 0
  on the receiver side regardless of intent.

  Fix: Add coord_pair_reg->cb_id = reg_ctx->cb_id; in fr_coord_pair_register.

  4. Missing mmap() error check — semaphore.c:33-38

  sem = mmap(NULL, sizeof(sem_t), PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0);

  if (sem_init(sem, 0, SEMAPHORE_LOCKED) != 0) {

  mmap() returns MAP_FAILED ((void *)-1) on failure, but the return is never checked. If mmap fails, sem_init is called on (void *)-1 — SIGSEGV.

  ---
  Bugs — Medium Severity

  5. fr_control_open frees caller's object on error — control.c (PR diff ~141-144)

  int fr_control_open(fr_control_t *c) {
      if (pipe(c->pipe) < 0) {
          talloc_free(c);     // <-- Frees the caller's object!

  This was carried over from when the code was inside fr_control_create() (a constructor, where freeing on failure is idiomatic). Now that fr_control_open is
  a separate post-creation call, the caller still holds a pointer and all call sites do goto fail -> talloc_free(parent) which would cascade-free c again. The
   talloc_free(c) calls in fr_control_open should be removed — the caller owns cleanup.

  Same issue on the fr_event_fd_insert failure path at diff ~156.

  6. fr_message_alloc return value unchecked — coord.c (diff ~1535-1537, ~1569-1571)

  fr_coord_data_t *cd = NULL;
  cd = (fr_coord_data_t *)fr_message_alloc(coord->ms[worker_id], (fr_message_t *)cd, fr_dbuff_used(dbuff));
  memcpy(cd->m.data, fr_dbuff_buff(dbuff), fr_dbuff_used(dbuff));  // NULL deref if alloc failed

  If the message ring buffer is exhausted, fr_message_alloc returns NULL. The subsequent memcpy segfaults. Both fr_coord_to_worker_send and
  fr_worker_to_coord_send have this.

  7. fr_atomic_queue_push return value unchecked — coord.c (diff ~1539, ~1573)

  fr_atomic_queue_push(coord->worker_data_aq[worker_id], cd);
  // ... then sends control signal anyway

  fr_atomic_queue_push returns false when the queue is full. If it fails, the data message is lost but the control signal is still sent. The receiver gets the
   pipe notification, calls fr_atomic_queue_pop, gets nothing, and silently discards. The sender returns success. Silent data loss in both
  fr_coord_to_worker_send and fr_worker_to_coord_send.

  8. fr_control_callback_add return unchecked in fr_coord_attach — coord.c (diff ~1506-1507)

  fr_control_callback_add(&cw->control, FR_CONTROL_ID_COORD_WORKER_ACK, cw, coordinate_worker_ack);
  fr_control_callback_add(&cw->control, FR_CONTROL_ID_COORD_CALLBACK, cw, coord_worker_recv_message);

  Neither call checks the return value. If either fails, the worker has a broken control plane but proceeds to attach to the coordinator.

  9. fr_control_open return unchecked in fr_coord_attach — coord.c (diff ~1509)

  fr_control_open(cw->control);  // No error check

  If fr_control_open fails, the worker proceeds with a non-functional control plane.

  ---
  Concurrency — Medium Severity

  10. Multiple workers writing to same coordinator pipe — coord.c (diff ~1573-1576)

  Workers call fr_control_message_send(cw->coord->control, cw->rb, ...) from different threads. Each worker has its own cw->rb ring buffer, and the atomic
  queue is MPMC-safe. However, fr_control_message_send at control.c:354 writes to c->pipe[1]:

  if (write(c->pipe[1], ".", 1) >= 0) return 0;

  Multiple workers write to the same cw->coord->control->pipe[1] concurrently. POSIX guarantees atomicity for write() of <= PIPE_BUF bytes (1 byte here), so
  this is technically safe. But fr_control_message_send has a documented contract at control.c:329: "This function is called ONLY from the originating
  thread." This is a novel usage that violates the documented assumption.

  Deserves at minimum a comment on cw->coord->control documenting that it's intentionally shared across worker threads, and explaining why it's safe (1-byte
  writes, per-worker ring buffers, MPMC atomic queue).

  ---
  Bugs — Low Severity

  11. UNUSED annotation on parameter that IS used — coord_pair.c (diff ~1810)

  void coord_recv_pair_data(fr_coord_t *coord, uint32_t worker_id, fr_dbuff_t *dbuff,
                            UNUSED fr_time_t now, void *uctx)
  {
      ...
      coord_request_bootstrap(coord, worker_id, dbuff, now, coord_pair_reg);
                                                       ^^^

  UNUSED is __attribute__((unused)) so this compiles fine, but it's misleading. Remove the UNUSED.

  12. Comma operator instead of semicolon — coord.c (diff ~1517)

  fr_control_message_send(cw->coord->control, cw->rb, FR_CONTROL_ID_COORD_WORKER_ATTACH,
                          msg, sizeof(fr_coord_worker_msg_t)),
  talloc_free(msg);

  The comma silently discards the return value. Should be a semicolon, and ideally the return value checked.

  13. Typo — coord.c (diff ~805)

  "Recevied" -> "Received"

  ---
  Design / Fragility — Low Severity

  14. pshared=0 vs stated inter-process goal — semaphore.c:39

  sem = mmap(NULL, sizeof(sem_t), PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0);
  sem_init(sem, 0, SEMAPHORE_LOCKED);  // pshared=0

  The commit message says "In preparation for using separate processes in place of threads", but pshared=0 only supports thread sharing per POSIX. For
  inter-process sharing, pshared must be non-zero. Works now (threads only) but won't work for the stated future goal.

  15. Any worker detach unconditionally sets coord->exiting = true — coord.c (diff ~886)

  When any worker detaches, exiting is set and the coordinator loop will exit once num_workers == 0. No mechanism for graceful worker replacement or partial
  detach.

  16. Stack-allocated search key with garbage fields — coord.c (diff ~1487)

  cw->coord = fr_rb_find(&coords, &(fr_coord_t){ .coord_reg = coord_reg });

  Creates a stack-allocated fr_coord_t with only coord_reg initialized. coord_cmp only accesses coord_reg, so this works. But if coord_cmp ever changes to
  compare additional fields, this reads uninitialized memory. Consider using a designated initializer that zeros the struct: &(fr_coord_t){ 0, .coord_reg =
  coord_reg } or a comment.

  ---
  Style — Low Severity

  17. Indentation error — control.c (diff ~211)

          memset(...);

                 ctrl->num_callbacks = id + 1;  // over-indented

  18. Brace style inconsistency — control.c (diff ~141)

  int fr_control_open(fr_control_t *c) {

  Rest of codebase uses next-line braces for function definitions.

  ---
  Summary

  ┌─────┬──────────────────────────────────────────┬─────────────┬──────────┐
  │  #  │                  Issue                   │    Type     │ Severity │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 1   │ sizeof(fr_coord_t) in message_set_create │ Bug         │ High     │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 2   │ Off-by-one > vs >= in callback check     │ Bug         │ High     │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 3   │ coord_pair_reg->cb_id never set          │ Bug         │ High     │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 4   │ Missing mmap() error check               │ Bug         │ High     │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 5   │ fr_control_open frees caller's object    │ Bug         │ Medium   │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 6   │ fr_message_alloc return unchecked        │ Bug         │ Medium   │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 7   │ fr_atomic_queue_push return unchecked    │ Concurrency │ Medium   │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 8   │ fr_control_callback_add return unchecked │ Bug         │ Medium   │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 9   │ fr_control_open return unchecked         │ Bug         │ Medium   │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 10  │ Multi-thread pipe writes undocumented    │ Concurrency │ Medium   │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 11  │ UNUSED on used parameter                 │ Bug         │ Low      │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 12  │ Comma instead of semicolon               │ Bug         │ Low      │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 13  │ Typo "Recevied"                          │ Style       │ Low      │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 14  │ pshared=0 vs inter-process intent        │ Design      │ Low      │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 15  │ Detach unconditionally sets exiting      │ Design      │ Low      │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 16  │ Stack search key with garbage fields     │ Fragility   │ Low      │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 17  │ Over-indentation                         │ Style       │ Low      │
  ├─────┼──────────────────────────────────────────┼─────────────┼──────────┤
  │ 18  │ Brace style inconsistency                │ Style       │ Low      │
  └─────┴──────────────────────────────────────────┴─────────────┴──────────┘

@ndptech ndptech force-pushed the v4_coordinator_threads branch 4 times, most recently from 517dcf4 to a54820e Compare February 27, 2026 09:46
@ndptech ndptech force-pushed the v4_coordinator_threads branch 3 times, most recently from 320100b to 069a454 Compare March 11, 2026 16:16
@ndptech ndptech force-pushed the v4_coordinator_threads branch from 069a454 to 6f5a9de Compare March 12, 2026 09:10
ndptech added 5 commits March 12, 2026 09:27
Allows callbacks to register an instance create function to create instance data and a set of callbacks to run as part of the event loop.
On the coordinator side the pair list is then converted to a request and run through an interpreter.
Control planes can be multi-producer, single consumer - so mulitple
threads can send / push a message.
@ndptech ndptech force-pushed the v4_coordinator_threads branch from 6f5a9de to 6cb05dd Compare March 12, 2026 09:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants