@@ -147,15 +147,177 @@ vector<Selectable *> Orch::getSelectables()
147147 return selectables;
148148}
149149
150- void ConsumerBase::addToSync (const KeyOpFieldsValuesTuple &entry)
150+ void Orch::createRetryCache (const std::string &executorName) {
151+ if (m_retryCaches.find (executorName) == m_retryCaches.end ())
152+ m_retryCaches[executorName] = std::make_shared<RetryCache>(executorName);
153+ }
154+
155+ RetryCache *Orch::getRetryCache (const std::string &executorName)
156+ {
157+ if (m_retryCaches.find (executorName) == m_retryCaches.end ())
158+ return nullptr ;
159+ else
160+ return m_retryCaches[executorName].get ();
161+ }
162+
163+ ConsumerBase* Orch::getConsumerBase (const std::string &executorName)
164+ {
165+ if (m_consumerMap.find (executorName) == m_consumerMap.end ())
166+ return nullptr ;
167+ return dynamic_cast <ConsumerBase*>(m_consumerMap[executorName].get ());
168+ }
169+
170+ bool ConsumerBase::addToRetry (const Task &task, const Constraint &cst) {
171+ auto retryCache = getOrch () ? getOrch ()->getRetryCache (getName ()) : nullptr ;
172+ if (retryCache)
173+ {
174+ Recorder::Instance ().retry .record (dumpTuple (task).append (CACHE));
175+ retryCache->insert (task, cst);
176+ return true ;
177+ }
178+ return false ;
179+ }
180+
181+ bool Orch::addToRetry (const std::string &executorName, const Task &task, const Constraint &cst) {
182+ auto retryCache = getRetryCache (executorName);
183+ if (retryCache)
184+ {
185+ Recorder::Instance ().retry .record (getConsumerBase (executorName)->dumpTuple (task).append (CACHE));
186+ retryCache->insert (task, cst);
187+ return true ;
188+ }
189+ return false ;
190+ }
191+
192+ /* *
193+ * @brief Check the consumer's RetryCache, if the set of resolved constraints is not empty,
194+ * query RetryMap for failed tasks indexed by these resolved constraints,
195+ * and move them back to the consumer's SyncMap, such that they can be retried in the next iteration.
196+ * @param executorName - name of the consumer
197+ * @param quota - maximum number of tasks to be moved back to SyncMap in a single call
198+ * @return number of tasks moved back to SyncMap
199+ */
200+ size_t Orch::retryToSync (const std::string &executorName, size_t quota)
201+ {
202+ auto retryCache = getRetryCache (executorName);
203+
204+ // directly return 0 if no retry cache for this executor or quota is non-positive
205+ if (!retryCache || quota <= 0 )
206+ return 0 ;
207+
208+ std::unordered_set<Constraint>& constraints = retryCache->getResolvedConstraints ();
209+
210+ size_t count = 0 ;
211+
212+ while (!constraints.empty () && count < quota)
213+ {
214+ auto cst = *constraints.begin ();
215+
216+ auto tasks = retryCache->resolve (cst, quota - count);
217+
218+ count += tasks->size ();
219+
220+ getConsumerBase (executorName)->addToSync (tasks, true );
221+
222+ }
223+ return count;
224+ }
225+
226+ void Orch::notifyRetry (Orch *retryOrch, const std::string &executorName, const Constraint &cst)
227+ {
228+ auto retryCache = retryOrch->getRetryCache (executorName);
229+ if (!retryCache)
230+ {
231+ SWSS_LOG_ERROR (" RetryCache not initialized for %s" , executorName.c_str ());
232+ }
233+ else
234+ {
235+ retryCache->mark_resolved (cst);
236+ }
237+ }
238+
239+ size_t ConsumerBase::addToSync (std::shared_ptr<std::deque<swss::KeyOpFieldsValuesTuple>> entries, bool onRetry) {
240+ return addToSync (*entries, onRetry);
241+ }
242+
243+ void ConsumerBase::addToSync (const KeyOpFieldsValuesTuple &entry, bool onRetry)
151244{
152245 SWSS_LOG_ENTER ();
153246
154247 string key = kfvKey (entry);
155248 string op = kfvOp (entry);
156249
157- /* Record incoming tasks */
158- Recorder::Instance ().swss .record (dumpTuple (entry));
250+ if (!onRetry)
251+ /* Record incoming tasks */
252+ Recorder::Instance ().swss .record (dumpTuple (entry));
253+ else
254+ Recorder::Instance ().retry .record (dumpTuple (entry).append (DECACHE));
255+
256+ auto retryCache = getOrch () ? getOrch ()->getRetryCache (getName ()) : nullptr ;
257+
258+ if (retryCache && !onRetry)
259+ {
260+ size_t count = retryCache->getRetryMap ().count (key);
261+
262+ switch (count)
263+ {
264+ case 0 :
265+ // No task with the same key found in the retrycache
266+ break ;
267+
268+ case 1 :
269+ {
270+ // Single task found
271+ auto it = retryCache->getRetryMap ().find (key);
272+ if (it->second .second == entry) // skip duplicate task
273+ {
274+ SWSS_LOG_DEBUG (" Skip, already in retry cache: %s" , dumpTuple (entry).c_str ());
275+ return ;
276+ }
277+
278+ if (op == DEL_COMMAND)
279+ {
280+ if (kfvOp (it->second .second ) == SET_COMMAND)
281+ {
282+ auto old_task = retryCache->evict (key);
283+ Recorder::Instance ().retry .record (dumpTuple (*old_task).append (DECACHE));
284+ }
285+ }
286+ else if (op == SET_COMMAND)
287+ {
288+ if (kfvOp (it->second .second ) == SET_COMMAND)
289+ {
290+ // move the old SET back to m_toSync for later merge
291+ auto old_task = retryCache->evict (key);
292+ m_toSync.emplace (key, *old_task);
293+ Recorder::Instance ().retry .record (dumpTuple (*old_task).append (DECACHE));
294+ }
295+ }
296+ break ;
297+ }
298+ case 2 :
299+ {
300+ // 2 tasks found, must be a DEL + a SET
301+ if (op == DEL_COMMAND)
302+ {
303+ // remove the SET task from the cache, reuse the DEL task
304+ auto old_task = retryCache->evict (key);
305+ Recorder::Instance ().retry .record (dumpTuple (*old_task).append (DECACHE));
306+ return ;
307+ }
308+ else if (op == SET_COMMAND)
309+ {
310+ // Keep the DEL task, move the old SET back to m_toSync for later merge
311+ auto old_task = retryCache->evict (key);
312+ Recorder::Instance ().retry .record (dumpTuple (*old_task).append (DECACHE));
313+ m_toSync.emplace (key, *old_task);
314+ }
315+ break ;
316+ }
317+ default :
318+ SWSS_LOG_ERROR (" Maximum two values per key, found: %zu" , count);
319+ }
320+ }
159321
160322 /*
161323 * m_toSync is a multimap which will allow one key with multiple values,
@@ -230,22 +392,18 @@ void ConsumerBase::addToSync(const KeyOpFieldsValuesTuple &entry)
230392
231393}
232394
233- size_t ConsumerBase::addToSync (const std::deque<KeyOpFieldsValuesTuple> &entries)
395+ size_t ConsumerBase::addToSync (const std::deque<KeyOpFieldsValuesTuple> &entries, bool onRetry )
234396{
235397 SWSS_LOG_ENTER ();
236398
237399 for (auto & entry: entries)
238400 {
239- addToSync (entry);
401+ addToSync (entry, onRetry );
240402 }
241403
242404 return entries.size ();
243405}
244406
245- size_t ConsumerBase::addToSync (std::shared_ptr<std::deque<swss::KeyOpFieldsValuesTuple>> entries) {
246- return addToSync (*entries);
247- }
248-
249407// TODO: Table should be const
250408size_t ConsumerBase::refillToSync (Table* table)
251409{
@@ -326,6 +484,20 @@ void ConsumerBase::dumpPendingTasks(vector<string> &ts)
326484
327485 ts.push_back (s);
328486 }
487+
488+ // check pending tasks in m_toRetry if orch has allocated a retry cache for this consumer
489+ auto rc = getOrch () ? getOrch ()->getRetryCache (getTableName ()) : nullptr ;
490+ if (rc)
491+ {
492+ for (auto &tm : rc->getRetryMap ())
493+ {
494+ KeyOpFieldsValuesTuple& tuple = tm.second .second ;
495+
496+ string s = dumpTuple (tuple);
497+
498+ ts.push_back (s);
499+ }
500+ }
329501}
330502
331503void Consumer::execute ()
@@ -663,8 +835,15 @@ string Orch::objectReferenceInfo(
663835
664836void Orch::doTask ()
665837{
838+ // limit the number of tasks moved from RetryMap to SyncMap in one iteration
839+ // to avoid starvation of new tasks in SyncMap
840+ auto threshold = gBatchSize == 0 ? 30000 : gBatchSize ;
841+
842+ size_t count = 0 ;
843+
666844 for (auto &it : m_consumerMap)
667845 {
846+ count += retryToSync (it.first , threshold - count);
668847 it.second ->drain ();
669848 }
670849}
0 commit comments