Skip to content

Commit eb3bbb9

Browse files
committed
Update handling of log requests
Threadshift out of the PMIx progress thread before touching PRRTE globals. Have the DVM controller respond with an "ack" message once the log has been completed. Signed-off-by: Ralph Castain <[email protected]>
1 parent 1e0927e commit eb3bbb9

File tree

5 files changed

+327
-254
lines changed

5 files changed

+327
-254
lines changed

examples/log.c

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
1616
* Copyright (c) 2013-2019 Intel, Inc. All rights reserved.
1717
* Copyright (c) 2015 Mellanox Technologies, Inc. All rights reserved.
18-
* Copyright (c) 2021 Nanook Consulting. All rights reserved.
18+
* Copyright (c) 2021-2026 Nanook Consulting All rights reserved.
1919
* $COPYRIGHT$
2020
*
2121
* Additional copyrights may follow
@@ -44,6 +44,7 @@ int main(int argc, char **argv)
4444
bool flag;
4545
pmix_proc_t proc;
4646
bool syslog = false, global = false;
47+
char *msg;
4748

4849
/* check for CLI directives */
4950
if (1 < argc) {
@@ -63,10 +64,12 @@ int main(int argc, char **argv)
6364
fprintf(stderr, "Client ns %s rank %d: Running\n", myproc.nspace, myproc.rank);
6465

6566
/* have rank 0 do the logs - doesn't really matter who does it */
66-
if (0 == myproc.rank) {
67+
if (0 == (myproc.rank % 2)) {
6768
/* always output a log message to stderr */
6869
PMIX_INFO_CREATE(info, 1);
69-
PMIX_INFO_LOAD(&info[0], PMIX_LOG_STDERR, "stderr log message\n", PMIX_STRING);
70+
asprintf(&msg, "[%u]: stderr log message\n", myproc.rank);
71+
PMIX_INFO_LOAD(&info[0], PMIX_LOG_STDERR, msg, PMIX_STRING);
72+
free(msg);
7073
PMIX_INFO_CREATE(directives, 1);
7174
PMIX_INFO_LOAD(&directives[0], PMIX_LOG_GENERATE_TIMESTAMP, NULL, PMIX_BOOL);
7275
rc = PMIx_Log(info, 1, directives, 1);
@@ -77,9 +80,11 @@ int main(int argc, char **argv)
7780
}
7881
/* if requested, output one to syslog */
7982
if (syslog) {
80-
fprintf(stderr, "LOG TO LOCAL SYSLOG\n");
83+
fprintf(stderr, "[%u]: LOG TO LOCAL SYSLOG\n", myproc.rank);
8184
PMIX_INFO_CREATE(info, 1);
82-
PMIX_INFO_LOAD(&info[0], PMIX_LOG_LOCAL_SYSLOG, "SYSLOG message\n", PMIX_STRING);
85+
asprintf(&msg, "[%u]: SYSLOG message\n", myproc.rank);
86+
PMIX_INFO_LOAD(&info[0], PMIX_LOG_LOCAL_SYSLOG, msg, PMIX_STRING);
87+
free(msg);
8388
rc = PMIx_Log(info, 1, NULL, 0);
8489
if (PMIX_SUCCESS != rc) {
8590
fprintf(stderr, "Client ns %s rank %d: PMIx_Log syslog failed: %s\n", myproc.nspace,
@@ -88,10 +93,52 @@ int main(int argc, char **argv)
8893
}
8994
}
9095
if (global) {
91-
fprintf(stderr, "LOG TO GLOBAL SYSLOG\n");
96+
fprintf(stderr, "[%u]: LOG TO GLOBAL SYSLOG\n", myproc.rank);
9297
PMIX_INFO_CREATE(info, 1);
93-
PMIX_INFO_LOAD(&info[0], PMIX_LOG_GLOBAL_SYSLOG, "GLOBAL SYSLOG message\n",
94-
PMIX_STRING);
98+
asprintf(&msg, "[%u]: GLOBAL SYSLOG message\n", myproc.rank);
99+
PMIX_INFO_LOAD(&info[0], PMIX_LOG_GLOBAL_SYSLOG, msg, PMIX_STRING);
100+
free(msg);
101+
rc = PMIx_Log(info, 1, NULL, 0);
102+
if (PMIX_SUCCESS != rc) {
103+
fprintf(stderr, "Client ns %s rank %d: PMIx_Log GLOBAL syslog failed: %s\n",
104+
myproc.nspace, myproc.rank, PMIx_Error_string(rc));
105+
goto fence;
106+
}
107+
}
108+
} else {
109+
/* always output a log message to stdout */
110+
PMIX_INFO_CREATE(info, 1);
111+
asprintf(&msg, "[%u]: stdout log message\n", myproc.rank);
112+
PMIX_INFO_LOAD(&info[0], PMIX_LOG_STDERR, msg, PMIX_STRING);
113+
free(msg);
114+
PMIX_INFO_CREATE(directives, 1);
115+
PMIX_INFO_LOAD(&directives[0], PMIX_LOG_GENERATE_TIMESTAMP, NULL, PMIX_BOOL);
116+
rc = PMIx_Log(info, 1, directives, 1);
117+
if (PMIX_SUCCESS != rc) {
118+
fprintf(stderr, "Client ns %s rank %d: PMIx_Log stderr failed: %s\n", myproc.nspace,
119+
myproc.rank, PMIx_Error_string(rc));
120+
goto fence;
121+
}
122+
/* if requested, output one to syslog */
123+
if (syslog) {
124+
fprintf(stderr, "[%u]: LOG TO LOCAL SYSLOG\n", myproc.rank);
125+
PMIX_INFO_CREATE(info, 1);
126+
asprintf(&msg, "[%u]: SYSLOG message\n", myproc.rank);
127+
PMIX_INFO_LOAD(&info[0], PMIX_LOG_LOCAL_SYSLOG, msg, PMIX_STRING);
128+
free(msg);
129+
rc = PMIx_Log(info, 1, NULL, 0);
130+
if (PMIX_SUCCESS != rc) {
131+
fprintf(stderr, "Client ns %s rank %d: PMIx_Log syslog failed: %s\n", myproc.nspace,
132+
myproc.rank, PMIx_Error_string(rc));
133+
goto fence;
134+
}
135+
}
136+
if (global) {
137+
fprintf(stderr, "[%u]: LOG TO GLOBAL SYSLOG\n", myproc.rank);
138+
PMIX_INFO_CREATE(info, 1);
139+
asprintf(&msg, "[%u]: GLOBAL SYSLOG message\n", myproc.rank);
140+
PMIX_INFO_LOAD(&info[0], PMIX_LOG_GLOBAL_SYSLOG, msg, PMIX_STRING);
141+
free(msg);
95142
rc = PMIx_Log(info, 1, NULL, 0);
96143
if (PMIX_SUCCESS != rc) {
97144
fprintf(stderr, "Client ns %s rank %d: PMIx_Log GLOBAL syslog failed: %s\n",

src/prted/pmix/pmix_server.c

Lines changed: 98 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
* All rights reserved.
1919
* Copyright (c) 2014-2019 Research Organization for Information Science
2020
* and Technology (RIST). All rights reserved.
21-
* Copyright (c) 2021-2025 Nanook Consulting All rights reserved.
21+
* Copyright (c) 2021-2026 Nanook Consulting All rights reserved.
2222
* Copyright (c) 2023 Triad National Security, LLC. All rights reserved.
2323
* $COPYRIGHT$
2424
*
@@ -1069,6 +1069,10 @@ void pmix_server_start(void)
10691069
PRTE_RML_RECV(PRTE_NAME_WILDCARD, PRTE_RML_TAG_MONITOR_RESP,
10701070
PRTE_RML_PERSISTENT, pmix_server_monitor_resp, NULL);
10711071

1072+
// setup recv for logging responses
1073+
PRTE_RML_RECV(PRTE_NAME_WILDCARD, PRTE_RML_TAG_LOGGING_RESP,
1074+
PRTE_RML_PERSISTENT, pmix_server_logging_resp, NULL);
1075+
10721076
if (PRTE_PROC_IS_MASTER) {
10731077
/* setup recv for logging requests */
10741078
PRTE_RML_RECV(PRTE_NAME_WILDCARD, PRTE_RML_TAG_LOGGING,
@@ -1099,6 +1103,7 @@ void pmix_server_finalize(void)
10991103
PRTE_RML_CANCEL(PRTE_NAME_WILDCARD, PRTE_RML_TAG_SCHED_RESP);
11001104
PRTE_RML_CANCEL(PRTE_NAME_WILDCARD, PRTE_RML_TAG_MONITOR_REQUEST);
11011105
PRTE_RML_CANCEL(PRTE_NAME_WILDCARD, PRTE_RML_TAG_MONITOR_RESP);
1106+
PRTE_RML_CANCEL(PRTE_NAME_WILDCARD, PRTE_RML_TAG_LOGGING_RESP);
11021107
if (PRTE_PROC_IS_MASTER) {
11031108
PRTE_RML_CANCEL(PRTE_NAME_WILDCARD, PRTE_RML_TAG_LOGGING);
11041109
PRTE_RML_CANCEL(PRTE_NAME_WILDCARD, PRTE_RML_TAG_SCHED);
@@ -1749,18 +1754,54 @@ static void pmix_server_dmdx_resp(int status, pmix_proc_t *sender,
17491754

17501755
static void log_cbfunc(pmix_status_t status, void *cbdata)
17511756
{
1752-
prte_pmix_server_op_caddy_t *scd = (prte_pmix_server_op_caddy_t *) cbdata;
1757+
prte_pmix_server_req_t *req = (prte_pmix_server_req_t *) cbdata;
1758+
pmix_data_buffer_t *buf;
1759+
pmix_status_t rc, lstat;
1760+
1761+
pmix_output_verbose(2, prte_pmix_server_globals.output,
1762+
"Logging callback called");
17531763

17541764
if (PMIX_SUCCESS != status && PMIX_OPERATION_SUCCEEDED != status) {
17551765
pmix_output(prte_pmix_server_globals.output, "LOG FAILED");
17561766
}
1757-
if (NULL != scd->info) {
1758-
PMIX_INFO_FREE(scd->info, scd->ninfo);
1767+
if (PMIX_OPERATION_SUCCEEDED == status) {
1768+
lstat = PMIX_SUCCESS;
1769+
} else {
1770+
lstat = status;
17591771
}
1760-
if (NULL != scd->directives) {
1761-
PMIX_INFO_FREE(scd->directives, scd->ndirs);
1772+
1773+
PMIX_DATA_BUFFER_CREATE(buf);
1774+
1775+
// pack the requestors index
1776+
rc = PMIx_Data_pack(NULL, buf, &req->remote_index, 1, PMIX_INT);
1777+
if (PMIX_SUCCESS != rc) {
1778+
PMIX_ERROR_LOG(rc);
1779+
PMIX_DATA_BUFFER_RELEASE(buf);
1780+
goto done;
1781+
}
1782+
1783+
// pack the operation's status
1784+
rc = PMIx_Data_pack(NULL, buf, &lstat, 1, PMIX_STATUS);
1785+
if (PMIX_SUCCESS != rc) {
1786+
PMIX_ERROR_LOG(rc);
1787+
PMIX_DATA_BUFFER_RELEASE(buf);
1788+
goto done;
17621789
}
1763-
PMIX_RELEASE(scd);
1790+
1791+
/* send the result to the requestor */
1792+
pmix_output_verbose(2, prte_pmix_server_globals.output,
1793+
"Logging response %s sent to daemon %u",
1794+
PMIx_Error_string(lstat), req->proxy.rank);
1795+
1796+
PRTE_RML_SEND(rc, req->proxy.rank, buf,
1797+
PRTE_RML_TAG_LOGGING_RESP);
1798+
if (PRTE_SUCCESS != rc) {
1799+
PRTE_ERROR_LOG(rc);
1800+
PMIX_DATA_BUFFER_RELEASE(buf);
1801+
}
1802+
1803+
done:
1804+
PMIX_RELEASE(req);
17641805
}
17651806

17661807

@@ -1770,136 +1811,108 @@ static void pmix_server_log(int status, pmix_proc_t *sender,
17701811
{
17711812
int rc;
17721813
int32_t cnt;
1773-
size_t n, ninfo, ndirs;
1774-
pmix_info_t *info;
1775-
pmix_status_t ret;
1776-
pmix_byte_object_t boptr;
1777-
pmix_data_buffer_t pbkt;
1778-
prte_pmix_server_op_caddy_t *scd;
1814+
size_t ndirs;
1815+
prte_pmix_server_req_t *req = NULL;
17791816
pmix_proc_t source;
17801817
prte_job_t *jdata;
17811818
bool noagg;
17821819
bool flag;
1783-
PRTE_HIDE_UNUSED_PARAMS(status, sender, tg, cbdata);
1820+
PRTE_HIDE_UNUSED_PARAMS(status, tg, cbdata);
17841821

17851822
pmix_output_verbose(2, prte_pmix_server_globals.output,
17861823
"Logging info relayed by %s",
17871824
PRTE_NAME_PRINT(sender));
17881825

1826+
req = PMIX_NEW(prte_pmix_server_req_t);
1827+
memcpy(&req->proxy, sender, sizeof(pmix_proc_t));
1828+
// unpack the requestor's local index - this is our remote_index
1829+
cnt = 1;
1830+
rc = PMIx_Data_unpack(NULL, buffer, &req->remote_index, &cnt, PMIX_INT);
1831+
if (PMIX_SUCCESS != rc) {
1832+
PMIX_ERROR_LOG(rc);
1833+
goto respond;
1834+
}
1835+
17891836
/* unpack the source of the request */
17901837
cnt = 1;
17911838
rc = PMIx_Data_unpack(NULL, buffer, &source, &cnt, PMIX_PROC);
17921839
if (PMIX_SUCCESS != rc) {
17931840
PMIX_ERROR_LOG(rc);
1794-
return;
1841+
goto respond;
17951842
}
17961843
/* look up the job for this source */
17971844
jdata = prte_get_job_data_object(source.nspace);
17981845
if (NULL == jdata) {
17991846
/* should never happen */
18001847
PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
1801-
return;
1848+
rc = PMIX_ERR_NOT_FOUND;
1849+
goto respond;
18021850
}
18031851
noagg = prte_get_attribute(&jdata->attributes, PRTE_JOB_NOAGG_HELP, NULL, PMIX_BOOL);
18041852

18051853
/* unpack the number of info */
18061854
cnt = 1;
1807-
rc = PMIx_Data_unpack(NULL, buffer, &ninfo, &cnt, PMIX_SIZE);
1808-
if (PMIX_SUCCESS != rc) {
1809-
PMIX_ERROR_LOG(rc);
1810-
return;
1811-
}
1812-
1813-
/* unpack the number of directives */
1814-
cnt = 1;
1815-
rc = PMIx_Data_unpack(NULL, buffer, &ndirs, &cnt, PMIX_SIZE);
1816-
if (PMIX_SUCCESS != rc) {
1817-
PMIX_ERROR_LOG(rc);
1818-
return;
1819-
}
1820-
1821-
PMIX_BYTE_OBJECT_CONSTRUCT(&boptr);
1822-
/* unpack the info blob */
1823-
cnt = 1;
1824-
rc = PMIx_Data_unpack(NULL, buffer, &boptr, &cnt, PMIX_BYTE_OBJECT);
1855+
rc = PMIx_Data_unpack(NULL, buffer, &req->ninfo, &cnt, PMIX_SIZE);
18251856
if (PMIX_SUCCESS != rc) {
18261857
PMIX_ERROR_LOG(rc);
1827-
return;
1858+
goto respond;
18281859
}
1829-
1830-
PMIX_INFO_CREATE(info, ninfo);
1831-
PMIX_DATA_BUFFER_CONSTRUCT(&pbkt);
1832-
rc = PMIx_Data_load(&pbkt, &boptr);
1833-
for (n = 0; n < ninfo; n++) {
1834-
cnt = 1;
1835-
ret = PMIx_Data_unpack(NULL, &pbkt, (void *) &info[n], &cnt, PMIX_INFO);
1836-
if (PMIX_SUCCESS != ret) {
1837-
PMIX_ERROR_LOG(ret);
1838-
PMIX_INFO_FREE(info, ninfo);
1839-
PMIX_DATA_BUFFER_DESTRUCT(&pbkt);
1840-
PMIX_BYTE_OBJECT_DESTRUCT(&boptr);
1841-
return;
1860+
if (0 < req->ninfo) {
1861+
req->copy = true;
1862+
PMIX_INFO_CREATE(req->info, req->ninfo);
1863+
cnt = req->ninfo;
1864+
rc = PMIx_Data_unpack(NULL, buffer, req->info, &cnt, PMIX_INFO);
1865+
if (PMIX_SUCCESS != rc) {
1866+
PMIX_ERROR_LOG(rc);
1867+
goto respond;
18421868
}
18431869
}
1844-
PMIX_DATA_BUFFER_DESTRUCT(&pbkt);
1845-
PMIX_BYTE_OBJECT_DESTRUCT(&boptr);
18461870

1847-
PMIX_BYTE_OBJECT_CONSTRUCT(&boptr);
1848-
/* unpack the directives blob */
1871+
/* unpack the number of directives */
18491872
cnt = 1;
1850-
rc = PMIx_Data_unpack(NULL, buffer, &boptr, &cnt, PMIX_BYTE_OBJECT);
1873+
rc = PMIx_Data_unpack(NULL, buffer, &ndirs, &cnt, PMIX_SIZE);
18511874
if (PMIX_SUCCESS != rc) {
1852-
PMIX_BYTE_OBJECT_CONSTRUCT(&boptr);
18531875
PMIX_ERROR_LOG(rc);
1854-
return;
1876+
goto respond;
18551877
}
18561878

1857-
scd = PMIX_NEW(prte_pmix_server_op_caddy_t);
18581879
/* if we are not going to aggregate, then indicate so */
18591880
if (noagg) {
1860-
scd->ndirs = ndirs + 3;
1881+
req->ndirs = ndirs + 3;
18611882
} else {
1862-
scd->ndirs = ndirs + 2; // need to locally add two directives
1883+
req->ndirs = ndirs + 2; // need to locally add two directives
18631884
}
1864-
PMIX_INFO_CREATE(scd->directives, scd->ndirs);
1865-
PMIX_DATA_BUFFER_CONSTRUCT(&pbkt);
1866-
rc = PMIx_Data_load(&pbkt, &boptr);
1867-
for (n = 0; n < ndirs; n++) {
1868-
cnt = 1;
1869-
ret = PMIx_Data_unpack(NULL, &pbkt, (void *) &scd->directives[n], &cnt, PMIX_INFO);
1870-
if (PMIX_SUCCESS != ret) {
1871-
PMIX_ERROR_LOG(ret);
1872-
PMIX_INFO_FREE(scd->directives, scd->ndirs);
1873-
PMIX_RELEASE(scd);
1874-
PMIX_DATA_BUFFER_DESTRUCT(&pbkt);
1875-
PMIX_BYTE_OBJECT_CONSTRUCT(&boptr);
1876-
return;
1885+
PMIX_INFO_CREATE(req->directives, req->ndirs);
1886+
req->dircopy = true;
1887+
if (0 < ndirs) {
1888+
cnt = ndirs;
1889+
rc = PMIx_Data_unpack(NULL, buffer, req->directives, &cnt, PMIX_INFO);
1890+
if (PMIX_SUCCESS != rc) {
1891+
PMIX_ERROR_LOG(rc);
1892+
goto respond;
18771893
}
18781894
}
1879-
PMIX_DATA_BUFFER_DESTRUCT(&pbkt);
1880-
PMIX_BYTE_OBJECT_CONSTRUCT(&boptr);
18811895

18821896
/* indicate that only ONE PMIx log component should handle this request */
1883-
PMIX_INFO_LOAD(&scd->directives[ndirs], PMIX_LOG_ONCE, NULL, PMIX_BOOL);
1897+
PMIX_INFO_LOAD(&req->directives[ndirs], PMIX_LOG_ONCE, NULL, PMIX_BOOL);
18841898
/* protect against infinite loop should the PMIx server push
18851899
* this back up to us */
1886-
PMIX_INFO_LOAD(&scd->directives[ndirs+1], "prte.log.noloop", NULL, PMIX_BOOL);
1900+
PMIX_INFO_LOAD(&req->directives[ndirs+1], "prte.log.noloop", NULL, PMIX_BOOL);
18871901
if (noagg) {
18881902
flag = false;
1889-
PMIX_INFO_LOAD(&scd->directives[ndirs+2], PMIX_LOG_AGG, &flag, PMIX_BOOL);
1903+
PMIX_INFO_LOAD(&req->directives[ndirs+2], PMIX_LOG_AGG, &flag, PMIX_BOOL);
18901904
}
1891-
scd->info = info;
1892-
scd->ninfo = ninfo;
1905+
18931906
/* pass the array down to be logged */
1894-
rc = PMIx_Log_nb(scd->info, scd->ninfo, scd->directives, scd->ndirs, log_cbfunc, scd);
1907+
rc = PMIx_Log_nb(req->info, req->ninfo, req->directives, req->ndirs, log_cbfunc, req);
1908+
1909+
respond:
18951910
if (PMIX_SUCCESS != rc) {
1896-
if (NULL != scd->info) {
1897-
PMIX_INFO_FREE(scd->info, scd->ninfo);
1898-
}
1899-
if (NULL != scd->directives) {
1900-
PMIX_INFO_FREE(scd->directives, scd->ndirs);
1911+
// callback fn will not be called - send a message to the requestor
1912+
if (PMIX_OPERATION_SUCCEEDED == rc) {
1913+
rc = PMIX_SUCCESS;
19011914
}
1902-
PMIX_RELEASE(scd);
1915+
log_cbfunc(rc, req); // will clear memory
19031916
}
19041917
}
19051918

0 commit comments

Comments
 (0)