Skip to content

Commit a35fef1

Browse files
Clean up cputopology initialization global communication pattern (#3449)
- Replaces error-prone locking and improper scheduler driving with well-tested pattern from Isomalloc-Sync - Replaces bespoke "empty reduction" with CmiBarrier - Replaces Broadcast with NodeBroadcast and moves send outside of handler - Replaces unnecessary privatization of Converse handler index variables with CmiAssignOnce
1 parent bd9f9af commit a35fef1

File tree

1 file changed

+76
-98
lines changed

1 file changed

+76
-98
lines changed

src/conv-core/cputopology.C

Lines changed: 76 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,10 @@ namespace CpuTopoDetails {
170170
static nodeTopoMsg *topomsg = NULL;
171171
static std::map<skt_ip_t, _procInfo *> hostTable;
172172

173-
CpvStaticDeclare(int, cpuTopoHandlerIdx);
174-
CpvStaticDeclare(int, cpuTopoRecvHandlerIdx);
175-
CpvStaticDeclare(int, topoDoneHandlerIdx);
173+
static int cpuTopoHandlerIdx;
174+
static int cpuTopoRecvHandlerIdx;
176175

177176
static CpuTopology cpuTopo;
178-
static CmiNodeLock topoLock = 0; /* Not spelled 'NULL' to quiet warnings when CmiNodeLock is just 'int' */
179177
static int done = 0;
180178
static int topoDone = 0;
181179
static int _noip = 0;
@@ -198,6 +196,32 @@ static void printTopology(int numNodes)
198196
CmiPrintf("Charm++> Running on %d hosts\n", numNodes);
199197
}
200198

199+
static std::atomic<bool> cpuTopoSyncHandlerDone{};
200+
#if CMK_SMP && !CMK_SMP_NO_COMMTHD
201+
extern void CommunicationServerThread(int sleepTime);
202+
static std::atomic<bool> cpuTopoSyncCommThreadDone{};
203+
#endif
204+
205+
#if CMK_SMP && !CMK_SMP_NO_COMMTHD
206+
static void cpuTopoSyncWaitCommThread()
207+
{
208+
do
209+
CommunicationServerThread(5);
210+
while (!cpuTopoSyncCommThreadDone.load());
211+
212+
CommunicationServerThread(5);
213+
}
214+
#endif
215+
216+
static void cpuTopoSyncWait()
217+
{
218+
do
219+
CsdSchedulePoll();
220+
while (!cpuTopoSyncHandlerDone.load());
221+
222+
CsdSchedulePoll();
223+
}
224+
201225
/* called on PE 0 */
202226
static void cpuTopoHandler(void *m)
203227
{
@@ -208,7 +232,7 @@ static void cpuTopoHandler(void *m)
208232
if (topomsg == NULL) {
209233
int i;
210234
topomsg = (nodeTopoMsg *)CmiAlloc(sizeof(nodeTopoMsg)+CmiNumPes()*sizeof(int));
211-
CmiSetHandler((char *)topomsg, CpvAccess(cpuTopoRecvHandlerIdx));
235+
CmiSetHandler((char *)topomsg, cpuTopoRecvHandlerIdx);
212236
topomsg->nodes = (int *)((char*)topomsg + sizeof(nodeTopoMsg));
213237
for (i=0; i<CmiNumPes(); i++) topomsg->nodes[i] = -1;
214238
}
@@ -244,14 +268,7 @@ static void cpuTopoHandler(void *m)
244268
hostTable.clear();
245269
CmiFree(msg);
246270

247-
CmiSyncBroadcastAllAndFree(sizeof(nodeTopoMsg)+CmiNumPes()*sizeof(int), (char *)topomsg);
248-
}
249-
250-
/* called on PE 0 */
251-
static void topoDoneHandler(void *m) {
252-
CmiLock(topoLock);
253-
topoDone++;
254-
CmiUnlock(topoLock);
271+
cpuTopoSyncHandlerDone = true;
255272
}
256273

257274
/* called on each processor */
@@ -260,17 +277,17 @@ static void cpuTopoRecvHandler(void *msg)
260277
nodeTopoMsg *m = (nodeTopoMsg *)msg;
261278
m->nodes = (int *)((char*)m + sizeof(nodeTopoMsg));
262279

263-
CmiLock(topoLock);
264280
if (cpuTopo.nodeIDs == NULL) {
265281
cpuTopo.nodeIDs = m->nodes;
266282
cpuTopo.sort();
267283
}
268284
else
269285
CmiFree(m);
270286
done++;
271-
CmiUnlock(topoLock);
272287

273288
//if (CmiMyPe() == 0) cpuTopo.print();
289+
290+
cpuTopoSyncHandlerDone = true;
274291
}
275292

276293
// reduction function
@@ -284,7 +301,7 @@ static void * combineMessage(int *size, void *data, void **remote, int count)
284301
hostnameMsg *msg = (hostnameMsg *)CmiAlloc(*size);
285302
msg->procs = (_procInfo*)((char*)msg + sizeof(hostnameMsg));
286303
msg->n = nprocs;
287-
CmiSetHandler((char *)msg, CpvAccess(cpuTopoHandlerIdx));
304+
CmiSetHandler((char *)msg, cpuTopoHandlerIdx);
288305

289306
int n=0;
290307
hostnameMsg *m = (hostnameMsg*)data;
@@ -300,20 +317,6 @@ static void * combineMessage(int *size, void *data, void **remote, int count)
300317
return msg;
301318
}
302319

303-
// reduction function
304-
static void *emptyReduction(int *size, void *data, void **remote, int count)
305-
{
306-
if (CmiMyPe() != 0) {
307-
CmiLock(topoLock);
308-
topoDone++;
309-
CmiUnlock(topoLock);
310-
}
311-
*size = sizeof(topoDoneMsg);
312-
topoDoneMsg *msg = (topoDoneMsg *)CmiAlloc(sizeof(topoDoneMsg));
313-
CmiSetHandler((char *)msg, CpvAccess(topoDoneHandlerIdx));
314-
return msg;
315-
}
316-
317320
/****************** API implementation **********************/
318321

319322
extern "C" int LrtsCpuTopoEnabled()
@@ -375,16 +378,11 @@ extern "C" int LrtsNodeFirst(int node)
375378
extern "C" void LrtsInitCpuTopo(char **argv)
376379
{
377380
static skt_ip_t myip;
378-
hostnameMsg *msg;
379381
double startT;
380382

381383
int obtain_flag = 1; // default on
382384
int show_flag = 0; // default not show topology
383385

384-
if (CmiMyRank() ==0) {
385-
topoLock = CmiCreateLock();
386-
}
387-
388386
#if __FAULT__
389387
obtain_flag = 0;
390388
#endif
@@ -398,17 +396,9 @@ extern "C" void LrtsInitCpuTopo(char **argv)
398396
"Show cpu topology info"))
399397
show_flag = 1;
400398

401-
{
402-
CpvInitialize(int, cpuTopoHandlerIdx);
403-
CpvInitialize(int, cpuTopoRecvHandlerIdx);
404-
CpvInitialize(int, topoDoneHandlerIdx);
405-
CpvAccess(cpuTopoHandlerIdx) =
406-
CmiRegisterHandler((CmiHandler)cpuTopoHandler);
407-
CpvAccess(cpuTopoRecvHandlerIdx) =
408-
CmiRegisterHandler((CmiHandler)cpuTopoRecvHandler);
409-
CpvAccess(topoDoneHandlerIdx) =
410-
CmiRegisterHandler((CmiHandler)topoDoneHandler);
411-
}
399+
CmiAssignOnce(&cpuTopoHandlerIdx, CmiRegisterHandler((CmiHandler)cpuTopoHandler));
400+
CmiAssignOnce(&cpuTopoRecvHandlerIdx, CmiRegisterHandler((CmiHandler)cpuTopoRecvHandler));
401+
412402
if (!obtain_flag) {
413403
if (CmiMyRank() == 0) cpuTopo.sort();
414404
CmiNodeAllBarrier();
@@ -478,24 +468,7 @@ extern "C" void LrtsInitCpuTopo(char **argv)
478468

479469
#else
480470

481-
bool topoInProgress = true;
482-
483-
if (CmiMyPe() >= CmiNumPes()) {
484-
CmiNodeAllBarrier(); // comm thread waiting
485-
#if CMK_MACHINE_PROGRESS_DEFINED
486-
bool waitForSecondReduction = (CmiNumNodes() > 1);
487-
while (topoInProgress) {
488-
CmiNetworkProgress();
489-
CmiLock(topoLock);
490-
if (waitForSecondReduction) topoInProgress = topoDone < CmiMyNodeSize();
491-
else topoInProgress = done < CmiMyNodeSize();
492-
CmiUnlock(topoLock);
493-
}
494-
#endif
495-
return; /* comm thread return */
496-
}
497-
498-
/* get my ip address */
471+
/* get my ip address */
499472
if (CmiMyRank() == 0)
500473
{
501474
#if !CMK_BLUEGENEQ
@@ -510,48 +483,53 @@ extern "C" void LrtsInitCpuTopo(char **argv)
510483
}
511484

512485
CmiNodeAllBarrier();
513-
if (_noip) return;
486+
if (_noip)
487+
{
488+
if (CmiMyRank() == 0) cpuTopo.sort();
489+
CcdRaiseCondition(CcdTOPOLOGY_AVAIL); // call callbacks
490+
return;
491+
}
514492

515-
/* prepare a msg to send */
516-
msg = (hostnameMsg *)CmiAlloc(sizeof(hostnameMsg)+sizeof(_procInfo));
517-
msg->n = 1;
518-
msg->procs = (_procInfo*)((char*)msg + sizeof(hostnameMsg));
519-
CmiSetHandler((char *)msg, CpvAccess(cpuTopoHandlerIdx));
520-
msg->procs[0].pe = CmiMyPe();
521-
msg->procs[0].ip = myip;
522-
msg->procs[0].ncores = CmiNumCores();
523-
msg->procs[0].rank = 0;
524-
msg->procs[0].nodeID = 0;
525-
CmiReduce(msg, sizeof(hostnameMsg)+sizeof(_procInfo), combineMessage);
526-
527-
// blocking here (wait for broadcast from PE0 to reach all PEs in this node)
528-
while (topoInProgress) {
529-
CsdSchedulePoll();
530-
CmiLock(topoLock);
531-
topoInProgress = done < CmiMyNodeSize();
532-
CmiUnlock(topoLock);
493+
#if CMK_SMP && !CMK_SMP_NO_COMMTHD
494+
if (CmiInCommThread())
495+
{
496+
cpuTopoSyncWaitCommThread();
533497
}
498+
else
499+
#endif
500+
{
501+
/* prepare a msg to send */
502+
hostnameMsg *msg = (hostnameMsg *)CmiAlloc(sizeof(hostnameMsg)+sizeof(_procInfo));
503+
msg->n = 1;
504+
msg->procs = (_procInfo*)((char*)msg + sizeof(hostnameMsg));
505+
CmiSetHandler((char *)msg, cpuTopoHandlerIdx);
506+
auto proc = &msg->procs[0];
507+
proc->pe = CmiMyPe();
508+
proc->ip = myip;
509+
proc->ncores = CmiNumCores();
510+
proc->rank = 0;
511+
proc->nodeID = 0;
512+
CmiReduce(msg, sizeof(hostnameMsg)+sizeof(_procInfo), combineMessage);
513+
514+
cpuTopoSyncWait();
515+
516+
if (CmiMyRank() == 0)
517+
{
518+
if (CmiMyPe() == 0)
519+
{
520+
CmiSyncNodeBroadcastAllAndFree(sizeof(nodeTopoMsg)+CmiNumPes()*sizeof(int), (char *)topomsg);
534521

535-
if (CmiNumNodes() > 1) {
536-
topoDoneMsg *msg2 = (topoDoneMsg *)CmiAlloc(sizeof(topoDoneMsg));
537-
CmiSetHandler((char *)msg2, CpvAccess(topoDoneHandlerIdx));
538-
CmiReduce(msg2, sizeof(topoDoneMsg), emptyReduction);
539-
if ((CmiMyPe() == 0) || (CmiNumSpanTreeChildren(CmiMyPe()) > 0)) {
540-
// wait until everyone else has topo info
541-
topoInProgress = true;
542-
while (topoInProgress) {
543522
CsdSchedulePoll();
544-
CmiLock(topoLock);
545-
topoInProgress = topoDone < CmiMyNodeSize();
546-
CmiUnlock(topoLock);
547523
}
548-
} else {
549-
CmiLock(topoLock);
550-
topoDone++;
551-
CmiUnlock(topoLock);
524+
525+
#if CMK_SMP && !CMK_SMP_NO_COMMTHD
526+
cpuTopoSyncCommThreadDone = true;
527+
#endif
552528
}
553529
}
554530

531+
CmiBarrier();
532+
555533
if (CmiMyPe() == 0) {
556534
CmiPrintf("Charm++> cpu topology info is gathered in %.3f seconds.\n", CmiWallTimer()-startT);
557535
}

0 commit comments

Comments
 (0)