diff --git a/doc/userguide/upgrade.rst b/doc/userguide/upgrade.rst index 436151fe6aa4..2e915dd9762d 100644 --- a/doc/userguide/upgrade.rst +++ b/doc/userguide/upgrade.rst @@ -77,6 +77,11 @@ Other Changes really enforced and there will be no hassh computation even if rules try to use it. +- In DPDK capture mode, management threads need to be explicitly defined and + they cannot overlap with worker CPU affinity. Previously, Suricata only + warned if overlap was present. +- DPDK EAL lcore arguments cannot be used. Suricata now manages the + lcore assignments itself based on the CPU affinity configuration. Upgrading to 8.0.1 ------------------ diff --git a/src/Makefile.am b/src/Makefile.am index 052d0370fb24..d4d8ba77525c 100755 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -503,6 +503,7 @@ noinst_HEADERS = \ util-dpdk-ixgbe.h \ util-dpdk-mlx5.h \ util-dpdk-rss.h \ + util-dpdk-threading.h \ util-dpdk.h \ util-ebpf.h \ util-enum.h \ @@ -1082,6 +1083,7 @@ libsuricata_c_a_SOURCES = \ util-dpdk-ixgbe.c \ util-dpdk-mlx5.c \ util-dpdk-rss.c \ + util-dpdk-threading.c \ util-dpdk.c \ util-ebpf.c \ util-enum.c \ diff --git a/src/runmode-dpdk.c b/src/runmode-dpdk.c index ef0c38679817..1e5c1639ab23 100644 --- a/src/runmode-dpdk.c +++ b/src/runmode-dpdk.c @@ -74,8 +74,15 @@ static void ArgumentsInit(struct Arguments *args, uint16_t capacity); static void ArgumentsCleanup(struct Arguments *args); static void ArgumentsAdd(struct Arguments *args, char *value); static void ArgumentsAddOptionAndArgument(struct Arguments *args, const char *opt, const char *arg); +static void ArgumentsAddLcoreArguments(struct Arguments *args); +static void ArgumentsEnsureEalOptionAllowed(const char *opt); static void InitEal(void); +static uint16_t CountDigits(uint32_t value); +static char *ConfigLcoreArgValGet(void); +static int ConfigLcoreWorkersSet(uint32_t *cpus, size_t cap); +static uint32_t ConfigLcoreMainGet(void); + static void ConfigSetIface(DPDKIfaceConfig *iconf, const char *entry_str); static int ConfigSetThreads(DPDKIfaceConfig *iconf, const char *entry_str); static int ConfigSetRxQueues(DPDKIfaceConfig *iconf, uint16_t nb_queues, uint16_t max_queues); @@ -180,6 +187,21 @@ static uint64_t GreatestPowOf2UpTo(uint64_t num) return num; } +/** + * \brief Counts number of decimal digits in a given value + * \param value Value to count digits for + * \return Number of decimal digits in value (e.g. 123 -> 3, 0 -> 1) + */ +static uint16_t CountDigits(uint32_t value) +{ + uint16_t digits = 1; + while (value >= 10U) { + value /= 10U; + digits++; + } + return digits; +} + static char *AllocArgument(size_t arg_len) { SCEnter(); @@ -286,6 +308,181 @@ static void ArgumentsAddOptionAndArgument(struct Arguments *args, const char *op SCReturn; } +static void ConfigLcoreManagementCpuSetGet(cpu_set_t *management_set) +{ + if (management_set == NULL) { + FatalError("Invalid buffer passed for management CPU set"); + } + + CPU_ZERO(management_set); + + ThreadsAffinityType *taf = GetAffinityTypeForNameAndIface("management-cpu-set", NULL); + if (taf == NULL || CPU_COUNT(&taf->cpu_set) == 0) { + FatalError("Unable to obtain CPU affinity for \"management-cpu-set\""); + } + + CPU_OR(management_set, management_set, &taf->cpu_set); +} + +static uint32_t ConfigLcoreMainGet(void) +{ + cpu_set_t management_set; + ConfigLcoreManagementCpuSetGet(&management_set); + + const uint32_t max_cpu = MIN((uint32_t)RTE_MAX_LCORE, (uint32_t)CPU_SETSIZE); + for (uint32_t cpu = 0; cpu < max_cpu; cpu++) { + if (CPU_ISSET(cpu, &management_set)) { + return cpu; + } + } + + FatalError("Unable to obtain CPU affinity for \"management-cpu-set\""); +} + +static void ConfigLcoreWorkersCpuSetMerge(cpu_set_t *worker_set) +{ + if (worker_set == NULL) { + FatalError("Invalid buffer passed for worker CPU set"); + } + + CPU_ZERO(worker_set); + + bool fallback_to_worker_affinity = false; + LiveDevice *ldev = NULL, *ndev = NULL; + while (LiveDeviceForEach(&ldev, &ndev)) { + ThreadsAffinityType *taf = GetAffinityTypeForNameAndIface("worker-cpu-set", ldev->dev); + if (taf == NULL) { + fallback_to_worker_affinity = true; + continue; + } + + CPU_OR(worker_set, worker_set, &taf->cpu_set); + } + + if (fallback_to_worker_affinity) { + ThreadsAffinityType *taf = GetAffinityTypeForNameAndIface("worker-cpu-set", NULL); + if (taf == NULL) { + FatalError("Unable to obtain CPU affinity for \"worker-cpu-set\""); + } + + CPU_OR(worker_set, worker_set, &taf->cpu_set); + } +} + +static int ConfigLcoreWorkersSet(uint32_t *cpus, size_t cap) +{ + SCEnter(); + if (cpus == NULL || cap == 0) { + FatalError("Invalid buffer passed for worker CPU set"); + } + + cpu_set_t worker_set; + ConfigLcoreWorkersCpuSetMerge(&worker_set); + + size_t wrk_cpus = CPU_COUNT(&worker_set); + if (wrk_cpus > cap) { + FatalError( + "Too many worker CPU cores configured (%zu), max supported is %zu", wrk_cpus, cap); + } + + size_t count = 0; + const uint32_t max_cpu = MIN((uint32_t)RTE_MAX_LCORE, (uint32_t)CPU_SETSIZE); + for (uint32_t cpu = 0; cpu < max_cpu; cpu++) { + if (!CPU_ISSET(cpu, &worker_set)) { + continue; + } + cpus[count++] = cpu; + } + + if (count == 0) { + FatalError("No worker CPUs detected"); + } + + SCReturnInt((int)count); +} + +static char *ConfigLcoreArgValGet(void) +{ + SCEnter(); + uint32_t lcore_list[RTE_MAX_LCORE + 1]; + size_t lcore_cnt = ConfigLcoreWorkersSet(lcore_list, ARRAY_SIZE(lcore_list) - 1); + uint32_t main_lcore = ConfigLcoreMainGet(); + lcore_list[lcore_cnt++] = main_lcore; + + size_t required_len = 0; + for (size_t i = 0; i < lcore_cnt; i++) { + required_len += CountDigits(lcore_list[i]); + } + required_len += (lcore_cnt - 1); // commas + + char *lcore_arg = AllocArgument(required_len); + size_t offset = 0; + for (size_t i = 0; i < lcore_cnt; i++) { + size_t remaining = required_len + 1 - offset; + int ret = snprintf(&lcore_arg[offset], remaining, "%u", lcore_list[i]); + if (ret < 0 || (size_t)ret >= remaining) { + FatalError("Conversion of CPU affinity to lcore argument failed"); + } + offset += (size_t)ret; + if (i + 1 < lcore_cnt) { + lcore_arg[offset++] = ','; + } + } + + lcore_arg[offset] = '\0'; + + SCReturnCharPtr(lcore_arg); +} + +static void ArgumentsAddLcoreArguments(struct Arguments *args) +{ + SCEnter(); + ArgumentsAdd(args, AllocAndSetArgument("-l")); + char *lcore_arg = ConfigLcoreArgValGet(); + ArgumentsAdd(args, lcore_arg); + + ArgumentsAdd(args, AllocAndSetArgument("--main-lcore")); + uint32_t main_lcore = ConfigLcoreMainGet(); + uint16_t digits = CountDigits(main_lcore); + char *main_lcore_arg = AllocArgument(digits); + int ret = snprintf(main_lcore_arg, digits + 1, "%u", main_lcore); + if (ret < 0 || ret > digits) { + FatalError("Conversion of management affinity to main lcore argument failed"); + } + ArgumentsAdd(args, main_lcore_arg); + + SCReturn; +} + +static void ArgumentsEnsureEalOptionAllowed(const char *opt) +{ + if (opt == NULL) { + return; + } + + const char *sanitized = opt; + while (*sanitized == '-') { + sanitized++; + } + if (*sanitized == '\0') { + return; + } + + static const char *const forbidden_opts[] = { + "l", + "lcores", + "c", + "main-lcore", + "master-lcore", + }; + + for (size_t i = 0; i < ARRAY_SIZE(forbidden_opts); i++) { + if (strcasecmp(sanitized, forbidden_opts[i]) == 0) { + FatalError("DPDK EAL option \"%s\" conflicts with Suricata CPU affinity settings", opt); + } + } +} + static void InitEal(void) { SCEnter(); @@ -301,8 +498,10 @@ static void InitEal(void) ArgumentsInit(&args, EAL_ARGS); ArgumentsAdd(&args, AllocAndSetArgument("suricata")); + ArgumentsAddLcoreArguments(&args); TAILQ_FOREACH (param, &eal_params->head, next) { + ArgumentsEnsureEalOptionAllowed(param->name); if (SCConfNodeIsSequence(param)) { const char *key = param->name; SCConfNode *val; @@ -414,8 +613,9 @@ static int ConfigSetThreads(DPDKIfaceConfig *iconf, const char *entry_str) SCLogError("No worker CPU cores with configured affinity were configured"); SCReturnInt(-EINVAL); } else if (UtilAffinityCpusOverlap(wtaf, mtaf) != 0) { - SCLogWarning("Worker threads should not overlap with management threads in the CPU core " - "affinity configuration"); + SCLogError("Worker threads cannot overlap with management threads in the CPU core " + "affinity configuration"); + SCReturnInt(-EINVAL); } const char *active_runmode = RunmodeGetActive(); diff --git a/src/source-dpdk.c b/src/source-dpdk.c index 3fd7bf7e07e0..c9c1d1843e46 100644 --- a/src/source-dpdk.c +++ b/src/source-dpdk.c @@ -42,6 +42,7 @@ #include "tmqh-packetpool.h" #include "util-privs.h" #include "util-device-private.h" +#include "util-dpdk-threading.h" #include "action-globals.h" #ifndef HAVE_DPDK @@ -259,6 +260,8 @@ void TmModuleReceiveDPDKRegister(void) tmm_modules[TMM_RECEIVEDPDK].PktAcqBreakLoop = NULL; tmm_modules[TMM_RECEIVEDPDK].ThreadExitPrintStats = ReceiveDPDKThreadExitStats; tmm_modules[TMM_RECEIVEDPDK].ThreadDeinit = ReceiveDPDKThreadDeinit; + tmm_modules[TMM_RECEIVEDPDK].ThreadSpawn = DpdkThreadSpawn; + tmm_modules[TMM_RECEIVEDPDK].ThreadJoin = DpdkThreadJoin; tmm_modules[TMM_RECEIVEDPDK].cap_flags = SC_CAP_NET_RAW; tmm_modules[TMM_RECEIVEDPDK].flags = TM_FLAG_RECEIVE_TM; } diff --git a/src/threadvars.h b/src/threadvars.h index b6d3aac9be43..ca53669b9781 100644 --- a/src/threadvars.h +++ b/src/threadvars.h @@ -57,11 +57,13 @@ struct TmSlot_; /** \brief Per thread variable structure */ typedef struct ThreadVars_ { - pthread_t t; + uint64_t thread_id; /** function pointer to the function that runs the packet pipeline for * this thread. It is passed directly to pthread_create(), hence the * void pointers in and out. */ void *(*tm_func)(void *); + void (*tm_spawn)(struct ThreadVars_ *); + void (*tm_join)(struct ThreadVars_ *); char name[16]; char *printable_name; diff --git a/src/tm-modules.h b/src/tm-modules.h index a557280d89f2..f94f4de9563b 100644 --- a/src/tm-modules.h +++ b/src/tm-modules.h @@ -47,6 +47,10 @@ typedef void (*ThreadExitPrintStatsFunc)(ThreadVars *, void *); typedef struct TmModule_ { const char *name; + /** thread management, if unspecified, falls back to pthreads */ + void (*ThreadSpawn)(ThreadVars *); + void (*ThreadJoin)(ThreadVars *); + /** thread handling */ TmEcode (*ThreadInit)(ThreadVars *, const void *, void **); void (*ThreadExitPrintStats)(ThreadVars *, void *); diff --git a/src/tm-threads.c b/src/tm-threads.c index 5d3d577b19e4..933d7343b411 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -320,7 +320,6 @@ static void *TmThreadsSlotPktAcqLoop(void *td) " tmqh_out=%p", s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out); TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit(NULL); return NULL; } @@ -349,11 +348,9 @@ static void *TmThreadsSlotPktAcqLoop(void *td) } SCLogDebug("%s ending", tv->name); - pthread_exit((void *) 0); return NULL; error: - pthread_exit(NULL); return NULL; } @@ -428,7 +425,6 @@ static void *TmThreadsSlotVar(void *td) /* check if we are setup properly */ if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) { TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit(NULL); return NULL; } @@ -454,7 +450,6 @@ static void *TmThreadsSlotVar(void *td) tv->flow_queue = FlowQueueNew(); if (tv->flow_queue == NULL) { TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit(NULL); return NULL; } /* setup a queue */ @@ -469,7 +464,6 @@ static void *TmThreadsSlotVar(void *td) tv->flow_queue = FlowQueueNew(); if (tv->flow_queue == NULL) { TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit(NULL); return NULL; } } @@ -526,12 +520,10 @@ static void *TmThreadsSlotVar(void *td) } StatsSyncCounters(&tv->stats); - pthread_exit(NULL); return NULL; error: tv->stream_pq = NULL; - pthread_exit(NULL); return NULL; } @@ -558,7 +550,6 @@ static void *TmThreadsManagement(void *td) r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data); if (r != TM_ECODE_OK) { TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit(NULL); return NULL; } (void)SC_ATOMIC_SET(s->slot_data, slot_data); @@ -589,13 +580,11 @@ static void *TmThreadsManagement(void *td) r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data)); if (r != TM_ECODE_OK) { TmThreadsSetFlag(tv, THV_CLOSED); - pthread_exit(NULL); return NULL; } } TmThreadsSetFlag(tv, THV_CLOSED); - pthread_exit((void *) 0); return NULL; } @@ -870,8 +859,11 @@ TmEcode TmThreadSetupOptions(ThreadVars *tv) } #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun - if (tv->thread_setup_flags & THREAD_SET_PRIORITY) + if (tv->thread_setup_flags & THREAD_SET_PRIORITY) { TmThreadSetPrio(tv); + SCLogPerf("Setting prio %d for thread \"%s\", thread id %lu", tv->thread_priority, tv->name, + SCGetThreadIdLong()); + } if (tv->thread_setup_flags & THREAD_SET_AFFTYPE) { ThreadsAffinityType *taf = &thread_affinity[tv->cpu_affinity]; bool use_iface_affinity = RunmodeIsAutofp() && tv->cpu_affinity == RECEIVE_CPU_SET && @@ -920,6 +912,58 @@ TmEcode TmThreadSetupOptions(ThreadVars *tv) return TM_ECODE_OK; } +static void TmThreadSpawnPthread(ThreadVars *tv) +{ + pthread_attr_t attr; + + /* Initialize and set thread detached attribute */ + pthread_attr_init(&attr); + + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + /* Adjust thread stack size if configured */ + if (threading_set_stack_size) { + SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size); + if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) { + FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes", + threading_set_stack_size); + } + } + + pthread_t *t = (pthread_t *)&tv->thread_id; + int rc = pthread_create(t, &attr, tv->tm_func, (void *)tv); + if (rc) { + FatalError("Unable to create thread %s with pthread_create(): retval %d: %s", tv->name, rc, + strerror(errno)); + } + +#if DEBUG && HAVE_PTHREAD_GETATTR_NP + if (threading_set_stack_size) { + if (pthread_getattr_np(*t, &attr) == 0) { + size_t stack_size; + void *stack_addr; + pthread_attr_getstack(&attr, &stack_addr, &stack_size); + SCLogDebug("stack: %p; size %" PRIu64, stack_addr, (uintmax_t)stack_size); + } else { + SCLogDebug("Unable to retrieve current stack-size for display; return code from " + "pthread_getattr_np() is %" PRId32, + rc); + } + } +#endif + + pthread_attr_destroy(&attr); +} + +static void TmThreadJoinPthread(ThreadVars *tv) +{ + /* Join the thread and flag as dead, unless the thread ID is 0 as + * its not a thread created by Suricata. */ + if (tv->thread_id != 0) { + pthread_join((pthread_t)tv->thread_id, NULL); + } +} + /** * \brief Creates and returns the TV instance for a new thread. * @@ -954,6 +998,9 @@ ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *i StatsThreadInit(&tv->stats); strlcpy(tv->name, name, sizeof(tv->name)); + /* default spawn and join functions */ + tv->tm_spawn = TmThreadSpawnPthread; + tv->tm_join = TmThreadJoinPthread; /* default state for every newly created thread */ TmThreadsSetFlag(tv, THV_PAUSE); @@ -1294,11 +1341,8 @@ static int TmThreadKillThread(ThreadVars *tv) } } - /* Join the thread and flag as dead, unless the thread ID is 0 as - * its not a thread created by Suricata. */ - if (tv->t) { - pthread_join(tv->t, NULL); - SCLogDebug("thread %s stopped", tv->name); + if (tv->tm_join != NULL) { + tv->tm_join(tv); } TmThreadsSetFlag(tv, THV_DEAD); return 1; @@ -1701,45 +1745,14 @@ void TmThreadClearThreadsFamily(int family) */ TmEcode TmThreadSpawn(ThreadVars *tv) { - pthread_attr_t attr; if (tv->tm_func == NULL) { FatalError("No thread function set"); } - /* Initialize and set thread detached attribute */ - pthread_attr_init(&attr); - - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - - /* Adjust thread stack size if configured */ - if (threading_set_stack_size) { - SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size); - if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) { - FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes", - threading_set_stack_size); - } + if (tv->tm_spawn == NULL) { + FatalError("No thread spawn function set"); } - - int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv); - if (rc) { - FatalError("Unable to create thread %s with pthread_create(): retval %d: %s", tv->name, rc, - strerror(errno)); - } - -#if DEBUG && HAVE_PTHREAD_GETATTR_NP - if (threading_set_stack_size) { - if (pthread_getattr_np(tv->t, &attr) == 0) { - size_t stack_size; - void *stack_addr; - pthread_attr_getstack(&attr, &stack_addr, &stack_size); - SCLogDebug("stack: %p; size %" PRIu64, stack_addr, (uintmax_t)stack_size); - } else { - SCLogDebug("Unable to retrieve current stack-size for display; return code from " - "pthread_getattr_np() is %" PRId32, - rc); - } - } -#endif + tv->tm_spawn(tv); TmThreadWaitForFlag(tv, THV_INIT_DONE | THV_RUNNING_DONE); diff --git a/src/util-dpdk-threading.c b/src/util-dpdk-threading.c new file mode 100644 index 000000000000..7c3fa3658ee8 --- /dev/null +++ b/src/util-dpdk-threading.c @@ -0,0 +1,123 @@ +/* Copyright (C) 2026 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Lukas Sismis + * + * DPDK threading utilities + */ + +#include "suricata-common.h" +#include "threadvars.h" +#include "tm-threads.h" +#include "util-affinity.h" +#include "util-dpdk-threading.h" +#include "util-debug.h" +#include "runmodes.h" +#include "util-dpdk-common.h" + +#ifdef HAVE_DPDK + +static bool stacksize_warn_once = false; + +/** + * \brief Wrapper function to convert ThreadVars thread function signature + * from void* (*)(void*) to int (*)(void*) for DPDK EAL threads. + */ +static int DpdkEalThreadWrapper(void *arg) +{ + ThreadVars *tv = (ThreadVars *)arg; + tv->tm_func(tv); + return 0; +} +#endif /* HAVE_DPDK */ + +void DpdkThreadSpawn(ThreadVars *tv) +{ +#ifdef HAVE_DPDK + if (threading_set_stack_size && SCConfGetNode("dpdk.eal-params.huge-worker-stack") == NULL) { + if (!stacksize_warn_once) { + stacksize_warn_once = true; + SCLogWarning("DPDK worker threads do not support Suricata-configured stack size. " + "Use additional DPDK EAL argument huge-worker-stack:[size in kB without a " + "unit] " + "to also set stack size for DPDK worker threads."); + } + } + if (!(tv->thread_setup_flags & THREAD_SET_AFFTYPE)) { + FatalError("%s: DPDK requires set threading affinity setting", tv->iface_name); + } + ThreadsAffinityType *taf = &thread_affinity[tv->cpu_affinity]; + if (!RunmodeIsWorkers() || !(tv->cpu_affinity == WORKER_CPU_SET)) { + FatalError("%s: DPDK EAL threads can only initialize worker threads", tv->iface_name); + } + + ThreadsAffinityType *if_taf = FindAffinityByInterface(taf, tv->iface_name); + if (if_taf) { + taf = if_taf; + } + + if (UtilAffinityGetAffinedCPUNum(taf) == 0) { + if (!taf->nocpu_warned) { + SCLogWarning("No CPU affinity set for %s", AffinityGetYamlPath(taf)); + taf->nocpu_warned = true; + } + } + + if (taf->mode_flag != EXCLUSIVE_AFFINITY) { + FatalError("%s: DPDK requires exclusive affinity setting", tv->iface_name); + } + + /* If CPU is in a set overwrite the default thread prio */ + if (CPU_ISSET(tv->thread_id, &taf->lowprio_cpu)) { + tv->thread_priority = PRIO_LOW; + } else if (CPU_ISSET(tv->thread_id, &taf->medprio_cpu)) { + tv->thread_priority = PRIO_MEDIUM; + } else if (CPU_ISSET(tv->thread_id, &taf->hiprio_cpu)) { + tv->thread_priority = PRIO_HIGH; + } else { + tv->thread_priority = taf->prio; + } + tv->thread_setup_flags = + THREAD_SET_PRIORITY; // affinity is handled, prio handles the thread itself + + tv->thread_id = AffinityGetNextCPU(tv, taf); + + SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core " + "%" PRIu64 ", thread id %lu", + tv->thread_priority, tv->name, tv->thread_id, SCGetThreadIdLong()); + + int ret = rte_eal_remote_launch(DpdkEalThreadWrapper, (void *)tv, (unsigned)tv->thread_id); + if (ret != 0) { + FatalError("Unable to create DPDK EAL thread %s with rte_eal_remote_launch(): retval %d", + tv->name, ret); + } +#endif /* HAVE_DPDK */ +} + +void DpdkThreadJoin(ThreadVars *tv) +{ +#ifdef HAVE_DPDK + int ret = rte_eal_wait_lcore((unsigned)tv->thread_id); + if (ret < 0) { + SCLogError("%s: error waiting for DPDK lcore %" PRIu64 " (%s) to finish (%s)", + tv->iface_name, tv->thread_id, tv->name, rte_strerror(-ret)); + } +#endif /* HAVE_DPDK */ +} diff --git a/src/util-dpdk-threading.h b/src/util-dpdk-threading.h new file mode 100644 index 000000000000..b49be10ce62b --- /dev/null +++ b/src/util-dpdk-threading.h @@ -0,0 +1,34 @@ +/* Copyright (C) 2024 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Lukas Sismis + * + * DPDK threading utilities + */ + +#ifndef SURICATA_UTIL_DPDK_THREADING_H +#define SURICATA_UTIL_DPDK_THREADING_H + +#include "threadvars.h" + +void DpdkThreadSpawn(ThreadVars *tv); +void DpdkThreadJoin(ThreadVars *tv); + +#endif /* SURICATA_UTIL_DPDK_THREADING_H */ diff --git a/src/util-runmodes.c b/src/util-runmodes.c index 4336687b06fd..a975e92bfd58 100644 --- a/src/util-runmodes.c +++ b/src/util-runmodes.c @@ -126,6 +126,12 @@ int RunModeSetLiveCaptureAutoFp(ConfigIfaceParserFunc ConfigParser, if (tm_module == NULL) { FatalError("TmModuleGetByName failed for %s", recv_mod_name); } + if (tm_module->ThreadSpawn != NULL && tm_module->ThreadSpawn != tv_receive->tm_spawn) { + tv_receive->tm_spawn = tm_module->ThreadSpawn; + } + if (tm_module->ThreadJoin != NULL && tm_module->ThreadJoin != tv_receive->tm_join) { + tv_receive->tm_join = tm_module->ThreadJoin; + } TmSlotSetFuncAppend(tv_receive, tm_module, aconf); tm_module = TmModuleGetByName(decode_mod_name); @@ -187,6 +193,13 @@ int RunModeSetLiveCaptureAutoFp(ConfigIfaceParserFunc ConfigParser, if (tm_module == NULL) { FatalError("TmModuleGetByName failed for %s", recv_mod_name); } + if (tm_module->ThreadSpawn != NULL && + tm_module->ThreadSpawn != tv_receive->tm_spawn) { + tv_receive->tm_spawn = tm_module->ThreadSpawn; + } + if (tm_module->ThreadJoin != NULL && tm_module->ThreadJoin != tv_receive->tm_join) { + tv_receive->tm_join = tm_module->ThreadJoin; + } TmSlotSetFuncAppend(tv_receive, tm_module, aconf); tm_module = TmModuleGetByName(decode_mod_name); @@ -298,6 +311,12 @@ static int RunModeSetLiveCaptureWorkersForDevice(ConfigIfaceThreadsCountFunc Mod if (tm_module == NULL) { FatalError("TmModuleGetByName failed for %s", recv_mod_name); } + if (tm_module->ThreadSpawn != NULL && tm_module->ThreadSpawn != tv->tm_spawn) { + tv->tm_spawn = tm_module->ThreadSpawn; + } + if (tm_module->ThreadJoin != NULL && tm_module->ThreadJoin != tv->tm_join) { + tv->tm_join = tm_module->ThreadJoin; + } TmSlotSetFuncAppend(tv, tm_module, aconf); tm_module = TmModuleGetByName(decode_mod_name); @@ -418,6 +437,12 @@ int RunModeSetIPSAutoFp(ConfigIPSParserFunc ConfigParser, if (tm_module == NULL) { FatalError("TmModuleGetByName failed for %s", recv_mod_name); } + if (tm_module->ThreadSpawn != NULL && tm_module->ThreadSpawn != tv_receive->tm_spawn) { + tv_receive->tm_spawn = tm_module->ThreadSpawn; + } + if (tm_module->ThreadJoin != NULL && tm_module->ThreadJoin != tv_receive->tm_join) { + tv_receive->tm_join = tm_module->ThreadJoin; + } TmSlotSetFuncAppend(tv_receive, tm_module, (void *) ConfigParser(i)); tm_module = TmModuleGetByName(decode_mod_name); @@ -530,6 +555,12 @@ int RunModeSetIPSWorker(ConfigIPSParserFunc ConfigParser, if (tm_module == NULL) { FatalError("TmModuleGetByName failed for %s", recv_mod_name); } + if (tm_module->ThreadSpawn != NULL && tm_module->ThreadSpawn != tv->tm_spawn) { + tv->tm_spawn = tm_module->ThreadSpawn; + } + if (tm_module->ThreadJoin != NULL && tm_module->ThreadJoin != tv->tm_join) { + tv->tm_join = tm_module->ThreadJoin; + } TmSlotSetFuncAppend(tv, tm_module, (void *) ConfigParser(i)); tm_module = TmModuleGetByName(decode_mod_name);