Skip to content

Commit e85fcc4

Browse files
committed
When work is done signal all threads to exit and join them
Relace the infinite loops with a while (!quit) loop and set quit = 1 when the pipe to the parent is closed. The main thread then signals all threads and uses pthread_join to wait for them. This way all threads can cleanup and exit. OK tb@ job@
1 parent 9fc7f45 commit e85fcc4

File tree

1 file changed

+36
-13
lines changed

1 file changed

+36
-13
lines changed

usr.sbin/rpki-client/parser.c

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* $OpenBSD: parser.c,v 1.156 2025/06/18 09:04:51 tb Exp $ */
1+
/* $OpenBSD: parser.c,v 1.157 2025/06/20 05:00:01 claudio Exp $ */
22
/*
33
* Copyright (c) 2019 Claudio Jeker <[email protected]>
44
* Copyright (c) 2019 Kristaps Dzonsons <[email protected]>
@@ -51,6 +51,8 @@ static struct ibufqueue *globalmsgq;
5151
static pthread_mutex_t globalmsgq_mtx = PTHREAD_MUTEX_INITIALIZER;
5252
static pthread_cond_t globalmsgq_cond = PTHREAD_COND_INITIALIZER;
5353

54+
static volatile int quit;
55+
5456
struct parse_repo {
5557
RB_ENTRY(parse_repo) entry;
5658
char *path;
@@ -1075,9 +1077,9 @@ parse_worker(void *arg)
10751077
if ((myq = ibufq_new()) == NULL)
10761078
err(1, "ibufqueue_new");
10771079

1078-
for (;;) {
1080+
while (!quit) {
10791081
pthread_mutex_lock(&globalq_mtx);
1080-
while (TAILQ_EMPTY(&globalq))
1082+
while (TAILQ_EMPTY(&globalq) && !quit)
10811083
pthread_cond_wait(&globalq_cond, &globalq_mtx);
10821084
n = 0;
10831085
while ((entp = TAILQ_FIRST(&globalq)) != NULL) {
@@ -1100,10 +1102,10 @@ parse_worker(void *arg)
11001102
}
11011103
}
11021104

1103-
11041105
X509_STORE_CTX_free(ctx);
11051106
BN_CTX_free(bn_ctx);
11061107
ibufq_free(myq);
1108+
return NULL;
11071109
}
11081110

11091111
static void *
@@ -1115,11 +1117,12 @@ parse_writer(void *arg)
11151117
if ((myq = msgbuf_new()) == NULL)
11161118
err(1, NULL);
11171119
pfd.fd = *(int *)arg;
1118-
for (;;) {
1120+
while (!quit) {
11191121
if (msgbuf_queuelen(myq) == 0) {
11201122
pthread_mutex_lock(&globalmsgq_mtx);
1121-
while (ibufq_queuelen(globalmsgq) == 0)
1122-
pthread_cond_wait(&globalmsgq_cond, &globalmsgq_mtx);
1123+
while (ibufq_queuelen(globalmsgq) == 0 && !quit)
1124+
pthread_cond_wait(&globalmsgq_cond,
1125+
&globalmsgq_mtx);
11231126
/* enqueue messages to local msgbuf */
11241127
msgbuf_concat(myq, globalmsgq);
11251128
pthread_mutex_unlock(&globalmsgq_mtx);
@@ -1137,8 +1140,10 @@ parse_writer(void *arg)
11371140
errx(1, "poll: bad descriptor");
11381141

11391142
/* If the parent closes, return immediately. */
1140-
if ((pfd.revents & POLLHUP))
1143+
if ((pfd.revents & POLLHUP)) {
1144+
quit = 1;
11411145
break;
1146+
}
11421147

11431148
if (pfd.revents & POLLOUT) {
11441149
if (msgbuf_write(pfd.fd, myq) == -1) {
@@ -1152,6 +1157,7 @@ parse_writer(void *arg)
11521157
}
11531158
}
11541159

1160+
msgbuf_free(myq);
11551161
return NULL;
11561162
}
11571163

@@ -1170,7 +1176,7 @@ proc_parser(int fd, int nthreads)
11701176
struct msgbuf *inbufq;
11711177
struct entity *entp;
11721178
struct ibuf *b;
1173-
pthread_t wthread, dummy;
1179+
pthread_t writer, *workers;
11741180
int i;
11751181

11761182
/* Only allow access to the cache directory. */
@@ -1191,12 +1197,15 @@ proc_parser(int fd, int nthreads)
11911197
NULL)
11921198
err(1, NULL);
11931199

1194-
pthread_create(&wthread, NULL, &parse_writer, &fd);
1200+
if ((workers = calloc(nthreads, sizeof(*workers))) == NULL)
1201+
err(1, NULL);
1202+
1203+
pthread_create(&writer, NULL, &parse_writer, &fd);
11951204
for (i = 0; i < nthreads; i++)
1196-
pthread_create(&dummy, NULL, &parse_worker, NULL);
1205+
pthread_create(&workers[i], NULL, &parse_worker, NULL);
11971206

11981207
pfd.fd = fd;
1199-
for (;;) {
1208+
while (!quit) {
12001209
pfd.events = POLLIN;
12011210
if (poll(&pfd, 1, INFTIM) == -1) {
12021211
if (errno == EINTR)
@@ -1207,8 +1216,10 @@ proc_parser(int fd, int nthreads)
12071216
errx(1, "poll: bad descriptor");
12081217

12091218
/* If the parent closes, return immediately. */
1210-
if ((pfd.revents & POLLHUP))
1219+
if ((pfd.revents & POLLHUP)) {
1220+
quit = 1;
12111221
break;
1222+
}
12121223

12131224
if ((pfd.revents & POLLIN)) {
12141225
switch (ibuf_read(fd, inbufq)) {
@@ -1243,13 +1254,25 @@ proc_parser(int fd, int nthreads)
12431254
}
12441255
}
12451256

1257+
/* signal all threads */
1258+
pthread_cond_broadcast(&globalq_cond);
1259+
pthread_cond_broadcast(&globalmsgq_cond);
1260+
12461261
pthread_mutex_lock(&globalq_mtx);
12471262
while ((entp = TAILQ_FIRST(&globalq)) != NULL) {
12481263
TAILQ_REMOVE(&globalq, entp, entries);
12491264
entity_free(entp);
12501265
}
12511266
pthread_mutex_unlock(&globalq_mtx);
12521267

1268+
if (pthread_join(writer, NULL) != 0)
1269+
errx(1, "pthread_join writer");
1270+
for (i = 0; i < nthreads; i++) {
1271+
if (pthread_join(workers[i], NULL) != 0)
1272+
errx(1, "pthread_join worker %d", i);
1273+
}
1274+
free(workers); /* karl marx */
1275+
12531276
auth_tree_free(&auths);
12541277
crl_tree_free(&crlt);
12551278

0 commit comments

Comments
 (0)