@@ -1894,21 +1894,33 @@ void PMTraceConsumer::CompletePresent(std::shared_ptr<PresentEvent> const& p)
18941894void PMTraceConsumer::AddPresentToCompletedList (std::shared_ptr<PresentEvent> const & present)
18951895{
18961896 {
1897- std::lock_guard <std::mutex> lock (mPresentEventMutex );
1897+ std::unique_lock <std::mutex> lock (mPresentEventMutex );
18981898
1899- // If the completed list is full, throw away the oldest completed present, if it IsLost; or this
1900- // present, if it IsLost; or the oldest completed present.
19011899 uint32_t index;
1900+ // if completed buffer is full
19021901 if (mCompletedCount == PRESENTEVENT_CIRCULAR_BUFFER_SIZE) {
1903- if (!mCompletedPresents [mCompletedIndex ]->IsLost && present->IsLost ) {
1904- return ;
1902+ // if we are in offline ETL processing mode, block instead of overwriting events
1903+ // unless either A) the buffer is full of non-ready events or B) backpressure disabled via CLI option
1904+ if (!mIsRealtimeSession && mReadyCount != 0 && !mDisableOfflineBackpressure ) {
1905+ mCompletedRingCondition .wait (lock, [this ] { return mCompletedCount < PRESENTEVENT_CIRCULAR_BUFFER_SIZE; });
1906+ index = GetRingIndex (mCompletedIndex + mCompletedCount );
1907+ mCompletedCount ++;
19051908 }
1909+ // Completed present overflow routine (when not blocking):
1910+ // If the completed list is full, throw away the oldest completed present, if it IsLost; or this
1911+ // present, if it IsLost; or the oldest completed present.
1912+ else {
1913+ if (!mCompletedPresents [mCompletedIndex ]->IsLost && present->IsLost ) {
1914+ return ;
1915+ }
19061916
1907- index = mCompletedIndex ;
1908- mCompletedIndex = GetRingIndex (mCompletedIndex + 1 );
1909- if (mReadyCount > 0 ) {
1910- mReadyCount --;
1917+ index = mCompletedIndex ;
1918+ mCompletedIndex = GetRingIndex (mCompletedIndex + 1 );
1919+ if (mReadyCount > 0 ) {
1920+ mReadyCount --;
1921+ }
19111922 }
1923+ // otherwise, completed buffer still has available space
19121924 } else {
19131925 index = GetRingIndex (mCompletedIndex + mCompletedCount );
19141926 mCompletedCount ++;
@@ -2384,19 +2396,20 @@ void PMTraceConsumer::DequeueProcessEvents(std::vector<ProcessEvent>& outProcess
23842396void PMTraceConsumer::DequeuePresentEvents (std::vector<std::shared_ptr<PresentEvent>>& outPresentEvents)
23852397{
23862398 outPresentEvents.clear ();
2399+ {
2400+ std::lock_guard<std::mutex> lock (mPresentEventMutex );
2401+ if (mReadyCount > 0 ) {
2402+ outPresentEvents.resize (mReadyCount , nullptr );
2403+ for (uint32_t i = 0 ; i < mReadyCount ; ++i) {
2404+ std::swap (outPresentEvents[i], mCompletedPresents [mCompletedIndex ]);
2405+ mCompletedIndex = GetRingIndex (mCompletedIndex + 1 );
2406+ }
23872407
2388- std::lock_guard<std::mutex> lock (mPresentEventMutex );
2389-
2390- if (mReadyCount > 0 ) {
2391- outPresentEvents.resize (mReadyCount , nullptr );
2392- for (uint32_t i = 0 ; i < mReadyCount ; ++i) {
2393- std::swap (outPresentEvents[i], mCompletedPresents [mCompletedIndex ]);
2394- mCompletedIndex = GetRingIndex (mCompletedIndex + 1 );
2408+ mCompletedCount -= mReadyCount ;
2409+ mReadyCount = 0 ;
23952410 }
2396-
2397- mCompletedCount -= mReadyCount ;
2398- mReadyCount = 0 ;
23992411 }
2412+ mCompletedRingCondition .notify_one ();
24002413}
24012414
24022415#ifdef TRACK_PRESENT_PATHS
0 commit comments