diff --git a/CMakeLists.txt b/CMakeLists.txt index 2e297630..3e0f2cfc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -51,6 +51,7 @@ option(ENABLE_MODULES "Allow loading third-party modules at runtime" ON) option(REQUIRE_MODULES "Require support for loading third-party modules at runtime" OFF) option(ENABLE_UNDETERMINISTIC_TESTS "Run undeterministic tests (e.g. queueing) when running tests" ON) option(ENABLE_WERROR "Make all compiler warnings fatal" OFF) +option(ENABLE_PKT_FANOUT "Enable action selector packet fanout support" OFF) option(ENABLE_WP4_16_STACKS "Implement stacks strictly as per the P4_16 specification" ON) # Set compiler flags @@ -144,6 +145,10 @@ if(ENABLE_LOGGING_MACROS) add_definitions(-DLOG_DEBUG_ON -DLOG_TRACE_ON) endif() +if(ENABLE_PKT_FANOUT) + add_definitions(-DPKT_FANOUT_ON) +endif() + if(ENABLE_WP4_16_STACKS) add_definitions(-DWP4_16_STACKS) endif() diff --git a/configure.ac b/configure.ac index 18db03a3..34fdce55 100755 --- a/configure.ac +++ b/configure.ac @@ -63,6 +63,15 @@ AS_IF([test "x$enable_debugger" = "xyes"], [ ]) ]) +# TODO: add this to cmake build as well +pkt_fanout_enabled=no +AC_ARG_ENABLE([pkt_fanout], + AS_HELP_STRING([--enable-pkt-fanout], [Enable packet fanout support])) +AS_IF([test "x$enable_pkt_fanout" = "xyes"], [ + pkt_fanout_enabled=yes + AC_DEFINE([PKT_FANOUT_ON], [], [Enable packet fanout support]) +]) + logging_macros_enabled=no AC_ARG_ENABLE([logging_macros], AS_HELP_STRING([--disable-logging-macros], diff --git a/include/bm/bm_sim/P4Objects.h b/include/bm/bm_sim/P4Objects.h index b2720fe2..f4fe34f7 100644 --- a/include/bm/bm_sim/P4Objects.h +++ b/include/bm/bm_sim/P4Objects.h @@ -545,6 +545,8 @@ class P4Objects { void enable_arith(header_id_t header_id, int field_offset); void enable_arith(header_id_t header_id); + bool is_selector_fanout( + const Json::Value &cfg_next_nodes) const; std::unique_ptr process_cfg_selector( const Json::Value &cfg_selector) const; }; diff --git a/include/bm/bm_sim/action_profile.h b/include/bm/bm_sim/action_profile.h index 46fa34c2..286ecd0d 100644 --- a/include/bm/bm_sim/action_profile.h +++ b/include/bm/bm_sim/action_profile.h @@ -165,6 +165,12 @@ class ActionProfile : public NamedP4Object { void serialize(std::ostream *out) const; void deserialize(std::istream *in, const P4Objects &objs); + void set_selector_fanout(); + + std::vector get_all_mbrs_from_grp(const grp_hdl_t &grp) const; + std::vector + get_entries_with_mbrs(const std::vector &mbrs) const; + private: using ReadLock = boost::shared_lock; using WriteLock = boost::unique_lock; @@ -291,6 +297,10 @@ class ActionProfile : public NamedP4Object { bool group_is_empty(grp_hdl_t grp) const; + bool is_selector_fanout_enabled() const { + return selector_fanout_enabled; + } + const ActionEntry &lookup(const Packet &pkt, const IndirectIndex &index) const; @@ -307,6 +317,7 @@ class ActionProfile : public NamedP4Object { std::shared_ptr grp_selector_{nullptr}; GroupSelectionIface *grp_selector{&grp_mgr}; std::unique_ptr hash{nullptr}; + bool selector_fanout_enabled{false}; }; } // namespace bm diff --git a/include/bm/bm_sim/event_logger.h b/include/bm/bm_sim/event_logger.h index 10622d19..a1503f4d 100644 --- a/include/bm/bm_sim/event_logger.h +++ b/include/bm/bm_sim/event_logger.h @@ -99,7 +99,8 @@ class EventLogger { void action_execute(const Packet &packet, const ActionFn &action_fn, const ActionData &action_data); - + void fanout_gen(const Packet &packet, uint64_t table_id, + uint64_t parent_pkt_copy_id); void config_change(); static EventLogger *get() { diff --git a/include/bm/bm_sim/fanout_pkt_mgr.h b/include/bm/bm_sim/fanout_pkt_mgr.h new file mode 100644 index 00000000..b119400f --- /dev/null +++ b/include/bm/bm_sim/fanout_pkt_mgr.h @@ -0,0 +1,112 @@ +/* Copyright 2025-present Contributors to the P4 Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef BM_BM_SIM_FANOUT_PKT_MGR_H_ +#define BM_BM_SIM_FANOUT_PKT_MGR_H_ + +#include +#include +#include +#include "logger.h" +#include "packet.h" +#include "match_tables.h" +#include "action_profile.h" + +namespace bm { +class MatchTableIndirect; +using bm::ActionProfile; +using EntryVec = const std::vector; +using SelectorIface = ActionProfile::GroupSelectionIface; + +struct FanoutCtx { + bool hit{false}; + const Packet * cur_pkt{nullptr}; + ActionProfile *action_profile{nullptr}; + MatchTableIndirect *cur_table{nullptr}; + std::function buffer_push_fn; + + explicit FanoutCtx( + const std::function &buffer_push_fn) + : buffer_push_fn(buffer_push_fn) { } +}; + +class FanoutPktSelection: public SelectorIface{ + public: + using grp_hdl_t = ActionProfile::grp_hdl_t; + using mbr_hdl_t = ActionProfile::mbr_hdl_t; + using hash_t = ActionProfile::hash_t; + using MatchErrorCode = bm::MatchErrorCode; + + FanoutPktSelection() = default; + + // callbacks after member op, not actual member/group ops + void add_member_to_group(grp_hdl_t grp, mbr_hdl_t mbr) override; + + void remove_member_from_group(grp_hdl_t grp, mbr_hdl_t mbr) override; + + mbr_hdl_t get_from_hash(grp_hdl_t grp, hash_t h) const override; + + void reset() override {} + + private: + std::unordered_map> groups; +}; + +class FanoutPktMgr { + public: + FanoutPktMgr(const FanoutPktMgr&) = delete; + FanoutPktMgr& operator=(const FanoutPktMgr&) = delete; + static FanoutPktMgr& instance() { + static FanoutPktMgr instance_; + return instance_; + } + + FanoutCtx& get_fanout_ctx(); + void set_ctx(MatchTableIndirect *table, const Packet &pkt, + ActionProfile *action_profile, bool hit); + void reset_ctx(); + void replicate_for_entries(const std::vector &entries); + + // PI overwrite selector specified during P4Object init, + // so we need to set the selector in switch start_and_return_ + void set_grp_selector() { + for (const auto &ap : act_profs) { + ap->set_group_selector(grp_selector); + } + } + inline void register_thread(std::thread::id thread_id, + const std::function &buffer_push_fn) { + BMLOG_DEBUG("Registering thread {}", thread_id); + fanout_ctx_map.emplace(thread_id, FanoutCtx(buffer_push_fn)); + } + + // TODO(Hao): deduplicate packets fanout, optional +#ifdef BM_PKT_FANOUT_ON + static constexpr bool pkt_fanout_on = true; +#else + static constexpr bool pkt_fanout_on = false; +#endif + std::mutex fanout_pkt_mutex; + std::vector act_profs; + + private: + FanoutPktMgr() = default; + std::unordered_map fanout_ctx_map; + std::shared_ptr + grp_selector{std::make_shared()}; +}; + +} // namespace bm + +#endif // BM_BM_SIM_FANOUT_PKT_MGR_H_ diff --git a/include/bm/bm_sim/logger.h b/include/bm/bm_sim/logger.h index 831d1002..04e2eaa4 100644 --- a/include/bm/bm_sim/logger.h +++ b/include/bm/bm_sim/logger.h @@ -155,4 +155,10 @@ class Logger { bm::Logger::get()->error("[{}] [cxt {}] " s, (pkt).get_unique_id(), \ (pkt).get_context(), ##__VA_ARGS__) +#define BMLOG_WARN(...) bm::Logger::get()->warn(__VA_ARGS__) + +#define BMLOG_WARN_PKT(pkt, s, ...) \ + bm::Logger::get()->warn("[{}] [cxt {}] " s, (pkt).get_unique_id(), \ + (pkt).get_context(), ##__VA_ARGS__) + #endif // BM_BM_SIM_LOGGER_H_ diff --git a/include/bm/bm_sim/match_tables.h b/include/bm/bm_sim/match_tables.h index 6348c117..cd699fb5 100644 --- a/include/bm/bm_sim/match_tables.h +++ b/include/bm/bm_sim/match_tables.h @@ -39,6 +39,7 @@ #include "lookup_structures.h" #include "action_entry.h" #include "action_profile.h" +#include "fanout_pkt_mgr.h" namespace bm { @@ -385,6 +386,7 @@ class MatchTable : public MatchTableAbstract { class MatchTableIndirect : public MatchTableAbstract { public: + friend class FanoutPktMgr; using mbr_hdl_t = ActionProfile::mbr_hdl_t; using IndirectIndex = ActionProfile::IndirectIndex; diff --git a/include/bm/bm_sim/packet.h b/include/bm/bm_sim/packet.h index 0da8c214..39fe35e1 100644 --- a/include/bm/bm_sim/packet.h +++ b/include/bm/bm_sim/packet.h @@ -32,6 +32,7 @@ #include #include // for std::min #include +#include #include @@ -40,6 +41,7 @@ #include "parser_error.h" #include "phv_source.h" #include "phv_forward.h" +#include "control_flow.h" namespace bm { @@ -301,6 +303,11 @@ class Packet final { //! @copydoc clone_with_phv_reset_metadata std::unique_ptr clone_with_phv_reset_metadata_ptr() const; + //! Clone the current packet, along with its PHV and registers. + Packet clone_with_phv_and_registers() const; + //! @copydoc clone_with_phv_and_registers + std::unique_ptr clone_with_phv_and_registers_ptr() const; + //! Clone the current packet, without the PHV. The value of the fields in the //! clone will be undefined and should not be accessed before setting it //! first. @@ -315,6 +322,17 @@ class Packet final { //! @copydoc clone_choose_context std::unique_ptr clone_choose_context_ptr(cxt_id_t new_cxt) const; + // Packet fanout related methods + //! Returns true if the packet has a next node set + bool has_next_node() const; + //! Get the next node, if it exists + const ControlFlowNode *get_next_node() const; + //! Set the next node, which is used to next the packet processing + void set_next_node(const ControlFlowNode *node); + //! Reset the next node + void reset_next_node(); + + //! Deleted copy constructor Packet(const Packet &other) = delete; //! Deleted copy assignment operator @@ -385,6 +403,8 @@ class Packet final { bool checksum_error{false}; + std::optional next_node{std::nullopt}; + private: static CopyIdGenerator *copy_id_gen; }; diff --git a/include/bm/bm_sim/pipeline.h b/include/bm/bm_sim/pipeline.h index 39d62895..9326cc55 100644 --- a/include/bm/bm_sim/pipeline.h +++ b/include/bm/bm_sim/pipeline.h @@ -40,10 +40,12 @@ class Pipeline : public NamedP4Object { : NamedP4Object(name, id), first_node(first_node) {} //! Sends the \p pkt through the correct match-action tables and - //! condiitons. Each step is determined based on the result of the previous + //! conditions. Each step is determined based on the result of the previous //! step (table lookup or condition evaluation), according to the P4 control //! flow graph. void apply(Packet *pkt); + // Start from next_node instead of first_node + void apply_from_next_node(Packet *pkt); //! Deleted copy constructor Pipeline(const Pipeline &other) = delete; diff --git a/src/bm_sim/CMakeLists.txt b/src/bm_sim/CMakeLists.txt index a5fd0792..bc5c0b9e 100644 --- a/src/bm_sim/CMakeLists.txt +++ b/src/bm_sim/CMakeLists.txt @@ -28,6 +28,7 @@ add_library(bmsim OBJECT event_logger.cpp expressions.cpp extern.cpp + fanout_pkt_mgr.cpp fields.cpp headers.cpp header_unions.cpp diff --git a/src/bm_sim/Makefile.am b/src/bm_sim/Makefile.am index 56a809e5..b20cda63 100644 --- a/src/bm_sim/Makefile.am +++ b/src/bm_sim/Makefile.am @@ -37,6 +37,7 @@ event_logger.cpp \ expressions.cpp \ extern.cpp \ extract.h \ +fanout_pkt_mgr.cpp \ fields.cpp \ headers.cpp \ header_unions.cpp \ diff --git a/src/bm_sim/P4Objects.cpp b/src/bm_sim/P4Objects.cpp index 2cf9ffd1..6e17ec73 100644 --- a/src/bm_sim/P4Objects.cpp +++ b/src/bm_sim/P4Objects.cpp @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include #include @@ -1655,8 +1657,13 @@ P4Objects::init_pipelines(const Json::Value &cfg_root, std::unique_ptr action_profile( new ActionProfile(act_prof_name, act_prof_id, with_selection)); if (with_selection) { - auto calc = process_cfg_selector(cfg_act_prof["selector"]); - action_profile->set_hash(std::move(calc)); + if (is_selector_fanout(cfg_act_prof["selector"])) { + action_profile->set_selector_fanout(); + FanoutPktMgr::instance().act_profs.push_back(action_profile.get()); + } else { + auto calc = process_cfg_selector(cfg_act_prof["selector"]); + action_profile->set_hash(std::move(calc)); + } } add_action_profile(act_prof_name, std::move(action_profile)); } @@ -2499,6 +2506,16 @@ P4Objects::check_hash(const std::string &name) const { return nullptr; } +bool P4Objects::is_selector_fanout(const Json::Value &cfg_selector) const { + bool is_fanout = cfg_selector.isMember("algo") && + cfg_selector["algo"].asString() == "selector_fanout"; + if (is_fanout && !FanoutPktMgr::pkt_fanout_on) { + throw std::runtime_error("Selector fanout is not enabled, but" + " found selector_fanout mode used"); + } + return is_fanout; +} + std::unique_ptr P4Objects::process_cfg_selector(const Json::Value &cfg_selector) const { const auto selector_algo = cfg_selector["algo"].asString(); diff --git a/src/bm_sim/action_profile.cpp b/src/bm_sim/action_profile.cpp index 2bc3dab8..522692ac 100644 --- a/src/bm_sim/action_profile.cpp +++ b/src/bm_sim/action_profile.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -184,6 +185,29 @@ ActionProfile::lookup(const Packet &pkt, const IndirectIndex &index) const { return action_entries[mbr]; } + +std::vector +ActionProfile::get_all_mbrs_from_grp(const grp_hdl_t &grp) const { + assert(is_valid_grp(grp)); + Group group; + MatchErrorCode rc = get_group(grp, &group); + _BM_UNUSED(rc); + assert(rc == MatchErrorCode::SUCCESS); + return group.mbr_handles; +} + +std::vector +ActionProfile::get_entries_with_mbrs( + const std::vector &mbrs) const { + std::vector entries; + entries.reserve(mbrs.size()); + for (auto m : mbrs) { + assert(is_valid_mbr(m)); + entries.push_back(&action_entries[m]); + } + return entries; +} + bool ActionProfile::has_selection() const { return with_selection; } @@ -524,6 +548,8 @@ void ActionProfile::set_group_selector( std::shared_ptr selector) { WriteLock lock = lock_write(); + BMLOG_DEBUG("Setting group selector for action profile '{}'", + get_name()); grp_selector_ = selector; grp_selector = grp_selector_.get(); } @@ -617,4 +643,8 @@ ActionProfile::choose_from_group(grp_hdl_t grp, const Packet &pkt) const { return grp_selector->get_from_hash(grp, h); } +void ActionProfile::set_selector_fanout() { + selector_fanout_enabled = true; +} + } // namespace bm diff --git a/src/bm_sim/event_logger.cpp b/src/bm_sim/event_logger.cpp index 3de163f7..e8b1a9c6 100644 --- a/src/bm_sim/event_logger.cpp +++ b/src/bm_sim/event_logger.cpp @@ -40,6 +40,7 @@ enum EventType { PIPELINE_START, PIPELINE_DONE, CONDITION_EVAL, TABLE_HIT, TABLE_MISS, ACTION_EXECUTE, + FANOUT_GEN, CONFIG_CHANGE = 999 }; @@ -52,6 +53,7 @@ struct msg_hdr_t { uint64_t copy_id; } __attribute__((packed)); + namespace { void @@ -267,6 +269,23 @@ EventLogger::action_execute(const Packet &packet, (void) action_data; } +// parent_pkt_copy_id is the copy_id of the parent packet that this +// fanout packet is generated from +void +EventLogger::fanout_gen(const Packet &packet, uint64_t table_id, + uint64_t parent_pkt_copy_id) { + struct msg_t : msg_hdr_t { + uint64_t table_id; + uint64_t parent_packet_copy_id; + } __attribute__((packed)); + + msg_t msg; + fill_msg_hdr(EventType::FANOUT_GEN, device_id, packet, &msg); + msg.table_id = table_id; + msg.parent_packet_copy_id = parent_pkt_copy_id; + transport_instance->send(reinterpret_cast(&msg), sizeof(msg)); +} + void EventLogger::config_change() { msg_hdr_t msg; diff --git a/src/bm_sim/fanout_pkt_mgr.cpp b/src/bm_sim/fanout_pkt_mgr.cpp new file mode 100644 index 00000000..a77436f1 --- /dev/null +++ b/src/bm_sim/fanout_pkt_mgr.cpp @@ -0,0 +1,122 @@ +/* Copyright 2025-present Contributors to the P4 Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace bm { + + +void FanoutPktSelection::add_member_to_group(grp_hdl_t grp, mbr_hdl_t mbr) { + (void) grp; + (void) mbr; +} + +void FanoutPktSelection::remove_member_from_group(grp_hdl_t grp, + mbr_hdl_t mbr) { + (void) grp; + (void) mbr; +} + +FanoutPktSelection::mbr_hdl_t +FanoutPktSelection::get_from_hash(grp_hdl_t grp, hash_t h) const { + (void)h; + auto &ctx = FanoutPktMgr::instance().get_fanout_ctx(); + auto *action_profile = ctx.action_profile; + if (!action_profile) { + BMLOG_ERROR("No action profile set for fanout packet selection"); + throw std::runtime_error("No action profile set for" + "fanout packet selection"); + } + + std::vector mbrs = action_profile->get_all_mbrs_from_grp(grp); + mbr_hdl_t selected_mbr = mbrs.back(); + mbrs.pop_back(); + + auto entries = action_profile->get_entries_with_mbrs(mbrs); + BMLOG_DEBUG("Fanout Selected member {} from group {} with hash {} " + "with number of entries {}", + selected_mbr, grp, h, entries.size()); + FanoutPktMgr::instance().replicate_for_entries(entries); + return selected_mbr; +} + +FanoutCtx& FanoutPktMgr::get_fanout_ctx() { + std::thread::id thread_id = std::this_thread::get_id(); + auto it = fanout_ctx_map.find(thread_id); + + std::lock_guard lock(fanout_pkt_mutex); + if (it == fanout_ctx_map.end()) { + BMLOG_ERROR("No fanout context registered for thread {}", thread_id); + throw std::runtime_error("Fanout context not found for thread"); + } + return it->second; +} +void FanoutPktMgr::set_ctx(MatchTableIndirect *table, + const Packet &pkt, + ActionProfile *action_profile, + bool hit) { + auto &ctx = get_fanout_ctx(); + ctx.cur_table = table; + ctx.cur_pkt = &pkt; + ctx.action_profile = action_profile; + ctx.hit = hit; +} + +void FanoutPktMgr::reset_ctx() { + auto &ctx = get_fanout_ctx(); + ctx.cur_table = nullptr; + ctx.cur_pkt = nullptr; + ctx.action_profile = nullptr; +} + +void FanoutPktMgr::replicate_for_entries( + const std::vector &entries) { + auto &ctx = get_fanout_ctx(); + auto *match_table = ctx.cur_table; + const Packet &pkt = *ctx.cur_pkt; + bool hit = ctx.hit; + + // for event logger + uint64_t parent_pkt_copy_id = pkt.get_copy_id(); + uint64_t table_id = match_table->get_id(); + for (auto entry : entries) { + Packet* rep_pkt = pkt.clone_with_phv_and_registers_ptr().release(); + rep_pkt->set_egress_port(pkt.get_egress_port()); + + entry->action_fn(rep_pkt); + BMLOG_DEBUG_PKT(*rep_pkt, "Action {} applied to fanout packet", + *entry); + + auto act_id = entry->action_fn.get_action_id(); + const ControlFlowNode *next_node = hit ? + match_table->get_next_node(act_id) : + match_table->get_next_node_default(act_id); + if (next_node == nullptr) { + BMLOG_DEBUG_PKT(*rep_pkt, "No next node for action id {}", act_id); + } else { + BMLOG_DEBUG_PKT(*rep_pkt, "Next node for action id {}: {}", + act_id, next_node->get_name()); + } + rep_pkt->set_next_node(next_node); + ctx.buffer_push_fn(rep_pkt); + + BMELOG(fanout_gen, *rep_pkt, table_id, parent_pkt_copy_id); + } + + reset_ctx(); +} + +} // namespace bm diff --git a/src/bm_sim/match_tables.cpp b/src/bm_sim/match_tables.cpp index bd106444..0f67a8ce 100644 --- a/src/bm_sim/match_tables.cpp +++ b/src/bm_sim/match_tables.cpp @@ -716,6 +716,13 @@ MatchTableIndirect::lookup(const Packet &pkt, return empty_action; } + if (FanoutPktMgr::pkt_fanout_on) { + // A bit hacky, in order to get the next_table for fanout. + if (action_profile->is_selector_fanout_enabled()) { + FanoutPktMgr::instance().set_ctx(this, pkt, action_profile, *hit); + } + } + const auto &entry = action_profile->lookup(pkt, index); // Unfortunately this has to be done at this stage and cannot be done when // inserting a member because for 2 match tables sharing the same action diff --git a/src/bm_sim/packet.cpp b/src/bm_sim/packet.cpp index a88302db..06479762 100644 --- a/src/bm_sim/packet.cpp +++ b/src/bm_sim/packet.cpp @@ -164,6 +164,21 @@ Packet::clone_with_phv_ptr() const { return std::unique_ptr(new Packet(clone_with_phv())); } +Packet +Packet::clone_with_phv_and_registers() const { + copy_id_t new_copy_id = copy_id_gen->add_one(packet_id); + Packet pkt(cxt_id, ingress_port, packet_id, new_copy_id, ingress_length, + buffer.clone(buffer.get_data_size()), phv_source); + pkt.phv->copy_headers(*phv); + pkt.registers = registers; + return pkt; +} + +std::unique_ptr +Packet::clone_with_phv_and_registers_ptr() const { + return std::unique_ptr(new Packet(clone_with_phv_and_registers())); +} + Packet Packet::clone_with_phv_reset_metadata() const { copy_id_t new_copy_id = copy_id_gen->add_one(packet_id); @@ -207,6 +222,20 @@ Packet::clone_no_phv_ptr() const { return clone_choose_context_ptr(cxt_id); } +bool Packet::has_next_node() const { + return next_node.has_value(); +} +const ControlFlowNode *Packet::get_next_node() const { + return next_node.value_or(nullptr); +} +void Packet::set_next_node(const ControlFlowNode *node) { + next_node = node; +} + +void Packet::reset_next_node() { + next_node.reset(); +} + /* Cannot get away with defaults here, we need to swap the phvs, otherwise we could "leak" the old phv (i.e. not put it back into the pool) */ diff --git a/src/bm_sim/pipeline.cpp b/src/bm_sim/pipeline.cpp index a0bb4864..910e45dd 100644 --- a/src/bm_sim/pipeline.cpp +++ b/src/bm_sim/pipeline.cpp @@ -50,4 +50,19 @@ Pipeline::apply(Packet *pkt) { BMLOG_DEBUG_PKT(*pkt, "Pipeline '{}': end", get_name()); } +void Pipeline::apply_from_next_node(Packet *pkt) { + const ControlFlowNode *node = pkt->get_next_node(); + BMLOG_DEBUG_PKT(*pkt, "Pipeline '{}': packet fanout from node '{}'", + get_name(), node? node->get_name() : "None"); + while (node) { + if (pkt->is_marked_for_exit()) { + BMLOG_DEBUG_PKT(*pkt, "Packet is marked for exit, interrupting pipeline"); + break; + } + node = (*node)(pkt); + } + pkt->reset_next_node(); + BMLOG_DEBUG_PKT(*pkt, "Pipeline '{}': fanout end", get_name()); +} + } // namespace bm diff --git a/targets/simple_switch/simple_switch.cpp b/targets/simple_switch/simple_switch.cpp index ec998fce..42d7334d 100644 --- a/targets/simple_switch/simple_switch.cpp +++ b/targets/simple_switch/simple_switch.cpp @@ -128,6 +128,7 @@ class SimpleSwitch::InputBuffer { NORMAL, RESUBMIT, RECIRCULATE, + SELECTOR_FANOUT, SENTINEL // signal for the ingress thread to terminate }; @@ -141,6 +142,7 @@ class SimpleSwitch::InputBuffer { std::move(item), true); case PacketType::RESUBMIT: case PacketType::RECIRCULATE: + case PacketType::SELECTOR_FANOUT: return push_front(&queue_hi, capacity_hi, &cvar_can_push_hi, std::move(item), false); case PacketType::SENTINEL: @@ -155,7 +157,7 @@ class SimpleSwitch::InputBuffer { Lock lock(mutex); cvar_can_pop.wait( lock, [this] { return (queue_hi.size() + queue_lo.size()) > 0; }); - // give higher priority to resubmit/recirculate queue + // give higher priority to resubmit/recirculate/selector-fanout queue if (queue_hi.size() > 0) { *pItem = std::move(queue_hi.back()); queue_hi.pop_back(); @@ -179,7 +181,10 @@ class SimpleSwitch::InputBuffer { std::unique_ptr &&item, bool blocking) { Lock lock(mutex); while (queue->size() == capacity) { - if (!blocking) return 0; + if (!blocking) { + BMLOG_WARN_PKT(*item, "Input buffer is full, dropping packet"); + return 0; + } cvar->wait(lock); } queue->push_front(std::move(item)); @@ -273,10 +278,37 @@ SimpleSwitch::receive_(port_t port_num, const char *buffer, int len) { void SimpleSwitch::start_and_return_() { check_queueing_metadata(); - - threads_.push_back(std::thread(&SimpleSwitch::ingress_thread, this)); - for (size_t i = 0; i < nb_egress_threads; i++) { - threads_.push_back(std::thread(&SimpleSwitch::egress_thread, this, i)); + if (FanoutPktMgr::pkt_fanout_on) { + FanoutPktMgr::instance().set_grp_selector(); + auto ingress_thread = std::thread(&SimpleSwitch::ingress_thread, this); + FanoutPktMgr::instance().register_thread( + ingress_thread.get_id(), [&](const bm::Packet *pkt) { + this->input_buffer->push_front( + InputBuffer::PacketType::SELECTOR_FANOUT, + std::unique_ptr(const_cast(pkt))); + BMLOG_DEBUG_PKT(*pkt, + "SELECTOR_FANOUT packet pushed to ingress_buffer"); + }); + + threads_.push_back(std::move(ingress_thread)); + for (size_t i = 0; i < nb_egress_threads; i++) { + auto egress_thread = std::thread(&SimpleSwitch::egress_thread, this, i); + + FanoutPktMgr::instance().register_thread( + egress_thread.get_id(), [&](const bm::Packet *pkt) { + this->egress_buffers.push_front(i, 0, + std::unique_ptr(const_cast(pkt))); + BMLOG_DEBUG_PKT(*pkt, + "SELECTOR_FANOUT packet pushed to egress_buffer"); + }); + + threads_.push_back(std::move(egress_thread)); + } + } else { + threads_.push_back(std::thread(&SimpleSwitch::ingress_thread, this)); + for (size_t i = 0; i < nb_egress_threads; i++) { + threads_.push_back(std::thread(&SimpleSwitch::egress_thread, this, i)); + } } threads_.push_back(std::thread(&SimpleSwitch::transmit_thread, this)); } @@ -492,7 +524,7 @@ SimpleSwitch::ingress_thread() { port_t ingress_port = packet->get_ingress_port(); (void) ingress_port; - BMLOG_DEBUG_PKT(*packet, "Processing packet received on port {}", + BMLOG_DEBUG_PKT(*packet, "Processing packet received on ingress port {}", ingress_port); auto ingress_packet_size = @@ -506,20 +538,25 @@ SimpleSwitch::ingress_thread() { parser leave the buffer unchanged, and move the pop logic to the deparser. TODO? */ const Packet::buffer_state_t packet_in_state = packet->save_buffer_state(); - parser->parse(packet.get()); - - if (phv->has_field("standard_metadata.parser_error")) { - phv->get_field("standard_metadata.parser_error").set( - packet->get_error_code().get()); - } - if (phv->has_field("standard_metadata.checksum_error")) { - phv->get_field("standard_metadata.checksum_error").set( - packet->get_checksum_error() ? 1 : 0); - } + // Check if the packet has an optional continue node for pkt fanout + // TODO(Hao): update the doc/simple_switch.md + if (FanoutPktMgr::pkt_fanout_on && packet->has_next_node()) { + ingress_mau->apply_from_next_node(packet.get()); + } else { + parser->parse(packet.get()); + if (phv->has_field("standard_metadata.parser_error")) { + phv->get_field("standard_metadata.parser_error").set( + packet->get_error_code().get()); + } - ingress_mau->apply(packet.get()); + if (phv->has_field("standard_metadata.checksum_error")) { + phv->get_field("standard_metadata.checksum_error").set( + packet->get_checksum_error() ? 1 : 0); + } + ingress_mau->apply(packet.get()); + } packet->reset_exit(); Field &f_egress_spec = phv->get_field("standard_metadata.egress_spec"); @@ -612,8 +649,8 @@ SimpleSwitch::ingress_thread() { ingress_packet_size); phv_copy->get_field("standard_metadata.packet_length") .set(ingress_packet_size); - input_buffer->push_front( - InputBuffer::PacketType::RESUBMIT, std::move(packet_copy)); + input_buffer->push_front(InputBuffer::PacketType::RESUBMIT, + std::move(packet_copy)); continue; } @@ -652,40 +689,45 @@ SimpleSwitch::egress_thread(size_t worker_id) { egress_buffers.pop_back(worker_id, &port, &priority, &packet); if (packet == nullptr) break; + BMLOG_DEBUG_PKT(*packet, "Processing packet in egress port {}", + worker_id); Deparser *deparser = this->get_deparser("deparser"); Pipeline *egress_mau = this->get_pipeline("egress"); phv = packet->get_phv(); + Field &f_egress_spec = phv->get_field("standard_metadata.egress_spec"); - if (phv->has_field("intrinsic_metadata.egress_global_timestamp")) { - phv->get_field("intrinsic_metadata.egress_global_timestamp") - .set(get_ts().count()); - } + if (packet->has_next_node()) { + egress_mau->apply_from_next_node(packet.get()); + } else { + if (phv->has_field("intrinsic_metadata.egress_global_timestamp")) { + phv->get_field("intrinsic_metadata.egress_global_timestamp") + .set(get_ts().count()); + } - if (with_queueing_metadata) { - auto enq_timestamp = + if (with_queueing_metadata) { + auto enq_timestamp = phv->get_field("queueing_metadata.enq_timestamp").get(); - phv->get_field("queueing_metadata.deq_timedelta").set( - get_ts().count() - enq_timestamp); - phv->get_field("queueing_metadata.deq_qdepth").set( - egress_buffers.size(port, priority)); - if (phv->has_field("queueing_metadata.qid")) { - auto &qid_f = phv->get_field("queueing_metadata.qid"); - qid_f.set(priority); + phv->get_field("queueing_metadata.deq_timedelta").set( + get_ts().count() - enq_timestamp); + phv->get_field("queueing_metadata.deq_qdepth").set( + egress_buffers.size(port, priority)); + if (phv->has_field("queueing_metadata.qid")) { + auto &qid_f = phv->get_field("queueing_metadata.qid"); + qid_f.set(priority); + } } - } - phv->get_field("standard_metadata.egress_port").set(port); + phv->get_field("standard_metadata.egress_port").set(port); + // When egress_spec == drop_port the packet will be dropped, thus + // here we initialize egress_spec to a value different from drop_port. + f_egress_spec.set(drop_port + 1); - Field &f_egress_spec = phv->get_field("standard_metadata.egress_spec"); - // When egress_spec == drop_port the packet will be dropped, thus - // here we initialize egress_spec to a value different from drop_port. - f_egress_spec.set(drop_port + 1); + phv->get_field("standard_metadata.packet_length").set( + packet->get_register(RegisterAccess::PACKET_LENGTH_REG_IDX)); - phv->get_field("standard_metadata.packet_length").set( - packet->get_register(RegisterAccess::PACKET_LENGTH_REG_IDX)); - - egress_mau->apply(packet.get()); + egress_mau->apply(packet.get()); + } auto clone_mirror_session_id = RegisterAccess::get_clone_mirror_session_id(packet.get()); diff --git a/targets/simple_switch/simple_switch.h b/targets/simple_switch/simple_switch.h index cdc751bd..80f5a2ae 100644 --- a/targets/simple_switch/simple_switch.h +++ b/targets/simple_switch/simple_switch.h @@ -27,6 +27,8 @@ #include #include #include +#include +#include #include #include @@ -56,6 +58,7 @@ using bm::Pipeline; using bm::McSimplePreLAG; using bm::Field; using bm::FieldList; +using bm::FanoutPktMgr; using bm::packet_id_t; using bm::p4object_id_t; diff --git a/tools/nanomsg_client.py b/tools/nanomsg_client.py index 20e1ce33..b20b4b09 100755 --- a/tools/nanomsg_client.py +++ b/tools/nanomsg_client.py @@ -83,7 +83,8 @@ class MSG_TYPES: CHECKSUM_UPDATE, PIPELINE_START, PIPELINE_DONE, CONDITION_EVAL, TABLE_HIT, TABLE_MISS, - ACTION_EXECUTE) = list(range(15)) + ACTION_EXECUTE, + FANOUT_GEN) = list(range(16)) CONFIG_CHANGE = 999 @staticmethod @@ -104,6 +105,7 @@ def get_msg_class(type_): MSG_TYPES.TABLE_HIT: TableHit, MSG_TYPES.TABLE_MISS: TableMiss, MSG_TYPES.ACTION_EXECUTE: ActionExecute, + MSG_TYPES.FANOUT_GEN: FanoutGen, MSG_TYPES.CONFIG_CHANGE: ConfigChange, } return classes[type_] @@ -126,6 +128,7 @@ def get_str(type_): MSG_TYPES.TABLE_HIT: "TABLE_HIT", MSG_TYPES.TABLE_MISS: "TABLE_MISS", MSG_TYPES.ACTION_EXECUTE: "ACTION_EXECUTE", + MSG_TYPES.FANOUT_GEN: "FANOUT_GEN", MSG_TYPES.CONFIG_CHANGE: "CONFIG_CHANGE", } return strs[type_] @@ -433,6 +436,24 @@ def __str__(self): s += " (" + name + ")" return s +class FanoutGen(Msg): + def __init__(self, msg): + super(FanoutGen, self).__init__(msg) + self.type_ = MSG_TYPES.FANOUT_GEN + self.type_str = MSG_TYPES.get_str(self.type_) + self.struct_ = struct.Struct("QQ") + + def extract(self): + self.table_id, self.parent_packet_copy_id = super(FanoutGen, self).extract() + + def __str__(self): + s = super(FanoutGen, self).__str__() + s += ", table_id: " + str(self.table_id) + name = name_lookup("table", self.table_id) + if name: + s += " (" + name + ")" + s += ", parent_packet_copy_id: " + str(self.parent_packet_copy_id) + return s class ConfigChange(Msg): def __init__(self, msg): @@ -472,8 +493,9 @@ def get_msg_type(msg): try: p = MSG_TYPES.get_msg_class(msg_type)(msg) - except: + except Exception as e: print("Unknown msg type", msg_type) + print("Error:", e) continue p.extract() print(p) diff --git a/tools/runtime_CLI.py b/tools/runtime_CLI.py index 700ccafd..cbcc5b27 100755 --- a/tools/runtime_CLI.py +++ b/tools/runtime_CLI.py @@ -1536,7 +1536,7 @@ def complete_table_indirect_set_default(self, text, line, start_index, end_index @handle_bad_input def do_table_indirect_set_default_with_group(self, line): - "Set default group for indirect match table: table_indirect_set_default " + "Set default group for indirect match table: table_indirect_set_default_with_group
" table_name, handle = self.indirect_set_default_common(line, ws=True)