@@ -314,9 +314,95 @@ ReturnCode L3MulticastManager::drainMulticastRouterInterfaceEntries(
314314// only.
315315ReturnCode L3MulticastManager::drainMulticastReplicationEntries (
316316 std::deque<swss::KeyOpFieldsValuesTuple>& replication_tuples) {
317- return ReturnCode (StatusCode::SWSS_RC_UNIMPLEMENTED)
318- << " L3MulticastManager::drainMulticastReplicationEntries is not "
319- << " implemented yet" ;
317+ SWSS_LOG_ENTER ();
318+ ReturnCode status;
319+ std::vector<P4MulticastReplicationEntry> multicast_replication_entry_list;
320+ std::deque<swss::KeyOpFieldsValuesTuple> tuple_list;
321+
322+ std::string prev_op;
323+ bool prev_update = false ;
324+
325+ while (!replication_tuples.empty ()) {
326+ auto key_op_fvs_tuple = replication_tuples.front ();
327+ replication_tuples.pop_front ();
328+ std::string table_name;
329+ std::string key;
330+ parseP4RTKey (kfvKey (key_op_fvs_tuple), &table_name, &key);
331+ const std::vector<swss::FieldValueTuple>& attributes =
332+ kfvFieldsValues (key_op_fvs_tuple);
333+
334+ // Form entry object
335+ auto replication_entry_or =
336+ deserializeMulticastReplicationEntry (key, attributes);
337+
338+ if (!replication_entry_or.ok ()) {
339+ status = replication_entry_or.status ();
340+ SWSS_LOG_ERROR (" Unable to deserialize APP DB entry with key %s: %s" ,
341+ QuotedVar (table_name + " :" + key).c_str (),
342+ status.message ().c_str ());
343+ m_publisher->publish (APP_P4RT_TABLE_NAME, kfvKey (key_op_fvs_tuple),
344+ kfvFieldsValues (key_op_fvs_tuple), status,
345+ /* replace=*/ true );
346+ break ;
347+ }
348+ auto & replication_entry = *replication_entry_or;
349+
350+ // Validate entry
351+ const std::string& operation = kfvOp (key_op_fvs_tuple);
352+ status = validateMulticastReplicationEntry (replication_entry, operation);
353+ if (!status.ok ()) {
354+ SWSS_LOG_ERROR (
355+ " Validation failed for replication APP DB entry with key %s: %s" ,
356+ QuotedVar (table_name + " :" + key).c_str (), status.message ().c_str ());
357+ m_publisher->publish (APP_P4RT_TABLE_NAME, kfvKey (key_op_fvs_tuple),
358+ kfvFieldsValues (key_op_fvs_tuple), status,
359+ /* replace=*/ true );
360+ break ;
361+ }
362+
363+ // Now, start processing batch of entries.
364+ auto * replication_entry_ptr = getMulticastReplicationEntry (
365+ replication_entry.multicast_replication_key );
366+ bool update = replication_entry_ptr != nullptr ;
367+
368+ if (prev_op == " " ) {
369+ prev_op = operation;
370+ prev_update = update;
371+ }
372+ // Process the entries if the operation type changes.
373+ if (operation != prev_op || update != prev_update) {
374+ status = processMulticastReplicationEntries (
375+ multicast_replication_entry_list, tuple_list, prev_op, prev_update);
376+ multicast_replication_entry_list.clear ();
377+ tuple_list.clear ();
378+ prev_op = operation;
379+ prev_update = update;
380+ }
381+
382+ if (!status.ok ()) {
383+ // Return SWSS_RC_NOT_EXECUTED if failure has occured.
384+ m_publisher->publish (APP_P4RT_TABLE_NAME, kfvKey (key_op_fvs_tuple),
385+ kfvFieldsValues (key_op_fvs_tuple),
386+ ReturnCode (StatusCode::SWSS_RC_NOT_EXECUTED),
387+ /* replace=*/ true );
388+ break ;
389+ } else {
390+ multicast_replication_entry_list.push_back (replication_entry);
391+ tuple_list.push_back (key_op_fvs_tuple);
392+ }
393+ } // while
394+
395+ // Process any pending entries.
396+ if (!multicast_replication_entry_list.empty ()) {
397+ auto rc = processMulticastReplicationEntries (
398+ multicast_replication_entry_list, tuple_list, prev_op, prev_update);
399+ if (!rc.ok ()) {
400+ status = rc;
401+ }
402+ }
403+
404+ drainMgmtWithNotExecuted (replication_tuples, m_publisher);
405+ return status;
320406}
321407
322408ReturnCodeOr<P4MulticastRouterInterfaceEntry>
@@ -775,6 +861,36 @@ ReturnCode L3MulticastManager::processMulticastRouterInterfaceEntries(
775861 return status;
776862}
777863
864+ ReturnCode L3MulticastManager::processMulticastReplicationEntries (
865+ std::vector<P4MulticastReplicationEntry>& entries,
866+ const std::deque<swss::KeyOpFieldsValuesTuple>& tuple_list,
867+ const std::string& op, bool update) {
868+ SWSS_LOG_ENTER ();
869+ ReturnCode status;
870+
871+ std::vector<ReturnCode> statuses;
872+ // In syncd, bulk SAI calls use mode SAI_BULK_OP_ERROR_MODE_STOP_ON_ERROR.
873+ if (op == SET_COMMAND) {
874+ if (!update) {
875+ statuses = addMulticastReplicationEntries (entries);
876+ } else {
877+ statuses = updateMulticastReplicationEntries (entries);
878+ }
879+ } else {
880+ statuses = deleteMulticastReplicationEntries (entries);
881+ }
882+ // Check status of each entry.
883+ for (size_t i = 0 ; i < entries.size (); ++i) {
884+ m_publisher->publish (APP_P4RT_TABLE_NAME, kfvKey (tuple_list[i]),
885+ kfvFieldsValues (tuple_list[i]), statuses[i],
886+ /* replace=*/ true );
887+ if (status.ok () && !statuses[i].ok ()) {
888+ status = statuses[i];
889+ }
890+ }
891+ return status;
892+ }
893+
778894ReturnCode L3MulticastManager::createRouterInterface (
779895 const std::string& rif_key, P4MulticastRouterInterfaceEntry& entry,
780896 sai_object_id_t * rif_oid) {
0 commit comments