@@ -335,36 +335,46 @@ void Consumer::execute()
335335 auto entries = std::make_shared<std::deque<KeyOpFieldsValuesTuple>>();
336336 getConsumerTable ()->pops (*entries);
337337
338- pushRingBuffer ([=](){
339- addToSync (entries);
340- });
341-
342- pushRingBuffer ([=](){
343- drain ();
344- });
338+ processAnyTask (
339+ // bundle tasks into a lambda function which takes no argument and returns void
340+ // this lambda captures variables by value from the surrounding scope
341+ [=](){
342+ addToSync (entries);
343+ drain ();
344+ }
345+ );
345346}
346347
347- void Executor::pushRingBuffer (AnyTask&& task)
348+ void Executor::processAnyTask (AnyTask&& task)
348349{
350+ // if either gRingBuffer isn't initialized or the ring thread isn't created
349351 if (!gRingBuffer || !gRingBuffer ->thread_created )
350352 {
351- // execute the task right now in this thread if gRingBuffer is not initialized
352- // or the ring thread is not created, or this executor is not served by gRingBuffer
353+ // execute the input task immediately
353354 task ();
355+ // in this case, processAnyTask(task) is equivalent to calling the task()
356+ return ;
354357 }
355- else if (!gRingBuffer ->serves (getName ())) // not served by ring thread
358+
359+ // Ring Buffer Logic
360+
361+ // if this executor isn't served by ring buffer
362+ if (!gRingBuffer ->serves (getName ()))
356363 {
364+ // this executor should execute the input task in the main thread
365+ // but to avoid thread issue, it should wait when the ring buffer is actively working
357366 while (!gRingBuffer ->IsEmpty () || !gRingBuffer ->IsIdle ()) {
358367 gRingBuffer ->notify ();
359368 std::this_thread::sleep_for (std::chrono::milliseconds (SLEEP_MSECONDS));
360369 }
361- // if ring thread is enabled, make sure to execute task after the ring finishes its work
370+ // execute task()
362371 task ();
363372 }
364373 else
365374 {
366- // if this executor is served by gRingBuffer, push the task to gRingBuffer
367- // and notify the ring thread to flush gRingBuffer
375+ // if this executor is served by ring buffer,
376+ // push the task to gRingBuffer
377+ // this task would be executed in the ring thread, not here
368378 while (!gRingBuffer ->push (task)) {
369379 gRingBuffer ->notify ();
370380 SWSS_LOG_WARN (" ring is full...push again" );
0 commit comments