Skip to content

Commit ad9d647

Browse files
committed
rework parallel processor to support workload balancing
1 parent 8c1a770 commit ad9d647

File tree

1 file changed

+34
-23
lines changed

1 file changed

+34
-23
lines changed

src/public/vstdlib/jobthread.h

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -872,10 +872,18 @@ template <typename ITEM_TYPE, class ITEM_PROCESSOR_TYPE>
872872
class CParallelProcessor
873873
{
874874
public:
875-
CParallelProcessor( const char *pszDescription )
875+
CParallelProcessor( const char *pszDescription, bool bUnbalanced = false )
876876
{
877-
m_pItems = m_pLimit= 0;
878877
m_szDescription = pszDescription;
878+
if (bUnbalanced)
879+
{
880+
m_iBatchSize = 1;
881+
}
882+
else
883+
{
884+
m_iBatchSize = 0;
885+
}
886+
m_iLeftOver = 0;
879887
}
880888

881889
void Run( ITEM_TYPE *pItems, unsigned nItems, int nMaxParallel = INT_MAX, IThreadPool *pThreadPool = NULL )
@@ -891,7 +899,6 @@ class CParallelProcessor
891899
}
892900

893901
m_pItems = pItems;
894-
m_pLimit = pItems + nItems;
895902

896903
int nJobs = nItems;
897904
#if MEASURE_PARALLEL_WORK
@@ -918,9 +925,16 @@ class CParallelProcessor
918925
#if MEASURE_PARALLEL_WORK
919926
float Start = Plat_FloatTime();
920927
#endif
928+
929+
m_iBatchCount = nJobs;
921930

922931
if ( nJobs > 1 )
923932
{
933+
if ( m_iBatchSize != 1)
934+
{
935+
m_iBatchSize = nItems / nJobs;
936+
m_iLeftOver = nItems % nJobs;
937+
}
924938
// Decrement jobs by 1 for the main thread.
925939
nJobs--;
926940
CJob **jobs = (CJob **)stackalloc( nJobs * sizeof(CJob **) );
@@ -941,14 +955,11 @@ class CParallelProcessor
941955

942956
// Do jobs alongside the threads.
943957
DoExecute();
944-
for (i = 0; i < nJobs; i++)
945-
{
946-
jobs[i]->Abort(); // will either abort ones that never got a thread, or noop on ones that did
947-
jobs[i]->Release();
948-
}
958+
pThreadPool->YieldWait(jobs, nJobs);
949959
}
950960
else
951961
{
962+
m_iBatchSize = 1;
952963
DoExecute();
953964
}
954965
#if MEASURE_PARALLEL_WORK
@@ -967,26 +978,23 @@ class CParallelProcessor
967978
int work = 0;
968979
#endif
969980

970-
if ( m_pItems < m_pLimit )
981+
int iBatchIndex = m_iBatchIndex++;
982+
if (iBatchIndex < m_iBatchCount)
971983
{
972984
m_ItemProcessor.Begin();
973985

974-
ITEM_TYPE *pLimit = m_pLimit;
986+
int iBatchSize = m_iBatchSize;
987+
if (iBatchIndex == m_iBatchCount - 1)
988+
{
989+
iBatchSize += m_iLeftOver;
990+
}
975991

976-
for (;;)
992+
for (int i = 0; i < iBatchSize; i++)
977993
{
978-
ITEM_TYPE *pCurrent = m_pItems++;
979-
if ( pCurrent < pLimit )
980-
{
981994
#if MEASURE_PARALLEL_WORK
982-
++work;
995+
++work;
983996
#endif
984-
m_ItemProcessor.Process( *pCurrent );
985-
}
986-
else
987-
{
988-
break;
989-
}
997+
m_ItemProcessor.Process( *(m_pItems + (iBatchIndex * m_iBatchSize + i)) );
990998
}
991999

9921000
m_ItemProcessor.End();
@@ -996,8 +1004,11 @@ class CParallelProcessor
9961004
DevMsg("Thread %d did %d/%d units of work for %s\n", GetCurrentThreadId(), work, m_iItems, m_szDescription);
9971005
#endif
9981006
}
999-
CInterlockedPtr<ITEM_TYPE> m_pItems;
1000-
ITEM_TYPE * m_pLimit;
1007+
ITEM_TYPE* m_pItems;
1008+
CInterlockedInt m_iBatchIndex; // Which batch are we on?
1009+
int m_iBatchSize; // How big the batches are
1010+
int m_iLeftOver; // The last batch gets the leftovers
1011+
int m_iBatchCount; // How many batches are there?
10011012
#if MEASURE_PARALLEL_WORK
10021013
int m_iItems;
10031014
#endif

0 commit comments

Comments
 (0)